diff --git a/internal/acp/openclaw_async_tasks.go b/internal/acp/openclaw_async_tasks.go index edcafb4..a8d7fac 100644 --- a/internal/acp/openclaw_async_tasks.go +++ b/internal/acp/openclaw_async_tasks.go @@ -15,21 +15,22 @@ const ( ) type OpenClawTaskRecord struct { - SessionID string - ThreadID string - TurnID string - RunID string - SessionKey string - GatewayProviderID string - TaskLoadClass string - RuntimeBudgetMinutes int - StartedAt time.Time - DeadlineAt time.Time - ProgressStage string - ProgressMessage string - PreparedArtifact *openClawPreparedArtifactScope - ResolvedModel string - ResolvedSkills []string + SessionID string + ThreadID string + TurnID string + RunID string + SessionKey string + GatewayProviderID string + TaskLoadClass string + RuntimeBudgetMinutes int + StartedAt time.Time + DeadlineAt time.Time + ProgressStage string + ProgressMessage string + PreparedArtifact *openClawPreparedArtifactScope + RequiresArtifactExport bool + ResolvedModel string + ResolvedSkills []string } func openClawTaskRuntimePolicy(params map[string]any, chatParams map[string]any, contract openClawArtifactContract) (string, int) { @@ -84,6 +85,7 @@ func openClawRunningTaskResult(record *OpenClawTaskRecord) map[string]any { "resolvedGatewayProviderId": record.GatewayProviderID, "taskLoadClass": record.TaskLoadClass, "runtimeBudgetMinutes": record.RuntimeBudgetMinutes, + "requiresArtifactExport": record.RequiresArtifactExport, "startedAt": record.StartedAt.UTC().Format(time.RFC3339Nano), "deadlineAt": record.DeadlineAt.UTC().Format(time.RFC3339Nano), "progress": openClawTaskProgress(record), diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index b2d96f9..b5b8bc8 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -384,21 +384,22 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask( taskLoadClass, runtimeBudgetMinutes := openClawTaskRuntimePolicy(params, chatParams, artifactContract) startedAt := time.Now() record := &OpenClawTaskRecord{ - SessionID: sessionID, - ThreadID: threadID, - TurnID: turnID, - RunID: runID, - SessionKey: sessionKey, - GatewayProviderID: gatewayProvider, - TaskLoadClass: taskLoadClass, - RuntimeBudgetMinutes: runtimeBudgetMinutes, - StartedAt: startedAt, - DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute), - ProgressStage: "running", - ProgressMessage: "OpenClaw task accepted", - PreparedArtifact: preparedArtifact, - ResolvedModel: routing.Model, - ResolvedSkills: append([]string(nil), routing.Skills...), + SessionID: sessionID, + ThreadID: threadID, + TurnID: turnID, + RunID: runID, + SessionKey: sessionKey, + GatewayProviderID: gatewayProvider, + TaskLoadClass: taskLoadClass, + RuntimeBudgetMinutes: runtimeBudgetMinutes, + StartedAt: startedAt, + DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute), + ProgressStage: "running", + ProgressMessage: "OpenClaw task accepted", + PreparedArtifact: preparedArtifact, + RequiresArtifactExport: artifactContract.RequiresArtifactExport, + ResolvedModel: routing.Model, + ResolvedSkills: append([]string(nil), routing.Skills...), } sess := o.server.getOrCreateSession(sessionID, threadID) sess.mu.Lock() @@ -617,6 +618,9 @@ func openClawSessionPrepareParams(params map[string]any, openClawSessionKey stri if len(artifactContract.ExpectedArtifactDirs) > 0 { result["expectedArtifactDirs"] = append([]string(nil), artifactContract.ExpectedArtifactDirs...) } + if artifactContract.RequiresArtifactExport { + result["requiresArtifactExport"] = true + } if workspaceDir := openClawArtifactWorkspaceDir(params); workspaceDir != "" { result["workspaceDir"] = workspaceDir } @@ -770,10 +774,11 @@ func shellSingleQuote(value string) string { } type openClawArtifactContract struct { - TaskLoadClass string - ComplexLongChain bool - ExpectedArtifactDirs []string - SourceMessage string + TaskLoadClass string + ComplexLongChain bool + RequiresArtifactExport bool + ExpectedArtifactDirs []string + SourceMessage string } func openClawArtifactContractForParams(params map[string]any, chatParams map[string]any) openClawArtifactContract { @@ -786,12 +791,14 @@ func openClawArtifactContractForParams(params map[string]any, chatParams map[str lowerMessage := strings.ToLower(message) contract := shared.AsMap(metadata["xworkmateTaskArtifactContract"]) expectedDirs := normalizeOpenClawDirList(shared.ListArg(contract, "expectedArtifactDirs")) + requiresExport := parseBool(contract["requiresExportBeforeFinalResponse"]) || len(expectedDirs) > 0 complex := taskLoadClass == "complex_long_chain_task" || isOpenClawLongArtifactTask(lowerMessage) return openClawArtifactContract{ - TaskLoadClass: taskLoadClass, - ComplexLongChain: complex, - ExpectedArtifactDirs: expectedDirs, - SourceMessage: message, + TaskLoadClass: taskLoadClass, + ComplexLongChain: complex, + RequiresArtifactExport: requiresExport, + ExpectedArtifactDirs: expectedDirs, + SourceMessage: message, } } diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index fa43b3e..24c858b 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -598,11 +598,11 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { if gateway.AgentWaitCount() != 0 { t.Fatalf("expected native task lookup to avoid Bridge-owned agent.wait, got %d", gateway.AgentWaitCount()) } - if gateway.ArtifactExportCount() != 0 { - t.Fatalf("expected native task lookup to avoid Bridge-owned artifact export, got %d", gateway.ArtifactExportCount()) + if gateway.ArtifactExportCount() != 1 { + t.Fatalf("expected empty terminal task lookup to fall back to artifact export, got %d", gateway.ArtifactExportCount()) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { - t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get", "xworkmate.artifacts.export"}) { + t.Fatalf("expected connect, prepare, chat.send, native task lookup, then artifact export fallback, got %#v", got) } client := gateway.LastConnectClient() if got := client["id"]; got != "openclaw-macos" { @@ -2593,7 +2593,7 @@ func TestExecuteSessionTaskGatewayCollectsOpenClawEventArtifacts(t *testing.T) { } } -func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing.T) { +func TestExecuteSessionTaskGatewayKeepsRunningWhenTerminalLookupExportsNoArtifacts(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -2609,6 +2609,11 @@ func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing "threadId": "thread-openclaw-artifact-missing", "taskPrompt": "say pong", "workingDirectory": t.TempDir(), + "metadata": map[string]any{ + "xworkmateTaskArtifactContract": map[string]any{ + "requiresExportBeforeFinalResponse": true, + }, + }, "routing": map[string]any{ "routingMode": "explicit", "explicitExecutionTarget": "gateway", @@ -2618,16 +2623,74 @@ func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing }, }) if rpcErr != nil { - t.Fatalf("expected gateway text response despite artifact export failure, got rpc error: %#v", rpcErr) + t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr) } - if got := response["output"]; got != "gateway pong" { - t.Fatalf("expected gateway pong output, got %#v", response) + if got := response["status"]; got != string(TaskStateRunning) { + t.Fatalf("expected empty terminal artifact export to keep task running, got %#v", response) } - if gateway.ArtifactExportCount() != 0 { - t.Fatalf("expected native task-registry lookup without Bridge artifact export sync, got %d", gateway.ArtifactExportCount()) + if got := shared.StringArg(shared.AsMap(response["progress"]), "stage", ""); got != "syncing-artifacts" { + t.Fatalf("expected syncing-artifacts progress, got %#v", response) } - if warnings := shared.ListArg(response, "artifactWarnings"); len(warnings) != 0 { - t.Fatalf("expected no artifact warnings when gateway export succeeds empty, got %#v", warnings) + if gateway.ArtifactExportCount() != 1 { + t.Fatalf("expected artifact export fallback after empty terminal lookup, got %d", gateway.ArtifactExportCount()) + } +} + +func TestTaskGetBackfillsArtifactScopeFromBridgeSession(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + defer gateway.Close() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + start, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-openclaw-scope-backfill", + "threadId": "thread-openclaw-scope-backfill", + "taskPrompt": "say pong", + "workingDirectory": t.TempDir(), + "metadata": map[string]any{ + "xworkmateTaskArtifactContract": map[string]any{ + "requiresExportBeforeFinalResponse": true, + }, + }, + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected running task handle, got rpc error: %#v", rpcErr) + } + if got := start["status"]; got != string(TaskStateRunning) { + t.Fatalf("expected running start response, got %#v", start) + } + + response, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "xworkmate.tasks.get", + Params: map[string]any{ + "runId": shared.StringArg(start, "runId", ""), + "appThreadKey": shared.StringArg(start, "appThreadKey", ""), + "openclawSessionKey": shared.StringArg(start, "openclawSessionKey", ""), + "includeArtifacts": true, + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected task lookup response, got rpc error: %#v", rpcErr) + } + if got := response["status"]; got != string(TaskStateRunning) { + t.Fatalf("expected empty terminal lookup to remain running, got %#v", response) + } + exportParams := gateway.LastArtifactExportParams() + if got := shared.StringArg(exportParams, "artifactScope", ""); got == "" { + t.Fatalf("expected Bridge to backfill artifactScope for export, got %#v", exportParams) + } + if got, want := shared.StringArg(exportParams, "artifactScope", ""), shared.StringArg(start, "artifactScope", ""); got != want { + t.Fatalf("expected export artifactScope %q, got %#v", want, exportParams) } } diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index 1f3d108..4a03180 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -91,6 +91,7 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string } func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { + params = s.taskGetParamsWithSessionScope(params) gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "")) if gatewayProvider == "" { gatewayProvider = strings.TrimSpace(shared.StringArg(params, "resolvedGatewayProviderId", "")) @@ -114,7 +115,9 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif notify, ) if result.OK { - return shared.AsMap(result.Payload) + payload := shared.AsMap(result.Payload) + s.mergeOpenClawTaskGetArtifactExport(payload, params, gatewayProvider, notify) + return normalizeOpenClawTaskGetResult(params, payload, gatewayProvider) } message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed")) code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED")) @@ -126,6 +129,184 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif } } +func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any { + next := make(map[string]any, len(params)+8) + for key, value := range params { + next[key] = value + } + sess := s.findTaskSession(params) + if sess == nil { + return next + } + sess.mu.Lock() + defer sess.mu.Unlock() + if strings.TrimSpace(shared.StringArg(next, "runId", "")) == "" { + next["runId"] = sess.task.RunID + } + if strings.TrimSpace(shared.StringArg(next, "taskId", "")) == "" { + next["taskId"] = sess.task.RunID + } + if strings.TrimSpace(shared.StringArg(next, "gatewayProviderId", "")) == "" { + next["gatewayProviderId"] = sess.task.GatewayProviderID + } + if strings.TrimSpace(shared.StringArg(next, "artifactScope", "")) == "" { + next["artifactScope"] = sess.task.ArtifactScope + } + if strings.TrimSpace(shared.StringArg(next, "artifactDirectory", "")) == "" { + next["artifactDirectory"] = sess.task.ArtifactDirectory + } + if _, ok := next["requiresArtifactExport"]; !ok && sess.openClaw != nil && sess.openClaw.RequiresArtifactExport { + next["requiresArtifactExport"] = true + } + if strings.TrimSpace(shared.StringArg(next, "openclawSessionKey", "")) == "" { + next["openclawSessionKey"] = sess.task.SessionKey + } + if strings.TrimSpace(shared.StringArg(next, "appThreadKey", "")) == "" { + next["appThreadKey"] = sess.threadID + } + return next +} + +func (s *Server) mergeOpenClawTaskGetArtifactExport(payload map[string]any, params map[string]any, gatewayProvider string, notify func(map[string]any)) { + if len(payload) == 0 || s == nil || s.orchestrator == nil { + return + } + status := strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) + if status == string(TaskStateRunning) || status == string(TaskStateFailed) || status == string(TaskStateCancelled) { + return + } + if !openClawTaskGetRequiresArtifactExport(params, payload) { + return + } + success := true + if value, ok := payload["success"]; ok { + success = parseBool(value) + } + if !success { + return + } + remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(payload, "remoteWorkingDirectory", "")) + if len(extractArtifactPayloads(payload, remoteWorkingDirectory)) > 0 { + return + } + sessionKey := firstNonEmptyString(payload, "openclawSessionKey", "sessionKey") + if sessionKey == "" { + sessionKey = strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) + } + runID := firstNonEmptyString(payload, "runId", "taskId") + if runID == "" { + runID = strings.TrimSpace(shared.StringArg(params, "runId", "")) + } + artifactScope := firstNonEmptyString(payload, "artifactScope") + if artifactScope == "" { + artifactScope = strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) + } + artifactDirectory := firstNonEmptyString(payload, "artifactDirectory") + if artifactDirectory == "" { + artifactDirectory = strings.TrimSpace(shared.StringArg(params, "artifactDirectory", "")) + } + if sessionKey == "" || runID == "" || artifactScope == "" || artifactDirectory == "" { + return + } + prepared := &openClawPreparedArtifactScope{ + RemoteWorkingDirectory: remoteWorkingDirectory, + RemoteWorkspaceRefKind: strings.TrimSpace(shared.StringArg(payload, "remoteWorkspaceRefKind", "")), + ArtifactScope: artifactScope, + ArtifactDirectory: artifactDirectory, + ScopeKind: "task", + } + exportParams := map[string]any{ + "openclawSessionKey": sessionKey, + "runId": runID, + "artifactScope": artifactScope, + "sinceUnixMs": 0, + "maxFiles": 64, + "maxInlineBytes": 0, + "includeContent": false, + } + if expectedDirs := shared.ListArg(params, "expectedArtifactDirs"); len(expectedDirs) > 0 { + exportParams["expectedArtifactDirs"] = expectedDirs + } + mergeOpenClawArtifactPayload(payload, s.orchestrator.openClawArtifactExportRequest(gatewayProvider, exportParams, notify)) + applyOpenClawPreparedArtifactToResult(payload, prepared) + s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID) + stripOpenClawArtifactInlineContent(payload) +} + +func normalizeOpenClawTaskGetResult(params map[string]any, payload map[string]any, gatewayProvider string) map[string]any { + if len(payload) == 0 { + return payload + } + artifactScope := firstNonEmptyString(payload, "artifactScope") + if artifactScope == "" { + artifactScope = strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) + } + artifactDirectory := firstNonEmptyString(payload, "artifactDirectory") + if artifactDirectory == "" { + artifactDirectory = strings.TrimSpace(shared.StringArg(params, "artifactDirectory", "")) + } + if artifactScope == "" && artifactDirectory == "" { + return payload + } + remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(payload, "remoteWorkingDirectory", "")) + if len(extractArtifactPayloads(payload, remoteWorkingDirectory)) > 0 { + return payload + } + status := strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) + success := true + if value, ok := payload["success"]; ok { + success = parseBool(value) + } + if !success || status == string(TaskStateRunning) || status == string(TaskStateFailed) || status == string(TaskStateCancelled) { + return payload + } + if !openClawTaskGetRequiresArtifactExport(params, payload) { + return payload + } + runID := firstNonEmptyString(payload, "runId", "taskId") + if runID == "" { + runID = strings.TrimSpace(shared.StringArg(params, "runId", "")) + } + sessionKey := firstNonEmptyString(payload, "openclawSessionKey", "sessionKey") + if sessionKey == "" { + sessionKey = strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) + } + payload["success"] = true + payload["status"] = string(TaskStateRunning) + payload["event"] = string(TaskStateRunning) + payload["pending"] = true + payload["artifactSyncStatus"] = "syncing" + payload["message"] = "OpenClaw task completed; waiting for artifact export." + payload["runId"] = runID + payload["taskId"] = runID + payload["openclawSessionKey"] = sessionKey + if strings.TrimSpace(shared.StringArg(payload, "appThreadKey", "")) == "" { + payload["appThreadKey"] = strings.TrimSpace(shared.StringArg(params, "appThreadKey", "")) + } + payload["artifactScope"] = artifactScope + payload["artifactDirectory"] = artifactDirectory + if strings.TrimSpace(shared.StringArg(payload, "resolvedGatewayProviderId", "")) == "" { + payload["resolvedGatewayProviderId"] = gatewayProvider + } + payload["progress"] = map[string]any{ + "stage": "syncing-artifacts", + "message": "Waiting for OpenClaw artifact export.", + "terminal": false, + } + return payload +} + +func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[string]any) bool { + if parseBool(params["requiresArtifactExport"]) || parseBool(payload["requiresArtifactExport"]) { + return true + } + if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) { + return true + } + return len(shared.ListArg(params, "expectedArtifactDirs")) > 0 || + len(shared.ListArg(payload, "expectedArtifactDirs")) > 0 +} + func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { sess := s.findTaskSession(params) runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) @@ -204,6 +385,9 @@ func openClawTaskLookupParams(params map[string]any) map[string]any { "includeContent", "expectedArtifactDirs", "workspaceDir", + "artifactScope", + "artifactDirectory", + "requiresArtifactExport", } { if value, ok := params[key]; ok { result[key] = value