fix: export openclaw gateway artifacts

This commit is contained in:
Haitao Pan 2026-05-05 12:48:04 +08:00
parent 9944cffe3f
commit f5a3c6f829
2 changed files with 370 additions and 9 deletions

View File

@ -185,6 +185,7 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
if rpcErr != nil {
return nil, rpcErr
}
artifactSinceUnixMs := time.Now().Add(-1 * time.Second).UnixMilli()
sendResult := o.server.gateway.RequestByMode(
gatewayProvider,
"chat.send",
@ -218,7 +219,7 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
if output == "" {
output = "OpenClaw completed without displayable output."
}
return map[string]any{
result := map[string]any{
"success": true,
"output": output,
"message": output,
@ -227,7 +228,17 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
"runId": runID,
"mode": router.ExecutionTargetGatewayChat,
"resolvedGatewayProviderId": gatewayProvider,
}, nil
}
mergeOpenClawArtifactPayload(result, waitPayload)
mergeOpenClawArtifactPayload(result, collector.artifactPayload())
mergeOpenClawArtifactPayload(result, o.openClawArtifactExport(
gatewayProvider,
chatParams,
runID,
artifactSinceUnixMs,
notifyWithCollection,
))
return result, nil
}
func isSessionTaskMethod(method string) bool {
@ -271,6 +282,89 @@ func openClawSessionKey(params map[string]any, turnID string) string {
return "main"
}
func (o *SessionOrchestrator) openClawArtifactExport(
gatewayProvider string,
chatParams map[string]any,
runID string,
sinceUnixMs int64,
notify func(map[string]any),
) map[string]any {
sessionKey := strings.TrimSpace(shared.StringArg(chatParams, "sessionKey", ""))
if sessionKey == "" || strings.TrimSpace(runID) == "" {
return nil
}
exportResult := o.server.gateway.RequestByMode(
gatewayProvider,
"xworkmate.artifacts.export",
map[string]any{
"sessionKey": sessionKey,
"runId": strings.TrimSpace(runID),
"sinceUnixMs": sinceUnixMs,
"maxFiles": 64,
"maxInlineBytes": 10 * 1024 * 1024,
},
30*time.Second,
notify,
)
if exportResult.OK {
return shared.AsMap(exportResult.Payload)
}
message := strings.TrimSpace(shared.StringArg(exportResult.Error, "message", ""))
if message == "" {
message = "openclaw artifact export unavailable"
}
return map[string]any{
"artifactWarnings": []any{message},
}
}
func mergeOpenClawArtifactPayload(result map[string]any, source map[string]any) {
if result == nil || len(source) == 0 {
return
}
if strings.TrimSpace(shared.StringArg(result, "remoteWorkingDirectory", "")) == "" {
if remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(source, "remoteWorkingDirectory", "")); remoteWorkingDirectory != "" {
result["remoteWorkingDirectory"] = remoteWorkingDirectory
}
}
if strings.TrimSpace(shared.StringArg(result, "remoteWorkspaceRefKind", "")) == "" {
if remoteWorkspaceRefKind := strings.TrimSpace(shared.StringArg(source, "remoteWorkspaceRefKind", "")); remoteWorkspaceRefKind != "" {
result["remoteWorkspaceRefKind"] = remoteWorkspaceRefKind
}
}
for _, key := range []string{"artifacts", "files", "attachments", "artifactWarnings", "warnings"} {
merged := appendArtifactList(result[key], source[key])
if len(merged) > 0 {
if key == "warnings" {
result["artifactWarnings"] = appendArtifactList(result["artifactWarnings"], source[key])
continue
}
result[key] = merged
}
}
}
func appendArtifactList(existing any, incoming any) []any {
merged := make([]any, 0)
switch typed := existing.(type) {
case []any:
merged = append(merged, typed...)
case []map[string]any:
for _, item := range typed {
merged = append(merged, item)
}
}
switch typed := incoming.(type) {
case []any:
merged = append(merged, typed...)
case []map[string]any:
for _, item := range typed {
merged = append(merged, item)
}
}
return merged
}
func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError {
message := strings.TrimSpace(shared.StringArg(errorPayload, "message", fallback))
if message == "" {
@ -289,8 +383,9 @@ func firstNonEmptyString(values map[string]any, keys ...string) string {
}
type openClawChatCollector struct {
parts []string
final string
parts []string
final string
artifactPayloads []map[string]any
}
func newOpenClawChatCollector() *openClawChatCollector {
@ -302,10 +397,16 @@ func (c *openClawChatCollector) observe(notification map[string]any) {
return
}
event := shared.AsMap(shared.AsMap(notification["params"])["event"])
if len(event) == 0 || strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" {
if len(event) == 0 {
return
}
payload := shared.AsMap(event["payload"])
if hasArtifactPayload(payload) {
c.artifactPayloads = append(c.artifactPayloads, payload)
}
if strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" {
return
}
text := firstNonEmptyString(payload, "assistantText", "text", "message", "output", "summary")
if text == "" {
return
@ -327,6 +428,29 @@ func (c *openClawChatCollector) output() string {
return strings.TrimSpace(strings.Join(c.parts, ""))
}
func (c *openClawChatCollector) artifactPayload() map[string]any {
if c == nil || len(c.artifactPayloads) == 0 {
return nil
}
result := map[string]any{}
for _, payload := range c.artifactPayloads {
mergeOpenClawArtifactPayload(result, payload)
}
return result
}
func hasArtifactPayload(payload map[string]any) bool {
if len(payload) == 0 {
return false
}
for _, key := range []string{"artifacts", "files", "attachments", "remoteWorkingDirectory", "remoteWorkspaceRefKind"} {
if _, ok := payload[key]; ok {
return true
}
}
return false
}
func isTerminalGatewayPayload(payload map[string]any) bool {
if payload == nil {
return false

View File

@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
@ -496,8 +497,11 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) {
if gateway.AgentWaitCount() != 1 {
t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount())
}
if got := gateway.Methods(); len(got) != 3 || got[0] != "connect" || got[1] != "chat.send" || got[2] != "agent.wait" {
t.Fatalf("expected connect, chat.send, then agent.wait, got %#v", got)
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got)
}
client := gateway.LastConnectClient()
if got := client["id"]; got != "openclaw-macos" {
@ -544,8 +548,11 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) {
if gateway.AgentWaitCount() != 1 {
t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount())
}
if got := gateway.Methods(); len(got) != 3 || got[0] != "connect" || got[1] != "chat.send" || got[2] != "agent.wait" {
t.Fatalf("expected connect, chat.send, then agent.wait, got %#v", got)
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got)
}
}
@ -619,6 +626,154 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) {
}
}
func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-artifact",
"threadId": "thread-openclaw-artifact",
"taskPrompt": "make artifact",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/workspace" {
t.Fatalf("expected remote working directory from manifest, got %#v", response)
}
artifacts, ok := response["artifacts"].([]map[string]any)
if !ok {
raw, ok := response["artifacts"].([]any)
if !ok {
t.Fatalf("expected artifacts payload, got %#v", response["artifacts"])
}
artifacts = make([]map[string]any, 0, len(raw))
for _, item := range raw {
artifacts = append(artifacts, shared.AsMap(item))
}
}
if len(artifacts) != 1 {
t.Fatalf("expected one artifact from manifest, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "reports/final.md" {
t.Fatalf("expected manifest artifact relative path, got %#v", artifacts[0])
}
if got := artifacts[0]["encoding"]; got != "base64" {
t.Fatalf("expected inline base64 artifact, got %#v", artifacts[0])
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got)
}
}
func TestExecuteSessionTaskGatewayCollectsOpenClawEventArtifacts(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactMode = "unknown"
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-event-artifact",
"threadId": "thread-openclaw-event-artifact",
"taskPrompt": "event artifact",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
artifacts, ok := response["artifacts"].([]map[string]any)
if !ok {
raw, ok := response["artifacts"].([]any)
if !ok {
t.Fatalf("expected artifacts payload, got %#v", response["artifacts"])
}
artifacts = make([]map[string]any, 0, len(raw))
for _, item := range raw {
artifacts = append(artifacts, shared.AsMap(item))
}
}
if len(artifacts) != 1 {
t.Fatalf("expected one event artifact, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "events/live.txt" {
t.Fatalf("expected event artifact relative path, got %#v", artifacts[0])
}
if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/events" {
t.Fatalf("expected remote working directory from event, got %#v", response)
}
}
func TestExecuteSessionTaskGatewayKeepsTextWhenArtifactExportUnavailable(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactMode = "unknown"
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-artifact-missing",
"threadId": "thread-openclaw-artifact-missing",
"taskPrompt": "say pong",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway text response despite artifact export failure, got rpc error: %#v", rpcErr)
}
if got := response["output"]; got != "gateway pong" {
t.Fatalf("expected gateway pong output, got %#v", response)
}
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
warnings := response["artifactWarnings"].([]any)
if len(warnings) != 1 || !strings.Contains(fmt.Sprint(warnings[0]), "unknown method") {
t.Fatalf("expected artifact warning for unknown method, got %#v", response["artifactWarnings"])
}
}
func TestExecuteSessionTaskDefaultsExplicitGatewayToOpenClaw(t *testing.T) {
server := NewServer()
@ -698,10 +853,12 @@ type acpFakeOpenClawGateway struct {
connectCount atomic.Int32
chatSendCount atomic.Int32
agentWaitCount atomic.Int32
artifactCount atomic.Int32
lastConnectClient atomic.Value
mu sync.Mutex
methods []string
runMessages map[string]string
artifactMode string
}
func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
@ -863,6 +1020,27 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
},
},
})
if fake.runMessage(runID) == "event artifact" {
_ = conn.WriteJSON(map[string]any{
"type": "event",
"event": "chat",
"seq": 2,
"payload": map[string]any{
"runId": runID,
"state": "final",
"remoteWorkingDirectory": "/remote/openclaw/events",
"remoteWorkspaceRefKind": "remotePath",
"artifacts": []any{
map[string]any{
"relativePath": "events/live.txt",
"contentType": "text/plain",
"encoding": "base64",
"content": "bGl2ZSBldmVudA==",
},
},
},
})
}
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
@ -872,6 +1050,49 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
"status": "ok",
},
})
case "xworkmate.artifacts.export":
fake.artifactCount.Add(1)
if fake.artifactMode == "unknown" {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "UNKNOWN_METHOD",
"message": "unknown method: xworkmate.artifacts.export",
},
})
continue
}
params := shared.AsMap(frame["params"])
runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run"))
payload := map[string]any{
"runId": runID,
"sessionKey": strings.TrimSpace(shared.StringArg(params, "sessionKey", "")),
"remoteWorkingDirectory": "/remote/openclaw/workspace",
"remoteWorkspaceRefKind": "remotePath",
"artifacts": []any{},
"warnings": []any{},
}
if fake.runMessage(runID) == "make artifact" {
payload["artifacts"] = []any{
map[string]any{
"relativePath": "reports/final.md",
"label": "final.md",
"contentType": "text/markdown",
"sizeBytes": 12,
"sha256": "fake-sha256",
"encoding": "base64",
"content": "ZmluYWwgcmVwb3J0",
},
}
}
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": payload,
})
case "chat.run":
_ = conn.WriteJSON(map[string]any{
"type": "res",
@ -949,6 +1170,10 @@ func (f *acpFakeOpenClawGateway) AgentWaitCount() int {
return int(f.agentWaitCount.Load())
}
func (f *acpFakeOpenClawGateway) ArtifactExportCount() int {
return int(f.artifactCount.Load())
}
func (f *acpFakeOpenClawGateway) LastConnectClient() map[string]any {
value := f.lastConnectClient.Load()
if value == nil {
@ -961,6 +1186,18 @@ func (f *acpFakeOpenClawGateway) Close() {
_ = f.server.Close()
}
func sameMethods(got []string, want []string) bool {
if len(got) != len(want) {
return false
}
for index := range got {
if got[index] != want[index] {
return false
}
}
return true
}
func TestExecuteSessionTaskAutoRoutingUsesBridgeProductionProviderOrder(t *testing.T) {
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {