From 28a7eb3343329c5ebb867df3dda10047d05e7042 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Thu, 18 Jun 2026 10:01:19 +0800 Subject: [PATCH] fix: add session prepare fallback --- internal/acp/orchestrator.go | 51 ++++++++++++++++++++++++++++++++++++ internal/acp/routing_test.go | 47 ++++++++++++++++++++++++++++----- internal/acp/rpc_handler.go | 33 +++++++++++++++++++++++ 3 files changed, 124 insertions(+), 7 deletions(-) diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index 5393c9c..da33adb 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -599,6 +599,9 @@ func (o *SessionOrchestrator) openClawArtifactPrepare( notify, ) if !prepareResult.OK { + if isOpenClawUnknownMethodError(prepareResult.Error, "xworkmate.session.prepare") { + return openClawPreparedArtifactScopeFromPayload(openClawFallbackSessionPreparePayload(prepareParams)), nil + } return nil, gatewayRPCError(prepareResult.Error, "openclaw artifact prepare failed") } prepared := openClawPreparedArtifactScopeFromPayload(shared.AsMap(prepareResult.Payload)) @@ -608,6 +611,54 @@ func (o *SessionOrchestrator) openClawArtifactPrepare( return prepared, nil } +func isOpenClawUnknownMethodError(errorPayload map[string]any, method string) bool { + message := strings.ToLower(strings.TrimSpace(shared.StringArg(errorPayload, "message", ""))) + code := strings.ToUpper(strings.TrimSpace(shared.StringArg(errorPayload, "code", ""))) + if message == "" { + return false + } + return strings.Contains(message, "unknown method") && + strings.Contains(message, strings.ToLower(strings.TrimSpace(method))) && + (code == "" || code == "INVALID_REQUEST" || code == "METHOD_NOT_FOUND") +} + +func openClawFallbackSessionPreparePayload(params map[string]any) map[string]any { + sessionKey := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) + if sessionKey == "" { + sessionKey = strings.TrimSpace(shared.StringArg(params, "sessionKey", "")) + } + if sessionKey == "" { + sessionKey = "main" + } + runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) + if runID == "" { + runID = strings.TrimSpace(shared.StringArg(params, "taskId", "")) + } + if runID == "" { + runID = strings.TrimSpace(shared.StringArg(params, "requestId", "")) + } + if runID == "" { + runID = "default" + } + relativeArtifactDirectory := filepath.Join("tasks", sessionKey, runID) + workspaceDir := openClawArtifactWorkspaceDir(params) + artifactDirectory := filepath.Join(workspaceDir, relativeArtifactDirectory) + return map[string]any{ + "ok": true, + "fallback": true, + "compatibilityMode": "local-session-prepare", + "runId": runID, + "sessionKey": sessionKey, + "openclawSessionKey": sessionKey, + "remoteWorkingDirectory": workspaceDir, + "remoteWorkspaceRefKind": "path", + "artifactScope": relativeArtifactDirectory, + "artifactDirectory": artifactDirectory, + "relativeArtifactDirectory": relativeArtifactDirectory, + "scopeKind": "task", + } +} + func openClawSessionPrepareParams(params map[string]any, openClawSessionKey string, runID string, artifactContract openClawArtifactContract) map[string]any { appThreadKey := openClawAppThreadKey(params) result := map[string]any{ diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index 769a1f9..ba946cd 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -833,7 +833,7 @@ func TestGatewayRequestSkillsStatusAutoConnectsOpenClaw(t *testing.T) { } } -func TestExecuteSessionTaskGatewayFailsWhenPrepareUnsupported(t *testing.T) { +func TestExecuteSessionTaskGatewayFallsBackWhenPrepareUnsupported(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) gateway.unsupportedSessionPrepare.Store(true) defer gateway.Close() @@ -858,15 +858,48 @@ func TestExecuteSessionTaskGatewayFailsWhenPrepareUnsupported(t *testing.T) { }, }, }) - if rpcErr == nil { - t.Fatalf("expected prepare error without legacy fallback, got response: %#v", response) - return + if rpcErr != nil { + t.Fatalf("expected prepare compatibility fallback, got error: %#v", rpcErr) } - if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "unknown method: xworkmate.session.prepare") { - t.Fatalf("expected surfaced prepare unsupported error, got %#v", rpcErr) + if response["success"] != true { + t.Fatalf("expected successful gateway task with prepare fallback, got %#v", response) + } + if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) { + t.Fatalf("expected bridge to continue to chat.send when prepare is unsupported, got %#v", got) + } +} + +func TestHandleSessionPrepareFallsBackWhenGatewayMethodUnsupported(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + gateway.unsupportedSessionPrepare.Store(true) + defer gateway.Close() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + response, rpcErr := server.handleRequest( + shared.RPCRequest{ + Method: "xworkmate.session.prepare", + Params: map[string]any{ + "openclawSessionKey": "thread-prepare", + "runId": "run-prepare", + "workspaceDir": "/remote/openclaw/workspace", + }, + }, + func(map[string]any) {}, + ) + if rpcErr != nil { + t.Fatalf("expected fallback prepare response, got error: %#v", rpcErr) + } + if response["fallback"] != true { + t.Fatalf("expected fallback marker, got %#v", response) + } + if response["artifactScope"] != "tasks/thread-prepare/run-prepare" { + t.Fatalf("expected fallback task artifact scope, got %#v", response) } if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare"}) { - t.Fatalf("expected bridge to stop before chat.send when prepare is unsupported, got %#v", got) + t.Fatalf("expected bridge to try gateway prepare before fallback, got %#v", got) } } diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index df2ef9e..4f1a1ba 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -76,6 +76,9 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string case "xworkmate.tasks.get": return s.handleTaskGet(ctx, request.Params, notify), nil + case "xworkmate.session.prepare": + return s.handleSessionPrepare(ctx, request.Params, notify) + case "xworkmate.tasks.cancel": return s.handleTaskCancel(ctx, request.Params, notify), nil @@ -90,6 +93,36 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string } } +func (s *Server) handleSessionPrepare(ctx context.Context, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) { + gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "")) + if gatewayProvider == "" { + gatewayProvider = strings.TrimSpace(shared.StringArg(params, "resolvedGatewayProviderId", "")) + } + if gatewayProvider == "" { + gatewayProvider = "openclaw" + } + if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil { + return openClawFallbackSessionPreparePayload(params), nil + } + result := s.gateway.RequestByMode( + gatewayProvider, + "xworkmate.session.prepare", + params, + 30*time.Second, + notify, + ) + if result.OK { + payload := shared.AsMap(result.Payload) + if openClawPreparedArtifactScopeFromPayload(payload) != nil { + return payload, nil + } + } + if !result.OK && !isOpenClawUnknownMethodError(result.Error, "xworkmate.session.prepare") { + return nil, gatewayRPCError(result.Error, "openclaw artifact prepare failed") + } + return openClawFallbackSessionPreparePayload(params), nil +} + func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { params = s.taskGetParamsWithSessionScope(params) gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", ""))