Compare commits

...

2 Commits

Author SHA1 Message Date
Haitao Pan
6acdb01eb4 Handle legacy OpenClaw prepare gateways 2026-06-06 06:52:11 +08:00
Haitao Pan
1f617e9c63 Recover OpenClaw smoke handle from SSE 2026-06-06 06:39:21 +08:00
3 changed files with 172 additions and 9 deletions

View File

@ -593,6 +593,17 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
notify, notify,
) )
if !prepareResult.OK { if !prepareResult.OK {
if openClawPrepareUnsupported(prepareResult.Error) {
prepared := openClawLegacyPreparedArtifactScope(params, sessionKey, runID)
log.Printf(
"level=warn component=openclaw_gateway event=session_prepare_legacy_fallback provider=%q sessionId=%q runId=%q artifactScope=%q",
gatewayProvider,
sessionKey,
runID,
prepared.ArtifactScope,
)
return prepared, nil
}
return nil, gatewayRPCError(prepareResult.Error, "openclaw artifact prepare failed") return nil, gatewayRPCError(prepareResult.Error, "openclaw artifact prepare failed")
} }
prepared := openClawPreparedArtifactScopeFromPayload(shared.AsMap(prepareResult.Payload)) prepared := openClawPreparedArtifactScopeFromPayload(shared.AsMap(prepareResult.Payload))
@ -602,6 +613,48 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
return prepared, nil return prepared, nil
} }
func openClawPrepareUnsupported(errorPayload map[string]any) bool {
code := strings.ToUpper(strings.TrimSpace(shared.StringArg(errorPayload, "code", "")))
message := strings.ToLower(strings.TrimSpace(shared.StringArg(errorPayload, "message", "")))
if !strings.Contains(message, "xworkmate.session.prepare") {
return false
}
return code == "INVALID_REQUEST" ||
code == "METHOD_NOT_FOUND" ||
code == "UNKNOWN_METHOD" ||
strings.Contains(message, "unknown method") ||
strings.Contains(message, "method not found")
}
func openClawLegacyPreparedArtifactScope(params map[string]any, sessionKey string, runID string) *openClawPreparedArtifactScope {
sessionKey = strings.TrimSpace(sessionKey)
runID = strings.TrimSpace(runID)
artifactScope := "tasks/" + sessionKey + "/" + runID
workspaceRoot := openClawLegacyArtifactWorkspaceRoot(params)
return &openClawPreparedArtifactScope{
RemoteWorkingDirectory: workspaceRoot,
RemoteWorkspaceRefKind: "remotePath",
ArtifactScope: artifactScope,
ArtifactDirectory: filepath.Join(workspaceRoot, filepath.FromSlash(artifactScope)),
RelativeArtifactDirectory: artifactScope,
ScopeKind: "task",
}
}
func openClawLegacyArtifactWorkspaceRoot(params map[string]any) string {
for _, key := range []string{"remoteWorkingDirectoryHint", "remoteWorkingDirectory"} {
value := strings.TrimSpace(shared.StringArg(params, key, ""))
if value == "" {
continue
}
cleaned := filepath.Clean(value)
if strings.HasPrefix(cleaned, "/home/ubuntu/.openclaw/workspace") {
return strings.TrimRight(cleaned, string(os.PathSeparator))
}
}
return "/home/ubuntu/.openclaw/workspace"
}
func openClawSessionPrepareParams(params map[string]any, openClawSessionKey string, runID string, artifactContract openClawArtifactContract) map[string]any { func openClawSessionPrepareParams(params map[string]any, openClawSessionKey string, runID string, artifactContract openClawArtifactContract) map[string]any {
appThreadKey := openClawAppThreadKey(params) appThreadKey := openClawAppThreadKey(params)
result := map[string]any{ result := map[string]any{

View File

@ -808,6 +808,54 @@ func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) {
} }
} }
func TestExecuteSessionTaskGatewayFallsBackWhenPrepareUnsupported(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.unsupportedSessionPrepare.Store(true)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-legacy-prepare",
"threadId": "thread-openclaw-legacy-prepare",
"taskPrompt": "say pong",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected legacy prepare fallback response, got rpc error: %#v", rpcErr)
}
if got := response["output"]; got != "gateway pong" {
t.Fatalf("expected gateway pong output after legacy prepare fallback, got %#v", response)
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export", "xworkmate.artifacts.collect-and-snapshot"}) {
t.Fatalf("expected legacy prepare attempt to continue through chat/send and export, got %#v", got)
}
chatParams := gateway.LastChatSendParams()
receipt := strings.TrimSpace(shared.StringArg(chatParams, "systemProvenanceReceipt", ""))
sessionKey := shared.StringArg(chatParams, "sessionKey", "")
runID := shared.StringArg(chatParams, "idempotencyKey", "")
for _, expected := range []string{
"artifactDirectory: /home/ubuntu/.openclaw/workspace/tasks/" + sessionKey + "/" + runID,
"artifactScope: tasks/" + sessionKey + "/" + runID,
} {
if !strings.Contains(receipt, expected) {
t.Fatalf("expected fallback provenance receipt to include %q, got %q", expected, receipt)
}
}
}
func TestExecuteSessionTaskGatewayFailsClosedWhenOpenClawAcceptsDifferentSession(t *testing.T) { func TestExecuteSessionTaskGatewayFailsClosedWhenOpenClawAcceptsDifferentSession(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t) gateway := newAcpFakeOpenClawGateway(t)
gateway.alternateSessionKey = "dashboard:c061bfeb-ad08-45f5-971d-d9018f745d7a" gateway.alternateSessionKey = "dashboard:c061bfeb-ad08-45f5-971d-d9018f745d7a"
@ -2657,6 +2705,7 @@ type acpFakeOpenClawGateway struct {
artifactWorkspaceRoot string artifactWorkspaceRoot string
alternateRunID string alternateRunID string
alternateSessionKey string alternateSessionKey string
unsupportedSessionPrepare atomic.Bool
} }
func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
@ -2795,6 +2844,18 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
fake.artifactPrepareCount.Add(1) fake.artifactPrepareCount.Add(1)
params := shared.AsMap(frame["params"]) params := shared.AsMap(frame["params"])
fake.lastArtifactPrepareParams.Store(params) fake.lastArtifactPrepareParams.Store(params)
if fake.unsupportedSessionPrepare.Load() {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "INVALID_REQUEST",
"message": "unknown method: xworkmate.session.prepare",
},
})
continue
}
runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run")) runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run"))
sessionKey := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "main")) sessionKey := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "main"))
artifactScope := "tasks/" + sessionKey + "/" + runID artifactScope := "tasks/" + sessionKey + "/" + runID

