diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index ae52043..44b9615 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -185,6 +185,7 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat( if rpcErr != nil { return nil, rpcErr } + artifactSinceUnixMs := time.Now().Add(-1 * time.Second).UnixMilli() sendResult := o.server.gateway.RequestByMode( gatewayProvider, "chat.send", @@ -218,7 +219,7 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat( if output == "" { output = "OpenClaw completed without displayable output." } - return map[string]any{ + result := map[string]any{ "success": true, "output": output, "message": output, @@ -227,7 +228,17 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat( "runId": runID, "mode": router.ExecutionTargetGatewayChat, "resolvedGatewayProviderId": gatewayProvider, - }, nil + } + mergeOpenClawArtifactPayload(result, waitPayload) + mergeOpenClawArtifactPayload(result, collector.artifactPayload()) + mergeOpenClawArtifactPayload(result, o.openClawArtifactExport( + gatewayProvider, + chatParams, + runID, + artifactSinceUnixMs, + notifyWithCollection, + )) + return result, nil } func isSessionTaskMethod(method string) bool { @@ -271,6 +282,89 @@ func openClawSessionKey(params map[string]any, turnID string) string { return "main" } +func (o *SessionOrchestrator) openClawArtifactExport( + gatewayProvider string, + chatParams map[string]any, + runID string, + sinceUnixMs int64, + notify func(map[string]any), +) map[string]any { + sessionKey := strings.TrimSpace(shared.StringArg(chatParams, "sessionKey", "")) + if sessionKey == "" || strings.TrimSpace(runID) == "" { + return nil + } + exportResult := o.server.gateway.RequestByMode( + gatewayProvider, + "xworkmate.artifacts.export", + map[string]any{ + "sessionKey": sessionKey, + "runId": strings.TrimSpace(runID), + "sinceUnixMs": sinceUnixMs, + "maxFiles": 64, + "maxInlineBytes": 10 * 1024 * 1024, + }, + 30*time.Second, + notify, + ) + if exportResult.OK { + return shared.AsMap(exportResult.Payload) + } + message := strings.TrimSpace(shared.StringArg(exportResult.Error, "message", "")) + if message == "" { + message = "openclaw artifact export unavailable" + } + return map[string]any{ + "artifactWarnings": []any{message}, + } +} + +func mergeOpenClawArtifactPayload(result map[string]any, source map[string]any) { + if result == nil || len(source) == 0 { + return + } + if strings.TrimSpace(shared.StringArg(result, "remoteWorkingDirectory", "")) == "" { + if remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(source, "remoteWorkingDirectory", "")); remoteWorkingDirectory != "" { + result["remoteWorkingDirectory"] = remoteWorkingDirectory + } + } + if strings.TrimSpace(shared.StringArg(result, "remoteWorkspaceRefKind", "")) == "" { + if remoteWorkspaceRefKind := strings.TrimSpace(shared.StringArg(source, "remoteWorkspaceRefKind", "")); remoteWorkspaceRefKind != "" { + result["remoteWorkspaceRefKind"] = remoteWorkspaceRefKind + } + } + for _, key := range []string{"artifacts", "files", "attachments", "artifactWarnings", "warnings"} { + merged := appendArtifactList(result[key], source[key]) + if len(merged) > 0 { + if key == "warnings" { + result["artifactWarnings"] = appendArtifactList(result["artifactWarnings"], source[key]) + continue + } + result[key] = merged + } + } +} + +func appendArtifactList(existing any, incoming any) []any { + merged := make([]any, 0) + switch typed := existing.(type) { + case []any: + merged = append(merged, typed...) + case []map[string]any: + for _, item := range typed { + merged = append(merged, item) + } + } + switch typed := incoming.(type) { + case []any: + merged = append(merged, typed...) + case []map[string]any: + for _, item := range typed { + merged = append(merged, item) + } + } + return merged +} + func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError { message := strings.TrimSpace(shared.StringArg(errorPayload, "message", fallback)) if message == "" { @@ -289,8 +383,9 @@ func firstNonEmptyString(values map[string]any, keys ...string) string { } type openClawChatCollector struct { - parts []string - final string + parts []string + final string + artifactPayloads []map[string]any } func newOpenClawChatCollector() *openClawChatCollector { @@ -302,10 +397,16 @@ func (c *openClawChatCollector) observe(notification map[string]any) { return } event := shared.AsMap(shared.AsMap(notification["params"])["event"]) - if len(event) == 0 || strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" { + if len(event) == 0 { return } payload := shared.AsMap(event["payload"]) + if hasArtifactPayload(payload) { + c.artifactPayloads = append(c.artifactPayloads, payload) + } + if strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" { + return + } text := firstNonEmptyString(payload, "assistantText", "text", "message", "output", "summary") if text == "" { return @@ -327,6 +428,29 @@ func (c *openClawChatCollector) output() string { return strings.TrimSpace(strings.Join(c.parts, "")) } +func (c *openClawChatCollector) artifactPayload() map[string]any { + if c == nil || len(c.artifactPayloads) == 0 { + return nil + } + result := map[string]any{} + for _, payload := range c.artifactPayloads { + mergeOpenClawArtifactPayload(result, payload) + } + return result +} + +func hasArtifactPayload(payload map[string]any) bool { + if len(payload) == 0 { + return false + } + for _, key := range []string{"artifacts", "files", "attachments", "remoteWorkingDirectory", "remoteWorkspaceRefKind"} { + if _, ok := payload[key]; ok { + return true + } + } + return false +} + func isTerminalGatewayPayload(payload map[string]any) bool { if payload == nil { return false diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index 1ab4c98..0f0701e 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "fmt" "net" "net/http" "net/http/httptest" @@ -496,8 +497,11 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { if gateway.AgentWaitCount() != 1 { t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount()) } - if got := gateway.Methods(); len(got) != 3 || got[0] != "connect" || got[1] != "chat.send" || got[2] != "agent.wait" { - t.Fatalf("expected connect, chat.send, then agent.wait, got %#v", got) + if gateway.ArtifactExportCount() != 1 { + t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount()) + } + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got) } client := gateway.LastConnectClient() if got := client["id"]; got != "openclaw-macos" { @@ -544,8 +548,11 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) { if gateway.AgentWaitCount() != 1 { t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount()) } - if got := gateway.Methods(); len(got) != 3 || got[0] != "connect" || got[1] != "chat.send" || got[2] != "agent.wait" { - t.Fatalf("expected connect, chat.send, then agent.wait, got %#v", got) + if gateway.ArtifactExportCount() != 1 { + t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount()) + } + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got) } } @@ -619,6 +626,154 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) { } } +func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + defer gateway.Close() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-openclaw-artifact", + "threadId": "thread-openclaw-artifact", + "taskPrompt": "make artifact", + "workingDirectory": t.TempDir(), + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr) + } + if gateway.ArtifactExportCount() != 1 { + t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount()) + } + if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/workspace" { + t.Fatalf("expected remote working directory from manifest, got %#v", response) + } + artifacts, ok := response["artifacts"].([]map[string]any) + if !ok { + raw, ok := response["artifacts"].([]any) + if !ok { + t.Fatalf("expected artifacts payload, got %#v", response["artifacts"]) + } + artifacts = make([]map[string]any, 0, len(raw)) + for _, item := range raw { + artifacts = append(artifacts, shared.AsMap(item)) + } + } + if len(artifacts) != 1 { + t.Fatalf("expected one artifact from manifest, got %#v", artifacts) + } + if got := artifacts[0]["relativePath"]; got != "reports/final.md" { + t.Fatalf("expected manifest artifact relative path, got %#v", artifacts[0]) + } + if got := artifacts[0]["encoding"]; got != "base64" { + t.Fatalf("expected inline base64 artifact, got %#v", artifacts[0]) + } + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { + t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got) + } +} + +func TestExecuteSessionTaskGatewayCollectsOpenClawEventArtifacts(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + gateway.artifactMode = "unknown" + defer gateway.Close() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-openclaw-event-artifact", + "threadId": "thread-openclaw-event-artifact", + "taskPrompt": "event artifact", + "workingDirectory": t.TempDir(), + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr) + } + artifacts, ok := response["artifacts"].([]map[string]any) + if !ok { + raw, ok := response["artifacts"].([]any) + if !ok { + t.Fatalf("expected artifacts payload, got %#v", response["artifacts"]) + } + artifacts = make([]map[string]any, 0, len(raw)) + for _, item := range raw { + artifacts = append(artifacts, shared.AsMap(item)) + } + } + if len(artifacts) != 1 { + t.Fatalf("expected one event artifact, got %#v", artifacts) + } + if got := artifacts[0]["relativePath"]; got != "events/live.txt" { + t.Fatalf("expected event artifact relative path, got %#v", artifacts[0]) + } + if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/events" { + t.Fatalf("expected remote working directory from event, got %#v", response) + } +} + +func TestExecuteSessionTaskGatewayKeepsTextWhenArtifactExportUnavailable(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + gateway.artifactMode = "unknown" + defer gateway.Close() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-openclaw-artifact-missing", + "threadId": "thread-openclaw-artifact-missing", + "taskPrompt": "say pong", + "workingDirectory": t.TempDir(), + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected gateway text response despite artifact export failure, got rpc error: %#v", rpcErr) + } + if got := response["output"]; got != "gateway pong" { + t.Fatalf("expected gateway pong output, got %#v", response) + } + if gateway.ArtifactExportCount() != 1 { + t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount()) + } + warnings := response["artifactWarnings"].([]any) + if len(warnings) != 1 || !strings.Contains(fmt.Sprint(warnings[0]), "unknown method") { + t.Fatalf("expected artifact warning for unknown method, got %#v", response["artifactWarnings"]) + } +} + func TestExecuteSessionTaskDefaultsExplicitGatewayToOpenClaw(t *testing.T) { server := NewServer() @@ -698,10 +853,12 @@ type acpFakeOpenClawGateway struct { connectCount atomic.Int32 chatSendCount atomic.Int32 agentWaitCount atomic.Int32 + artifactCount atomic.Int32 lastConnectClient atomic.Value mu sync.Mutex methods []string runMessages map[string]string + artifactMode string } func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { @@ -863,6 +1020,27 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { }, }, }) + if fake.runMessage(runID) == "event artifact" { + _ = conn.WriteJSON(map[string]any{ + "type": "event", + "event": "chat", + "seq": 2, + "payload": map[string]any{ + "runId": runID, + "state": "final", + "remoteWorkingDirectory": "/remote/openclaw/events", + "remoteWorkspaceRefKind": "remotePath", + "artifacts": []any{ + map[string]any{ + "relativePath": "events/live.txt", + "contentType": "text/plain", + "encoding": "base64", + "content": "bGl2ZSBldmVudA==", + }, + }, + }, + }) + } _ = conn.WriteJSON(map[string]any{ "type": "res", "id": id, @@ -872,6 +1050,49 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { "status": "ok", }, }) + case "xworkmate.artifacts.export": + fake.artifactCount.Add(1) + if fake.artifactMode == "unknown" { + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": false, + "error": map[string]any{ + "code": "UNKNOWN_METHOD", + "message": "unknown method: xworkmate.artifacts.export", + }, + }) + continue + } + params := shared.AsMap(frame["params"]) + runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run")) + payload := map[string]any{ + "runId": runID, + "sessionKey": strings.TrimSpace(shared.StringArg(params, "sessionKey", "")), + "remoteWorkingDirectory": "/remote/openclaw/workspace", + "remoteWorkspaceRefKind": "remotePath", + "artifacts": []any{}, + "warnings": []any{}, + } + if fake.runMessage(runID) == "make artifact" { + payload["artifacts"] = []any{ + map[string]any{ + "relativePath": "reports/final.md", + "label": "final.md", + "contentType": "text/markdown", + "sizeBytes": 12, + "sha256": "fake-sha256", + "encoding": "base64", + "content": "ZmluYWwgcmVwb3J0", + }, + } + } + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": payload, + }) case "chat.run": _ = conn.WriteJSON(map[string]any{ "type": "res", @@ -949,6 +1170,10 @@ func (f *acpFakeOpenClawGateway) AgentWaitCount() int { return int(f.agentWaitCount.Load()) } +func (f *acpFakeOpenClawGateway) ArtifactExportCount() int { + return int(f.artifactCount.Load()) +} + func (f *acpFakeOpenClawGateway) LastConnectClient() map[string]any { value := f.lastConnectClient.Load() if value == nil { @@ -961,6 +1186,18 @@ func (f *acpFakeOpenClawGateway) Close() { _ = f.server.Close() } +func sameMethods(got []string, want []string) bool { + if len(got) != len(want) { + return false + } + for index := range got { + if got[index] != want[index] { + return false + } + } + return true +} + func TestExecuteSessionTaskAutoRoutingUsesBridgeProductionProviderOrder(t *testing.T) { workspaceDir := filepath.Join(t.TempDir(), "workspace") if err := os.MkdirAll(workspaceDir, 0o755); err != nil {