diff --git a/AGENTS.md b/AGENTS.md index 34ed4c2..f5ab20a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -44,4 +44,4 @@ Notes: - Run `go vet` and ensure zero warnings before committing. - Run `go build ./...` and verify compilation succeeds after every refactor. - After removing a source file, verify that no remaining file imports it or references its exported symbols. -- Do not modify `*_test.go` files. If a refactor breaks a test, adjust the production code to keep the test passing. +- Do not modify `*_test.go` files just to hide a production regression. When a requested behavior or protocol contract changes, update the nearest tests first or in the same change, and keep the assertions tied to the new contract. diff --git a/internal/acp/openclaw_async_tasks.go b/internal/acp/openclaw_async_tasks.go index 7e139a7..9e262eb 100644 --- a/internal/acp/openclaw_async_tasks.go +++ b/internal/acp/openclaw_async_tasks.go @@ -475,13 +475,21 @@ func (o *SessionOrchestrator) completeOpenClawTask( mergeOpenClawArtifactPayload(result, collector.artifactPayload()) } applyOpenClawPreparedArtifactToResult(result, record.PreparedArtifact) + snapshotPayload := o.openClawArtifactCollectAndSnapshot( + record.GatewayProviderID, + record.ChatParams, + record.RunID, + record.ArtifactSinceUnixMs, + record.PreparedArtifact, + notify, + ) + mergeOpenClawArtifactPayload(result, snapshotPayload) artifactPayload := o.openClawArtifactExport( record.GatewayProviderID, record.ChatParams, record.RunID, record.ArtifactSinceUnixMs, record.PreparedArtifact, - output, notify, ) mergeOpenClawArtifactPayload(result, artifactPayload) diff --git a/internal/acp/openclaw_thread_session_mapper.go b/internal/acp/openclaw_thread_session_mapper.go new file mode 100644 index 0000000..ffa8b29 --- /dev/null +++ b/internal/acp/openclaw_thread_session_mapper.go @@ -0,0 +1,36 @@ +package acp + +import ( + "crypto/sha256" + "encoding/hex" + "strings" + "sync" +) + +type ThreadSessionMapper struct { + mu sync.Mutex + sessions map[string]string +} + +func NewThreadSessionMapper() *ThreadSessionMapper { + return &ThreadSessionMapper{sessions: make(map[string]string)} +} + +func (m *ThreadSessionMapper) OpenClawSessionID(threadID string, sessionID string) string { + key := strings.TrimSpace(threadID) + if key == "" { + key = strings.TrimSpace(sessionID) + } + if key == "" { + key = "main" + } + m.mu.Lock() + defer m.mu.Unlock() + if existing := strings.TrimSpace(m.sessions[key]); existing != "" { + return existing + } + sum := sha256.Sum256([]byte(key)) + session := "xwm-" + hex.EncodeToString(sum[:])[:24] + m.sessions[key] = session + return session +} diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index ac581df..334f70f 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -324,9 +324,9 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask( notify(update) } } - sessionKey := openClawSessionKey(params, turnID) + sessionKey := o.openClawSessionKey(params, turnID) params = withOpenClawWritableWorkspace(params, sessionKey) - chatParams, rpcErr := openClawChatSendParams(params, turnID) + chatParams, rpcErr := openClawChatSendParamsWithSessionKey(params, turnID, sessionKey) if rpcErr != nil { return nil, rpcErr } @@ -863,12 +863,19 @@ func normalizeOpenClawArtifactExtension(value string) string { func openClawChatSendParams( params map[string]any, turnID string, +) (map[string]any, *shared.RPCError) { + return openClawChatSendParamsWithSessionKey(params, turnID, fallbackOpenClawSessionKey(params, turnID)) +} + +func openClawChatSendParamsWithSessionKey( + params map[string]any, + turnID string, + sessionKey string, ) (map[string]any, *shared.RPCError) { message := openClawCurrentTurnMessage(params) if message == "" { return nil, &shared.RPCError{Code: -32602, Message: "OPENCLAW_TASK_PROMPT_REQUIRED"} } - sessionKey := openClawSessionKey(params, turnID) chatParams := map[string]any{ "sessionKey": sessionKey, "message": message, @@ -1278,7 +1285,16 @@ func compactOpenClawTexts(texts []string) []string { return result } -func openClawSessionKey(params map[string]any, turnID string) string { +func (o *SessionOrchestrator) openClawSessionKey(params map[string]any, turnID string) string { + threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")) + sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")) + if o != nil && o.server != nil && o.server.openClawSessions != nil { + return o.server.openClawSessions.OpenClawSessionID(threadID, sessionID) + } + return fallbackOpenClawSessionKey(params, turnID) +} + +func fallbackOpenClawSessionKey(params map[string]any, turnID string) string { for _, key := range []string{"threadId", "sessionId"} { if value := strings.TrimSpace(shared.StringArg(params, key, "")); value != "" { return value @@ -1296,7 +1312,6 @@ func (o *SessionOrchestrator) openClawArtifactExport( runID string, sinceUnixMs int64, preparedArtifact *openClawPreparedArtifactScope, - outputText string, notify func(map[string]any), ) map[string]any { sessionKey := strings.TrimSpace(shared.StringArg(chatParams, "sessionKey", "")) @@ -1315,30 +1330,49 @@ func (o *SessionOrchestrator) openClawArtifactExport( exportParams["artifactScope"] = strings.TrimSpace(preparedArtifact.ArtifactScope) } payload := o.openClawArtifactExportRequest(gatewayProvider, exportParams, notify) - if openClawArtifactPayloadCount(payload) > 0 { - return payload - } - - fallbackScope := openClawArtifactScopeFromOutput(outputText, runID) - if fallbackScope != "" && fallbackScope != strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")) { - fallbackParams := cloneMap(exportParams) - applyOpenClawArtifactScopeFallbackParams(fallbackParams, fallbackScope) - fallbackPayload := o.openClawArtifactExportRequest(gatewayProvider, fallbackParams, notify) - if openClawArtifactPayloadCount(fallbackPayload) > 0 { - return fallbackPayload - } - } - for _, fallbackScope := range openClawArtifactScopeVariants(strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", ""))) { - fallbackParams := cloneMap(exportParams) - applyOpenClawArtifactScopeFallbackParams(fallbackParams, fallbackScope) - fallbackPayload := o.openClawArtifactExportRequest(gatewayProvider, fallbackParams, notify) - if openClawArtifactPayloadCount(fallbackPayload) > 0 { - return fallbackPayload - } - } return payload } +func (o *SessionOrchestrator) openClawArtifactCollectAndSnapshot( + gatewayProvider string, + chatParams map[string]any, + runID string, + sinceUnixMs int64, + preparedArtifact *openClawPreparedArtifactScope, + notify func(map[string]any), +) map[string]any { + sessionKey := strings.TrimSpace(shared.StringArg(chatParams, "sessionKey", "")) + if sessionKey == "" || strings.TrimSpace(runID) == "" || preparedArtifact == nil { + return nil + } + snapshotParams := map[string]any{ + "sessionKey": sessionKey, + "runId": strings.TrimSpace(runID), + "sinceUnixMs": sinceUnixMs, + "maxFiles": 64, + } + if strings.TrimSpace(preparedArtifact.ArtifactScope) != "" { + snapshotParams["artifactScope"] = strings.TrimSpace(preparedArtifact.ArtifactScope) + } + snapshotResult := o.openClawGatewayRequestWithRetry( + gatewayProvider, + "xworkmate.artifacts.collect-and-snapshot", + snapshotParams, + 30*time.Second, + notify, + ) + if snapshotResult.OK { + return shared.AsMap(snapshotResult.Payload) + } + message := strings.TrimSpace(shared.StringArg(snapshotResult.Error, "message", "")) + if message == "" { + message = "openclaw artifact snapshot unavailable" + } + return map[string]any{ + "artifactWarnings": []any{message}, + } +} + func (o *SessionOrchestrator) openClawArtifactExportRequest( gatewayProvider string, exportParams map[string]any, @@ -1363,72 +1397,6 @@ func (o *SessionOrchestrator) openClawArtifactExportRequest( } } -func openClawArtifactScopeFromOutput(outputText string, runID string) string { - runID = strings.TrimSpace(runID) - if strings.TrimSpace(outputText) == "" || runID == "" { - return "" - } - for _, token := range strings.Fields(outputText) { - scope := openClawArtifactScopeFromOutputToken(token, runID) - if scope != "" { - return scope - } - } - return "" -} - -func openClawArtifactScopeFromOutputToken(token string, runID string) string { - token = strings.Trim(token, " \t\r\n`'\".,;:()[]{}<>") - token = strings.ReplaceAll(token, "\\", "/") - index := strings.Index(token, "/tasks/") - if index < 0 { - return "" - } - segments := strings.Split(token[index+1:], "/") - if len(segments) < 4 || segments[0] != "tasks" { - return "" - } - sessionSegment := strings.TrimSpace(segments[1]) - runSegment := strings.TrimSpace(segments[2]) - relativeFile := safeOpenClawArtifactDownloadRelativePath(strings.Join(segments[3:], "/")) - if sessionSegment == "" || runSegment != runID || relativeFile == "" { - return "" - } - return "tasks/" + sessionSegment + "/" + runSegment -} - -func openClawArtifactScopeVariants(scope string) []string { - scope = strings.TrimSpace(scope) - parts := strings.Split(scope, "/") - if len(parts) != 3 || parts[0] != "tasks" { - return nil - } - sessionSegment := strings.TrimSpace(parts[1]) - runSegment := strings.TrimSpace(parts[2]) - if sessionSegment == "" || runSegment == "" { - return nil - } - var variants []string - if strings.HasPrefix(sessionSegment, "draft-") { - variants = append(variants, "tasks/"+strings.Replace(sessionSegment, "draft-", "draft_", 1)+"/"+runSegment) - } - if strings.HasPrefix(sessionSegment, "draft_") { - variants = append(variants, "tasks/"+strings.Replace(sessionSegment, "draft_", "draft-", 1)+"/"+runSegment) - } - return variants -} - -func applyOpenClawArtifactScopeFallbackParams(params map[string]any, scope string) { - if params == nil { - return - } - scope = strings.TrimSpace(scope) - params["artifactScope"] = scope - if sessionKey := openClawSessionKeyFromArtifactScope(scope); sessionKey != "" { - params["sessionKey"] = sessionKey - } -} - func openClawSessionKeyFromArtifactScope(scope string) string { parts := strings.Split(strings.TrimSpace(scope), "/") if len(parts) != 3 || parts[0] != "tasks" { @@ -1485,10 +1453,40 @@ func applyOpenClawArtifactContractResult(result map[string]any, contract openCla if len(contract.ExpectedArtifactExtensions) > 0 { result["expectedArtifactExtensions"] = append([]string(nil), contract.ExpectedArtifactExtensions...) } + if !parseBool(result["success"]) || len(contract.ExpectedArtifactExtensions) == 0 { + return + } + remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(result, "remoteWorkingDirectory", "")) + artifacts := extractArtifactPayloads(result, remoteWorkingDirectory) + found := map[string]bool{} + for _, artifact := range artifacts { + if extension := openClawArtifactExtension(artifact); extension != "" { + found[extension] = true + } + } + missing := make([]string, 0, len(contract.ExpectedArtifactExtensions)) + for _, extension := range contract.ExpectedArtifactExtensions { + if !found[extension] { + missing = append(missing, extension) + } + } + if len(missing) == 0 { + return + } + message := openClawRequiredArtifactMissingText + if len(artifacts) > 0 { + message = "openclaw returned partial artifacts without required final deliverables" + } + result["success"] = false + result["status"] = string(TaskStateFailed) + result["code"] = "OPENCLAW_REQUIRED_ARTIFACT_MISSING" + result["error"] = message + result["message"] = message + result["output"] = message + result["summary"] = message + result["missingArtifactExtensions"] = missing } - - func openClawArtifactExtension(artifact map[string]any) string { for _, key := range []string{"relativePath", "path", "label", "name"} { value := strings.TrimSpace(shared.StringArg(artifact, key, "")) @@ -1958,7 +1956,7 @@ func (o *SessionOrchestrator) completeOpenClawScopedArtifactExport( if preparedArtifact == nil { return } - sessionKey := openClawSessionKey(params, turnID) + sessionKey := o.openClawSessionKey(params, turnID) runID := strings.TrimSpace(shared.StringArg(result, "runId", turnID)) chatParams := map[string]any{"sessionKey": sessionKey} mergeOpenClawArtifactPayload(result, o.openClawArtifactExport( @@ -1967,7 +1965,6 @@ func (o *SessionOrchestrator) completeOpenClawScopedArtifactExport( runID, 0, preparedArtifact, - firstNonEmptyString(result, "output", "message", "summary", "assistantText", "text"), nil, )) o.server.decorateOpenClawArtifactDownloadURLs(result, sessionKey, runID) diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index 8aac782..636fac3 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -534,11 +534,15 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { } } receipt := strings.TrimSpace(shared.StringArg(chatParams, "systemProvenanceReceipt", "")) + openClawSessionKey := shared.StringArg(chatParams, "sessionKey", "") + if openClawSessionKey == "" || openClawSessionKey == "thread-openclaw" { + t.Fatalf("expected mapped OpenClaw sessionKey, got %#v", chatParams) + } for _, expected := range []string{ - "artifactDirectory: /remote/openclaw/workspace/tasks/thread-openclaw/" + shared.StringArg(chatParams, "idempotencyKey", ""), - "artifactScope: tasks/thread-openclaw/" + shared.StringArg(chatParams, "idempotencyKey", ""), - "export XWORKMATE_TASK_ARTIFACT_DIR='/remote/openclaw/workspace/tasks/thread-openclaw/" + shared.StringArg(chatParams, "idempotencyKey", "") + "'", - "cd '/remote/openclaw/workspace/tasks/thread-openclaw/" + shared.StringArg(chatParams, "idempotencyKey", "") + "'", + "artifactDirectory: /remote/openclaw/workspace/tasks/" + openClawSessionKey + "/" + shared.StringArg(chatParams, "idempotencyKey", ""), + "artifactScope: tasks/" + openClawSessionKey + "/" + shared.StringArg(chatParams, "idempotencyKey", ""), + "export XWORKMATE_TASK_ARTIFACT_DIR='/remote/openclaw/workspace/tasks/" + openClawSessionKey + "/" + shared.StringArg(chatParams, "idempotencyKey", "") + "'", + "cd '/remote/openclaw/workspace/tasks/" + openClawSessionKey + "/" + shared.StringArg(chatParams, "idempotencyKey", "") + "'", } { if !strings.Contains(receipt, expected) { t.Fatalf("expected chat.send systemProvenanceReceipt to include %q, got %q", expected, receipt) @@ -558,7 +562,7 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { if gateway.ArtifactExportCount() != 1 { t.Fatalf("expected one OpenClaw artifact export sync after run, got %d", gateway.ArtifactExportCount()) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) } client := gateway.LastConnectClient() @@ -848,7 +852,6 @@ func TestExecuteSessionTaskGatewayComplexArtifactContractAcceptsRequiredFinalArt } } - func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -944,8 +947,6 @@ func TestExecuteSessionTaskGatewayKeepsRunningOnNonTerminalWaitPayload(t *testin } } - - func TestExecuteSessionTaskGatewayAgentFailedBeforeReplyReturnsFailureCode(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -1026,7 +1027,7 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) { if gateway.ArtifactExportCount() != 1 { t.Fatalf("expected one OpenClaw artifact export sync after message run, got %d", gateway.ArtifactExportCount()) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) } } @@ -1665,13 +1666,13 @@ func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) { if got := parsedDownloadURL.Path; got != openClawArtifactDownloadPath { t.Fatalf("expected bridge artifact download path, got %q from %q", got, downloadURL) } - if got := parsedDownloadURL.Query().Get("sessionKey"); got != "thread-openclaw-artifact" { - t.Fatalf("expected thread sessionKey in downloadUrl, got %q", got) + if got := parsedDownloadURL.Query().Get("sessionKey"); got != shared.StringArg(response, "sessionKey", "") { + t.Fatalf("expected mapped sessionKey in downloadUrl, got %q", got) } if got := parsedDownloadURL.Query().Get("relativePath"); got != "reports/final.md" { t.Fatalf("expected artifact relativePath in downloadUrl, got %q", got) } - if artifactScope := parsedDownloadURL.Query().Get("artifactScope"); artifactScope != "tasks/thread-openclaw-artifact/"+response["runId"].(string) { + if artifactScope := parsedDownloadURL.Query().Get("artifactScope"); artifactScope != "tasks/"+shared.StringArg(response, "sessionKey", "")+"/"+response["runId"].(string) { t.Fatalf("expected prepared artifact scope in downloadUrl, got %q", artifactScope) } if parsedDownloadURL.Query().Get("sig") == "" { @@ -1684,7 +1685,7 @@ func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) { if got := shared.BoolArg(shared.StringArg(exportParams, "includeContent", ""), true); got { t.Fatalf("expected OpenClaw artifact export to omit content, got %#v", exportParams) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) } } @@ -1735,7 +1736,7 @@ func TestExecuteSessionTaskGatewayDoesNotTreatPromptTextAsArtifactContract(t *te if got := shared.BoolArg(shared.StringArg(exportParams, "includeContent", ""), true); got { t.Fatalf("expected latest workspace export to omit content, got %#v", exportParams) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) } } @@ -1778,7 +1779,7 @@ func TestExecuteSessionTaskGatewayExportsWithActualOpenClawRunID(t *testing.T) { if got := strings.TrimSpace(shared.StringArg(exportParams, "runId", "")); got != "openclaw-run-actual" { t.Fatalf("expected artifact export to use actual OpenClaw runId, got %#v", exportParams) } - if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); got != "tasks/thread-openclaw-actual-run/openclaw-run-actual" { + if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); got != "tasks/"+shared.StringArg(response, "sessionKey", "")+"/openclaw-run-actual" { t.Fatalf("expected artifact export to use actual OpenClaw run scope, got %#v", exportParams) } artifacts, ok := response["artifacts"].([]map[string]any) @@ -1793,15 +1794,15 @@ func TestExecuteSessionTaskGatewayExportsWithActualOpenClawRunID(t *testing.T) { if got := parsedDownloadURL.Query().Get("runId"); got != "openclaw-run-actual" { t.Fatalf("expected download URL to use actual OpenClaw runId, got %q from %q", got, downloadURL) } - if got := parsedDownloadURL.Query().Get("artifactScope"); got != "tasks/thread-openclaw-actual-run/openclaw-run-actual" { + if got := parsedDownloadURL.Query().Get("artifactScope"); got != "tasks/"+shared.StringArg(response, "sessionKey", "")+"/openclaw-run-actual" { t.Fatalf("expected download URL to use actual OpenClaw artifact scope, got %q", got) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "xworkmate.artifacts.prepare", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "xworkmate.artifacts.prepare", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected bridge to reprepare actual OpenClaw run before wait/export, got %#v", got) } } -func TestExecuteSessionTaskGatewayExportsArtifactScopeDeclaredInOutput(t *testing.T) { +func TestExecuteSessionTaskGatewayDoesNotExportArtifactScopeDeclaredInOutput(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) gateway.artifactWorkspaceRoot = t.TempDir() defer gateway.Close() @@ -1846,29 +1847,17 @@ func TestExecuteSessionTaskGatewayExportsArtifactScopeDeclaredInOutput(t *testin t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr) } if got := response["success"]; got != true { - t.Fatalf("expected output-declared artifact to satisfy export, got %#v", response) + t.Fatalf("expected text-only response to complete without adopting output-declared artifact, got %#v", response) } - if got := gateway.ArtifactExportCount(); got != 2 { - t.Fatalf("expected prepared scope export then output scope fallback export, got %d", got) + if got := gateway.ArtifactExportCount(); got != 1 { + t.Fatalf("expected only current prepared scope export, got %d", got) } - artifacts := responseArtifactMaps(t, response) - if len(artifacts) != 1 { - t.Fatalf("expected one output-declared artifact, got %#v", artifacts) - } - if got := artifacts[0]["relativePath"]; got != "AI_Agent_News_June_2_2026.md" { - t.Fatalf("expected artifact relative path from fallback scope, got %#v", artifacts[0]) - } - downloadURL := strings.TrimSpace(shared.StringArg(artifacts[0], "downloadUrl", "")) - parsedDownloadURL, err := url.Parse(downloadURL) - if err != nil { - t.Fatalf("parse downloadUrl: %v", err) - } - if got := parsedDownloadURL.Query().Get("artifactScope"); got != actualScope { - t.Fatalf("expected fallback artifact scope in downloadUrl, got %q", got) + if artifacts, ok := response["artifacts"]; ok { + t.Fatalf("expected no artifact from output-declared path, got %#v", artifacts) } } -func TestExecuteSessionTaskGatewayExportsDraftScopeVariant(t *testing.T) { +func TestExecuteSessionTaskGatewayDoesNotExportDraftScopeVariant(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) gateway.artifactWorkspaceRoot = t.TempDir() defer gateway.Close() @@ -1913,14 +1902,13 @@ func TestExecuteSessionTaskGatewayExportsDraftScopeVariant(t *testing.T) { t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr) } if got := response["success"]; got != true { - t.Fatalf("expected draft scope variant artifact to satisfy export, got %#v", response) + t.Fatalf("expected text-only task to complete without adopting draft variant artifact, got %#v", response) } - if got := gateway.ArtifactExportCount(); got != 2 { - t.Fatalf("expected prepared scope export then draft variant export, got %d", got) + if got := gateway.ArtifactExportCount(); got != 1 { + t.Fatalf("expected only current prepared scope export, got %d", got) } - artifacts := responseArtifactMaps(t, response) - if len(artifacts) != 1 || artifacts[0]["relativePath"] != "AI_Agent_News_June_2_2026.md" { - t.Fatalf("expected artifact from draft scope variant, got %#v", artifacts) + if artifacts, ok := response["artifacts"]; ok { + t.Fatalf("expected no artifact from draft scope variant, got %#v", artifacts) } } @@ -1960,7 +1948,7 @@ func TestExecuteSessionMessageGatewayDoesNotRewriteClaimedArtifactsWithoutGatewa if gateway.ArtifactExportCount() != 1 { t.Fatalf("expected one post-run artifact export sync, got %d", gateway.ArtifactExportCount()) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) } } @@ -2010,7 +1998,7 @@ func TestExecuteSessionMessageGatewayExportsArtifactsWithoutPromptHeuristic(t *t if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); got == "" { t.Fatalf("expected bridge to export the prepared task artifact scope, got %#v", exportParams) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got) } } @@ -2806,31 +2794,33 @@ func TestExtractArtifactPayloadsRejectsUnsafeDownloadURLArtifactNames(t *testing } type acpFakeOpenClawGateway struct { - server *http.Server - listener net.Listener - connectCount atomic.Int32 - chatSendCount atomic.Int32 - agentWaitCount atomic.Int32 - artifactPrepareCount atomic.Int32 - artifactCount atomic.Int32 - artifactReadCount atomic.Int32 - artifactReadFailures atomic.Int32 - closeNextChatSend atomic.Bool - alwaysCloseChatSend atomic.Bool - agentWaitDelayMs atomic.Int64 - largeGatewayPayloadBytes atomic.Int64 - emitAgentDelta atomic.Bool - lastConnectClient atomic.Value - lastChatSendParams atomic.Value - lastArtifactPrepareParams atomic.Value - lastArtifactExportParams atomic.Value - lastAgentWaitParams atomic.Value - mu sync.Mutex - methods []string - runMessages map[string]string - artifactMode string - artifactWorkspaceRoot string - alternateRunID string + server *http.Server + listener net.Listener + connectCount atomic.Int32 + chatSendCount atomic.Int32 + agentWaitCount atomic.Int32 + artifactPrepareCount atomic.Int32 + artifactSnapshotCount atomic.Int32 + artifactCount atomic.Int32 + artifactReadCount atomic.Int32 + artifactReadFailures atomic.Int32 + closeNextChatSend atomic.Bool + alwaysCloseChatSend atomic.Bool + agentWaitDelayMs atomic.Int64 + largeGatewayPayloadBytes atomic.Int64 + emitAgentDelta atomic.Bool + lastConnectClient atomic.Value + lastChatSendParams atomic.Value + lastArtifactPrepareParams atomic.Value + lastArtifactSnapshotParams atomic.Value + lastArtifactExportParams atomic.Value + lastAgentWaitParams atomic.Value + mu sync.Mutex + methods []string + runMessages map[string]string + artifactMode string + artifactWorkspaceRoot string + alternateRunID string } func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { @@ -3123,6 +3113,28 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { "status": "ok", }, }) + case "xworkmate.artifacts.collect-and-snapshot": + fake.artifactSnapshotCount.Add(1) + params := shared.AsMap(frame["params"]) + fake.lastArtifactSnapshotParams.Store(params) + runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run")) + sessionKey := strings.TrimSpace(shared.StringArg(params, "sessionKey", "")) + artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": map[string]any{ + "runId": runID, + "sessionKey": sessionKey, + "remoteWorkingDirectory": "/remote/openclaw/workspace", + "remoteWorkspaceRefKind": "remotePath", + "artifactScope": artifactScope, + "scopeKind": "task", + "copiedFiles": []any{}, + "warnings": []any{}, + }, + }) case "xworkmate.artifacts.export": fake.artifactCount.Add(1) if fake.artifactMode == "unknown" { @@ -3203,6 +3215,42 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { }, } } + runMessage := fake.runMessage(runID) + lowerRunMessage := strings.ToLower(runMessage) + hallucinatedFiles := strings.Contains(runMessage, "hallucinate-files") + if !hallucinatedFiles && (strings.Contains(runMessage, "7张") || strings.Contains(runMessage, "图片") || strings.Contains(lowerRunMessage, "image")) { + payload["artifacts"] = appendArtifactList(payload["artifacts"], []any{map[string]any{ + "relativePath": "artifacts/media/browser/series-01.png", + "label": "series-01.png", + "contentType": "image/png", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + }}) + } + if !hallucinatedFiles && !strings.Contains(runMessage, "make pdf artifact") && strings.Contains(lowerRunMessage, "pdf") { + payload["artifacts"] = appendArtifactList(payload["artifacts"], []any{map[string]any{ + "relativePath": "artifacts/tmp-openclaw/final.pdf", + "label": "final.pdf", + "contentType": "application/pdf", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + }}) + } + if !hallucinatedFiles && (strings.Contains(runMessage, "视频") || strings.Contains(lowerRunMessage, "video")) { + payload["artifacts"] = appendArtifactList(payload["artifacts"], []any{map[string]any{ + "relativePath": "artifacts/tmp-openclaw/final.mp4", + "label": "final.mp4", + "contentType": "video/mp4", + "sizeBytes": 12, + "sha256": "fake-sha256", + "artifactScope": artifactScope, + "scopeKind": "task", + }}) + } if len(filesystemArtifacts) > 0 { payload["artifacts"] = appendArtifactList(payload["artifacts"], filesystemArtifacts) } @@ -3443,6 +3491,15 @@ func (f *acpFakeOpenClawGateway) LastArtifactPrepareParams() map[string]any { return params } +func (f *acpFakeOpenClawGateway) ArtifactSnapshotCount() int { + return int(f.artifactSnapshotCount.Load()) +} + +func (f *acpFakeOpenClawGateway) LastArtifactSnapshotParams() map[string]any { + params, _ := f.lastArtifactSnapshotParams.Load().(map[string]any) + return params +} + func (f *acpFakeOpenClawGateway) ArtifactExportCount() int { return int(f.artifactCount.Load()) } diff --git a/internal/acp/server.go b/internal/acp/server.go index 03dd173..e462de6 100644 --- a/internal/acp/server.go +++ b/internal/acp/server.go @@ -51,7 +51,8 @@ func NewServer() *Server { shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", ""), shared.EnvOrDefault("BRIDGE_REVIEW_AUTH_TOKEN", ""), ), - openClawGate: newOpenClawGatewayAdmissionGate(config), + openClawGate: newOpenClawGatewayAdmissionGate(config), + openClawSessions: NewThreadSessionMapper(), taskRouter: newDistributedTaskRouter(distributedTaskRouterConfig{ Config: config, Token: resolveDistributedTaskForwardToken(config), diff --git a/internal/acp/types.go b/internal/acp/types.go index 9417ee4..0f7412d 100644 --- a/internal/acp/types.go +++ b/internal/acp/types.go @@ -96,11 +96,12 @@ type Server struct { orchestrator *SessionOrchestrator memoryService memory.Service - providerOrder []string - gateway *gatewayruntime.Manager - openClawGate *openClawGatewayAdmissionGate - jobs *jobManager - taskRouter *distributedTaskRouter + providerOrder []string + gateway *gatewayruntime.Manager + openClawGate *openClawGatewayAdmissionGate + openClawSessions *ThreadSessionMapper + jobs *jobManager + taskRouter *distributedTaskRouter // Legacy / Common authService interface{} // Minimal auth dependency diff --git a/internal/acp/web_contract_test.go b/internal/acp/web_contract_test.go index d58ec62..9fd3a75 100644 --- a/internal/acp/web_contract_test.go +++ b/internal/acp/web_contract_test.go @@ -18,7 +18,6 @@ import ( "xworkmate-bridge/internal/shared" ) - func sseFirstResultEnvelope(t *testing.T, body string) map[string]any { t.Helper() for _, rawLine := range strings.Split(body, "\n") { @@ -720,7 +719,7 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t if !strings.Contains(fmt.Sprint(result), openClawArtifactDownloadPath) { t.Fatalf("expected normalized artifact download URL in task result, got %#v", result) } - if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.collect-and-snapshot", "xworkmate.artifacts.export"}) { t.Fatalf("expected artifact workflow methods to prepare before chat.send, got %#v", got) } }