From 31220154cce84d79b08c947e067a77381e706844 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Tue, 2 Jun 2026 11:58:40 +0800 Subject: [PATCH] Implement async OpenClaw task control plane --- internal/acp/openclaw_async_tasks.go | 518 +++++++++++++++++++++++++++ internal/acp/orchestrator.go | 147 ++++---- internal/acp/routing_test.go | 60 +++- internal/acp/rpc_handler.go | 207 ++++++++--- internal/acp/types.go | 30 +- internal/acp/web_contract_test.go | 268 +++++++++----- 6 files changed, 999 insertions(+), 231 deletions(-) create mode 100644 internal/acp/openclaw_async_tasks.go diff --git a/internal/acp/openclaw_async_tasks.go b/internal/acp/openclaw_async_tasks.go new file mode 100644 index 0000000..b129ef7 --- /dev/null +++ b/internal/acp/openclaw_async_tasks.go @@ -0,0 +1,518 @@ +package acp + +import ( + "context" + "strings" + "time" + + "xworkmate-bridge/internal/router" + "xworkmate-bridge/internal/shared" +) + +const ( + openClawTaskProbeTimeout = 2 * time.Second + openClawTaskProbeTimeoutMs = 1000 + openClawTaskMonitorInterval = time.Second + openClawShortTaskMinutes = 10 + openClawLongTaskMinutes = 30 + openClawComplexTaskMinutes = 60 +) + +type OpenClawTaskRecord struct { + SessionID string + ThreadID string + TurnID string + RunID string + SessionKey string + GatewayProviderID string + TaskLoadClass string + ArtifactSinceUnixMs int64 + RuntimeBudgetMinutes int + StartedAt time.Time + DeadlineAt time.Time + LastProbeAt time.Time + ProgressStage string + ProgressMessage string + ProgressTerminal bool + ChatParams map[string]any + PreparedArtifact *openClawPreparedArtifactScope + ArtifactContract openClawArtifactContract + ResolvedModel string + ResolvedSkills []string + MonitorStarted bool + ProbeInFlight bool + AdmissionRelease func() +} + +func openClawTaskRuntimePolicy(params map[string]any, chatParams map[string]any, contract openClawArtifactContract) (string, int) { + message := strings.TrimSpace(shared.StringArg(chatParams, "message", "")) + if message == "" { + message = openClawCurrentTurnMessage(params) + } + lower := strings.ToLower(message) + metadataClass := strings.TrimSpace(shared.StringArg(shared.AsMap(params["metadata"]), "taskLoadClass", "")) + if metadataClass == "complex_long_chain_task" || contract.ComplexLongChain || openClawMessageContainsAny(lower, []string{ + "复杂链路", "多章节", "每章", "拆章节", "汇总排版", "gpt images", "images2", "image generation", "视频", "渲染", "hyperframes", "remotion", "ffmpeg", + }) { + return "complex_chain_task", openClawComplexTaskMinutes + } + if len(contract.RequiredFinalExtensions) > 0 || openClawMessageContainsAny(lower, []string{ + "生成文件", "同步生成文件", "产物", "附件", "pdf", "docx", "ppt", "pptx", "markdown", ".md", "png", "jpg", "jpeg", "mp4", + }) || len(shared.ListArg(params, "attachments"))+len(shared.ListArg(params, "inlineAttachments")) >= 2 { + return "long_task", openClawLongTaskMinutes + } + return "short_task", openClawShortTaskMinutes +} + +func openClawRunningTaskResult(record *OpenClawTaskRecord) map[string]any { + if record == nil { + return map[string]any{"success": true, "status": "running", "mode": router.ExecutionTargetGatewayChat} + } + result := map[string]any{ + "success": true, + "status": string(TaskStateRunning), + "turnId": record.TurnID, + "runId": record.RunID, + "sessionId": record.SessionID, + "threadId": record.ThreadID, + "sessionKey": record.SessionKey, + "mode": router.ExecutionTargetGatewayChat, + "resolvedGatewayProviderId": record.GatewayProviderID, + "taskLoadClass": record.TaskLoadClass, + "runtimeBudgetMinutes": record.RuntimeBudgetMinutes, + "startedAt": record.StartedAt.UTC().Format(time.RFC3339Nano), + "deadlineAt": record.DeadlineAt.UTC().Format(time.RFC3339Nano), + "progress": openClawTaskProgress(record), + } + if record.PreparedArtifact != nil { + applyOpenClawPreparedArtifactToResult(result, record.PreparedArtifact) + } + if len(record.ArtifactContract.RequiredFinalExtensions) > 0 { + result["requiredArtifactExtensions"] = append([]string(nil), record.ArtifactContract.RequiredFinalExtensions...) + } + if len(record.ArtifactContract.ExpectedArtifactExtensions) > 0 { + result["expectedArtifactExtensions"] = append([]string(nil), record.ArtifactContract.ExpectedArtifactExtensions...) + } + return result +} + +func openClawTaskProgress(record *OpenClawTaskRecord) map[string]any { + now := time.Now() + stage := strings.TrimSpace(record.ProgressStage) + if stage == "" { + stage = "running" + } + message := strings.TrimSpace(record.ProgressMessage) + if message == "" { + message = "OpenClaw task is running" + } + return map[string]any{ + "stage": stage, + "message": message, + "elapsedMs": maxInt64(0, now.Sub(record.StartedAt).Milliseconds()), + "budgetMs": (time.Duration(record.RuntimeBudgetMinutes) * time.Minute).Milliseconds(), + "lastProbeAtMs": record.LastProbeAt.UnixMilli(), + "terminal": record.ProgressTerminal, + } +} + +func maxInt64(a int64, b int64) int64 { + if a > b { + return a + } + return b +} + +func (o *SessionOrchestrator) startOpenClawTaskMonitor(sess *session) { + if sess == nil { + return + } + sess.mu.Lock() + record := sess.openClaw + if record == nil || record.MonitorStarted || isTerminalTaskState(sess.task.State) { + sess.mu.Unlock() + return + } + record.MonitorStarted = true + sess.mu.Unlock() + + go func() { + defer o.releaseOpenClawAdmission(sess) + time.Sleep(openClawTaskMonitorInterval) + for { + sess.mu.Lock() + state := sess.task.State + deadline := sess.task.DeadlineAt + sess.mu.Unlock() + if isTerminalTaskState(state) { + return + } + if !deadline.IsZero() && time.Now().After(deadline) { + o.failOpenClawTask(sess, "TASK_SLA_EXPIRED", "OpenClaw task exceeded its runtime SLA") + return + } + o.probeOpenClawTask(context.Background(), sess, nil) + sess.mu.Lock() + state = sess.task.State + sess.mu.Unlock() + if isTerminalTaskState(state) { + return + } + time.Sleep(openClawTaskMonitorInterval) + } + }() +} + +func isTerminalTaskState(state TaskState) bool { + return state == TaskStateCompleted || state == TaskStateFailed || state == TaskStateCancelled +} + +func (o *SessionOrchestrator) releaseOpenClawAdmission(sess *session) { + if sess == nil { + return + } + var release func() + sess.mu.Lock() + if sess.openClaw != nil { + release = sess.openClaw.AdmissionRelease + sess.openClaw.AdmissionRelease = nil + } + sess.mu.Unlock() + if release != nil { + release() + } +} + +func (o *SessionOrchestrator) failOpenClawTask(sess *session, code string, message string) map[string]any { + if sess == nil { + return map[string]any{"success": false, "status": string(TaskStateFailed), "code": code, "message": message} + } + if strings.TrimSpace(message) == "" { + message = code + } + sess.mu.Lock() + turnID := sess.task.TurnID + runID := sess.task.RunID + gatewayProviderID := sess.task.GatewayProviderID + sess.task.State = TaskStateFailed + sess.task.UpdatedAt = time.Now() + sess.task.ProgressStage = "failed" + sess.task.ProgressMessage = message + sess.task.ProgressTerminal = true + if sess.openClaw != nil { + sess.openClaw.ProgressStage = "failed" + sess.openClaw.ProgressMessage = message + sess.openClaw.ProgressTerminal = true + sess.openClaw.ProbeInFlight = false + } + result := map[string]any{ + "success": false, + "status": string(TaskStateFailed), + "code": code, + "error": message, + "message": message, + "summary": message, + "output": message, + "turnId": turnID, + "runId": runID, + "mode": router.ExecutionTargetGatewayChat, + "resolvedGatewayProviderId": gatewayProviderID, + } + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + o.releaseOpenClawAdmission(sess) + return result +} + +func (o *SessionOrchestrator) probeOpenClawTask(ctx context.Context, sess *session, notify func(map[string]any)) map[string]any { + if sess == nil { + return map[string]any{"status": "not_found"} + } + sess.mu.Lock() + record := sess.openClaw + if record == nil { + snapshot := openClawSessionSnapshotLocked(sess) + sess.mu.Unlock() + return snapshot + } + if isTerminalTaskState(sess.task.State) { + snapshot := openClawSessionSnapshotLocked(sess) + sess.mu.Unlock() + return snapshot + } + if !record.DeadlineAt.IsZero() && time.Now().After(record.DeadlineAt) { + sess.mu.Unlock() + return o.failOpenClawTask(sess, "TASK_SLA_EXPIRED", "OpenClaw task exceeded its runtime SLA") + } + if record.ProbeInFlight { + result := openClawRunningTaskResult(record) + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + return result + } + record.ProbeInFlight = true + record.LastProbeAt = time.Now() + record.ProgressStage = "probing" + record.ProgressMessage = "Checking OpenClaw task status" + sess.task.LastProbeAt = record.LastProbeAt + sess.task.ProgressStage = record.ProgressStage + sess.task.ProgressMessage = record.ProgressMessage + gatewayProvider := record.GatewayProviderID + runID := record.RunID + sessionID := record.SessionID + threadID := record.ThreadID + turnID := record.TurnID + sess.mu.Unlock() + + collector := newOpenClawChatCollector() + notifyWithCollection := func(message map[string]any) { + collector.observe(message) + if notify == nil { + return + } + if update := openClawGatewaySessionUpdate(message, sessionID, threadID, turnID); update != nil { + notify(update) + } + } + waitStarted := time.Now() + waitResult := o.openClawGatewayRequestWithRetry( + gatewayProvider, + "agent.wait", + map[string]any{ + "runId": runID, + "timeoutMs": openClawTaskProbeTimeoutMs, + }, + openClawTaskProbeTimeout, + notifyWithCollection, + ) + logOpenClawGatewayTiming( + gatewayProvider, + "agent.wait.probe", + sessionID, + runID, + time.Since(waitStarted), + waitResult.OK, + ) + if !waitResult.OK { + if openClawProbeStillRunning(waitResult.Error) { + sess.mu.Lock() + if sess.openClaw != nil { + sess.openClaw.ProgressStage = "running" + sess.openClaw.ProgressMessage = "OpenClaw task is still running" + sess.openClaw.ProbeInFlight = false + } + sess.task.ProgressStage = "running" + sess.task.ProgressMessage = "OpenClaw task is still running" + sess.task.UpdatedAt = time.Now() + result := openClawRunningTaskResult(sess.openClaw) + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + return result + } + code := strings.TrimSpace(shared.StringArg(waitResult.Error, "code", "OPENCLAW_WAIT_FAILED")) + message := strings.TrimSpace(shared.StringArg(waitResult.Error, "message", "openclaw wait failed")) + return o.failOpenClawTask(sess, code, message) + } + return o.completeOpenClawTask(sess, shared.AsMap(waitResult.Payload), collector, notify) +} + +func openClawProbeStillRunning(errorPayload map[string]any) bool { + code := strings.TrimSpace(strings.ToUpper(shared.StringArg(errorPayload, "code", ""))) + if code == "TIMEOUT" || code == "RPC_TIMEOUT" || code == "REQUEST_TIMEOUT" || + code == "OFFLINE" || code == "SOCKET_FAILURE" || code == "SOCKET_CLOSED" { + return true + } + message := strings.TrimSpace(strings.ToLower(shared.StringArg(errorPayload, "message", ""))) + return strings.Contains(message, "timeout") || strings.Contains(message, "timed out") +} + +func (o *SessionOrchestrator) completeOpenClawTask( + sess *session, + waitPayload map[string]any, + collector *openClawChatCollector, + notify func(map[string]any), +) map[string]any { + if sess == nil { + return map[string]any{"status": "not_found"} + } + sess.mu.Lock() + record := sess.openClaw + if record == nil { + snapshot := openClawSessionSnapshotLocked(sess) + sess.mu.Unlock() + return snapshot + } + if isTerminalTaskState(sess.task.State) { + record.ProbeInFlight = false + snapshot := openClawSessionSnapshotLocked(sess) + sess.mu.Unlock() + return snapshot + } + sess.mu.Unlock() + + output := "" + if collector != nil { + output = collector.output() + } + if output == "" { + output = firstNonEmptyString(waitPayload, "output", "message", "summary", "assistantText", "text") + } + noDisplayableOutput := strings.TrimSpace(output) == "" + if output == "" { + output = openClawNoDisplayableText + } + result := map[string]any{ + "success": true, + "status": string(TaskStateCompleted), + "output": output, + "message": output, + "summary": output, + "turnId": record.TurnID, + "runId": record.RunID, + "sessionId": record.SessionID, + "threadId": record.ThreadID, + "sessionKey": record.SessionKey, + "mode": router.ExecutionTargetGatewayChat, + "resolvedExecutionTarget": router.ExecutionTargetGatewayChat, + "resolvedProviderId": record.GatewayProviderID, + "resolvedGatewayProviderId": record.GatewayProviderID, + "resolvedModel": record.ResolvedModel, + "resolvedSkills": append([]string(nil), record.ResolvedSkills...), + "taskLoadClass": record.TaskLoadClass, + "runtimeBudgetMinutes": record.RuntimeBudgetMinutes, + } + mergeOpenClawArtifactPayload(result, waitPayload) + if collector != nil { + mergeOpenClawArtifactPayload(result, collector.artifactPayload()) + } + applyOpenClawPreparedArtifactToResult(result, record.PreparedArtifact) + artifactPayload := o.openClawArtifactExport( + record.GatewayProviderID, + record.ChatParams, + record.RunID, + record.ArtifactSinceUnixMs, + record.PreparedArtifact, + notify, + ) + mergeOpenClawArtifactPayload(result, artifactPayload) + result[openClawArtifactExportAttemptedField] = true + exportedCount := openClawArtifactPayloadCount(result) + logOpenClawArtifactSync(record.GatewayProviderID, record.SessionKey, record.RunID, "export", record.PreparedArtifact != nil, exportedCount > 0, exportedCount == 0) + o.server.decorateOpenClawArtifactDownloadURLs(result, record.SessionKey, record.RunID) + stripOpenClawArtifactInlineContent(result) + applyOpenClawArtifactContractResult(result, record.ArtifactContract) + guardOpenClawAgentFailedBeforeReplyResult(result) + guardOpenClawNoDisplayableResult(result, noDisplayableOutput) + delete(result, openClawArtifactExportAttemptedField) + + success := parseBool(result["success"]) + state := TaskStateCompleted + stage := "completed" + if !success { + state = TaskStateFailed + stage = "failed" + } + artifactRecord := buildArtifactRecord(sess, result, output) + if len(artifactRecord.Artifacts) > 0 { + result["artifacts"] = artifactRecord.Artifacts + } + if artifactRecord.RemoteWorkingDirectory != "" { + result["remoteWorkingDirectory"] = artifactRecord.RemoteWorkingDirectory + } + if artifactRecord.RemoteWorkspaceRefKind != "" { + result["remoteWorkspaceRefKind"] = artifactRecord.RemoteWorkspaceRefKind + } + if artifactRecord.ResultSummary != "" && strings.TrimSpace(shared.StringArg(result, "resultSummary", "")) == "" { + result["resultSummary"] = artifactRecord.ResultSummary + } + + sess.mu.Lock() + sess.task.State = state + sess.task.UpdatedAt = time.Now() + sess.task.ProgressStage = stage + sess.task.ProgressMessage = output + sess.task.ProgressTerminal = true + if sess.openClaw != nil { + sess.openClaw.ProgressStage = stage + sess.openClaw.ProgressMessage = output + sess.openClaw.ProgressTerminal = true + sess.openClaw.ProbeInFlight = false + } + if output != "" { + sess.history = append(sess.history, "ASSISTANT: "+output) + } + sess.artifacts = artifactRecord + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + o.releaseOpenClawAdmission(sess) + if notify != nil { + notify(shared.NotificationEnvelope("session.update", openClawGatewayCompletedResultUpdate(record.SessionID, record.ThreadID, record.TurnID, result))) + } + return result +} + +func openClawSessionSnapshotLocked(sess *session) map[string]any { + payload := map[string]any{ + "status": string(sess.task.State), + "sessionId": sess.sessionID, + "threadId": sess.threadID, + "task": openClawTaskMapLocked(sess), + } + if len(sess.lastResult) > 0 { + payload["result"] = cloneMap(sess.lastResult) + } + if len(sess.artifacts.Artifacts) > 0 || + sess.artifacts.RemoteWorkingDirectory != "" || + sess.artifacts.RemoteWorkspaceRefKind != "" || + sess.artifacts.ResultSummary != "" { + payload["artifacts"] = map[string]any{ + "items": cloneMapSlice(sess.artifacts.Artifacts), + "remoteWorkingDirectory": sess.artifacts.RemoteWorkingDirectory, + "remoteWorkspaceRefKind": sess.artifacts.RemoteWorkspaceRefKind, + "resultSummary": sess.artifacts.ResultSummary, + "updatedAt": sess.artifacts.UpdatedAt.UTC().Format(time.RFC3339Nano), + } + } + if sess.openClaw != nil { + payload["progress"] = openClawTaskProgress(sess.openClaw) + } + return payload +} + +func openClawTaskMapLocked(sess *session) map[string]any { + task := sess.task + payload := map[string]any{ + "sessionId": task.SessionID, + "threadId": task.ThreadID, + "turnId": task.TurnID, + "runId": task.RunID, + "sessionKey": task.SessionKey, + "provider": task.Provider, + "target": task.Target, + "gatewayProviderId": task.GatewayProviderID, + "state": string(task.State), + "kind": string(task.Kind), + "taskLoadClass": task.TaskLoadClass, + "artifactScope": task.ArtifactScope, + "artifactDirectory": task.ArtifactDirectory, + "runtimeBudgetMinutes": task.RuntimeBudgetMinutes, + "updatedAt": task.UpdatedAt.UTC().Format(time.RFC3339Nano), + } + if !task.StartedAt.IsZero() { + payload["startedAt"] = task.StartedAt.UTC().Format(time.RFC3339Nano) + } + if !task.DeadlineAt.IsZero() { + payload["deadlineAt"] = task.DeadlineAt.UTC().Format(time.RFC3339Nano) + } + if !task.LastProbeAt.IsZero() { + payload["lastProbeAt"] = task.LastProbeAt.UTC().Format(time.RFC3339Nano) + } + if task.ProgressStage != "" || task.ProgressMessage != "" { + payload["progress"] = map[string]any{ + "stage": task.ProgressStage, + "message": task.ProgressMessage, + "terminal": task.ProgressTerminal, + } + } + return payload +} diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index 4abc127..f0b6e2c 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -262,11 +262,16 @@ func (o *SessionOrchestrator) runGateway( if rpcErr != nil { return nil, rpcErr } - defer release() if rpcErr := ensureProductionGatewayConnected(o.server, gatewayProvider, notify); rpcErr != nil { + release() return nil, rpcErr } - return o.runOpenClawGatewayChat(ctx, params, gatewayProvider, turnID, notify) + result, rpcErr := o.startOpenClawGatewayTask(ctx, params, routing, gatewayProvider, turnID, release, notify) + if rpcErr != nil { + release() + return nil, rpcErr + } + return result, nil } if rpcErr := ensureProductionGatewayConnected(o.server, gatewayProvider, notify); rpcErr != nil { return nil, rpcErr @@ -299,11 +304,13 @@ func (o *SessionOrchestrator) runGateway( return payload, nil } -func (o *SessionOrchestrator) runOpenClawGatewayChat( +func (o *SessionOrchestrator) startOpenClawGatewayTask( _ context.Context, params map[string]any, + routing RoutingResult, gatewayProvider string, turnID string, + releaseAdmission func(), notify func(map[string]any), ) (map[string]any, *shared.RPCError) { collector := newOpenClawChatCollector() @@ -374,72 +381,65 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat( logOpenClawArtifactSync(gatewayProvider, sessionKey, runID, "prepare", true, false, false) applyOpenClawPreparedArtifactToChatParams(chatParams, preparedArtifact, sessionKey, runID, artifactContract) } - waitTimeout := openClawAgentWaitTimeout(params, chatParams) - waitStarted := time.Now() - waitResult := o.openClawGatewayRequestWithRetry( - gatewayProvider, - "agent.wait", - map[string]any{ - "runId": runID, - "timeoutMs": waitTimeout.Milliseconds(), - }, - waitTimeout, - notifyWithCollection, - ) - logOpenClawGatewayTiming( - gatewayProvider, - "agent.wait", - sessionKey, - runID, - time.Since(waitStarted), - waitResult.OK, - ) - if !waitResult.OK { - return nil, gatewayRPCError(waitResult.Error, "openclaw agent.wait failed") + taskLoadClass, runtimeBudgetMinutes := openClawTaskRuntimePolicy(params, chatParams, artifactContract) + startedAt := time.Now() + record := &OpenClawTaskRecord{ + SessionID: sessionID, + ThreadID: threadID, + TurnID: turnID, + RunID: runID, + SessionKey: sessionKey, + GatewayProviderID: gatewayProvider, + TaskLoadClass: taskLoadClass, + ArtifactSinceUnixMs: artifactSinceUnixMs, + RuntimeBudgetMinutes: runtimeBudgetMinutes, + StartedAt: startedAt, + DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute), + ProgressStage: "running", + ProgressMessage: "OpenClaw task accepted", + ChatParams: cloneMap(chatParams), + PreparedArtifact: preparedArtifact, + ArtifactContract: artifactContract, + ResolvedModel: routing.Model, + ResolvedSkills: append([]string(nil), routing.Skills...), + AdmissionRelease: releaseAdmission, } - waitPayload := shared.AsMap(waitResult.Payload) - output := collector.output() - if output == "" { - output = firstNonEmptyString(waitPayload, "output", "message", "summary", "assistantText", "text") - } - noDisplayableOutput := strings.TrimSpace(output) == "" - if output == "" { - output = openClawNoDisplayableText - } - result := map[string]any{ - "success": true, - "output": output, - "message": output, - "summary": output, - "turnId": turnID, - "runId": runID, - "mode": router.ExecutionTargetGatewayChat, - "resolvedGatewayProviderId": gatewayProvider, - } - mergeOpenClawArtifactPayload(result, waitPayload) - mergeOpenClawArtifactPayload(result, collector.artifactPayload()) - applyOpenClawPreparedArtifactToResult(result, preparedArtifact) - artifactPayload := o.openClawArtifactExport( - gatewayProvider, - chatParams, - runID, - artifactSinceUnixMs, - preparedArtifact, - notifyWithCollection, - ) - mergeOpenClawArtifactPayload(result, artifactPayload) - result[openClawArtifactExportAttemptedField] = true - exportedCount := openClawArtifactPayloadCount(result) - logOpenClawArtifactSync(gatewayProvider, sessionKey, runID, "export", preparedArtifact != nil, exportedCount > 0, exportedCount == 0) - o.server.decorateOpenClawArtifactDownloadURLs(result, shared.StringArg(chatParams, "sessionKey", ""), runID) - stripOpenClawArtifactInlineContent(result) - applyOpenClawArtifactContractResult(result, artifactContract) - guardOpenClawAgentFailedBeforeReplyResult(result) - guardOpenClawNoDisplayableResult(result, noDisplayableOutput) + sess := o.server.getOrCreateSession(sessionID, threadID) + sess.mu.Lock() + sess.task.RunID = runID + sess.task.SessionKey = sessionKey + sess.task.GatewayProviderID = gatewayProvider + sess.task.TaskLoadClass = taskLoadClass + sess.task.ArtifactScope = strings.TrimSpace(preparedArtifact.ArtifactScope) + sess.task.ArtifactDirectory = strings.TrimSpace(preparedArtifact.ArtifactDirectory) + sess.task.RuntimeBudgetMinutes = runtimeBudgetMinutes + sess.task.StartedAt = startedAt + sess.task.DeadlineAt = record.DeadlineAt + sess.task.ProgressStage = "running" + sess.task.ProgressMessage = "OpenClaw task accepted" + sess.openClaw = record + running := openClawRunningTaskResult(record) + sess.lastResult = cloneMap(running) + sess.mu.Unlock() + o.startOpenClawTaskMonitor(sess) if notify != nil { - notify(shared.NotificationEnvelope("session.update", openClawGatewayCompletedResultUpdate(sessionID, threadID, turnID, result))) + notify(shared.NotificationEnvelope("session.update", map[string]any{ + "sessionId": sessionID, + "threadId": threadID, + "turnId": turnID, + "runId": runID, + "type": "status", + "event": "running", + "message": "OpenClaw task accepted", + "pending": true, + "error": false, + "status": string(TaskStateRunning), + "runtimeBudgetMinutes": runtimeBudgetMinutes, + "progress": running["progress"], + })) } - return result, nil + _ = collector + return running, nil } func openClawGatewayCompletedResultUpdate(sessionID string, threadID string, turnID string, result map[string]any) map[string]any { @@ -1773,6 +1773,21 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a if result == nil { result = map[string]any{} } + if routing.TargetID == "gateway" && strings.TrimSpace(shared.StringArg(result, "status", "")) == string(TaskStateRunning) { + result["turnId"] = turnID + result["success"] = true + result["resolvedExecutionTarget"] = routing.TargetID + result["resolvedProviderId"] = routing.ProviderID + result["resolvedGatewayProviderId"] = routing.GatewayProviderID + result["resolvedModel"] = routing.Model + result["resolvedSkills"] = append([]string(nil), routing.Skills...) + sess.mu.Lock() + sess.task.State = TaskStateRunning + sess.task.UpdatedAt = time.Now() + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + return result + } if openClawArtifactResponse(result, routing, params) { o.completeOpenClawScopedArtifactExport(result, params, openClawGatewayProviderForArtifacts(result, routing, params), turnID) } diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index 4a594a1..e74e0fa 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -55,7 +55,27 @@ func (s *Server) executeSessionTask(t task) (map[string]any, *shared.RPCError) { if t.req.Method == "" { t.req.Method = "session.start" } - return s.handleRequest(t.req, t.notify) + response, rpcErr := s.handleRequest(t.req, t.notify) + if rpcErr != nil || strings.TrimSpace(shared.StringArg(response, "status", "")) != string(TaskStateRunning) { + return response, rpcErr + } + return s.handleRequest(shared.RPCRequest{ + Method: "xworkmate.tasks.get", + Params: map[string]any{ + "sessionId": shared.StringArg(response, "sessionId", ""), + "threadId": shared.StringArg(response, "threadId", ""), + "turnId": shared.StringArg(response, "turnId", ""), + "runId": shared.StringArg(response, "runId", ""), + "sessionKey": shared.StringArg(response, "sessionKey", ""), + "artifactScope": shared.StringArg(response, "artifactScope", ""), + "artifactDirectory": shared.StringArg(response, "artifactDirectory", ""), + "gatewayProviderId": shared.StringArg(response, "resolvedGatewayProviderId", ""), + "runtimeBudgetMinutes": shared.StringArg(response, "runtimeBudgetMinutes", ""), + "taskLoadClass": shared.StringArg(response, "taskLoadClass", ""), + "expectedArtifactExtensions": shared.ListArg(response, "expectedArtifactExtensions"), + "requiredArtifactExtensions": shared.ListArg(response, "requiredArtifactExtensions"), + }, + }, t.notify) } func newExternalSingleAgentProvider( @@ -532,11 +552,8 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) { if !ok { t.Fatalf("expected numeric OpenClaw agent.wait timeoutMs, got %#v", waitParams) } - if got := int64(timeoutMs); got != openClawAgentWaitDefaultTimeout.Milliseconds() { - t.Fatalf("expected default OpenClaw agent.wait timeoutMs %d, got %#v", openClawAgentWaitDefaultTimeout.Milliseconds(), waitParams) - } - if got := int64(timeoutMs); got <= 120000 { - t.Fatalf("expected OpenClaw agent.wait timeout to exceed the previous 120s cap, got %#v", waitParams) + if got := int64(timeoutMs); got != openClawTaskProbeTimeoutMs { + t.Fatalf("expected OpenClaw probe timeoutMs %d, got %#v", openClawTaskProbeTimeoutMs, waitParams) } if gateway.ArtifactExportCount() != 1 { t.Fatalf("expected one OpenClaw artifact export sync after run, got %d", gateway.ArtifactExportCount()) @@ -865,17 +882,20 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi }, }, }) - if rpcErr == nil { - t.Fatalf("expected wait-timeout rpc error, got response: %#v", response) + if rpcErr != nil { + t.Fatalf("expected wait-timeout probe to keep task running, got rpc error: %#v", rpcErr) } - if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "openclaw wait timeout") { - t.Fatalf("expected surfaced wait timeout, got %#v", rpcErr) + if got := response["status"]; got != string(TaskStateRunning) { + t.Fatalf("expected wait-timeout probe to keep running status, got %#v", response) } if got := gateway.ChatSendCount(); got != 1 { t.Fatalf("expected no automatic repair model turn, got %d", got) } if got := gateway.AgentWaitCount(); got != 1 { - t.Fatalf("expected one failed wait, got %d", got) + t.Fatalf("expected one status probe, got %d", got) + } + if got := gateway.ArtifactExportCount(); got != 0 { + t.Fatalf("expected no artifact export before terminal state, got %d", got) } } @@ -1551,19 +1571,25 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) { }, }, }) - if rpcErr == nil { - t.Fatalf("expected OpenClaw agent.wait error, got response: %#v", response) + if rpcErr != nil { + t.Fatalf("expected OpenClaw agent.wait error as task result, got rpc error: %#v", rpcErr) } - if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "openclaw wait failed") { - t.Fatalf("expected surfaced agent.wait failure, got %#v", rpcErr) + if got := response["status"]; got != string(TaskStateFailed) { + t.Fatalf("expected failed task result, got %#v", response) + } + if got := response["code"]; got != "OPENCLAW_WAIT_FAILED" { + t.Fatalf("expected OpenClaw wait failure code, got %#v", response) + } + if got := shared.StringArg(response, "message", ""); !strings.Contains(got, "openclaw wait failed") { + t.Fatalf("expected surfaced agent.wait failure, got %#v", response) } if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait"}) { t.Fatalf("expected connect, artifact prepare, chat.send, then agent.wait, got %#v", got) } - snapshot := server.handleSessionGet(map[string]any{ + snapshot := server.handleTaskGet(context.Background(), map[string]any{ "sessionId": "session-openclaw-wait-fail", "threadId": "thread-openclaw-wait-fail", - }) + }, nil) if got := snapshot["status"]; got != string(TaskStateFailed) { t.Fatalf("expected failed session snapshot, got %#v from %#v", got, snapshot) } diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index 9a3e0a6..53f2f92 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -59,8 +59,11 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string case "xworkmate.jobs.submit", "xworkmate.jobs.get", "xworkmate.jobs.list", "xworkmate.jobs.stats": return s.handleJobMethod(ctx, method, request.Params, notify) - case "xworkmate.sessions.get": - return s.handleSessionGet(request.Params), nil + case "xworkmate.tasks.get": + return s.handleTaskGet(ctx, request.Params, notify), nil + + case "xworkmate.tasks.cancel": + return s.handleTaskCancel(ctx, request.Params, notify), nil case "xworkmate.tools.invoke": return s.invokeOpenClawTool(ctx, request.Params) @@ -73,63 +76,167 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string } } -func (s *Server) handleSessionGet(params map[string]any) map[string]any { - sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")) - threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")) - if sessionID == "" && threadID == "" { +func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { + sess := s.findTaskSession(params) + if sess == nil { + sess = s.reassociateOpenClawTask(params) + } + if sess == nil { return map[string]any{"status": "not_found"} } - s.mu.RLock() - sess := s.sessions[sessionID] - if sess == nil && threadID != "" { - for _, candidate := range s.sessions { - if candidate != nil && candidate.threadID == threadID { - sess = candidate - break - } - } - } - s.mu.RUnlock() + return s.orchestrator.probeOpenClawTask(ctx, sess, notify) +} + +func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { + sess := s.findTaskSession(params) if sess == nil { - return map[string]any{ - "status": "not_found", - "sessionId": sessionID, - "threadId": threadID, - } + sess = s.reassociateOpenClawTask(params) + } + if sess == nil { + return map[string]any{"accepted": false, "status": "not_found"} } sess.mu.Lock() - defer sess.mu.Unlock() - payload := map[string]any{ - "status": string(sess.task.State), - "sessionId": sess.sessionID, - "threadId": sess.threadID, - "task": map[string]any{ - "sessionId": sess.task.SessionID, - "threadId": sess.task.ThreadID, - "turnId": sess.task.TurnID, - "provider": sess.task.Provider, - "target": sess.task.Target, - "state": string(sess.task.State), - "kind": string(sess.task.Kind), - "updatedAt": sess.task.UpdatedAt.UTC().Format(time.RFC3339Nano), - }, + gatewayProvider := sess.task.GatewayProviderID + runID := sess.task.RunID + sess.task.State = TaskStateCancelled + sess.task.UpdatedAt = time.Now() + sess.task.ProgressStage = "cancelled" + sess.task.ProgressMessage = "OpenClaw task cancelled" + sess.task.ProgressTerminal = true + if sess.openClaw != nil { + sess.openClaw.ProgressStage = "cancelled" + sess.openClaw.ProgressMessage = "OpenClaw task cancelled" + sess.openClaw.ProgressTerminal = true } - if len(sess.lastResult) > 0 { - payload["result"] = cloneMap(sess.lastResult) + snapshot := openClawSessionSnapshotLocked(sess) + sess.mu.Unlock() + s.orchestrator.releaseOpenClawAdmission(sess) + if strings.TrimSpace(gatewayProvider) != "" && strings.TrimSpace(runID) != "" && s.gateway != nil { + _ = s.gateway.RequestByMode( + gatewayProvider, + "agent.cancel", + map[string]any{"runId": runID}, + 5*time.Second, + notify, + ) } - if len(sess.artifacts.Artifacts) > 0 || - sess.artifacts.RemoteWorkingDirectory != "" || - sess.artifacts.RemoteWorkspaceRefKind != "" || - sess.artifacts.ResultSummary != "" { - payload["artifacts"] = map[string]any{ - "items": cloneMapSlice(sess.artifacts.Artifacts), - "remoteWorkingDirectory": sess.artifacts.RemoteWorkingDirectory, - "remoteWorkspaceRefKind": sess.artifacts.RemoteWorkspaceRefKind, - "resultSummary": sess.artifacts.ResultSummary, - "updatedAt": sess.artifacts.UpdatedAt.UTC().Format(time.RFC3339Nano), + snapshot["accepted"] = true + return snapshot +} + +func (s *Server) findTaskSession(params map[string]any) *session { + sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")) + threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")) + turnID := strings.TrimSpace(shared.StringArg(params, "turnId", "")) + runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) + artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) + s.mu.RLock() + defer s.mu.RUnlock() + if sessionID != "" && s.sessions[sessionID] != nil { + return s.sessions[sessionID] + } + for _, candidate := range s.sessions { + if candidate == nil { + continue + } + candidate.mu.Lock() + matches := (threadID != "" && candidate.threadID == threadID) || + (turnID != "" && candidate.task.TurnID == turnID) || + (runID != "" && candidate.task.RunID == runID) || + (artifactScope != "" && candidate.task.ArtifactScope == artifactScope) + candidate.mu.Unlock() + if matches { + return candidate } } - return payload + return nil +} + +func (s *Server) reassociateOpenClawTask(params map[string]any) *session { + runID := strings.TrimSpace(shared.StringArg(params, "runId", "")) + artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", "")) + if runID == "" || artifactScope == "" { + return nil + } + sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")) + threadID := strings.TrimSpace(shared.StringArg(params, "threadId", sessionID)) + if sessionID == "" { + sessionID = threadID + } + if sessionID == "" { + sessionID = strings.TrimSpace(shared.StringArg(params, "sessionKey", "")) + } + if sessionID == "" { + sessionID = "openclaw:" + runID + } + if threadID == "" { + threadID = sessionID + } + turnID := strings.TrimSpace(shared.StringArg(params, "turnId", runID)) + sessionKey := strings.TrimSpace(shared.StringArg(params, "sessionKey", threadID)) + gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "openclaw")) + budget := shared.IntArg(shared.StringArg(params, "runtimeBudgetMinutes", ""), openClawComplexTaskMinutes) + now := time.Now() + prepared := &openClawPreparedArtifactScope{ + ArtifactScope: artifactScope, + ArtifactDirectory: strings.TrimSpace(shared.StringArg(params, "artifactDirectory", "")), + RelativeArtifactDirectory: artifactScope, + ScopeKind: "task", + RemoteWorkingDirectory: strings.TrimSpace(shared.StringArg(params, "remoteWorkingDirectory", "")), + RemoteWorkspaceRefKind: strings.TrimSpace(shared.StringArg(params, "remoteWorkspaceRefKind", "")), + } + contract := openClawArtifactContract{ + TaskLoadClass: strings.TrimSpace(shared.StringArg(params, "taskLoadClass", "")), + ExpectedArtifactExtensions: normalizeOpenClawExtensionList(shared.ListArg(params, "expectedArtifactExtensions")), + RequiredFinalExtensions: normalizeOpenClawExtensionList(shared.ListArg(params, "requiredArtifactExtensions")), + } + sess := s.getOrCreateSession(sessionID, threadID) + sess.mu.Lock() + sess.provider = gatewayProvider + sess.target = "gateway" + sess.mode = "gateway" + sess.task = QueuedTask{ + SessionID: sessionID, + ThreadID: threadID, + TurnID: turnID, + RunID: runID, + SessionKey: sessionKey, + Provider: gatewayProvider, + Target: "gateway", + GatewayProviderID: gatewayProvider, + State: TaskStateRunning, + Kind: TaskKindGateway, + TaskLoadClass: contract.TaskLoadClass, + ArtifactScope: artifactScope, + ArtifactDirectory: prepared.ArtifactDirectory, + RuntimeBudgetMinutes: budget, + StartedAt: now, + DeadlineAt: now.Add(time.Duration(budget) * time.Minute), + UpdatedAt: now, + ProgressStage: "reassociated", + ProgressMessage: "OpenClaw task reassociated from task handle", + } + sess.openClaw = &OpenClawTaskRecord{ + SessionID: sessionID, + ThreadID: threadID, + TurnID: turnID, + RunID: runID, + SessionKey: sessionKey, + GatewayProviderID: gatewayProvider, + TaskLoadClass: contract.TaskLoadClass, + ArtifactSinceUnixMs: 0, + RuntimeBudgetMinutes: budget, + StartedAt: now, + DeadlineAt: now.Add(time.Duration(budget) * time.Minute), + ProgressStage: "reassociated", + ProgressMessage: "OpenClaw task reassociated from task handle", + ChatParams: map[string]any{"sessionKey": sessionKey}, + PreparedArtifact: prepared, + ArtifactContract: contract, + } + sess.lastResult = openClawRunningTaskResult(sess.openClaw) + sess.mu.Unlock() + return sess } func (s *Server) cancelSession(ctx context.Context, sessionID string) { diff --git a/internal/acp/types.go b/internal/acp/types.go index fbc87f5..c8ebbec 100644 --- a/internal/acp/types.go +++ b/internal/acp/types.go @@ -36,14 +36,27 @@ type ControlPlaneSession struct { } type QueuedTask struct { - SessionID string - ThreadID string - TurnID string - Provider string - Target string - State TaskState - Kind TaskKind - UpdatedAt time.Time + SessionID string + ThreadID string + TurnID string + RunID string + SessionKey string + Provider string + Target string + GatewayProviderID string + State TaskState + Kind TaskKind + TaskLoadClass string + ArtifactScope string + ArtifactDirectory string + RuntimeBudgetMinutes int + StartedAt time.Time + UpdatedAt time.Time + DeadlineAt time.Time + LastProbeAt time.Time + ProgressStage string + ProgressMessage string + ProgressTerminal bool } type ArtifactRecord struct { @@ -69,6 +82,7 @@ type session struct { task QueuedTask artifacts ArtifactRecord lastResult map[string]any + openClaw *OpenClawTaskRecord } type Server struct { diff --git a/internal/acp/web_contract_test.go b/internal/acp/web_contract_test.go index 8ab8034..e83ba8d 100644 --- a/internal/acp/web_contract_test.go +++ b/internal/acp/web_contract_test.go @@ -18,6 +18,104 @@ import ( "xworkmate-bridge/internal/shared" ) +func sseResultEnvelope(t *testing.T, body string, id string) map[string]any { + t.Helper() + for _, rawLine := range strings.Split(body, "\n") { + line := strings.TrimSpace(rawLine) + if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" { + continue + } + var envelope map[string]any + if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope); err != nil { + t.Fatalf("decode SSE envelope %q: %v", line, err) + } + if strings.TrimSpace(shared.StringArg(envelope, "id", "")) == id { + return envelope + } + } + t.Fatalf("missing SSE result envelope %q in body: %s", id, body) + return nil +} + +func sseFirstResultEnvelope(t *testing.T, body string) map[string]any { + t.Helper() + for _, rawLine := range strings.Split(body, "\n") { + line := strings.TrimSpace(rawLine) + if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" { + continue + } + var envelope map[string]any + if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope); err != nil { + t.Fatalf("decode SSE envelope %q: %v", line, err) + } + if _, ok := envelope["result"]; ok { + return envelope + } + } + t.Fatalf("missing SSE result envelope in body: %s", body) + return nil +} + +func taskGetHTTPResult(t *testing.T, handler http.Handler, handle map[string]any) map[string]any { + t.Helper() + body := fmt.Sprintf( + `{"jsonrpc":"2.0","id":"task-get","method":"xworkmate.tasks.get","params":{"sessionId":%q,"threadId":%q,"turnId":%q,"runId":%q,"sessionKey":%q,"artifactScope":%q,"artifactDirectory":%q,"gatewayProviderId":%q,"runtimeBudgetMinutes":%q,"taskLoadClass":%q,"expectedArtifactExtensions":%s,"requiredArtifactExtensions":%s}}`, + shared.StringArg(handle, "sessionId", ""), + shared.StringArg(handle, "threadId", ""), + shared.StringArg(handle, "turnId", ""), + shared.StringArg(handle, "runId", ""), + shared.StringArg(handle, "sessionKey", ""), + shared.StringArg(handle, "artifactScope", ""), + shared.StringArg(handle, "artifactDirectory", ""), + shared.StringArg(handle, "resolvedGatewayProviderId", "openclaw"), + shared.StringArg(handle, "runtimeBudgetMinutes", ""), + shared.StringArg(handle, "taskLoadClass", ""), + jsonArrayString(t, shared.ListArg(handle, "expectedArtifactExtensions")), + jsonArrayString(t, shared.ListArg(handle, "requiredArtifactExtensions")), + ) + recorder := httptest.NewRecorder() + request := httptest.NewRequest(http.MethodPost, "http://127.0.0.1/acp/rpc", strings.NewReader(body)) + request.Header.Set("Content-Type", "application/json") + request.Header.Set("Authorization", "Bearer bridge-test-token") + handler.ServeHTTP(recorder, request) + if recorder.Code != http.StatusOK { + t.Fatalf("expected task get 200, got %d: %s", recorder.Code, recorder.Body.String()) + } + var decoded map[string]any + if err := json.Unmarshal(recorder.Body.Bytes(), &decoded); err != nil { + t.Fatalf("decode task get response: %v", err) + } + return shared.AsMap(decoded["result"]) +} + +func taskGetHTTPTerminalResult(t *testing.T, handler http.Handler, handle map[string]any) map[string]any { + t.Helper() + deadline := time.Now().Add(5 * time.Second) + for { + result := taskGetHTTPResult(t, handler, handle) + switch result["status"] { + case "completed", "failed", "cancelled": + return result + } + if time.Now().After(deadline) { + t.Fatalf("task did not reach terminal state: %#v", result) + } + time.Sleep(25 * time.Millisecond) + } +} + +func jsonArrayString(t *testing.T, values []any) string { + t.Helper() + if values == nil { + return "[]" + } + encoded, err := json.Marshal(values) + if err != nil { + t.Fatalf("encode array: %v", err) + } + return string(encoded) +} + func TestHTTPHandlerRootAndPingExposeRuntimeVersionInfo(t *testing.T) { t.Setenv("BRIDGE_AUTH_TOKEN", "") SetRuntimeVersionInfo(RuntimeVersionInfo{ @@ -156,19 +254,13 @@ func TestHTTPHandlerRPCSSEWritesFinalEnvelopeAndDone(t *testing.T) { } } -func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *testing.T) { +func TestHTTPHandlerGatewayOpenClawReturnsRunningEnvelopeAndDone(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() - gateway.agentWaitDelayMs.Store(50) t.Setenv("GATEWAY_RPC_URL", gateway.URL()) t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-test-token") t.Setenv("BRIDGE_CONFIG_PATH", filepath.Join(t.TempDir(), "missing-config.yaml")) - previousInterval := httpSSEKeepaliveInterval - httpSSEKeepaliveInterval = 10 * time.Millisecond - t.Cleanup(func() { - httpSSEKeepaliveInterval = previousInterval - }) server := NewServer() httpServer := httptest.NewServer(server.Handler()) @@ -202,15 +294,15 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes t.Fatalf("expected event-stream content type, got %q", contentType) } - events := strings.Split(strings.TrimSpace(string(body)), "\n\n") - if len(events) < 4 { - t.Fatalf("expected accepted, keepalive, final envelope, and done events, got %q", string(body)) + bodyText := string(body) + events := strings.Split(strings.TrimSpace(bodyText), "\n\n") + if len(events) < 3 { + t.Fatalf("expected accepted, running envelope, and done events, got %q", bodyText) } if events[len(events)-1] != "data: [DONE]" { t.Fatalf("expected done event, got %q", events[len(events)-1]) } - var sawAcceptedBeforeKeepalive bool - var sawKeepaliveBeforeFinal bool + var sawAcceptedBeforeFinal bool var sawFinal bool for _, event := range events[:len(events)-1] { if !strings.HasPrefix(event, "data: ") { @@ -220,27 +312,25 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes if err := json.Unmarshal([]byte(strings.TrimPrefix(event, "data: ")), &envelope); err != nil { t.Fatalf("decode event %q: %v", event, err) } - if envelope["method"] == "xworkmate.bridge.accepted" && !sawKeepaliveBeforeFinal && !sawFinal { - sawAcceptedBeforeKeepalive = true - } - if envelope["method"] == "xworkmate.bridge.keepalive" && !sawFinal { - sawKeepaliveBeforeFinal = true + if envelope["method"] == "xworkmate.bridge.accepted" && !sawFinal { + sawAcceptedBeforeFinal = true } if envelope["id"] == "task-keepalive" { sawFinal = true - if _, ok := envelope["result"].(map[string]any); !ok { - t.Fatalf("expected result envelope, got %#v", envelope) + result := shared.AsMap(envelope["result"]) + if got := result["status"]; got != "running" { + t.Fatalf("expected running task handle, got %#v", result) + } + if got := result["runtimeBudgetMinutes"]; got != float64(openClawShortTaskMinutes) { + t.Fatalf("expected short task budget in running handle, got %#v", result) } } } - if !sawAcceptedBeforeKeepalive { - t.Fatalf("expected accepted event before keepalive/final envelope, got %q", string(body)) - } - if !sawKeepaliveBeforeFinal { - t.Fatalf("expected keepalive event before final envelope, got %q", string(body)) + if !sawAcceptedBeforeFinal { + t.Fatalf("expected accepted event before final envelope, got %q", bodyText) } if !sawFinal { - t.Fatalf("expected final task envelope, got %q", string(body)) + t.Fatalf("expected running task envelope, got %q", bodyText) } } @@ -311,7 +401,7 @@ func TestHTTPHandlerGatewayOpenClawAdmissionQueuesExcessConcurrentSSE(t *testing close(results) var sawQueued bool - var finalCount int + var runningHandleCount int for item := range results { if item.err != nil { t.Fatalf("concurrent request failed: %v", item.err) @@ -319,15 +409,17 @@ func TestHTTPHandlerGatewayOpenClawAdmissionQueuesExcessConcurrentSSE(t *testing if strings.Contains(item.body, `"event":"queued"`) { sawQueued = true } - if strings.Contains(item.body, `"result"`) && strings.Contains(item.body, `data: [DONE]`) { - finalCount += 1 + envelope := sseFirstResultEnvelope(t, item.body) + result := shared.AsMap(envelope["result"]) + if result["status"] == "running" && strings.TrimSpace(shared.StringArg(result, "runId", "")) != "" { + runningHandleCount += 1 } } if !sawQueued { t.Fatalf("expected one queued session.update event") } - if finalCount != 2 { - t.Fatalf("expected both requests to return final result, got %d", finalCount) + if runningHandleCount != 2 { + t.Fatalf("expected both requests to return running handles, got %d", runningHandleCount) } if got := gateway.ChatSendCount(); got != 2 { t.Fatalf("expected queued request to run after a slot releases, got %d chat.send calls", got) @@ -412,7 +504,7 @@ func TestHTTPHandlerGatewayOpenClawHandlesFiveConcurrentE2ECases(t *testing.T) { wg.Wait() close(results) - var finalCount int + var runningHandleCount int var missingFinalArtifactCount int for item := range results { if item.err != nil { @@ -431,15 +523,19 @@ func TestHTTPHandlerGatewayOpenClawHandlesFiveConcurrentE2ECases(t *testing.T) { t.Fatalf("unexpected gateway stability error %q in body: %s", unexpected, item.body) } } - if strings.Contains(item.body, "OPENCLAW_REQUIRED_ARTIFACT_MISSING") { + envelope := sseFirstResultEnvelope(t, item.body) + handle := shared.AsMap(envelope["result"]) + if got := handle["status"]; got != "running" { + t.Fatalf("expected running task handle, got %#v", handle) + } + runningHandleCount += 1 + result := taskGetHTTPTerminalResult(t, httpServer.Config.Handler, handle) + if result["code"] == "OPENCLAW_REQUIRED_ARTIFACT_MISSING" { missingFinalArtifactCount += 1 } - if strings.Contains(item.body, `"result"`) && strings.Contains(item.body, `data: [DONE]`) { - finalCount += 1 - } } - if finalCount != len(prompts) { - t.Fatalf("expected all five e2e requests to return final result, got %d", finalCount) + if runningHandleCount != len(prompts) { + t.Fatalf("expected all five e2e requests to return running handles, got %d", runningHandleCount) } if missingFinalArtifactCount != len(prompts) { t.Fatalf("expected all artifact-producing prompts to fail without real final artifacts, got %d", missingFinalArtifactCount) @@ -584,15 +680,14 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t events := strings.Split(strings.TrimSpace(bodyText), "\n\n") if len(events) < 4 { - t.Fatalf("expected accepted, session.update, final envelope, and done events, got %q", bodyText) + t.Fatalf("expected accepted, session.update, running envelope, and done events, got %q", bodyText) } if events[len(events)-1] != "data: [DONE]" { t.Fatalf("expected done event, got %q", events[len(events)-1]) } var sawAccepted bool - var sawDelta bool - var sawCompletedSnapshot bool var sawFinal bool + var handle map[string]any for _, event := range events[:len(events)-1] { if !strings.HasPrefix(event, "data: ") { t.Fatalf("expected data event, got %q", event) @@ -606,8 +701,7 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t sawAccepted = true case "session.update": params := shared.AsMap(envelope["params"]) - if params["type"] == "delta" && params["delta"] == "streamed delta" { - sawDelta = true + if params["type"] == "status" && params["event"] == "running" { if got := params["sessionId"]; got != "session-filter" { t.Fatalf("expected session-filter session update, got %#v", params) } @@ -615,39 +709,33 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t t.Fatalf("expected thread-filter session update, got %#v", params) } } - result := shared.AsMap(params["result"]) - if params["type"] == "status" && params["event"] == "completed" && len(result) > 0 { - sawCompletedSnapshot = true - if got := result["resolvedGatewayProviderId"]; got != "openclaw" { - t.Fatalf("expected completed snapshot to carry openclaw final result, got %#v", result) - } - if !strings.Contains(bodyText, openClawArtifactDownloadPath) { - t.Fatalf("expected completed snapshot to include normalized artifact download URL, got %s", bodyText) - } - } } if envelope["id"] == "task-filter" { sawFinal = true - result := shared.AsMap(envelope["result"]) - if got := result["resolvedGatewayProviderId"]; got != "openclaw" { - t.Fatalf("expected openclaw final result, got %#v", result) + handle = shared.AsMap(envelope["result"]) + if got := handle["status"]; got != "running" { + t.Fatalf("expected running task handle, got %#v", handle) } - if !strings.Contains(bodyText, openClawArtifactDownloadPath) { - t.Fatalf("expected normalized artifact download URL in final result, got %s", bodyText) + if got := handle["resolvedGatewayProviderId"]; got != "openclaw" { + t.Fatalf("expected openclaw running result, got %#v", handle) } } } if !sawAccepted { t.Fatalf("expected accepted event, got %q", bodyText) } - if !sawDelta { - t.Fatalf("expected compact session.update delta, got %q", bodyText) - } - if !sawCompletedSnapshot { - t.Fatalf("expected completed session.update result snapshot, got %q", bodyText) - } if !sawFinal { - t.Fatalf("expected final result envelope, got %q", bodyText) + t.Fatalf("expected running result envelope, got %q", bodyText) + } + result := taskGetHTTPTerminalResult(t, httpServer.Config.Handler, handle) + if got := result["resolvedGatewayProviderId"]; got != "openclaw" { + t.Fatalf("expected openclaw final result, got %#v", result) + } + if got := result["status"]; got != "completed" { + t.Fatalf("expected completed task result, got %#v", result) + } + if !strings.Contains(fmt.Sprint(result), openClawArtifactDownloadPath) { + t.Fatalf("expected normalized artifact download URL in task result, got %#v", result) } if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { t.Fatalf("expected artifact workflow methods to prepare before chat.send, got %#v", got) @@ -680,15 +768,27 @@ func TestHTTPHandlerGatewayOpenClawForcesGatewayRouting(t *testing.T) { if !strings.Contains(recorder.Body.String(), `"resolvedGatewayProviderId":"openclaw"`) { t.Fatalf("expected forced OpenClaw gateway result, got %q", recorder.Body.String()) } + var decoded map[string]any + if err := json.Unmarshal(recorder.Body.Bytes(), &decoded); err != nil { + t.Fatalf("decode start response: %v", err) + } + handle := shared.AsMap(decoded["result"]) + if got := handle["status"]; got != "running" { + t.Fatalf("expected running task handle, got %#v", handle) + } if gateway.ChatSendCount() != 1 { t.Fatalf("expected one OpenClaw chat.send, got %d", gateway.ChatSendCount()) } + result := taskGetHTTPTerminalResult(t, handler, handle) + if got := result["status"]; got != "completed" { + t.Fatalf("expected completed task result, got %#v", result) + } if gateway.AgentWaitCount() != 1 { t.Fatalf("expected one OpenClaw agent.wait, got %d", gateway.AgentWaitCount()) } } -func TestHTTPHandlerSessionGetReturnsCompletedOpenClawResult(t *testing.T) { +func TestHTTPHandlerTasksGetReturnsCompletedOpenClawResult(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -710,38 +810,26 @@ func TestHTTPHandlerSessionGetReturnsCompletedOpenClawResult(t *testing.T) { if startRecorder.Code != http.StatusOK { t.Fatalf("expected start 200, got %d: %s", startRecorder.Code, startRecorder.Body.String()) } - - getRecorder := httptest.NewRecorder() - getRequest := httptest.NewRequest( - http.MethodPost, - "http://127.0.0.1/acp/rpc", - strings.NewReader(`{"jsonrpc":"2.0","id":"get-1","method":"xworkmate.sessions.get","params":{"sessionId":"s1","threadId":"t1"}}`), - ) - getRequest.Header.Set("Content-Type", "application/json") - getRequest.Header.Set("Authorization", "Bearer bridge-test-token") - handler.ServeHTTP(getRecorder, getRequest) - - if getRecorder.Code != http.StatusOK { - t.Fatalf("expected get 200, got %d: %s", getRecorder.Code, getRecorder.Body.String()) - } var decoded map[string]any - if err := json.Unmarshal(getRecorder.Body.Bytes(), &decoded); err != nil { - t.Fatalf("decode session get response: %v", err) + if err := json.Unmarshal(startRecorder.Body.Bytes(), &decoded); err != nil { + t.Fatalf("decode start response: %v", err) } - result := shared.AsMap(decoded["result"]) + handle := shared.AsMap(decoded["result"]) + if got := handle["status"]; got != "running" { + t.Fatalf("expected running task handle, got %#v", handle) + } + result := taskGetHTTPTerminalResult(t, handler, handle) if got := result["status"]; got != "completed" { t.Fatalf("expected completed status, got %#v from %#v", got, result) } - task := shared.AsMap(result["task"]) - if got := task["turnId"]; got == "" { - t.Fatalf("expected retained task turn id, got %#v", task) + if got := result["turnId"]; got == "" { + t.Fatalf("expected retained task turn id, got %#v", result) } - snapshot := shared.AsMap(result["result"]) - if got := snapshot["resolvedGatewayProviderId"]; got != "openclaw" { - t.Fatalf("expected OpenClaw snapshot, got %#v", snapshot) + if got := result["resolvedGatewayProviderId"]; got != "openclaw" { + t.Fatalf("expected OpenClaw snapshot, got %#v", result) } - if got := snapshot["output"]; got == "" { - t.Fatalf("expected output in session snapshot, got %#v", snapshot) + if got := result["output"]; got == "" { + t.Fatalf("expected output in task result, got %#v", result) } }