View File

@ -94,7 +94,8 @@ def terminal_result(payload):
if not isinstance(payload, dict): if not isinstance(payload, dict):
return {} return {}
nested = payload.get("result") nested = payload.get("result")
if isinstance(nested, dict) and str(payload.get("status", "")).lower() in { status = str(payload.get("status", "")).lower()
if isinstance(nested, dict) and nested and status in {
"completed", "completed",
"failed", "failed",
"cancelled", "cancelled",
@ -136,6 +137,45 @@ def require_nonempty(payload, key):
raise SystemExit(f"OpenClaw smoke result missing {key}: {json.dumps(payload, ensure_ascii=False, sort_keys=True)[:1000]}") raise SystemExit(f"OpenClaw smoke result missing {key}: {json.dumps(payload, ensure_ascii=False, sort_keys=True)[:1000]}")
def first_nonempty(payload, *keys):
if not isinstance(payload, dict):
return ""
for key in keys:
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return ""
def task_handle_from_payload(payload):
if not isinstance(payload, dict):
return {}
candidates = []
if isinstance(payload.get("result"), dict):
candidates.append(payload["result"])
if isinstance(payload.get("payload"), dict):
candidates.append(payload["payload"])
if isinstance(payload.get("params"), dict):
candidates.append(payload["params"])
candidates.append(payload)
for candidate in candidates:
if not isinstance(candidate, dict):
continue
if first_nonempty(candidate, "sessionId") and first_nonempty(candidate, "threadId") and (
first_nonempty(candidate, "turnId") or first_nonempty(candidate, "runId")
):
return candidate
return {}
def find_task_handle(payloads, final):
for payload in reversed(payloads):
handle = task_handle_from_payload(payload)
if handle:
return handle
return task_handle_from_payload(final)
def is_valid_no_displayable_contract(payload): def is_valid_no_displayable_contract(payload):
if not isinstance(payload, dict): if not isinstance(payload, dict):
return False return False
@ -143,8 +183,12 @@ def is_valid_no_displayable_contract(payload):
return False return False
if payload.get("resolvedGatewayProviderId") != "openclaw": if payload.get("resolvedGatewayProviderId") != "openclaw":
return False return False
for key in ("sessionId", "threadId", "runId", "openclawSessionKey", "artifactScope"): for key in ("sessionId", "threadId", "runId", "artifactScope"):
require_nonempty(payload, key) require_nonempty(payload, key)
if not first_nonempty(payload, "openclawSessionKey", "sessionKey"):
artifact_scope = first_nonempty(payload, "artifactScope")
if not artifact_scope.startswith("tasks/"):
raise SystemExit(f"OpenClaw smoke result missing session scope: {json.dumps(payload, ensure_ascii=False, sort_keys=True)[:1000]}")
return True return True
@ -158,11 +202,14 @@ if not payloads or payloads[-1].get("done") is not True:
raise SystemExit("missing SSE done marker") raise SystemExit("missing SSE done marker")
result = terminal_result(final.get("result") or final.get("payload") or {}) result = terminal_result(final.get("result") or final.get("payload") or {})
if result.get("status") == "running": handle = result
session_id = result.get("sessionId") if result.get("status") != "running":
thread_id = result.get("threadId") handle = find_task_handle(payloads, final)
turn_id = result.get("turnId") if handle.get("status") == "running" or (not result and handle):
run_id = result.get("runId") session_id = first_nonempty(handle, "sessionId")
thread_id = first_nonempty(handle, "threadId")
turn_id = first_nonempty(handle, "turnId")
run_id = first_nonempty(handle, "runId")
deadline = time.time() + poll_timeout deadline = time.time() + poll_timeout
while time.time() < deadline: while time.time() < deadline:
@ -192,7 +239,8 @@ if result.get("status") == "running":
poll_result = resp_data.get("result") or {} poll_result = resp_data.get("result") or {}
status = poll_result.get("status") status = poll_result.get("status")
if status in ("completed", "failed", "cancelled"): if status in ("completed", "failed", "cancelled"):
result = terminal_result(poll_result) terminal = terminal_result(poll_result)
result = terminal if terminal else poll_result
final["result"] = poll_result final["result"] = poll_result
break break
except Exception as exc: except Exception as exc:
@ -227,7 +275,8 @@ if "pong" not in output_text.lower():
print("OpenClaw smoke OK: session contract completed without displayable output") print("OpenClaw smoke OK: session contract completed without displayable output")
sys.exit(0) sys.exit(0)
result_preview = json.dumps(result, ensure_ascii=False, sort_keys=True)[:1000] result_preview = json.dumps(result, ensure_ascii=False, sort_keys=True)[:1000]
raise SystemExit(f"OpenClaw smoke did not return pong: {output_text[:500]}\nresult preview: {result_preview}") payload_preview = json.dumps(payloads[:6], ensure_ascii=False, sort_keys=True)[:1500]
raise SystemExit(f"OpenClaw smoke did not return pong: {output_text[:500]}\nresult preview: {result_preview}\nSSE preview: {payload_preview}")
print("OpenClaw smoke OK: pong received from session contract") print("OpenClaw smoke OK: pong received from session contract")
PY PY