From 733ef26c588d537897d772448a5c31771e2d20f9 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Wed, 3 Jun 2026 07:47:30 +0800 Subject: [PATCH] Keep OpenClaw probes running until terminal state --- internal/acp/openclaw_async_tasks.go | 52 ++++++++++++++----- internal/acp/orchestrator.go | 10 +++- internal/acp/routing_test.go | 78 ++++++++++++++++++++++++++-- 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/internal/acp/openclaw_async_tasks.go b/internal/acp/openclaw_async_tasks.go index b129ef7..53e2a2f 100644 --- a/internal/acp/openclaw_async_tasks.go +++ b/internal/acp/openclaw_async_tasks.go @@ -295,25 +295,49 @@ func (o *SessionOrchestrator) probeOpenClawTask(ctx context.Context, sess *sessi ) if !waitResult.OK { if openClawProbeStillRunning(waitResult.Error) { - sess.mu.Lock() - if sess.openClaw != nil { - sess.openClaw.ProgressStage = "running" - sess.openClaw.ProgressMessage = "OpenClaw task is still running" - sess.openClaw.ProbeInFlight = false - } - sess.task.ProgressStage = "running" - sess.task.ProgressMessage = "OpenClaw task is still running" - sess.task.UpdatedAt = time.Now() - result := openClawRunningTaskResult(sess.openClaw) - sess.lastResult = cloneMap(result) - sess.mu.Unlock() - return result + return openClawMarkProbeRunning(sess) } code := strings.TrimSpace(shared.StringArg(waitResult.Error, "code", "OPENCLAW_WAIT_FAILED")) message := strings.TrimSpace(shared.StringArg(waitResult.Error, "message", "openclaw wait failed")) return o.failOpenClawTask(sess, code, message) } - return o.completeOpenClawTask(sess, shared.AsMap(waitResult.Payload), collector, notify) + waitPayload := shared.AsMap(waitResult.Payload) + if !openClawWaitPayloadTerminal(waitPayload) && !collector.isTerminal() { + return openClawMarkProbeRunning(sess) + } + return o.completeOpenClawTask(sess, waitPayload, collector, notify) +} + +func openClawMarkProbeRunning(sess *session) map[string]any { + sess.mu.Lock() + if sess.openClaw != nil { + sess.openClaw.ProgressStage = "running" + sess.openClaw.ProgressMessage = "OpenClaw task is still running" + sess.openClaw.ProbeInFlight = false + } + sess.task.ProgressStage = "running" + sess.task.ProgressMessage = "OpenClaw task is still running" + sess.task.UpdatedAt = time.Now() + result := openClawRunningTaskResult(sess.openClaw) + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + return result +} + +func openClawWaitPayloadTerminal(payload map[string]any) bool { + if payload == nil { + return false + } + if value, ok := payload["terminal"].(bool); ok { + return value + } + for _, key := range []string{"status", "state", "phase"} { + switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, key, ""))) { + case "complete", "completed", "done", "final", "success", "succeeded", "failed", "failure", "error", "timeout", "timed_out", "cancelled", "canceled": + return true + } + } + return false } func openClawProbeStillRunning(errorPayload map[string]any) bool { diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index f0b6e2c..ff3d15a 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -1606,6 +1606,7 @@ func firstNonEmptyString(values map[string]any, keys ...string) string { type openClawChatCollector struct { parts []string final string + terminal bool artifactPayloads []map[string]any } @@ -1628,6 +1629,9 @@ func (c *openClawChatCollector) observe(notification map[string]any) { if strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" { return } + if isTerminalGatewayPayload(payload) { + c.terminal = true + } text := firstNonEmptyString(payload, "assistantText", "text", "message", "output", "summary") if text == "" { return @@ -1649,6 +1653,10 @@ func (c *openClawChatCollector) output() string { return strings.TrimSpace(strings.Join(c.parts, "")) } +func (c *openClawChatCollector) isTerminal() bool { + return c != nil && c.terminal +} + func (c *openClawChatCollector) artifactPayload() map[string]any { if c == nil || len(c.artifactPayloads) == 0 { return nil @@ -1714,7 +1722,7 @@ func isTerminalGatewayPayload(payload map[string]any) bool { return true } switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, "state", ""))) { - case "complete", "completed", "done", "ok", "success", "failed", "error", "timeout", "timed_out", "cancelled", "canceled": + case "complete", "completed", "done", "final", "ok", "success", "failed", "error", "timeout", "timed_out", "cancelled", "canceled": return true default: return false diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index e74e0fa..98c5972 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -733,7 +733,7 @@ func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) { Params: map[string]any{ "sessionId": "session-openclaw-no-output", "threadId": "thread-openclaw-no-output", - "taskPrompt": "silent-turn", + "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "routing": map[string]any{ "routingMode": "explicit", @@ -899,6 +899,55 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi } } +func TestExecuteSessionTaskGatewayKeepsRunningOnNonTerminalWaitPayload(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + defer gateway.Close() + gateway.artifactWorkspaceRoot = t.TempDir() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-openclaw-running-wait", + "threadId": "thread-openclaw-running-wait", + "taskPrompt": "wait-running", + "workingDirectory": t.TempDir(), + "metadata": map[string]any{ + "taskLoadClass": "complex_long_chain_task", + "expectedArtifactExtensions": []any{"pdf"}, + }, + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected running wait payload to keep task running, got rpc error: %#v", rpcErr) + } + if got := response["status"]; got != string(TaskStateRunning) { + t.Fatalf("expected running status from non-terminal wait payload, got %#v", response) + } + if got := gateway.ChatSendCount(); got != 1 { + t.Fatalf("expected no repair turn, got %d", got) + } + if got := gateway.AgentWaitCount(); got != 1 { + t.Fatalf("expected one status probe, got %d", got) + } + if got := gateway.ArtifactExportCount(); got != 0 { + t.Fatalf("expected no artifact export before terminal wait payload, got %d", got) + } + if _, ok := response["code"]; ok { + t.Fatalf("expected no terminal failure code, got %#v", response) + } +} + func TestExecuteSessionTaskGatewayArtifactContractNoFilesRequiresFinalArtifact(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -913,7 +962,7 @@ func TestExecuteSessionTaskGatewayArtifactContractNoFilesRequiresFinalArtifact(t Params: map[string]any{ "sessionId": "session-openclaw-no-complex-output", "threadId": "thread-openclaw-no-complex-output", - "taskPrompt": "silent-turn", + "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "metadata": map[string]any{ "taskLoadClass": "complex_long_chain_task", @@ -964,7 +1013,7 @@ func TestExecuteSessionTaskGatewaySimpleArtifactContractNoFilesRequiresFinalArti Params: map[string]any{ "sessionId": "session-openclaw-simple-md", "threadId": "thread-openclaw-simple-md", - "taskPrompt": "silent-turn", + "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "metadata": map[string]any{ "expectedArtifactExtensions": []any{"md"}, @@ -2943,6 +2992,29 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { }, }) continue + case "wait-running": + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": map[string]any{ + "runId": runID, + "status": "running", + "terminal": false, + }, + }) + continue + case "completed-empty": + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": map[string]any{ + "runId": runID, + "status": "completed", + }, + }) + continue } message := "gateway pong" if strings.Contains(fake.runMessage(runID), "hallucinate-files") {