diff --git a/internal/acp/execution_test.go b/internal/acp/execution_test.go index 446a6e4..0fee1f9 100644 --- a/internal/acp/execution_test.go +++ b/internal/acp/execution_test.go @@ -5,14 +5,10 @@ import ( "encoding/json" "net/http" "net/http/httptest" - "os" - "path/filepath" "strings" "testing" - "time" "github.com/gorilla/websocket" - "xworkmate-bridge/internal/shared" ) func TestResolveSingleAgentForwardEndpointFromExampleConfig(t *testing.T) { @@ -350,7 +346,7 @@ func TestCodexCompatWaitsForTurnCompletedNotification(t *testing.T) { if err := conn.WriteJSON(map[string]any{ "method": "turn/completed", "params": map[string]any{ - "openclawSessionKey": "codex-thread-1", "threadId": "codex-thread-1", + "threadId": "codex-thread-1", "turn": turn, }, }); err != nil { @@ -561,117 +557,3 @@ func TestExternalACPNotificationCollectorIgnoresCodexCommentaryMessages(t *testi t.Fatalf("expected commentary to be hidden and duplicate final line collapsed, got %#v", result) } } - -func TestProbeOpenClawTaskFailsAfterMaxAllowedSilentDuration(t *testing.T) { - t.Setenv("XWORKMATE_BRIDGE_OPENCLAW_GATEWAY_MAX_SILENT_DURATION", "2s") - t.Setenv("BRIDGE_CONFIG_PATH", filepath.Join(t.TempDir(), "missing-config.yaml")) - - server := NewServer() - orchestrator := server.orchestrator - sess := server.getOrCreateSession("silent-session", "silent-thread") - startedAt := time.Now().Add(-time.Minute) - sess.mu.Lock() - sess.task = QueuedTask{ - SessionID: "silent-session", - ThreadID: "silent-thread", - TurnID: "silent-turn", - RunID: "silent-run", - SessionKey: "silent-session", - GatewayProviderID: "openclaw", - State: TaskStateRunning, - Kind: TaskKindGateway, - RuntimeBudgetMinutes: openClawLongTaskMinutes, - StartedAt: startedAt, - DeadlineAt: time.Now().Add(time.Minute), - } - sess.openClaw = &OpenClawTaskRecord{ - SessionID: "silent-session", - ThreadID: "silent-thread", - TurnID: "silent-turn", - RunID: "silent-run", - SessionKey: "silent-session", - GatewayProviderID: "openclaw", - TaskLoadClass: "long_task", - RuntimeBudgetMinutes: openClawLongTaskMinutes, - StartedAt: startedAt, - DeadlineAt: time.Now().Add(time.Minute), - FirstSilentFailureAt: time.Now().Add(-3 * time.Second), - } - sess.mu.Unlock() - - result := orchestrator.probeOpenClawTask(context.Background(), sess, nil, false) - - if got := result["status"]; got != string(TaskStateFailed) { - t.Fatalf("expected failed status after silent duration, got %#v", result) - } - if got := result["code"]; got != "OPENCLAW_GATEWAY_LOST" { - t.Fatalf("expected OPENCLAW_GATEWAY_LOST, got %#v", result) - } - sess.mu.Lock() - state := sess.task.State - sess.mu.Unlock() - if state != TaskStateFailed { - t.Fatalf("task state = %s, want %s", state, TaskStateFailed) - } -} - -func TestTerminalOpenClawTaskRemovesInlineAttachmentDirectory(t *testing.T) { - workspace := t.TempDir() - turnID := "turn-inline-gc" - params := map[string]any{ - "openclawSessionKey": "thread-inline-gc", "threadId": "thread-inline-gc", - "taskPrompt": "inspect uploaded file", - "workingDirectory": workspace, - "inlineAttachments": []any{ - map[string]any{ - "name": "note.txt", - "mimeType": "text/plain", - "content": "bm90ZQ==", - }, - }, - } - chatParams, rpcErr := openClawChatSendParamsWithSessionKey(params, turnID, "thread-inline-gc") - if rpcErr != nil { - t.Fatalf("expected chat params, got rpc error: %#v", rpcErr) - } - attachments := shared.ListArg(chatParams, "attachments") - if len(attachments) != 1 { - t.Fatalf("expected materialized attachment, got %#v", attachments) - } - attachmentPath := shared.StringArg(shared.AsMap(attachments[0]), "path", "") - attachmentDirectory := filepath.Dir(attachmentPath) - if _, err := os.Stat(attachmentDirectory); err != nil { - t.Fatalf("expected attachment directory before terminal task state: %v", err) - } - - server := NewServer() - sess := server.getOrCreateSession("gc-session", "gc-thread") - now := time.Now() - sess.mu.Lock() - sess.task = QueuedTask{ - SessionID: "gc-session", - ThreadID: "gc-thread", - TurnID: turnID, - RunID: "gc-run", - State: TaskStateRunning, - Kind: TaskKindGateway, - StartedAt: now, - } - sess.openClaw = &OpenClawTaskRecord{ - SessionID: "gc-session", - ThreadID: "gc-thread", - TurnID: turnID, - RunID: "gc-run", - StartedAt: now, - ChatParams: map[string]any{ - "workingDirectory": workspace, - }, - } - sess.mu.Unlock() - - server.orchestrator.failOpenClawTask(sess, "TEST_FAILED", "terminal") - - if _, err := os.Stat(attachmentDirectory); !os.IsNotExist(err) { - t.Fatalf("expected terminal task to remove attachment directory, stat err=%v", err) - } -} diff --git a/internal/acp/gateway_runtime_test.go b/internal/acp/gateway_runtime_test.go index f025616..500bb71 100644 --- a/internal/acp/gateway_runtime_test.go +++ b/internal/acp/gateway_runtime_test.go @@ -64,48 +64,3 @@ func TestResolveGatewayReportedRemoteAddressNormalizesExplicitPublicRemoteHost( t.Fatalf("resolveGatewayReportedRemoteAddress() = %q, want %q", got, want) } } - -func TestReassociateOpenClawTaskDerivesRuntimeBudgetWithoutExplicitBudget(t *testing.T) { - t.Parallel() - - cases := []struct { - name string - params map[string]any - want int - }{ - { - name: "short task load class", - params: map[string]any{ - "runId": "run-short", - "artifactScope": "tasks/main/run-short", - "taskLoadClass": "short_task", - }, - want: openClawShortTaskMinutes, - }, - } - - for _, tc := range cases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - server := NewServer() - sess := server.reassociateOpenClawTask(tc.params) - if sess == nil { - t.Fatal("expected reassociated session") - } else { - sess.mu.Lock() - gotTaskBudget := sess.task.RuntimeBudgetMinutes - gotRecordBudget := sess.openClaw.RuntimeBudgetMinutes - sess.mu.Unlock() - - if gotTaskBudget != tc.want { - t.Fatalf("task RuntimeBudgetMinutes = %d, want %d", gotTaskBudget, tc.want) - } - if gotRecordBudget != tc.want { - t.Fatalf("record RuntimeBudgetMinutes = %d, want %d", gotRecordBudget, tc.want) - } - } - }) - } -} diff --git a/internal/acp/openclaw_async_tasks.go b/internal/acp/openclaw_async_tasks.go index 01c7885..edcafb4 100644 --- a/internal/acp/openclaw_async_tasks.go +++ b/internal/acp/openclaw_async_tasks.go @@ -1,9 +1,6 @@ package acp import ( - "context" - "os" - "path/filepath" "strings" "time" @@ -12,13 +9,9 @@ import ( ) const ( - openClawTaskProbeTimeout = 2 * time.Second - openClawTaskProbeTimeoutMs = 1000 - openClawTaskMonitorInterval = time.Second - openClawShortTaskMinutes = 10 - openClawLongTaskMinutes = 30 - openClawComplexTaskMinutes = 60 - openClawDefaultMaxAllowedSilentDuration = 10 * time.Minute + openClawShortTaskMinutes = 10 + openClawLongTaskMinutes = 30 + openClawComplexTaskMinutes = 60 ) type OpenClawTaskRecord struct { @@ -29,23 +22,14 @@ type OpenClawTaskRecord struct { SessionKey string GatewayProviderID string TaskLoadClass string - ArtifactSinceUnixMs int64 RuntimeBudgetMinutes int StartedAt time.Time DeadlineAt time.Time - LastProbeAt time.Time ProgressStage string ProgressMessage string - ProgressTerminal bool - FirstSilentFailureAt time.Time - ChatParams map[string]any PreparedArtifact *openClawPreparedArtifactScope - ArtifactContract openClawArtifactContract ResolvedModel string ResolvedSkills []string - MonitorStarted bool - ProbeInFlight bool - AdmissionRelease func() } func openClawTaskRuntimePolicy(params map[string]any, chatParams map[string]any, contract openClawArtifactContract) (string, int) { @@ -121,12 +105,11 @@ func openClawTaskProgress(record *OpenClawTaskRecord) map[string]any { message = "OpenClaw task is running" } return map[string]any{ - "stage": stage, - "message": message, - "elapsedMs": maxInt64(0, now.Sub(record.StartedAt).Milliseconds()), - "budgetMs": (time.Duration(record.RuntimeBudgetMinutes) * time.Minute).Milliseconds(), - "lastProbeAtMs": record.LastProbeAt.UnixMilli(), - "terminal": record.ProgressTerminal, + "stage": stage, + "message": message, + "elapsedMs": maxInt64(0, now.Sub(record.StartedAt).Milliseconds()), + "budgetMs": (time.Duration(record.RuntimeBudgetMinutes) * time.Minute).Milliseconds(), + "terminal": false, } } @@ -136,530 +119,3 @@ func maxInt64(a int64, b int64) int64 { } return b } - -func (o *SessionOrchestrator) startOpenClawTaskMonitor(sess *session) { - if sess == nil { - return - } - sess.mu.Lock() - record := sess.openClaw - if record == nil || record.MonitorStarted || isTerminalTaskState(sess.task.State) { - sess.mu.Unlock() - return - } - record.MonitorStarted = true - sess.mu.Unlock() - - go func() { - defer o.releaseOpenClawAdmission(sess) - time.Sleep(openClawTaskMonitorInterval) - for { - sess.mu.Lock() - state := sess.task.State - deadline := sess.task.DeadlineAt - sess.mu.Unlock() - if isTerminalTaskState(state) { - return - } - if !deadline.IsZero() && time.Now().After(deadline) { - o.failOpenClawTask(sess, "TASK_SLA_EXPIRED", "OpenClaw task exceeded its runtime SLA") - return - } - o.probeOpenClawTask(context.Background(), sess, nil, false) - sess.mu.Lock() - state = sess.task.State - sess.mu.Unlock() - if isTerminalTaskState(state) { - return - } - time.Sleep(openClawTaskMonitorInterval) - } - }() -} - -func isTerminalTaskState(state TaskState) bool { - return state == TaskStateCompleted || state == TaskStateFailed || state == TaskStateCancelled -} - -func (o *SessionOrchestrator) releaseOpenClawAdmission(sess *session) { - if sess == nil { - return - } - var release func() - sess.mu.Lock() - if sess.openClaw != nil { - release = sess.openClaw.AdmissionRelease - sess.openClaw.AdmissionRelease = nil - } - sess.mu.Unlock() - if release != nil { - release() - } -} - -func (o *SessionOrchestrator) failOpenClawTask(sess *session, code string, message string) map[string]any { - if sess == nil { - return map[string]any{"success": false, "status": string(TaskStateFailed), "code": code, "message": message} - } - if strings.TrimSpace(message) == "" { - message = code - } - sess.mu.Lock() - turnID := sess.task.TurnID - runID := sess.task.RunID - gatewayProviderID := sess.task.GatewayProviderID - record := sess.openClaw - sess.task.State = TaskStateFailed - sess.task.UpdatedAt = time.Now() - sess.task.ProgressStage = "failed" - sess.task.ProgressMessage = message - sess.task.ProgressTerminal = true - if sess.openClaw != nil { - sess.openClaw.ProgressStage = "failed" - sess.openClaw.ProgressMessage = message - sess.openClaw.ProgressTerminal = true - sess.openClaw.ProbeInFlight = false - } - result := map[string]any{ - "success": false, - "status": string(TaskStateFailed), - "code": code, - "error": message, - "message": message, - "summary": message, - "output": message, - "turnId": turnID, - "runId": runID, - "mode": router.ExecutionTargetGatewayChat, - "resolvedGatewayProviderId": gatewayProviderID, - } - sess.lastResult = cloneMap(result) - sess.mu.Unlock() - o.releaseOpenClawAdmission(sess) - cleanupOpenClawTurnAttachments(record) - return result -} - -func (o *SessionOrchestrator) probeOpenClawTask(ctx context.Context, sess *session, notify func(map[string]any), waitForArtifacts bool) map[string]any { - if sess == nil { - return map[string]any{"status": "not_found"} - } - sess.mu.Lock() - record := sess.openClaw - if record == nil { - snapshot := openClawSessionSnapshotLocked(sess) - sess.mu.Unlock() - return snapshot - } - if isTerminalTaskState(sess.task.State) { - snapshot := openClawSessionSnapshotLocked(sess) - sess.mu.Unlock() - return snapshot - } - if !record.DeadlineAt.IsZero() && time.Now().After(record.DeadlineAt) { - sess.mu.Unlock() - return o.failOpenClawTask(sess, "TASK_SLA_EXPIRED", "OpenClaw task exceeded its runtime SLA") - } - if record.ProbeInFlight { - result := openClawRunningTaskResult(record) - sess.lastResult = cloneMap(result) - sess.mu.Unlock() - return result - } - record.ProbeInFlight = true - record.LastProbeAt = time.Now() - record.ProgressStage = "probing" - record.ProgressMessage = "Checking OpenClaw task status" - sess.task.LastProbeAt = record.LastProbeAt - sess.task.ProgressStage = record.ProgressStage - sess.task.ProgressMessage = record.ProgressMessage - gatewayProvider := record.GatewayProviderID - runID := record.RunID - sessionID := record.SessionID - threadID := record.ThreadID - turnID := record.TurnID - sess.mu.Unlock() - - collector := newOpenClawChatCollector() - notifyWithCollection := func(message map[string]any) { - collector.observe(message) - if notify == nil { - return - } - if update := openClawGatewaySessionUpdate(message, sessionID, threadID, turnID); update != nil { - notify(update) - } - } - waitStarted := time.Now() - waitParams := map[string]any{ - "runId": runID, - "timeoutMs": openClawTaskProbeTimeoutMs, - } - if waitForArtifacts { - waitParams["waitForArtifacts"] = true - } - waitResult := o.openClawGatewayRequestWithRetry( - gatewayProvider, - "agent.wait", - waitParams, - openClawTaskProbeTimeout, - notifyWithCollection, - ) - logOpenClawGatewayTiming( - gatewayProvider, - "agent.wait.probe", - sessionID, - runID, - time.Since(waitStarted), - waitResult.OK, - ) - if !waitResult.OK { - if openClawProbeStillRunning(waitResult.Error) { - now := time.Now() - sess.mu.Lock() - if sess.openClaw != nil { - if sess.openClaw.FirstSilentFailureAt.IsZero() { - sess.openClaw.FirstSilentFailureAt = now - } - if openClawSilentFailureExceeded(o.server.config, sess.openClaw.FirstSilentFailureAt, now) { - sess.openClaw.ProbeInFlight = false - sess.mu.Unlock() - return o.failOpenClawTask(sess, "OPENCLAW_GATEWAY_LOST", "OpenClaw gateway stayed unreachable beyond the allowed silent duration") - } - sess.openClaw.ProgressStage = "running" - sess.openClaw.ProgressMessage = "OpenClaw task is still running" - sess.openClaw.ProbeInFlight = false - } - sess.task.ProgressStage = "running" - sess.task.ProgressMessage = "OpenClaw task is still running" - sess.task.UpdatedAt = time.Now() - result := openClawRunningTaskResult(sess.openClaw) - sess.lastResult = cloneMap(result) - sess.mu.Unlock() - return result - } - code := strings.TrimSpace(shared.StringArg(waitResult.Error, "code", "OPENCLAW_WAIT_FAILED")) - message := strings.TrimSpace(shared.StringArg(waitResult.Error, "message", "openclaw wait failed")) - return o.failOpenClawTask(sess, code, message) - } - sess.mu.Lock() - if sess.openClaw != nil { - sess.openClaw.FirstSilentFailureAt = time.Time{} - } - sess.mu.Unlock() - waitPayload := shared.AsMap(waitResult.Payload) - if !openClawWaitPayloadTerminal(waitPayload) && !collector.isTerminal() { - return openClawMarkProbeRunning(sess) - } - return o.completeOpenClawTask(sess, waitPayload, collector, notify) -} - -func openClawMarkProbeRunning(sess *session) map[string]any { - sess.mu.Lock() - if sess.openClaw != nil { - sess.openClaw.ProgressStage = "running" - sess.openClaw.ProgressMessage = "OpenClaw task is still running" - sess.openClaw.ProbeInFlight = false - } - sess.task.ProgressStage = "running" - sess.task.ProgressMessage = "OpenClaw task is still running" - sess.task.UpdatedAt = time.Now() - result := openClawRunningTaskResult(sess.openClaw) - sess.lastResult = cloneMap(result) - sess.mu.Unlock() - return result -} - -func openClawWaitPayloadTerminal(payload map[string]any) bool { - if payload == nil { - return false - } - if value, ok := payload["terminal"].(bool); ok { - return value - } - for _, key := range []string{"status", "state", "phase"} { - switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, key, ""))) { - case "complete", "completed", "done", "final", "success", "succeeded", "failed", "failure", "error", "timeout", "timed_out", "cancelled", "canceled": - return true - } - } - return false -} - -func openClawSilentFailureExceeded(config *BridgeConfig, firstFailureAt time.Time, now time.Time) bool { - if firstFailureAt.IsZero() { - return false - } - return now.Sub(firstFailureAt) >= openClawMaxAllowedSilentDuration(config) -} - -func openClawMaxAllowedSilentDuration(config *BridgeConfig) time.Duration { - raw := strings.TrimSpace(shared.EnvOrDefault("XWORKMATE_BRIDGE_OPENCLAW_GATEWAY_MAX_SILENT_DURATION", "")) - if raw == "" && config != nil { - raw = strings.TrimSpace(config.OpenClawGateway.MaxAllowedSilentDuration) - } - if raw != "" { - if parsed, err := time.ParseDuration(raw); err == nil && parsed > 0 { - return parsed - } - } - return openClawDefaultMaxAllowedSilentDuration -} - -func openClawProbeStillRunning(errorPayload map[string]any) bool { - code := strings.TrimSpace(strings.ToUpper(shared.StringArg(errorPayload, "code", ""))) - if code == "TIMEOUT" || code == "RPC_TIMEOUT" || code == "REQUEST_TIMEOUT" || - code == "OFFLINE" || code == "SOCKET_FAILURE" || code == "SOCKET_CLOSED" { - return true - } - message := strings.TrimSpace(strings.ToLower(shared.StringArg(errorPayload, "message", ""))) - return strings.Contains(message, "timeout") || strings.Contains(message, "timed out") -} - -func (o *SessionOrchestrator) completeOpenClawTask( - sess *session, - waitPayload map[string]any, - collector *openClawChatCollector, - notify func(map[string]any), -) map[string]any { - if sess == nil { - return map[string]any{"status": "not_found"} - } - sess.mu.Lock() - record := sess.openClaw - if record == nil { - snapshot := openClawSessionSnapshotLocked(sess) - sess.mu.Unlock() - return snapshot - } - if isTerminalTaskState(sess.task.State) { - record.ProbeInFlight = false - snapshot := openClawSessionSnapshotLocked(sess) - sess.mu.Unlock() - return snapshot - } - sess.mu.Unlock() - - output := "" - if collector != nil { - output = collector.output() - } - if output == "" { - output = firstNonEmptyString(waitPayload, "output", "message", "summary", "assistantText", "text") - if output == "" { - output = firstNonEmptyString(shared.AsMap(waitPayload["result"]), "output", "message", "summary", "assistantText", "text") - } - } - noDisplayableOutput := strings.TrimSpace(output) == "" - if output == "" { - output = openClawNoDisplayableText - } - result := map[string]any{ - "success": true, - "status": string(TaskStateCompleted), - "output": output, - "message": output, - "summary": output, - "turnId": record.TurnID, - "runId": record.RunID, - "sessionId": record.SessionID, - "threadId": record.ThreadID, - "appThreadKey": record.ThreadID, - "openclawSessionKey": record.SessionKey, - "mode": router.ExecutionTargetGatewayChat, - "resolvedExecutionTarget": router.ExecutionTargetGatewayChat, - "resolvedProviderId": record.GatewayProviderID, - "resolvedGatewayProviderId": record.GatewayProviderID, - "resolvedModel": record.ResolvedModel, - "resolvedSkills": append([]string(nil), record.ResolvedSkills...), - "taskLoadClass": record.TaskLoadClass, - "runtimeBudgetMinutes": record.RuntimeBudgetMinutes, - } - mergeOpenClawArtifactPayload(result, waitPayload) - if nestedResult := shared.AsMap(waitPayload["result"]); len(nestedResult) > 0 { - mergeOpenClawArtifactPayload(result, nestedResult) - } - if collector != nil { - mergeOpenClawArtifactPayload(result, collector.artifactPayload()) - } - applyOpenClawPreparedArtifactToResult(result, record.PreparedArtifact) - artifactPayload := o.openClawArtifactExport( - record.GatewayProviderID, - record.ChatParams, - record.ArtifactContract, - record.RunID, - record.ArtifactSinceUnixMs, - record.PreparedArtifact, - notify, - ) - mergeOpenClawArtifactPayload(result, artifactPayload) - snapshotPayload := o.openClawArtifactCollectAndSnapshot( - record.GatewayProviderID, - record.ChatParams, - record.ArtifactContract, - record.RunID, - record.ArtifactSinceUnixMs, - record.PreparedArtifact, - notify, - ) - mergeOpenClawArtifactPayload(result, snapshotPayload) - result[openClawArtifactExportAttemptedField] = true - exportedCount := openClawArtifactPayloadCount(result) - logOpenClawArtifactSync(record.GatewayProviderID, record.SessionKey, record.RunID, "export", record.PreparedArtifact != nil, exportedCount > 0, exportedCount == 0) - o.server.decorateOpenClawArtifactDownloadURLs(result, record.SessionKey, record.RunID) - stripOpenClawArtifactInlineContent(result) - applyOpenClawArtifactContractResult(result, record.ArtifactContract) - guardOpenClawAgentFailedBeforeReplyResult(result) - guardOpenClawNoDisplayableResult(result, noDisplayableOutput) - delete(result, openClawArtifactExportAttemptedField) - - success := parseBool(result["success"]) - state := TaskStateCompleted - stage := "completed" - if !success { - state = TaskStateFailed - stage = "failed" - } - artifactRecord := buildArtifactRecord(sess, result, output) - if len(artifactRecord.Artifacts) > 0 { - result["artifacts"] = artifactRecord.Artifacts - } - if artifactRecord.RemoteWorkingDirectory != "" { - result["remoteWorkingDirectory"] = artifactRecord.RemoteWorkingDirectory - } - if artifactRecord.RemoteWorkspaceRefKind != "" { - result["remoteWorkspaceRefKind"] = artifactRecord.RemoteWorkspaceRefKind - } - if artifactRecord.ResultSummary != "" && strings.TrimSpace(shared.StringArg(result, "resultSummary", "")) == "" { - result["resultSummary"] = artifactRecord.ResultSummary - } - - sess.mu.Lock() - sess.task.State = state - sess.task.UpdatedAt = time.Now() - sess.task.ProgressStage = stage - sess.task.ProgressMessage = output - sess.task.ProgressTerminal = true - if sess.openClaw != nil { - sess.openClaw.ProgressStage = stage - sess.openClaw.ProgressMessage = output - sess.openClaw.ProgressTerminal = true - sess.openClaw.ProbeInFlight = false - } - if output != "" { - sess.history = append(sess.history, "ASSISTANT: "+output) - } - sess.artifacts = artifactRecord - sess.lastResult = cloneMap(result) - sess.mu.Unlock() - o.releaseOpenClawAdmission(sess) - cleanupOpenClawTurnAttachments(record) - if notify != nil { - notify(shared.NotificationEnvelope("session.update", openClawGatewayCompletedResultUpdate(record.SessionID, record.ThreadID, record.TurnID, result))) - } - return result -} - -func cleanupOpenClawTurnAttachments(record *OpenClawTaskRecord) { - if record == nil { - return - } - workingDirectory := strings.TrimSpace(shared.StringArg(record.ChatParams, "workingDirectory", "")) - if workingDirectory == "" { - return - } - attachmentDirectory := filepath.Join( - workingDirectory, - ".xworkmate", - "attachments", - safeOpenClawAttachmentPathSegment(record.TurnID, "turn"), - ) - if !openClawSafeAttachmentCleanupPath(workingDirectory, attachmentDirectory) { - return - } - _ = os.RemoveAll(attachmentDirectory) -} - -func openClawSafeAttachmentCleanupPath(workingDirectory string, attachmentDirectory string) bool { - workingRoot, err := filepath.Abs(strings.TrimSpace(workingDirectory)) - if err != nil || workingRoot == "" { - return false - } - attachmentRoot := filepath.Join(workingRoot, ".xworkmate", "attachments") - target, err := filepath.Abs(strings.TrimSpace(attachmentDirectory)) - if err != nil || target == "" { - return false - } - rel, err := filepath.Rel(attachmentRoot, target) - if err != nil || rel == "." || strings.HasPrefix(rel, "..") || filepath.IsAbs(rel) { - return false - } - return true -} - -func openClawSessionSnapshotLocked(sess *session) map[string]any { - payload := map[string]any{ - "status": string(sess.task.State), - "sessionId": sess.sessionID, - "threadId": sess.threadID, - "task": openClawTaskMapLocked(sess), - } - if len(sess.lastResult) > 0 { - payload["result"] = cloneMap(sess.lastResult) - } - if len(sess.artifacts.Artifacts) > 0 || - sess.artifacts.RemoteWorkingDirectory != "" || - sess.artifacts.RemoteWorkspaceRefKind != "" || - sess.artifacts.ResultSummary != "" { - payload["artifacts"] = map[string]any{ - "items": cloneMapSlice(sess.artifacts.Artifacts), - "remoteWorkingDirectory": sess.artifacts.RemoteWorkingDirectory, - "remoteWorkspaceRefKind": sess.artifacts.RemoteWorkspaceRefKind, - "resultSummary": sess.artifacts.ResultSummary, - "updatedAt": sess.artifacts.UpdatedAt.UTC().Format(time.RFC3339Nano), - } - } - if sess.openClaw != nil { - payload["progress"] = openClawTaskProgress(sess.openClaw) - } - return payload -} - -func openClawTaskMapLocked(sess *session) map[string]any { - task := sess.task - payload := map[string]any{ - "sessionId": task.SessionID, - "threadId": task.ThreadID, - "turnId": task.TurnID, - "runId": task.RunID, - "appThreadKey": task.ThreadID, - "openclawSessionKey": task.SessionKey, - "provider": task.Provider, - "target": task.Target, - "gatewayProviderId": task.GatewayProviderID, - "state": string(task.State), - "kind": string(task.Kind), - "taskLoadClass": task.TaskLoadClass, - "artifactScope": task.ArtifactScope, - "artifactDirectory": task.ArtifactDirectory, - "runtimeBudgetMinutes": task.RuntimeBudgetMinutes, - "updatedAt": task.UpdatedAt.UTC().Format(time.RFC3339Nano), - } - if !task.StartedAt.IsZero() { - payload["startedAt"] = task.StartedAt.UTC().Format(time.RFC3339Nano) - } - if !task.DeadlineAt.IsZero() { - payload["deadlineAt"] = task.DeadlineAt.UTC().Format(time.RFC3339Nano) - } - if !task.LastProbeAt.IsZero() { - payload["lastProbeAt"] = task.LastProbeAt.UTC().Format(time.RFC3339Nano) - } - if task.ProgressStage != "" || task.ProgressMessage != "" { - payload["progress"] = map[string]any{ - "stage": task.ProgressStage, - "message": task.ProgressMessage, - "terminal": task.ProgressTerminal, - } - } - return payload -} diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index a1fbf24..cfcc3c0 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -330,7 +330,6 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask( return nil, rpcErr } artifactContract := openClawArtifactContractForParams(params, chatParams) - artifactSinceUnixMs := time.Now().Add(-1 * time.Second).UnixMilli() preparedArtifact, prepareErr := o.openClawArtifactPrepare( gatewayProvider, params, @@ -393,18 +392,14 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask( SessionKey: sessionKey, GatewayProviderID: gatewayProvider, TaskLoadClass: taskLoadClass, - ArtifactSinceUnixMs: artifactSinceUnixMs, RuntimeBudgetMinutes: runtimeBudgetMinutes, StartedAt: startedAt, DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute), ProgressStage: "running", ProgressMessage: "OpenClaw task accepted", - ChatParams: cloneMap(chatParams), PreparedArtifact: preparedArtifact, - ArtifactContract: artifactContract, ResolvedModel: routing.Model, ResolvedSkills: append([]string(nil), routing.Skills...), - AdmissionRelease: releaseAdmission, } sess := o.server.getOrCreateSession(sessionID, threadID) sess.mu.Lock() @@ -423,7 +418,9 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask( running := openClawRunningTaskResult(record) sess.lastResult = cloneMap(running) sess.mu.Unlock() - o.startOpenClawTaskMonitor(sess) + if releaseAdmission != nil { + releaseAdmission() + } if notify != nil { notify(shared.NotificationEnvelope("session.update", map[string]any{ "sessionId": sessionID, @@ -786,7 +783,12 @@ func normalizeOpenClawDirList(values []any) []string { return result } - +func openClawChatSendParams( + params map[string]any, + turnID string, +) (map[string]any, *shared.RPCError) { + return openClawChatSendParamsWithSessionKey(params, turnID, openClawAgentMainSessionKey(openClawAppThreadKey(params))) +} func openClawChatSendParamsWithSessionKey( params map[string]any, @@ -1207,7 +1209,18 @@ func compactOpenClawTexts(texts []string) []string { } func (o *SessionOrchestrator) openClawSessionKey(params map[string]any, turnID string) string { - return strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) + if explicit := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")); explicit != "" { + return explicit + } + return openClawAgentMainSessionKey(openClawAppThreadKey(params)) +} + +func openClawAgentMainSessionKey(appThreadKey string) string { + appThreadKey = strings.TrimSpace(appThreadKey) + if appThreadKey == "" { + appThreadKey = "main" + } + return "agent:main:" + appThreadKey } func validateOpenClawAcceptedSessionKey(payload map[string]any, expectedSessionKey string) *shared.RPCError { @@ -1233,8 +1246,6 @@ func validateOpenClawAcceptedSessionKey(payload map[string]any, expectedSessionK } } - - func (o *SessionOrchestrator) openClawArtifactExport( gatewayProvider string, chatParams map[string]any, @@ -1334,14 +1345,6 @@ func (o *SessionOrchestrator) openClawArtifactExportRequest( } } -func openClawSessionKeyFromArtifactScope(scope string) string { - parts := strings.Split(strings.TrimSpace(scope), "/") - if len(parts) != 3 || parts[0] != "tasks" { - return "" - } - return strings.TrimSpace(parts[1]) -} - func guardOpenClawNoDisplayableResult(result map[string]any, noDisplayableOutput bool) { if !noDisplayableOutput || result == nil || !parseBool(result["success"]) { return diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index 01f7fa1..5fb6df6 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -309,7 +309,7 @@ func TestExecuteSessionTaskAutoRoutingRecordsProjectMemory(t *testing.T) { req: shared.RPCRequest{ Params: map[string]any{ "sessionId": "session-auto", - "openclawSessionKey": "thread-auto", "threadId": "thread-auto", + "threadId": "thread-auto", "provider": "codex", "taskPrompt": "create a powerpoint deck for launch", "workingDirectory": workspaceDir, @@ -379,7 +379,7 @@ func TestExecuteSessionTaskExplicitRoutingDoesNotRecordProjectMemory(t *testing. req: shared.RPCRequest{ Params: map[string]any{ "sessionId": "session-explicit", - "openclawSessionKey": "thread-explicit", "threadId": "thread-explicit", + "threadId": "thread-explicit", "provider": "codex", "taskPrompt": "create a powerpoint deck for launch", "workingDirectory": workspaceDir, @@ -427,7 +427,7 @@ func TestExecuteSessionTaskExplicitProviderRequiresAdvertisedBridgeProvider(t *t Method: "session.start", Params: map[string]any{ "sessionId": "session-explicit-provider", - "openclawSessionKey": "thread-explicit-provider", "threadId": "thread-explicit-provider", + "threadId": "thread-explicit-provider", "taskPrompt": "create a powerpoint deck for launch", "routing": map[string]any{ "routingMode": "explicit", @@ -459,7 +459,7 @@ func TestExecuteSessionTaskExplicitGatewayUsesResolvedGatewayProvider(t *testing Method: "session.start", Params: map[string]any{ "sessionId": "session-explicit-gateway", - "openclawSessionKey": "thread-explicit-gateway", "threadId": "thread-explicit-gateway", + "threadId": "thread-explicit-gateway", "taskPrompt": "search latest news", "routing": map[string]any{ "routingMode": "explicit", @@ -490,7 +490,7 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw", - "openclawSessionKey": "thread-openclaw", "threadId": "thread-openclaw", + "threadId": "thread-openclaw", "taskPrompt": "say pong", "workingDirectory": t.TempDir(), "metadata": map[string]any{ @@ -528,7 +528,7 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { if got := shared.StringArg(prepareParams, "appThreadKey", ""); got != "thread-openclaw" { t.Fatalf("expected prepare appThreadKey to match app thread, got %#v", prepareParams) } - if got := shared.StringArg(prepareParams, "openclawSessionKey", ""); got != "thread-openclaw" { + if got := shared.StringArg(prepareParams, "openclawSessionKey", ""); got != "agent:main:thread-openclaw" { t.Fatalf("expected readable OpenClaw session key, got %#v", prepareParams) } if _, ok := prepareParams["sessionKey"]; ok { @@ -573,36 +573,14 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { t.Fatalf("expected chat.send systemProvenanceReceipt to include %q, got %q", expected, receipt) } } - if gateway.AgentWaitCount() != 1 { - t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount()) + if gateway.AgentWaitCount() != 0 { + t.Fatalf("expected native task lookup to avoid Bridge-owned agent.wait, got %d", gateway.AgentWaitCount()) } - waitParams := gateway.LastAgentWaitParams() - timeoutMs, ok := waitParams["timeoutMs"].(float64) - if !ok { - t.Fatalf("expected numeric OpenClaw agent.wait timeoutMs, got %#v", waitParams) + if gateway.ArtifactExportCount() != 0 { + t.Fatalf("expected native task lookup to avoid Bridge-owned artifact export, got %d", gateway.ArtifactExportCount()) } - if got := int64(timeoutMs); got != openClawTaskProbeTimeoutMs { - t.Fatalf("expected OpenClaw probe timeoutMs %d, got %#v", openClawTaskProbeTimeoutMs, waitParams) - } - if gateway.ArtifactExportCount() != 1 { - t.Fatalf("expected one OpenClaw artifact export sync after run, got %d", gateway.ArtifactExportCount()) - } - exportParams := gateway.LastArtifactExportParams() - if _, ok := exportParams["sessionKey"]; ok { - t.Fatalf("expected artifact export params to omit legacy sessionKey, got %#v", exportParams) - } - if got := shared.ListArg(exportParams, "expectedArtifactDirs"); !sameAnyStringSlice(got, []string{"assets/images/", "reports/"}) { - t.Fatalf("expected artifact export to receive expectedArtifactDirs from contract, got %#v", exportParams) - } - snapshotParams := gateway.LastArtifactSnapshotParams() - if _, ok := snapshotParams["sessionKey"]; ok { - t.Fatalf("expected artifact snapshot params to omit legacy sessionKey, got %#v", snapshotParams) - } - if got := shared.ListArg(snapshotParams, "expectedArtifactDirs"); !sameAnyStringSlice(got, []string{"assets/images/", "reports/"}) { - t.Fatalf("expected artifact snapshot to receive expectedArtifactDirs from contract, got %#v", snapshotParams) - } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } client := gateway.LastConnectClient() if got := client["id"]; got != "openclaw-macos" { @@ -777,7 +755,7 @@ func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-no-output", - "openclawSessionKey": "thread-openclaw-no-output", "threadId": "thread-openclaw-no-output", + "threadId": "thread-openclaw-no-output", "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -803,8 +781,8 @@ func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) { if got := response["output"]; got != openClawNoDisplayableText { t.Fatalf("expected no-displayable output message, got %#v", response) } - if gateway.ArtifactExportCount() != 1 { - t.Fatalf("expected one artifact export sync even when no displayable text is returned, got %d", gateway.ArtifactExportCount()) + if gateway.ArtifactExportCount() != 0 { + t.Fatalf("expected native task-registry failure to avoid Bridge artifact export sync, got %d", gateway.ArtifactExportCount()) } } @@ -822,7 +800,7 @@ func TestExecuteSessionTaskGatewayFailsClosedWhenOpenClawAcceptsDifferentSession Method: "session.start", Params: map[string]any{ "sessionId": "draft:1780669943199412-3", - "openclawSessionKey": "draft:1780669943199412-3", "threadId": "draft:1780669943199412-3", + "threadId": "draft:1780669943199412-3", "taskPrompt": "say pong", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -847,7 +825,7 @@ func TestExecuteSessionTaskGatewayFailsClosedWhenOpenClawAcceptsDifferentSession t.Fatalf("session mismatch must fail before artifact export, got %d exports", gateway.ArtifactExportCount()) } chatParams := gateway.LastChatSendParams() - if got := shared.StringArg(chatParams, "sessionKey", ""); got != "draft:1780669943199412-3" { + if got := shared.StringArg(chatParams, "sessionKey", ""); got != "agent:main:draft:1780669943199412-3" { t.Fatalf("expected Bridge to request the app-mapped OpenClaw session, got %#v", chatParams) } } @@ -866,7 +844,7 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-wait-recover", - "openclawSessionKey": "thread-openclaw-wait-recover", "threadId": "thread-openclaw-wait-recover", + "threadId": "thread-openclaw-wait-recover", "taskPrompt": "wait-timeout", "workingDirectory": t.TempDir(), "metadata": map[string]any{ @@ -890,8 +868,8 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi if got := gateway.ChatSendCount(); got != 1 { t.Fatalf("expected no automatic repair model turn, got %d", got) } - if got := gateway.AgentWaitCount(); got != 1 { - t.Fatalf("expected one status probe, got %d", got) + if got := gateway.AgentWaitCount(); got != 0 { + t.Fatalf("expected native task-registry lookup without Bridge-owned status probe, got %d", got) } if got := gateway.ArtifactExportCount(); got != 0 { t.Fatalf("expected no artifact export before terminal state, got %d", got) @@ -912,7 +890,7 @@ func TestExecuteSessionTaskGatewayKeepsRunningOnNonTerminalWaitPayload(t *testin Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-running-wait", - "openclawSessionKey": "thread-openclaw-running-wait", "threadId": "thread-openclaw-running-wait", + "threadId": "thread-openclaw-running-wait", "taskPrompt": "wait-running", "workingDirectory": t.TempDir(), "metadata": map[string]any{ @@ -936,8 +914,8 @@ func TestExecuteSessionTaskGatewayKeepsRunningOnNonTerminalWaitPayload(t *testin if got := gateway.ChatSendCount(); got != 1 { t.Fatalf("expected no repair turn, got %d", got) } - if got := gateway.AgentWaitCount(); got != 1 { - t.Fatalf("expected one status probe, got %d", got) + if got := gateway.AgentWaitCount(); got != 0 { + t.Fatalf("expected native task-registry lookup without Bridge-owned status probe, got %d", got) } if got := gateway.ArtifactExportCount(); got != 0 { t.Fatalf("expected no artifact export before terminal wait payload, got %d", got) @@ -960,7 +938,7 @@ func TestExecuteSessionTaskGatewayAgentFailedBeforeReplyReturnsFailureCode(t *te Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-agent-failed", - "openclawSessionKey": "thread-openclaw-agent-failed", "threadId": "thread-openclaw-agent-failed", + "threadId": "thread-openclaw-agent-failed", "taskPrompt": "agent failed before reply", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1001,7 +979,7 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) { Method: "session.message", Params: map[string]any{ "sessionId": "session-openclaw", - "openclawSessionKey": "thread-openclaw", "threadId": "thread-openclaw", + "threadId": "thread-openclaw", "taskPrompt": "continue", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1021,14 +999,14 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) { if gateway.ChatSendCount() != 1 { t.Fatalf("expected one OpenClaw chat.send request, got %d", gateway.ChatSendCount()) } - if gateway.AgentWaitCount() != 1 { - t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount()) + if gateway.AgentWaitCount() != 0 { + t.Fatalf("expected native task-registry lookup without Bridge-owned agent.wait, got %d", gateway.AgentWaitCount()) } - if gateway.ArtifactExportCount() != 1 { - t.Fatalf("expected one OpenClaw artifact export sync after message run, got %d", gateway.ArtifactExportCount()) + if gateway.ArtifactExportCount() != 0 { + t.Fatalf("expected native task-registry lookup without Bridge artifact export sync, got %d", gateway.ArtifactExportCount()) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } } @@ -1048,7 +1026,7 @@ func TestInternalJobsSubmitCompletesAndReportsStats(t *testing.T) { Params: map[string]any{ "providerId": "opencode", "sessionId": "job-session", - "openclawSessionKey": "job-thread", "threadId": "job-thread", + "threadId": "job-thread", "taskPrompt": "run async job", "workingDirectory": t.TempDir(), "timeoutMs": 30_000, @@ -1108,7 +1086,7 @@ func TestInternalJobWebhookRetriesUntilSuccess(t *testing.T) { Params: map[string]any{ "providerId": "opencode", "sessionId": "job-webhook-session", - "openclawSessionKey": "job-webhook-thread", "threadId": "job-webhook-thread", + "threadId": "job-webhook-thread", "taskPrompt": "run async job", "workingDirectory": t.TempDir(), "callbackUrl": callbackServer.URL, @@ -1279,7 +1257,7 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawChatSendError(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-fail", - "openclawSessionKey": "thread-openclaw-fail", "threadId": "thread-openclaw-fail", + "threadId": "thread-openclaw-fail", "taskPrompt": "fail", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1314,7 +1292,7 @@ func TestExecuteSessionTaskGatewayRetriesOpenClawChatSendSocketClose(t *testing. Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-retry", - "openclawSessionKey": "thread-openclaw-retry", "threadId": "thread-openclaw-retry", + "threadId": "thread-openclaw-retry", "taskPrompt": "retry after socket close", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1353,7 +1331,7 @@ func TestExecuteSessionTaskGatewayReturnsStructuredOpenClawSocketCloseAfterRetry Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-retry-fail", - "openclawSessionKey": "thread-openclaw-retry-fail", "threadId": "thread-openclaw-retry-fail", + "threadId": "thread-openclaw-retry-fail", "taskPrompt": "retry fails", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1391,7 +1369,7 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-wait-fail", - "openclawSessionKey": "thread-openclaw-wait-fail", "threadId": "thread-openclaw-wait-fail", + "threadId": "thread-openclaw-wait-fail", "taskPrompt": "wait-error", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1414,24 +1392,25 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) { if got := shared.StringArg(response, "message", ""); !strings.Contains(got, "openclaw wait failed") { t.Fatalf("expected surfaced agent.wait failure, got %#v", response) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, then agent.wait, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } snapshot := server.handleTaskGet(context.Background(), map[string]any{ - "sessionId": "session-openclaw-wait-fail", - "openclawSessionKey": "thread-openclaw-wait-fail", "threadId": "thread-openclaw-wait-fail", + "appThreadKey": "thread-openclaw-wait-fail", + "openclawSessionKey": "agent:main:thread-openclaw-wait-fail", + "runId": shared.StringArg(response, "runId", ""), }, nil) if got := snapshot["status"]; got != string(TaskStateFailed) { t.Fatalf("expected failed session snapshot, got %#v from %#v", got, snapshot) } - result := shared.AsMap(snapshot["result"]) + result := snapshot if got := result["success"]; got != false { t.Fatalf("expected failed result snapshot, got %#v", result) } - if got := shared.StringArg(result, "code", ""); got != "OPENCLAW_WAIT_FAILED" { + if got := shared.StringArg(snapshot, "code", ""); got != "OPENCLAW_WAIT_FAILED" { t.Fatalf("expected OpenClaw wait code in snapshot, got %#v", result) } - if got := shared.StringArg(result, "message", ""); !strings.Contains(got, "openclaw wait failed") { + if got := shared.StringArg(snapshot, "message", ""); !strings.Contains(got, "openclaw wait failed") { t.Fatalf("expected OpenClaw wait message in snapshot, got %#v", result) } } @@ -1446,7 +1425,7 @@ func TestSessionCloseReturnsAcceptedAndClosedState(t *testing.T) { Method: "session.close", Params: map[string]any{ "sessionId": sessionID, - "openclawSessionKey": threadID, "threadId": threadID, + "threadId": threadID, }, }, nil) if rpcErr != nil { @@ -1463,7 +1442,7 @@ func TestSessionCloseReturnsAcceptedAndClosedState(t *testing.T) { Method: "session.close", Params: map[string]any{ "sessionId": sessionID, - "openclawSessionKey": threadID, "threadId": threadID, + "threadId": threadID, }, }, nil) if rpcErr != nil { @@ -1490,7 +1469,7 @@ func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-artifact", - "openclawSessionKey": "thread-openclaw-artifact", "threadId": "thread-openclaw-artifact", + "threadId": "thread-openclaw-artifact", "taskPrompt": "make artifact", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1504,8 +1483,8 @@ func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) { if rpcErr != nil { t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr) } - if gateway.ArtifactExportCount() != 1 { - t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount()) + if gateway.ArtifactExportCount() != 0 { + t.Fatalf("expected native task-registry artifact payload without Bridge export, got %d", gateway.ArtifactExportCount()) } if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/workspace" { t.Fatalf("expected remote working directory from manifest, got %#v", response) @@ -1556,15 +1535,8 @@ func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) { if parsedDownloadURL.Query().Get("sig") == "" { t.Fatalf("expected signed downloadUrl, got %q", downloadURL) } - exportParams := gateway.LastArtifactExportParams() - if got := strings.TrimSpace(shared.StringArg(exportParams, "maxInlineBytes", "")); got != "0" { - t.Fatalf("expected OpenClaw artifact export to disable inline content, got %#v", exportParams) - } - if got := shared.BoolArg(shared.StringArg(exportParams, "includeContent", ""), true); got { - t.Fatalf("expected OpenClaw artifact export to omit content, got %#v", exportParams) - } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } } @@ -1581,7 +1553,7 @@ func TestExecuteSessionTaskGatewayDoesNotTreatPromptTextAsArtifactContract(t *te Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-latest-artifact", - "openclawSessionKey": "thread-openclaw-latest-artifact", "threadId": "thread-openclaw-latest-artifact", + "threadId": "thread-openclaw-latest-artifact", "taskPrompt": "检查 workspace 已有真实制品,输出 artifacts files download。不要生成新文件,只简短说明。", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1601,21 +1573,8 @@ func TestExecuteSessionTaskGatewayDoesNotTreatPromptTextAsArtifactContract(t *te if _, ok := response["artifacts"]; ok { t.Fatalf("expected no stale artifacts when gateway exported none, got %#v", response["artifacts"]) } - exportParams := gateway.LastArtifactExportParams() - if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); got == "" { - t.Fatalf("expected bridge to export the prepared task artifact scope, got %#v", exportParams) - } - if _, ok := exportParams["latestIfEmpty"]; ok { - t.Fatalf("expected no latestIfEmpty fallback export param, got %#v", exportParams) - } - if got := strings.TrimSpace(shared.StringArg(exportParams, "maxInlineBytes", "")); got != "0" { - t.Fatalf("expected latest workspace export to disable inline content, got %#v", exportParams) - } - if got := shared.BoolArg(shared.StringArg(exportParams, "includeContent", ""), true); got { - t.Fatalf("expected latest workspace export to omit content, got %#v", exportParams) - } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } } @@ -1633,7 +1592,7 @@ func TestExecuteSessionTaskGatewayExportsWithActualOpenClawRunID(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-actual-run", - "openclawSessionKey": "thread-openclaw-actual-run", "threadId": "thread-openclaw-actual-run", + "threadId": "thread-openclaw-actual-run", "taskPrompt": "make artifact", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1653,15 +1612,18 @@ func TestExecuteSessionTaskGatewayExportsWithActualOpenClawRunID(t *testing.T) { if gateway.ArtifactPrepareCount() != 2 { t.Fatalf("expected bridge to prepare initial turn scope and actual OpenClaw run scope, got %d", gateway.ArtifactPrepareCount()) } - exportParams := gateway.LastArtifactExportParams() - if got := strings.TrimSpace(shared.StringArg(exportParams, "runId", "")); got != "openclaw-run-actual" { - t.Fatalf("expected artifact export to use actual OpenClaw runId, got %#v", exportParams) - } - if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); got != "tasks/"+shared.StringArg(response, "openclawSessionKey", "")+"/openclaw-run-actual" { - t.Fatalf("expected artifact export to use actual OpenClaw run scope, got %#v", exportParams) - } artifacts, ok := response["artifacts"].([]map[string]any) - if !ok || len(artifacts) != 1 { + if !ok { + raw, rawOK := response["artifacts"].([]any) + if !rawOK { + t.Fatalf("expected actual-run artifact manifest, got %#v", response["artifacts"]) + } + artifacts = make([]map[string]any, 0, len(raw)) + for _, item := range raw { + artifacts = append(artifacts, shared.AsMap(item)) + } + } + if len(artifacts) != 1 { t.Fatalf("expected actual-run artifact manifest, got %#v", response["artifacts"]) } downloadURL := strings.TrimSpace(shared.StringArg(artifacts[0], "downloadUrl", "")) @@ -1675,8 +1637,8 @@ func TestExecuteSessionTaskGatewayExportsWithActualOpenClawRunID(t *testing.T) { if got := parsedDownloadURL.Query().Get("artifactScope"); got != "tasks/"+shared.StringArg(response, "openclawSessionKey", "")+"/openclaw-run-actual" { t.Fatalf("expected download URL to use actual OpenClaw artifact scope, got %q", got) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.session.prepare", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected bridge to reprepare actual OpenClaw run before wait/export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.session.prepare", "xworkmate.tasks.get"}) { + t.Fatalf("expected bridge to reprepare actual OpenClaw run before native task lookup, got %#v", got) } } @@ -1710,7 +1672,7 @@ func TestExecuteSessionTaskGatewayDoesNotExportArtifactScopeDeclaredInOutput(t * Method: "session.start", Params: map[string]any{ "sessionId": sessionKey, - "openclawSessionKey": sessionKey, "threadId": sessionKey, + "threadId": sessionKey, "taskPrompt": "declare output artifact path", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1727,8 +1689,8 @@ func TestExecuteSessionTaskGatewayDoesNotExportArtifactScopeDeclaredInOutput(t * if got := response["success"]; got != true { t.Fatalf("expected text-only response to complete without adopting output-declared artifact, got %#v", response) } - if got := gateway.ArtifactExportCount(); got != 1 { - t.Fatalf("expected only current prepared scope export, got %d", got) + if got := gateway.ArtifactExportCount(); got != 0 { + t.Fatalf("expected no Bridge export from output-declared artifact path, got %d", got) } if artifacts, ok := response["artifacts"]; ok { t.Fatalf("expected no artifact from output-declared path, got %#v", artifacts) @@ -1765,7 +1727,7 @@ func TestExecuteSessionTaskGatewayDoesNotExportDraftScopeVariant(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": sessionKey, - "openclawSessionKey": sessionKey, "threadId": sessionKey, + "threadId": sessionKey, "taskPrompt": "plain done", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1782,8 +1744,8 @@ func TestExecuteSessionTaskGatewayDoesNotExportDraftScopeVariant(t *testing.T) { if got := response["success"]; got != true { t.Fatalf("expected text-only task to complete without adopting draft variant artifact, got %#v", response) } - if got := gateway.ArtifactExportCount(); got != 1 { - t.Fatalf("expected only current prepared scope export, got %d", got) + if got := gateway.ArtifactExportCount(); got != 0 { + t.Fatalf("expected no Bridge export from draft scope variant, got %d", got) } if artifacts, ok := response["artifacts"]; ok { t.Fatalf("expected no artifact from draft scope variant, got %#v", artifacts) @@ -1803,7 +1765,7 @@ func TestExecuteSessionMessageGatewayDoesNotRewriteClaimedArtifactsWithoutGatewa Method: "session.message", Params: map[string]any{ "sessionId": "session-openclaw-claimed-artifact", - "openclawSessionKey": "thread-openclaw-claimed-artifact", "threadId": "thread-openclaw-claimed-artifact", + "threadId": "thread-openclaw-claimed-artifact", "taskPrompt": "hi hallucinate-files", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -1823,11 +1785,11 @@ func TestExecuteSessionMessageGatewayDoesNotRewriteClaimedArtifactsWithoutGatewa if output := strings.TrimSpace(shared.StringArg(response, "output", "")); !strings.Contains(output, "文件已就绪") { t.Fatalf("expected bridge not to rewrite gateway text output, got %q", output) } - if gateway.ArtifactExportCount() != 1 { - t.Fatalf("expected one post-run artifact export sync, got %d", gateway.ArtifactExportCount()) + if gateway.ArtifactExportCount() != 0 { + t.Fatalf("expected native task-registry lookup without Bridge artifact export, got %d", gateway.ArtifactExportCount()) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } } @@ -1844,7 +1806,7 @@ func TestExecuteSessionMessageGatewayExportsArtifactsWithoutPromptHeuristic(t *t Method: "session.message", Params: map[string]any{ "sessionId": "session-openclaw-message-artifact", - "openclawSessionKey": "thread-openclaw-message-artifact", "threadId": "thread-openclaw-message-artifact", + "threadId": "thread-openclaw-message-artifact", "workingDirectory": t.TempDir(), "messages": []any{ map[string]any{ @@ -1872,12 +1834,8 @@ func TestExecuteSessionMessageGatewayExportsArtifactsWithoutPromptHeuristic(t *t if got := response["success"]; got != true { t.Fatalf("expected artifact response success, got %#v", response) } - exportParams := gateway.LastArtifactExportParams() - if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); got == "" { - t.Fatalf("expected bridge to export the prepared task artifact scope, got %#v", exportParams) - } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got) } } @@ -2267,11 +2225,10 @@ func TestOpenClawChatSendParamsPreservesRawPrompt(t *testing.T) { "write a csv dataset file", } { t.Run(prompt, func(t *testing.T) { - params := map[string]any{ - "openclawSessionKey": "thread-artifact-instructions", "threadId": "thread-artifact-instructions", + chatParams, rpcErr := openClawChatSendParams(map[string]any{ + "threadId": "thread-artifact-instructions", "taskPrompt": prompt, - } - chatParams, rpcErr := openClawChatSendParamsWithSessionKey(params, "turn-artifact-instructions", "thread-artifact-instructions") + }, "turn-artifact-instructions") if rpcErr != nil { t.Fatalf("expected chat params, got rpc error: %#v", rpcErr) } @@ -2285,8 +2242,8 @@ func TestOpenClawChatSendParamsPreservesRawPrompt(t *testing.T) { func TestOpenClawChatSendParamsMaterializesInlineAttachments(t *testing.T) { workspace := t.TempDir() - params := map[string]any{ - "openclawSessionKey": "thread-attachments", "threadId": "thread-attachments", + chatParams, rpcErr := openClawChatSendParams(map[string]any{ + "threadId": "thread-attachments", "taskPrompt": "inspect uploaded image", "workingDirectory": workspace, "attachments": []any{ @@ -2299,8 +2256,7 @@ func TestOpenClawChatSendParamsMaterializesInlineAttachments(t *testing.T) { "content": base64.StdEncoding.EncodeToString([]byte("image-bytes")), }, }, - } - chatParams, rpcErr := openClawChatSendParamsWithSessionKey(params, "turn-inline-attachments", "thread-attachments") + }, "turn-inline-attachments") if rpcErr != nil { t.Fatalf("expected chat params, got rpc error: %#v", rpcErr) } @@ -2334,8 +2290,8 @@ func TestOpenClawChatSendParamsMaterializesInlineAttachments(t *testing.T) { func TestOpenClawChatSendParamsMaterializesInlineAttachmentsInRemoteHint(t *testing.T) { remoteWorkspace := t.TempDir() - params := map[string]any{ - "openclawSessionKey": "thread-remote-attachments", "threadId": "thread-remote-attachments", + chatParams, rpcErr := openClawChatSendParams(map[string]any{ + "threadId": "thread-remote-attachments", "taskPrompt": "inspect uploaded file", "workingDirectory": "/Users/local/.xworkmate/threads/thread-remote-attachments", "remoteWorkingDirectoryHint": remoteWorkspace, @@ -2346,8 +2302,7 @@ func TestOpenClawChatSendParamsMaterializesInlineAttachmentsInRemoteHint(t *test "content": base64.StdEncoding.EncodeToString([]byte("note body")), }, }, - } - chatParams, rpcErr := openClawChatSendParamsWithSessionKey(params, "turn-remote-attachments", "thread-remote-attachments") + }, "turn-remote-attachments") if rpcErr != nil { t.Fatalf("expected chat params, got rpc error: %#v", rpcErr) } @@ -2378,7 +2333,7 @@ func TestOpenClawChatSendParamsMapsOwnerScopedWorkspaceToWritableRoot(t *testing ownerWorkspace := "/owners/local/device/demo/threads/draft-1" params := withOpenClawWritableWorkspace(map[string]any{ "sessionId": "draft-1", - "openclawSessionKey": "draft-1", "threadId": "draft-1", + "threadId": "draft-1", "taskPrompt": "write into currentTaskWorkspace: " + ownerWorkspace, "workingDirectory": ownerWorkspace, "remoteWorkingDirectoryHint": ownerWorkspace, @@ -2391,7 +2346,7 @@ func TestOpenClawChatSendParamsMapsOwnerScopedWorkspaceToWritableRoot(t *testing }, }, "draft-1") - chatParams, rpcErr := openClawChatSendParamsWithSessionKey(params, "turn-owner-workspace", "draft-1") + chatParams, rpcErr := openClawChatSendParams(params, "turn-owner-workspace") if rpcErr != nil { t.Fatalf("expected chat params, got rpc error: %#v", rpcErr) } @@ -2429,7 +2384,7 @@ func TestExecuteSessionTaskGatewayRejectsOversizedInlineAttachmentBeforeChatSend Method: "session.start", Params: map[string]any{ "sessionId": "session-oversized-attachment", - "openclawSessionKey": "thread-oversized-attachment", "threadId": "thread-oversized-attachment", + "threadId": "thread-oversized-attachment", "taskPrompt": "inspect attachment", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -2471,7 +2426,7 @@ func TestExecuteSessionTaskGatewayCollectsOpenClawEventArtifacts(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-event-artifact", - "openclawSessionKey": "thread-openclaw-event-artifact", "threadId": "thread-openclaw-event-artifact", + "threadId": "thread-openclaw-event-artifact", "taskPrompt": "event artifact", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -2520,7 +2475,7 @@ func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing Method: "session.start", Params: map[string]any{ "sessionId": "session-openclaw-artifact-missing", - "openclawSessionKey": "thread-openclaw-artifact-missing", "threadId": "thread-openclaw-artifact-missing", + "threadId": "thread-openclaw-artifact-missing", "taskPrompt": "say pong", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -2537,8 +2492,8 @@ func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing if got := response["output"]; got != "gateway pong" { t.Fatalf("expected gateway pong output, got %#v", response) } - if gateway.ArtifactExportCount() != 1 { - t.Fatalf("expected one OpenClaw artifact export sync, got %d", gateway.ArtifactExportCount()) + if gateway.ArtifactExportCount() != 0 { + t.Fatalf("expected native task-registry lookup without Bridge artifact export sync, got %d", gateway.ArtifactExportCount()) } if warnings := shared.ListArg(response, "artifactWarnings"); len(warnings) != 0 { t.Fatalf("expected no artifact warnings when gateway export succeeds empty, got %#v", warnings) @@ -2567,7 +2522,7 @@ func TestExecuteSessionTaskDefaultsExplicitGatewayToOpenClaw(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-gateway-missing-provider", - "openclawSessionKey": "thread-gateway-missing-provider", "threadId": "thread-gateway-missing-provider", + "threadId": "thread-gateway-missing-provider", "taskPrompt": "search latest news", "routing": map[string]any{ "routingMode": "explicit", @@ -2961,6 +2916,203 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { "status": "ok", }, }) + case "xworkmate.tasks.get": + params := shared.AsMap(frame["params"]) + runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) + sessionKey := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) + appThreadKey := strings.TrimSpace(shared.StringArg(params, "appThreadKey", "")) + if runID == "" || sessionKey == "" { + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": map[string]any{ + "ok": false, + "code": "invalid_lookup", + "status": "not_found", + "message": "openclawSessionKey and runId required", + }, + }) + continue + } + status := "completed" + taskStatus := "succeeded" + success := true + message := "gateway pong" + code := "" + errorMessage := "" + runMessage := fake.runMessage(runID) + lowerRunMessage := strings.ToLower(runMessage) + switch { + case strings.Contains(runMessage, "wait-error"): + status = "failed" + taskStatus = "failed" + success = false + message = "openclaw wait failed" + code = "OPENCLAW_WAIT_FAILED" + errorMessage = "openclaw wait failed" + case strings.Contains(runMessage, "wait-timeout"), strings.Contains(runMessage, "wait-running"): + status = "running" + taskStatus = "running" + message = "" + case strings.Contains(runMessage, "completed-empty"): + status = "failed" + taskStatus = "failed" + success = false + message = openClawNoDisplayableText + code = "OPENCLAW_NO_DISPLAYABLE_OUTPUT" + errorMessage = openClawNoDisplayableText + case strings.Contains(runMessage, "agent failed before reply"): + status = "failed" + taskStatus = "failed" + success = false + message = "Agent failed before reply: No available auth profile for nvidia" + code = "OPENCLAW_AGENT_FAILED_BEFORE_REPLY" + errorMessage = message + case strings.Contains(runMessage, "hallucinate-files"): + message = "文件已就绪,点击直接下载👇 三个格式一键收取:" + } + artifactScope := "tasks/" + sessionKey + "/" + runID + artifacts := []any{} + hallucinatedFiles := strings.Contains(runMessage, "hallucinate-files") + downloadURL := func(relativePath string) string { + query := url.Values{} + query.Set("sessionKey", sessionKey) + query.Set("runId", runID) + query.Set("artifactScope", artifactScope) + query.Set("relativePath", relativePath) + query.Set("sig", "fake-signature") + return openClawArtifactDownloadPath + "?" + query.Encode() + } + if strings.Contains(runMessage, "make artifact") { + artifacts = append(artifacts, map[string]any{ + "relativePath": "reports/final.md", + "label": "final.md", + "contentType": "text/markdown", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + "downloadUrl": downloadURL("reports/final.md"), + }) + } + if strings.Contains(runMessage, "make pdf artifact") { + artifacts = append(artifacts, map[string]any{ + "relativePath": "exports/final.pdf", + "label": "final.pdf", + "contentType": "application/pdf", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + "downloadUrl": downloadURL("exports/final.pdf"), + }) + } + if strings.Contains(runMessage, "make partial artifact") { + artifacts = append(artifacts, map[string]any{ + "relativePath": "chapters/intro.md", + "label": "intro.md", + "contentType": "text/markdown", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + "downloadUrl": downloadURL("chapters/intro.md"), + }) + } + if !hallucinatedFiles && (strings.Contains(runMessage, "7张") || strings.Contains(runMessage, "图片") || strings.Contains(lowerRunMessage, "image")) { + artifacts = append(artifacts, map[string]any{ + "relativePath": "artifacts/media/browser/series-01.png", + "label": "series-01.png", + "contentType": "image/png", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + "downloadUrl": downloadURL("artifacts/media/browser/series-01.png"), + }) + } + if !hallucinatedFiles && !strings.Contains(runMessage, "make pdf artifact") && strings.Contains(lowerRunMessage, "pdf") { + artifacts = append(artifacts, map[string]any{ + "relativePath": "artifacts/tmp-openclaw/final.pdf", + "label": "final.pdf", + "contentType": "application/pdf", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + "downloadUrl": downloadURL("artifacts/tmp-openclaw/final.pdf"), + }) + } + if !hallucinatedFiles && (strings.Contains(runMessage, "视频") || strings.Contains(lowerRunMessage, "video")) { + artifacts = append(artifacts, map[string]any{ + "relativePath": "artifacts/tmp-openclaw/final.mp4", + "label": "final.mp4", + "contentType": "video/mp4", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + "downloadUrl": downloadURL("artifacts/tmp-openclaw/final.mp4"), + }) + } + if strings.Contains(runMessage, "event artifact") { + artifacts = append(artifacts, map[string]any{ + "relativePath": "events/live.txt", + "label": "live.txt", + "contentType": "text/plain", + "sizeBytes": 12, + "sha256": "fake-sha256", + "downloadUrl": downloadURL("events/live.txt"), + }) + } + if strings.TrimSpace(fake.artifactWorkspaceRoot) != "" { + artifacts = appendArtifactList(artifacts, fake.exportFilesystemArtifacts(artifactScope)) + } + remoteWorkingDirectory := "/remote/openclaw/workspace" + if strings.TrimSpace(fake.artifactWorkspaceRoot) != "" { + remoteWorkingDirectory = strings.TrimSpace(fake.artifactWorkspaceRoot) + } + if strings.Contains(runMessage, "event artifact") { + remoteWorkingDirectory = "/remote/openclaw/events" + } + payload := map[string]any{ + "success": success, + "status": status, + "taskStatus": taskStatus, + "mode": "gateway-chat", + "resolvedGatewayProviderId": "openclaw", + "runId": runID, + "taskId": runID, + "appThreadKey": appThreadKey, + "openclawSessionKey": sessionKey, + "message": message, + "output": message, + "summary": message, + "remoteWorkingDirectory": remoteWorkingDirectory, + "remoteWorkspaceRefKind": "remotePath", + "task": map[string]any{ + "taskId": runID, + "runId": runID, + "status": taskStatus, + }, + "warnings": []any{}, + } + if code != "" { + payload["code"] = code + } + if errorMessage != "" { + payload["error"] = errorMessage + } + if len(artifacts) > 0 { + payload["artifacts"] = artifacts + } + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": payload, + }) case "xworkmate.artifacts.collect-and-snapshot": fake.artifactSnapshotCount.Add(1) params := shared.AsMap(frame["params"]) @@ -3018,7 +3170,7 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { filesystemArtifacts := []any{} if strings.TrimSpace(fake.artifactWorkspaceRoot) != "" && artifactScope != "" { payload["remoteWorkingDirectory"] = strings.TrimSpace(fake.artifactWorkspaceRoot) - if sessionKey == "" || openClawSessionKeyFromArtifactScope(artifactScope) == sessionKey { + if sessionKey != "" { filesystemArtifacts = fake.exportFilesystemArtifacts(artifactScope) } } @@ -3457,7 +3609,7 @@ func TestExecuteSessionTaskAutoRoutingUsesBridgeProductionProviderOrder(t *testi Method: "session.start", Params: map[string]any{ "sessionId": "session-auto-order", - "openclawSessionKey": "thread-auto-order", "threadId": "thread-auto-order", + "threadId": "thread-auto-order", "taskPrompt": "create a powerpoint deck for launch", "workingDirectory": workspaceDir, "routing": map[string]any{ @@ -3537,7 +3689,7 @@ func TestExecuteSessionTaskKeepsRemoteWorkspaceHintOutOfLocalCWD(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-remote-hint", - "openclawSessionKey": "thread-remote-hint", "threadId": "thread-remote-hint", + "threadId": "thread-remote-hint", "taskPrompt": "say hello", "workingDirectory": workspaceDir, "remoteWorkingDirectoryHint": "/owners/local/user/demo/threads/main", @@ -3588,7 +3740,7 @@ func TestExecuteSessionTaskRequiresRouting(t *testing.T) { Method: "session.start", Params: map[string]any{ "sessionId": "session-missing-routing", - "openclawSessionKey": "thread-missing-routing", "threadId": "thread-missing-routing", + "threadId": "thread-missing-routing", "taskPrompt": "hello", }, }, @@ -3617,7 +3769,7 @@ func TestExecuteSessionMessageMissingProviderStateReturnsContinuationUnavailable Method: "session.message", Params: map[string]any{ "sessionId": "session-without-provider-state", - "openclawSessionKey": "thread-without-provider-state", "threadId": "thread-without-provider-state", + "threadId": "thread-without-provider-state", "taskPrompt": "continue", "workingDirectory": t.TempDir(), "routing": map[string]any{ @@ -3659,7 +3811,7 @@ func TestExecuteSessionTaskComplexRequestNoLongerPromotesToMultiAgent(t *testing req: shared.RPCRequest{ Params: map[string]any{ "sessionId": "session-complex", - "openclawSessionKey": "thread-complex", "threadId": "thread-complex", + "threadId": "thread-complex", "taskPrompt": "collect latest news and summarize it into a report for review", "workingDirectory": workspaceDir, "routing": map[string]any{ diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index 35ee791..d8e018a 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -91,45 +91,71 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string } func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { - sess := s.findTaskSession(params) - if sess == nil { - sess = s.reassociateOpenClawTask(params) + gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "")) + if gatewayProvider == "" { + gatewayProvider = strings.TrimSpace(shared.StringArg(params, "resolvedGatewayProviderId", "")) } - if sess == nil { - return map[string]any{"status": "not_found"} + if gatewayProvider == "" { + gatewayProvider = "openclaw" } - waitForArtifacts := shared.BoolArg(shared.StringArg(params, "waitForArtifacts", ""), false) - if val, ok := params["waitForArtifacts"].(bool); ok { - waitForArtifacts = val + if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil { + return map[string]any{ + "ok": false, + "status": "not_found", + "code": "GATEWAY_UNAVAILABLE", + "message": rpcErr.Message, + } + } + result := s.gateway.RequestByMode( + gatewayProvider, + "xworkmate.tasks.get", + openClawTaskLookupParams(params), + 30*time.Second, + notify, + ) + if result.OK { + return shared.AsMap(result.Payload) + } + message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed")) + code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED")) + return map[string]any{ + "ok": false, + "status": "not_found", + "code": code, + "message": message, } - return s.orchestrator.probeOpenClawTask(ctx, sess, notify, waitForArtifacts) } func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { sess := s.findTaskSession(params) - if sess == nil { - sess = s.reassociateOpenClawTask(params) + runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) + gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "")) + if gatewayProvider == "" { + gatewayProvider = strings.TrimSpace(shared.StringArg(params, "resolvedGatewayProviderId", "")) } - if sess == nil { - return map[string]any{"accepted": false, "status": "not_found"} + if sess != nil { + sess.mu.Lock() + if runID == "" { + runID = sess.task.RunID + } + if gatewayProvider == "" { + gatewayProvider = sess.task.GatewayProviderID + } + sess.task.State = TaskStateCancelled + sess.task.UpdatedAt = time.Now() + sess.task.ProgressStage = "cancelled" + sess.task.ProgressMessage = "OpenClaw task cancelled" + sess.task.ProgressTerminal = true + if sess.openClaw != nil { + sess.openClaw.ProgressStage = "cancelled" + sess.openClaw.ProgressMessage = "OpenClaw task cancelled" + } + sess.mu.Unlock() } - sess.mu.Lock() - gatewayProvider := sess.task.GatewayProviderID - runID := sess.task.RunID - sess.task.State = TaskStateCancelled - sess.task.UpdatedAt = time.Now() - sess.task.ProgressStage = "cancelled" - sess.task.ProgressMessage = "OpenClaw task cancelled" - sess.task.ProgressTerminal = true - if sess.openClaw != nil { - sess.openClaw.ProgressStage = "cancelled" - sess.openClaw.ProgressMessage = "OpenClaw task cancelled" - sess.openClaw.ProgressTerminal = true + if gatewayProvider == "" { + gatewayProvider = "openclaw" } - snapshot := openClawSessionSnapshotLocked(sess) - sess.mu.Unlock() - s.orchestrator.releaseOpenClawAdmission(sess) - if strings.TrimSpace(gatewayProvider) != "" && strings.TrimSpace(runID) != "" && s.gateway != nil { + if strings.TrimSpace(runID) != "" && s.gateway != nil { _ = s.gateway.RequestByMode( gatewayProvider, "agent.cancel", @@ -138,8 +164,7 @@ func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, no notify, ) } - snapshot["accepted"] = true - return snapshot + return map[string]any{"accepted": strings.TrimSpace(runID) != "", "runId": runID} } func (s *Server) findTaskSession(params map[string]any) *session { @@ -147,7 +172,6 @@ func (s *Server) findTaskSession(params map[string]any) *session { threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")) turnID := strings.TrimSpace(shared.StringArg(params, "turnId", "")) runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) - artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) s.mu.RLock() defer s.mu.RUnlock() if sessionID != "" && s.sessions[sessionID] != nil { @@ -160,8 +184,7 @@ func (s *Server) findTaskSession(params map[string]any) *session { candidate.mu.Lock() matches := (threadID != "" && candidate.threadID == threadID) || (turnID != "" && candidate.task.TurnID == turnID) || - (runID != "" && candidate.task.RunID == runID) || - (artifactScope != "" && candidate.task.ArtifactScope == artifactScope) + (runID != "" && candidate.task.RunID == runID) candidate.mu.Unlock() if matches { return candidate @@ -170,93 +193,22 @@ func (s *Server) findTaskSession(params map[string]any) *session { return nil } -func (s *Server) reassociateOpenClawTask(params map[string]any) *session { - runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) - artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) - if runID == "" || artifactScope == "" { - return nil +func openClawTaskLookupParams(params map[string]any) map[string]any { + result := map[string]any{} + for _, key := range []string{ + "appThreadKey", + "openclawSessionKey", + "runId", + "taskId", + "includeArtifacts", + "includeContent", + "expectedArtifactDirs", + } { + if value, ok := params[key]; ok { + result[key] = value + } } - sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")) - threadID := strings.TrimSpace(shared.StringArg(params, "threadId", sessionID)) - if sessionID == "" { - sessionID = threadID - } - if sessionID == "" { - sessionID = "openclaw:" + runID - } - if threadID == "" { - threadID = sessionID - } - turnID := strings.TrimSpace(shared.StringArg(params, "turnId", runID)) - sessionKey := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) - if sessionKey == "" { - sessionKey = strings.TrimSpace(shared.StringArg(params, "appThreadKey", threadID)) - } - gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "openclaw")) - now := time.Now() - prepared := &openClawPreparedArtifactScope{ - ArtifactScope: artifactScope, - ArtifactDirectory: strings.TrimSpace(shared.StringArg(params, "artifactDirectory", "")), - RelativeArtifactDirectory: artifactScope, - ScopeKind: "task", - RemoteWorkingDirectory: strings.TrimSpace(shared.StringArg(params, "remoteWorkingDirectory", "")), - RemoteWorkspaceRefKind: strings.TrimSpace(shared.StringArg(params, "remoteWorkspaceRefKind", "")), - } - contract := openClawArtifactContract{ - TaskLoadClass: strings.TrimSpace(shared.StringArg(params, "taskLoadClass", "")), - ComplexLongChain: shared.BoolArg(shared.StringArg(params, "complexLongChain", ""), false), - } - taskLoadClass, budget := openClawTaskRuntimePolicy(params, map[string]any{"sessionKey": sessionKey}, contract) - if explicitBudget := shared.IntArg(shared.StringArg(params, "runtimeBudgetMinutes", ""), 0); explicitBudget > 0 { - budget = explicitBudget - } - sess := s.getOrCreateSession(sessionID, threadID) - sess.mu.Lock() - sess.provider = gatewayProvider - sess.target = "gateway" - sess.mode = "gateway" - sess.task = QueuedTask{ - SessionID: sessionID, - ThreadID: threadID, - TurnID: turnID, - RunID: runID, - SessionKey: sessionKey, - Provider: gatewayProvider, - Target: "gateway", - GatewayProviderID: gatewayProvider, - State: TaskStateRunning, - Kind: TaskKindGateway, - TaskLoadClass: taskLoadClass, - ArtifactScope: artifactScope, - ArtifactDirectory: prepared.ArtifactDirectory, - RuntimeBudgetMinutes: budget, - StartedAt: now, - DeadlineAt: now.Add(time.Duration(budget) * time.Minute), - UpdatedAt: now, - ProgressStage: "reassociated", - ProgressMessage: "OpenClaw task reassociated from task handle", - } - sess.openClaw = &OpenClawTaskRecord{ - SessionID: sessionID, - ThreadID: threadID, - TurnID: turnID, - RunID: runID, - SessionKey: sessionKey, - GatewayProviderID: gatewayProvider, - TaskLoadClass: taskLoadClass, - ArtifactSinceUnixMs: 0, - RuntimeBudgetMinutes: budget, - StartedAt: now, - DeadlineAt: now.Add(time.Duration(budget) * time.Minute), - ProgressStage: "reassociated", - ProgressMessage: "OpenClaw task reassociated from task handle", - ChatParams: map[string]any{"sessionKey": sessionKey}, - PreparedArtifact: prepared, - ArtifactContract: contract, - } - sess.lastResult = openClawRunningTaskResult(sess.openClaw) - sess.mu.Unlock() - return sess + return result } func (s *Server) cancelSession(ctx context.Context, sessionID string) { diff --git a/internal/acp/types.go b/internal/acp/types.go index be936ab..cf6abe6 100644 --- a/internal/acp/types.go +++ b/internal/acp/types.go @@ -52,7 +52,6 @@ type QueuedTask struct { StartedAt time.Time UpdatedAt time.Time DeadlineAt time.Time - LastProbeAt time.Time ProgressStage string ProgressMessage string ProgressTerminal bool @@ -96,11 +95,11 @@ type Server struct { orchestrator *SessionOrchestrator memoryService memory.Service - providerOrder []string - gateway *gatewayruntime.Manager - openClawGate *openClawGatewayAdmissionGate - jobs *jobManager - taskRouter *distributedTaskRouter + providerOrder []string + gateway *gatewayruntime.Manager + openClawGate *openClawGatewayAdmissionGate + jobs *jobManager + taskRouter *distributedTaskRouter // Legacy / Common authService interface{} // Minimal auth dependency diff --git a/internal/acp/web_contract_test.go b/internal/acp/web_contract_test.go index 597fe8e..a30425c 100644 --- a/internal/acp/web_contract_test.go +++ b/internal/acp/web_contract_test.go @@ -40,20 +40,11 @@ func sseFirstResultEnvelope(t *testing.T, body string) map[string]any { func taskGetHTTPResult(t *testing.T, handler http.Handler, handle map[string]any) map[string]any { t.Helper() body := fmt.Sprintf( - `{"jsonrpc":"2.0","id":"task-get","method":"xworkmate.tasks.get","params":{"sessionId":%q,"threadId":%q,"turnId":%q,"runId":%q,"appThreadKey":%q,"openclawSessionKey":%q,"artifactScope":%q,"artifactDirectory":%q,"gatewayProviderId":%q,"runtimeBudgetMinutes":%q,"taskLoadClass":%q,"expectedArtifactExtensions":%s,"requiredArtifactExtensions":%s}}`, - shared.StringArg(handle, "sessionId", ""), - shared.StringArg(handle, "threadId", ""), - shared.StringArg(handle, "turnId", ""), + `{"jsonrpc":"2.0","id":"task-get","method":"xworkmate.tasks.get","params":{"runId":%q,"appThreadKey":%q,"openclawSessionKey":%q,"gatewayProviderId":%q,"includeArtifacts":true}}`, shared.StringArg(handle, "runId", ""), shared.StringArg(handle, "appThreadKey", ""), shared.StringArg(handle, "openclawSessionKey", ""), - shared.StringArg(handle, "artifactScope", ""), - shared.StringArg(handle, "artifactDirectory", ""), shared.StringArg(handle, "resolvedGatewayProviderId", "openclaw"), - shared.StringArg(handle, "runtimeBudgetMinutes", ""), - shared.StringArg(handle, "taskLoadClass", ""), - jsonArrayString(t, shared.ListArg(handle, "expectedArtifactExtensions")), - jsonArrayString(t, shared.ListArg(handle, "requiredArtifactExtensions")), ) recorder := httptest.NewRecorder() request := httptest.NewRequest(http.MethodPost, "http://127.0.0.1/acp/rpc", strings.NewReader(body)) @@ -317,7 +308,7 @@ func TestHTTPHandlerGatewayOpenClawReturnsRunningEnvelopeAndDone(t *testing.T) { } } -func TestHTTPHandlerGatewayOpenClawAdmissionQueuesExcessConcurrentSSE(t *testing.T) { +func TestHTTPHandlerGatewayOpenClawAdmissionReleasesAfterAcceptedSSE(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() gateway.agentWaitDelayMs.Store(1500) @@ -376,36 +367,25 @@ func TestHTTPHandlerGatewayOpenClawAdmissionQueuesExcessConcurrentSSE(t *testing } close(start) waitForOpenClawGatewayCount(t, func() int { return gateway.ChatSendCount() }, 1) - time.Sleep(75 * time.Millisecond) - if got := gateway.ChatSendCount(); got != 1 { - t.Fatalf("expected admission gate to hold queued chat.send while one is active, got %d", got) - } wg.Wait() close(results) - var sawQueued bool var runningHandleCount int for item := range results { if item.err != nil { t.Fatalf("concurrent request failed: %v", item.err) } - if strings.Contains(item.body, `"event":"queued"`) { - sawQueued = true - } envelope := sseFirstResultEnvelope(t, item.body) result := shared.AsMap(envelope["result"]) if result["status"] == "running" && strings.TrimSpace(shared.StringArg(result, "runId", "")) != "" { runningHandleCount += 1 } } - if !sawQueued { - t.Fatalf("expected one queued session.update event") - } if runningHandleCount != 2 { t.Fatalf("expected both requests to return running handles, got %d", runningHandleCount) } if got := gateway.ChatSendCount(); got != 2 { - t.Fatalf("expected queued request to run after a slot releases, got %d chat.send calls", got) + t.Fatalf("expected admission to release after accepted native chat.send, got %d chat.send calls", got) } } @@ -531,12 +511,12 @@ func TestHTTPHandlerGatewayOpenClawHandlesFiveConcurrentE2ECases(t *testing.T) { if got := gateway.ChatSendCount(); got != expectedGatewayTurns { t.Fatalf("expected five primary chat.send calls without model repair turns, got %d", got) } - if got := gateway.AgentWaitCount(); got != expectedGatewayTurns { - t.Fatalf("expected five primary agent.wait calls without model repair turns, got %d", got) + if got := gateway.AgentWaitCount(); got != 0 { + t.Fatalf("expected task polling to use native task-registry without Bridge-owned agent.wait, got %d", got) } } -func TestHTTPHandlerGatewayOpenClawAdmissionRejectsWhenQueueFull(t *testing.T) { +func TestHTTPHandlerGatewayOpenClawAdmissionDoesNotHoldAcceptedNativeTasks(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() gateway.agentWaitDelayMs.Store(300) @@ -596,14 +576,13 @@ func TestHTTPHandlerGatewayOpenClawAdmissionRejectsWhenQueueFull(t *testing.T) { t.Fatalf("read second response: %v", err) } bodyText := string(body) - if !strings.Contains(bodyText, openClawGatewayBusyErrorCode) { - t.Fatalf("expected busy error, got %s", bodyText) + envelope := sseFirstResultEnvelope(t, bodyText) + result := shared.AsMap(envelope["result"]) + if result["status"] != "running" { + t.Fatalf("expected second request to receive running handle after first native chat was accepted, got %s", bodyText) } - if strings.Contains(bodyText, `"result"`) { - t.Fatalf("busy response must not return a result envelope: %s", bodyText) - } - if got := gateway.ChatSendCount(); got != 1 { - t.Fatalf("rejected request must not reach chat.send, got %d", got) + if got := gateway.ChatSendCount(); got != 2 { + t.Fatalf("accepted native task must not hold admission slot, got %d chat.send calls", got) } if err := <-firstDone; err != nil { t.Fatalf("first request failed: %v", err) @@ -721,8 +700,8 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t if !strings.Contains(fmt.Sprint(result), openClawArtifactDownloadPath) { t.Fatalf("expected normalized artifact download URL in task result, got %#v", result) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) { - t.Fatalf("expected artifact workflow methods to prepare before chat.send, got %#v", got) + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected prepare, chat.send, then native task lookup, got %#v", got) } } @@ -767,8 +746,8 @@ func TestHTTPHandlerGatewayOpenClawForcesGatewayRouting(t *testing.T) { if got := result["status"]; got != "completed" { t.Fatalf("expected completed task result, got %#v", result) } - if gateway.AgentWaitCount() != 1 { - t.Fatalf("expected one OpenClaw agent.wait, got %d", gateway.AgentWaitCount()) + if gateway.AgentWaitCount() != 0 { + t.Fatalf("expected native task-registry lookup without Bridge-owned agent.wait, got %d", gateway.AgentWaitCount()) } }