Keep OpenClaw probes running until terminal state

This commit is contained in:
Haitao Pan 2026-06-03 07:47:30 +08:00
parent 5833378794
commit 733ef26c58
3 changed files with 122 additions and 18 deletions

View File

@ -295,25 +295,49 @@ func (o *SessionOrchestrator) probeOpenClawTask(ctx context.Context, sess *sessi
)
if !waitResult.OK {
if openClawProbeStillRunning(waitResult.Error) {
sess.mu.Lock()
if sess.openClaw != nil {
sess.openClaw.ProgressStage = "running"
sess.openClaw.ProgressMessage = "OpenClaw task is still running"
sess.openClaw.ProbeInFlight = false
}
sess.task.ProgressStage = "running"
sess.task.ProgressMessage = "OpenClaw task is still running"
sess.task.UpdatedAt = time.Now()
result := openClawRunningTaskResult(sess.openClaw)
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
return result
return openClawMarkProbeRunning(sess)
}
code := strings.TrimSpace(shared.StringArg(waitResult.Error, "code", "OPENCLAW_WAIT_FAILED"))
message := strings.TrimSpace(shared.StringArg(waitResult.Error, "message", "openclaw wait failed"))
return o.failOpenClawTask(sess, code, message)
}
return o.completeOpenClawTask(sess, shared.AsMap(waitResult.Payload), collector, notify)
waitPayload := shared.AsMap(waitResult.Payload)
if !openClawWaitPayloadTerminal(waitPayload) && !collector.isTerminal() {
return openClawMarkProbeRunning(sess)
}
return o.completeOpenClawTask(sess, waitPayload, collector, notify)
}
func openClawMarkProbeRunning(sess *session) map[string]any {
sess.mu.Lock()
if sess.openClaw != nil {
sess.openClaw.ProgressStage = "running"
sess.openClaw.ProgressMessage = "OpenClaw task is still running"
sess.openClaw.ProbeInFlight = false
}
sess.task.ProgressStage = "running"
sess.task.ProgressMessage = "OpenClaw task is still running"
sess.task.UpdatedAt = time.Now()
result := openClawRunningTaskResult(sess.openClaw)
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
return result
}
func openClawWaitPayloadTerminal(payload map[string]any) bool {
if payload == nil {
return false
}
if value, ok := payload["terminal"].(bool); ok {
return value
}
for _, key := range []string{"status", "state", "phase"} {
switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, key, ""))) {
case "complete", "completed", "done", "final", "success", "succeeded", "failed", "failure", "error", "timeout", "timed_out", "cancelled", "canceled":
return true
}
}
return false
}
func openClawProbeStillRunning(errorPayload map[string]any) bool {

View File

@ -1606,6 +1606,7 @@ func firstNonEmptyString(values map[string]any, keys ...string) string {
type openClawChatCollector struct {
parts []string
final string
terminal bool
artifactPayloads []map[string]any
}
@ -1628,6 +1629,9 @@ func (c *openClawChatCollector) observe(notification map[string]any) {
if strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" {
return
}
if isTerminalGatewayPayload(payload) {
c.terminal = true
}
text := firstNonEmptyString(payload, "assistantText", "text", "message", "output", "summary")
if text == "" {
return
@ -1649,6 +1653,10 @@ func (c *openClawChatCollector) output() string {
return strings.TrimSpace(strings.Join(c.parts, ""))
}
func (c *openClawChatCollector) isTerminal() bool {
return c != nil && c.terminal
}
func (c *openClawChatCollector) artifactPayload() map[string]any {
if c == nil || len(c.artifactPayloads) == 0 {
return nil
@ -1714,7 +1722,7 @@ func isTerminalGatewayPayload(payload map[string]any) bool {
return true
}
switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, "state", ""))) {
case "complete", "completed", "done", "ok", "success", "failed", "error", "timeout", "timed_out", "cancelled", "canceled":
case "complete", "completed", "done", "final", "ok", "success", "failed", "error", "timeout", "timed_out", "cancelled", "canceled":
return true
default:
return false

View File

@ -733,7 +733,7 @@ func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) {
Params: map[string]any{
"sessionId": "session-openclaw-no-output",
"threadId": "thread-openclaw-no-output",
"taskPrompt": "silent-turn",
"taskPrompt": "completed-empty",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
@ -899,6 +899,55 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi
}
}
func TestExecuteSessionTaskGatewayKeepsRunningOnNonTerminalWaitPayload(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
gateway.artifactWorkspaceRoot = t.TempDir()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-running-wait",
"threadId": "thread-openclaw-running-wait",
"taskPrompt": "wait-running",
"workingDirectory": t.TempDir(),
"metadata": map[string]any{
"taskLoadClass": "complex_long_chain_task",
"expectedArtifactExtensions": []any{"pdf"},
},
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected running wait payload to keep task running, got rpc error: %#v", rpcErr)
}
if got := response["status"]; got != string(TaskStateRunning) {
t.Fatalf("expected running status from non-terminal wait payload, got %#v", response)
}
if got := gateway.ChatSendCount(); got != 1 {
t.Fatalf("expected no repair turn, got %d", got)
}
if got := gateway.AgentWaitCount(); got != 1 {
t.Fatalf("expected one status probe, got %d", got)
}
if got := gateway.ArtifactExportCount(); got != 0 {
t.Fatalf("expected no artifact export before terminal wait payload, got %d", got)
}
if _, ok := response["code"]; ok {
t.Fatalf("expected no terminal failure code, got %#v", response)
}
}
func TestExecuteSessionTaskGatewayArtifactContractNoFilesRequiresFinalArtifact(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
@ -913,7 +962,7 @@ func TestExecuteSessionTaskGatewayArtifactContractNoFilesRequiresFinalArtifact(t
Params: map[string]any{
"sessionId": "session-openclaw-no-complex-output",
"threadId": "thread-openclaw-no-complex-output",
"taskPrompt": "silent-turn",
"taskPrompt": "completed-empty",
"workingDirectory": t.TempDir(),
"metadata": map[string]any{
"taskLoadClass": "complex_long_chain_task",
@ -964,7 +1013,7 @@ func TestExecuteSessionTaskGatewaySimpleArtifactContractNoFilesRequiresFinalArti
Params: map[string]any{
"sessionId": "session-openclaw-simple-md",
"threadId": "thread-openclaw-simple-md",
"taskPrompt": "silent-turn",
"taskPrompt": "completed-empty",
"workingDirectory": t.TempDir(),
"metadata": map[string]any{
"expectedArtifactExtensions": []any{"md"},
@ -2943,6 +2992,29 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
},
})
continue
case "wait-running":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"runId": runID,
"status": "running",
"terminal": false,
},
})
continue
case "completed-empty":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"runId": runID,
"status": "completed",
},
})
continue
}
message := "gateway pong"
if strings.Contains(fake.runMessage(runID), "hallucinate-files") {