Implement async OpenClaw task control plane

This commit is contained in:
Haitao Pan 2026-06-02 11:58:40 +08:00
parent d2d32f554d
commit 31220154cc
6 changed files with 999 additions and 231 deletions

View File

@ -0,0 +1,518 @@
package acp
import (
"context"
"strings"
"time"
"xworkmate-bridge/internal/router"
"xworkmate-bridge/internal/shared"
)
const (
openClawTaskProbeTimeout = 2 * time.Second
openClawTaskProbeTimeoutMs = 1000
openClawTaskMonitorInterval = time.Second
openClawShortTaskMinutes = 10
openClawLongTaskMinutes = 30
openClawComplexTaskMinutes = 60
)
type OpenClawTaskRecord struct {
SessionID string
ThreadID string
TurnID string
RunID string
SessionKey string
GatewayProviderID string
TaskLoadClass string
ArtifactSinceUnixMs int64
RuntimeBudgetMinutes int
StartedAt time.Time
DeadlineAt time.Time
LastProbeAt time.Time
ProgressStage string
ProgressMessage string
ProgressTerminal bool
ChatParams map[string]any
PreparedArtifact *openClawPreparedArtifactScope
ArtifactContract openClawArtifactContract
ResolvedModel string
ResolvedSkills []string
MonitorStarted bool
ProbeInFlight bool
AdmissionRelease func()
}
func openClawTaskRuntimePolicy(params map[string]any, chatParams map[string]any, contract openClawArtifactContract) (string, int) {
message := strings.TrimSpace(shared.StringArg(chatParams, "message", ""))
if message == "" {
message = openClawCurrentTurnMessage(params)
}
lower := strings.ToLower(message)
metadataClass := strings.TrimSpace(shared.StringArg(shared.AsMap(params["metadata"]), "taskLoadClass", ""))
if metadataClass == "complex_long_chain_task" || contract.ComplexLongChain || openClawMessageContainsAny(lower, []string{
"复杂链路", "多章节", "每章", "拆章节", "汇总排版", "gpt images", "images2", "image generation", "视频", "渲染", "hyperframes", "remotion", "ffmpeg",
}) {
return "complex_chain_task", openClawComplexTaskMinutes
}
if len(contract.RequiredFinalExtensions) > 0 || openClawMessageContainsAny(lower, []string{
"生成文件", "同步生成文件", "产物", "附件", "pdf", "docx", "ppt", "pptx", "markdown", ".md", "png", "jpg", "jpeg", "mp4",
}) || len(shared.ListArg(params, "attachments"))+len(shared.ListArg(params, "inlineAttachments")) >= 2 {
return "long_task", openClawLongTaskMinutes
}
return "short_task", openClawShortTaskMinutes
}
func openClawRunningTaskResult(record *OpenClawTaskRecord) map[string]any {
if record == nil {
return map[string]any{"success": true, "status": "running", "mode": router.ExecutionTargetGatewayChat}
}
result := map[string]any{
"success": true,
"status": string(TaskStateRunning),
"turnId": record.TurnID,
"runId": record.RunID,
"sessionId": record.SessionID,
"threadId": record.ThreadID,
"sessionKey": record.SessionKey,
"mode": router.ExecutionTargetGatewayChat,
"resolvedGatewayProviderId": record.GatewayProviderID,
"taskLoadClass": record.TaskLoadClass,
"runtimeBudgetMinutes": record.RuntimeBudgetMinutes,
"startedAt": record.StartedAt.UTC().Format(time.RFC3339Nano),
"deadlineAt": record.DeadlineAt.UTC().Format(time.RFC3339Nano),
"progress": openClawTaskProgress(record),
}
if record.PreparedArtifact != nil {
applyOpenClawPreparedArtifactToResult(result, record.PreparedArtifact)
}
if len(record.ArtifactContract.RequiredFinalExtensions) > 0 {
result["requiredArtifactExtensions"] = append([]string(nil), record.ArtifactContract.RequiredFinalExtensions...)
}
if len(record.ArtifactContract.ExpectedArtifactExtensions) > 0 {
result["expectedArtifactExtensions"] = append([]string(nil), record.ArtifactContract.ExpectedArtifactExtensions...)
}
return result
}
func openClawTaskProgress(record *OpenClawTaskRecord) map[string]any {
now := time.Now()
stage := strings.TrimSpace(record.ProgressStage)
if stage == "" {
stage = "running"
}
message := strings.TrimSpace(record.ProgressMessage)
if message == "" {
message = "OpenClaw task is running"
}
return map[string]any{
"stage": stage,
"message": message,
"elapsedMs": maxInt64(0, now.Sub(record.StartedAt).Milliseconds()),
"budgetMs": (time.Duration(record.RuntimeBudgetMinutes) * time.Minute).Milliseconds(),
"lastProbeAtMs": record.LastProbeAt.UnixMilli(),
"terminal": record.ProgressTerminal,
}
}
func maxInt64(a int64, b int64) int64 {
if a > b {
return a
}
return b
}
func (o *SessionOrchestrator) startOpenClawTaskMonitor(sess *session) {
if sess == nil {
return
}
sess.mu.Lock()
record := sess.openClaw
if record == nil || record.MonitorStarted || isTerminalTaskState(sess.task.State) {
sess.mu.Unlock()
return
}
record.MonitorStarted = true
sess.mu.Unlock()
go func() {
defer o.releaseOpenClawAdmission(sess)
time.Sleep(openClawTaskMonitorInterval)
for {
sess.mu.Lock()
state := sess.task.State
deadline := sess.task.DeadlineAt
sess.mu.Unlock()
if isTerminalTaskState(state) {
return
}
if !deadline.IsZero() && time.Now().After(deadline) {
o.failOpenClawTask(sess, "TASK_SLA_EXPIRED", "OpenClaw task exceeded its runtime SLA")
return
}
o.probeOpenClawTask(context.Background(), sess, nil)
sess.mu.Lock()
state = sess.task.State
sess.mu.Unlock()
if isTerminalTaskState(state) {
return
}
time.Sleep(openClawTaskMonitorInterval)
}
}()
}
func isTerminalTaskState(state TaskState) bool {
return state == TaskStateCompleted || state == TaskStateFailed || state == TaskStateCancelled
}
func (o *SessionOrchestrator) releaseOpenClawAdmission(sess *session) {
if sess == nil {
return
}
var release func()
sess.mu.Lock()
if sess.openClaw != nil {
release = sess.openClaw.AdmissionRelease
sess.openClaw.AdmissionRelease = nil
}
sess.mu.Unlock()
if release != nil {
release()
}
}
func (o *SessionOrchestrator) failOpenClawTask(sess *session, code string, message string) map[string]any {
if sess == nil {
return map[string]any{"success": false, "status": string(TaskStateFailed), "code": code, "message": message}
}
if strings.TrimSpace(message) == "" {
message = code
}
sess.mu.Lock()
turnID := sess.task.TurnID
runID := sess.task.RunID
gatewayProviderID := sess.task.GatewayProviderID
sess.task.State = TaskStateFailed
sess.task.UpdatedAt = time.Now()
sess.task.ProgressStage = "failed"
sess.task.ProgressMessage = message
sess.task.ProgressTerminal = true
if sess.openClaw != nil {
sess.openClaw.ProgressStage = "failed"
sess.openClaw.ProgressMessage = message
sess.openClaw.ProgressTerminal = true
sess.openClaw.ProbeInFlight = false
}
result := map[string]any{
"success": false,
"status": string(TaskStateFailed),
"code": code,
"error": message,
"message": message,
"summary": message,
"output": message,
"turnId": turnID,
"runId": runID,
"mode": router.ExecutionTargetGatewayChat,
"resolvedGatewayProviderId": gatewayProviderID,
}
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
o.releaseOpenClawAdmission(sess)
return result
}
func (o *SessionOrchestrator) probeOpenClawTask(ctx context.Context, sess *session, notify func(map[string]any)) map[string]any {
if sess == nil {
return map[string]any{"status": "not_found"}
}
sess.mu.Lock()
record := sess.openClaw
if record == nil {
snapshot := openClawSessionSnapshotLocked(sess)
sess.mu.Unlock()
return snapshot
}
if isTerminalTaskState(sess.task.State) {
snapshot := openClawSessionSnapshotLocked(sess)
sess.mu.Unlock()
return snapshot
}
if !record.DeadlineAt.IsZero() && time.Now().After(record.DeadlineAt) {
sess.mu.Unlock()
return o.failOpenClawTask(sess, "TASK_SLA_EXPIRED", "OpenClaw task exceeded its runtime SLA")
}
if record.ProbeInFlight {
result := openClawRunningTaskResult(record)
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
return result
}
record.ProbeInFlight = true
record.LastProbeAt = time.Now()
record.ProgressStage = "probing"
record.ProgressMessage = "Checking OpenClaw task status"
sess.task.LastProbeAt = record.LastProbeAt
sess.task.ProgressStage = record.ProgressStage
sess.task.ProgressMessage = record.ProgressMessage
gatewayProvider := record.GatewayProviderID
runID := record.RunID
sessionID := record.SessionID
threadID := record.ThreadID
turnID := record.TurnID
sess.mu.Unlock()
collector := newOpenClawChatCollector()
notifyWithCollection := func(message map[string]any) {
collector.observe(message)
if notify == nil {
return
}
if update := openClawGatewaySessionUpdate(message, sessionID, threadID, turnID); update != nil {
notify(update)
}
}
waitStarted := time.Now()
waitResult := o.openClawGatewayRequestWithRetry(
gatewayProvider,
"agent.wait",
map[string]any{
"runId": runID,
"timeoutMs": openClawTaskProbeTimeoutMs,
},
openClawTaskProbeTimeout,
notifyWithCollection,
)
logOpenClawGatewayTiming(
gatewayProvider,
"agent.wait.probe",
sessionID,
runID,
time.Since(waitStarted),
waitResult.OK,
)
if !waitResult.OK {
if openClawProbeStillRunning(waitResult.Error) {
sess.mu.Lock()
if sess.openClaw != nil {
sess.openClaw.ProgressStage = "running"
sess.openClaw.ProgressMessage = "OpenClaw task is still running"
sess.openClaw.ProbeInFlight = false
}
sess.task.ProgressStage = "running"
sess.task.ProgressMessage = "OpenClaw task is still running"
sess.task.UpdatedAt = time.Now()
result := openClawRunningTaskResult(sess.openClaw)
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
return result
}
code := strings.TrimSpace(shared.StringArg(waitResult.Error, "code", "OPENCLAW_WAIT_FAILED"))
message := strings.TrimSpace(shared.StringArg(waitResult.Error, "message", "openclaw wait failed"))
return o.failOpenClawTask(sess, code, message)
}
return o.completeOpenClawTask(sess, shared.AsMap(waitResult.Payload), collector, notify)
}
func openClawProbeStillRunning(errorPayload map[string]any) bool {
code := strings.TrimSpace(strings.ToUpper(shared.StringArg(errorPayload, "code", "")))
if code == "TIMEOUT" || code == "RPC_TIMEOUT" || code == "REQUEST_TIMEOUT" ||
code == "OFFLINE" || code == "SOCKET_FAILURE" || code == "SOCKET_CLOSED" {
return true
}
message := strings.TrimSpace(strings.ToLower(shared.StringArg(errorPayload, "message", "")))
return strings.Contains(message, "timeout") || strings.Contains(message, "timed out")
}
func (o *SessionOrchestrator) completeOpenClawTask(
sess *session,
waitPayload map[string]any,
collector *openClawChatCollector,
notify func(map[string]any),
) map[string]any {
if sess == nil {
return map[string]any{"status": "not_found"}
}
sess.mu.Lock()
record := sess.openClaw
if record == nil {
snapshot := openClawSessionSnapshotLocked(sess)
sess.mu.Unlock()
return snapshot
}
if isTerminalTaskState(sess.task.State) {
record.ProbeInFlight = false
snapshot := openClawSessionSnapshotLocked(sess)
sess.mu.Unlock()
return snapshot
}
sess.mu.Unlock()
output := ""
if collector != nil {
output = collector.output()
}
if output == "" {
output = firstNonEmptyString(waitPayload, "output", "message", "summary", "assistantText", "text")
}
noDisplayableOutput := strings.TrimSpace(output) == ""
if output == "" {
output = openClawNoDisplayableText
}
result := map[string]any{
"success": true,
"status": string(TaskStateCompleted),
"output": output,
"message": output,
"summary": output,
"turnId": record.TurnID,
"runId": record.RunID,
"sessionId": record.SessionID,
"threadId": record.ThreadID,
"sessionKey": record.SessionKey,
"mode": router.ExecutionTargetGatewayChat,
"resolvedExecutionTarget": router.ExecutionTargetGatewayChat,
"resolvedProviderId": record.GatewayProviderID,
"resolvedGatewayProviderId": record.GatewayProviderID,
"resolvedModel": record.ResolvedModel,
"resolvedSkills": append([]string(nil), record.ResolvedSkills...),
"taskLoadClass": record.TaskLoadClass,
"runtimeBudgetMinutes": record.RuntimeBudgetMinutes,
}
mergeOpenClawArtifactPayload(result, waitPayload)
if collector != nil {
mergeOpenClawArtifactPayload(result, collector.artifactPayload())
}
applyOpenClawPreparedArtifactToResult(result, record.PreparedArtifact)
artifactPayload := o.openClawArtifactExport(
record.GatewayProviderID,
record.ChatParams,
record.RunID,
record.ArtifactSinceUnixMs,
record.PreparedArtifact,
notify,
)
mergeOpenClawArtifactPayload(result, artifactPayload)
result[openClawArtifactExportAttemptedField] = true
exportedCount := openClawArtifactPayloadCount(result)
logOpenClawArtifactSync(record.GatewayProviderID, record.SessionKey, record.RunID, "export", record.PreparedArtifact != nil, exportedCount > 0, exportedCount == 0)
o.server.decorateOpenClawArtifactDownloadURLs(result, record.SessionKey, record.RunID)
stripOpenClawArtifactInlineContent(result)
applyOpenClawArtifactContractResult(result, record.ArtifactContract)
guardOpenClawAgentFailedBeforeReplyResult(result)
guardOpenClawNoDisplayableResult(result, noDisplayableOutput)
delete(result, openClawArtifactExportAttemptedField)
success := parseBool(result["success"])
state := TaskStateCompleted
stage := "completed"
if !success {
state = TaskStateFailed
stage = "failed"
}
artifactRecord := buildArtifactRecord(sess, result, output)
if len(artifactRecord.Artifacts) > 0 {
result["artifacts"] = artifactRecord.Artifacts
}
if artifactRecord.RemoteWorkingDirectory != "" {
result["remoteWorkingDirectory"] = artifactRecord.RemoteWorkingDirectory
}
if artifactRecord.RemoteWorkspaceRefKind != "" {
result["remoteWorkspaceRefKind"] = artifactRecord.RemoteWorkspaceRefKind
}
if artifactRecord.ResultSummary != "" && strings.TrimSpace(shared.StringArg(result, "resultSummary", "")) == "" {
result["resultSummary"] = artifactRecord.ResultSummary
}
sess.mu.Lock()
sess.task.State = state
sess.task.UpdatedAt = time.Now()
sess.task.ProgressStage = stage
sess.task.ProgressMessage = output
sess.task.ProgressTerminal = true
if sess.openClaw != nil {
sess.openClaw.ProgressStage = stage
sess.openClaw.ProgressMessage = output
sess.openClaw.ProgressTerminal = true
sess.openClaw.ProbeInFlight = false
}
if output != "" {
sess.history = append(sess.history, "ASSISTANT: "+output)
}
sess.artifacts = artifactRecord
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
o.releaseOpenClawAdmission(sess)
if notify != nil {
notify(shared.NotificationEnvelope("session.update", openClawGatewayCompletedResultUpdate(record.SessionID, record.ThreadID, record.TurnID, result)))
}
return result
}
func openClawSessionSnapshotLocked(sess *session) map[string]any {
payload := map[string]any{
"status": string(sess.task.State),
"sessionId": sess.sessionID,
"threadId": sess.threadID,
"task": openClawTaskMapLocked(sess),
}
if len(sess.lastResult) > 0 {
payload["result"] = cloneMap(sess.lastResult)
}
if len(sess.artifacts.Artifacts) > 0 ||
sess.artifacts.RemoteWorkingDirectory != "" ||
sess.artifacts.RemoteWorkspaceRefKind != "" ||
sess.artifacts.ResultSummary != "" {
payload["artifacts"] = map[string]any{
"items": cloneMapSlice(sess.artifacts.Artifacts),
"remoteWorkingDirectory": sess.artifacts.RemoteWorkingDirectory,
"remoteWorkspaceRefKind": sess.artifacts.RemoteWorkspaceRefKind,
"resultSummary": sess.artifacts.ResultSummary,
"updatedAt": sess.artifacts.UpdatedAt.UTC().Format(time.RFC3339Nano),
}
}
if sess.openClaw != nil {
payload["progress"] = openClawTaskProgress(sess.openClaw)
}
return payload
}
func openClawTaskMapLocked(sess *session) map[string]any {
task := sess.task
payload := map[string]any{
"sessionId": task.SessionID,
"threadId": task.ThreadID,
"turnId": task.TurnID,
"runId": task.RunID,
"sessionKey": task.SessionKey,
"provider": task.Provider,
"target": task.Target,
"gatewayProviderId": task.GatewayProviderID,
"state": string(task.State),
"kind": string(task.Kind),
"taskLoadClass": task.TaskLoadClass,
"artifactScope": task.ArtifactScope,
"artifactDirectory": task.ArtifactDirectory,
"runtimeBudgetMinutes": task.RuntimeBudgetMinutes,
"updatedAt": task.UpdatedAt.UTC().Format(time.RFC3339Nano),
}
if !task.StartedAt.IsZero() {
payload["startedAt"] = task.StartedAt.UTC().Format(time.RFC3339Nano)
}
if !task.DeadlineAt.IsZero() {
payload["deadlineAt"] = task.DeadlineAt.UTC().Format(time.RFC3339Nano)
}
if !task.LastProbeAt.IsZero() {
payload["lastProbeAt"] = task.LastProbeAt.UTC().Format(time.RFC3339Nano)
}
if task.ProgressStage != "" || task.ProgressMessage != "" {
payload["progress"] = map[string]any{
"stage": task.ProgressStage,
"message": task.ProgressMessage,
"terminal": task.ProgressTerminal,
}
}
return payload
}

View File

@ -262,11 +262,16 @@ func (o *SessionOrchestrator) runGateway(
if rpcErr != nil { if rpcErr != nil {
return nil, rpcErr return nil, rpcErr
} }
defer release()
if rpcErr := ensureProductionGatewayConnected(o.server, gatewayProvider, notify); rpcErr != nil { if rpcErr := ensureProductionGatewayConnected(o.server, gatewayProvider, notify); rpcErr != nil {
release()
return nil, rpcErr return nil, rpcErr
} }
return o.runOpenClawGatewayChat(ctx, params, gatewayProvider, turnID, notify) result, rpcErr := o.startOpenClawGatewayTask(ctx, params, routing, gatewayProvider, turnID, release, notify)
if rpcErr != nil {
release()
return nil, rpcErr
}
return result, nil
} }
if rpcErr := ensureProductionGatewayConnected(o.server, gatewayProvider, notify); rpcErr != nil { if rpcErr := ensureProductionGatewayConnected(o.server, gatewayProvider, notify); rpcErr != nil {
return nil, rpcErr return nil, rpcErr
@ -299,11 +304,13 @@ func (o *SessionOrchestrator) runGateway(
return payload, nil return payload, nil
} }
func (o *SessionOrchestrator) runOpenClawGatewayChat( func (o *SessionOrchestrator) startOpenClawGatewayTask(
_ context.Context, _ context.Context,
params map[string]any, params map[string]any,
routing RoutingResult,
gatewayProvider string, gatewayProvider string,
turnID string, turnID string,
releaseAdmission func(),
notify func(map[string]any), notify func(map[string]any),
) (map[string]any, *shared.RPCError) { ) (map[string]any, *shared.RPCError) {
collector := newOpenClawChatCollector() collector := newOpenClawChatCollector()
@ -374,72 +381,65 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
logOpenClawArtifactSync(gatewayProvider, sessionKey, runID, "prepare", true, false, false) logOpenClawArtifactSync(gatewayProvider, sessionKey, runID, "prepare", true, false, false)
applyOpenClawPreparedArtifactToChatParams(chatParams, preparedArtifact, sessionKey, runID, artifactContract) applyOpenClawPreparedArtifactToChatParams(chatParams, preparedArtifact, sessionKey, runID, artifactContract)
} }
waitTimeout := openClawAgentWaitTimeout(params, chatParams) taskLoadClass, runtimeBudgetMinutes := openClawTaskRuntimePolicy(params, chatParams, artifactContract)
waitStarted := time.Now() startedAt := time.Now()
waitResult := o.openClawGatewayRequestWithRetry( record := &OpenClawTaskRecord{
gatewayProvider, SessionID: sessionID,
"agent.wait", ThreadID: threadID,
map[string]any{ TurnID: turnID,
"runId": runID, RunID: runID,
"timeoutMs": waitTimeout.Milliseconds(), SessionKey: sessionKey,
}, GatewayProviderID: gatewayProvider,
waitTimeout, TaskLoadClass: taskLoadClass,
notifyWithCollection, ArtifactSinceUnixMs: artifactSinceUnixMs,
) RuntimeBudgetMinutes: runtimeBudgetMinutes,
logOpenClawGatewayTiming( StartedAt: startedAt,
gatewayProvider, DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute),
"agent.wait", ProgressStage: "running",
sessionKey, ProgressMessage: "OpenClaw task accepted",
runID, ChatParams: cloneMap(chatParams),
time.Since(waitStarted), PreparedArtifact: preparedArtifact,
waitResult.OK, ArtifactContract: artifactContract,
) ResolvedModel: routing.Model,
if !waitResult.OK { ResolvedSkills: append([]string(nil), routing.Skills...),
return nil, gatewayRPCError(waitResult.Error, "openclaw agent.wait failed") AdmissionRelease: releaseAdmission,
} }
waitPayload := shared.AsMap(waitResult.Payload) sess := o.server.getOrCreateSession(sessionID, threadID)
output := collector.output() sess.mu.Lock()
if output == "" { sess.task.RunID = runID
output = firstNonEmptyString(waitPayload, "output", "message", "summary", "assistantText", "text") sess.task.SessionKey = sessionKey
} sess.task.GatewayProviderID = gatewayProvider
noDisplayableOutput := strings.TrimSpace(output) == "" sess.task.TaskLoadClass = taskLoadClass
if output == "" { sess.task.ArtifactScope = strings.TrimSpace(preparedArtifact.ArtifactScope)
output = openClawNoDisplayableText sess.task.ArtifactDirectory = strings.TrimSpace(preparedArtifact.ArtifactDirectory)
} sess.task.RuntimeBudgetMinutes = runtimeBudgetMinutes
result := map[string]any{ sess.task.StartedAt = startedAt
"success": true, sess.task.DeadlineAt = record.DeadlineAt
"output": output, sess.task.ProgressStage = "running"
"message": output, sess.task.ProgressMessage = "OpenClaw task accepted"
"summary": output, sess.openClaw = record
"turnId": turnID, running := openClawRunningTaskResult(record)
"runId": runID, sess.lastResult = cloneMap(running)
"mode": router.ExecutionTargetGatewayChat, sess.mu.Unlock()
"resolvedGatewayProviderId": gatewayProvider, o.startOpenClawTaskMonitor(sess)
}
mergeOpenClawArtifactPayload(result, waitPayload)
mergeOpenClawArtifactPayload(result, collector.artifactPayload())
applyOpenClawPreparedArtifactToResult(result, preparedArtifact)
artifactPayload := o.openClawArtifactExport(
gatewayProvider,
chatParams,
runID,
artifactSinceUnixMs,
preparedArtifact,
notifyWithCollection,
)
mergeOpenClawArtifactPayload(result, artifactPayload)
result[openClawArtifactExportAttemptedField] = true
exportedCount := openClawArtifactPayloadCount(result)
logOpenClawArtifactSync(gatewayProvider, sessionKey, runID, "export", preparedArtifact != nil, exportedCount > 0, exportedCount == 0)
o.server.decorateOpenClawArtifactDownloadURLs(result, shared.StringArg(chatParams, "sessionKey", ""), runID)
stripOpenClawArtifactInlineContent(result)
applyOpenClawArtifactContractResult(result, artifactContract)
guardOpenClawAgentFailedBeforeReplyResult(result)
guardOpenClawNoDisplayableResult(result, noDisplayableOutput)
if notify != nil { if notify != nil {
notify(shared.NotificationEnvelope("session.update", openClawGatewayCompletedResultUpdate(sessionID, threadID, turnID, result))) notify(shared.NotificationEnvelope("session.update", map[string]any{
"sessionId": sessionID,
"threadId": threadID,
"turnId": turnID,
"runId": runID,
"type": "status",
"event": "running",
"message": "OpenClaw task accepted",
"pending": true,
"error": false,
"status": string(TaskStateRunning),
"runtimeBudgetMinutes": runtimeBudgetMinutes,
"progress": running["progress"],
}))
} }
return result, nil _ = collector
return running, nil
} }
func openClawGatewayCompletedResultUpdate(sessionID string, threadID string, turnID string, result map[string]any) map[string]any { func openClawGatewayCompletedResultUpdate(sessionID string, threadID string, turnID string, result map[string]any) map[string]any {
@ -1773,6 +1773,21 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a
if result == nil { if result == nil {
result = map[string]any{} result = map[string]any{}
} }
if routing.TargetID == "gateway" && strings.TrimSpace(shared.StringArg(result, "status", "")) == string(TaskStateRunning) {
result["turnId"] = turnID
result["success"] = true
result["resolvedExecutionTarget"] = routing.TargetID
result["resolvedProviderId"] = routing.ProviderID
result["resolvedGatewayProviderId"] = routing.GatewayProviderID
result["resolvedModel"] = routing.Model
result["resolvedSkills"] = append([]string(nil), routing.Skills...)
sess.mu.Lock()
sess.task.State = TaskStateRunning
sess.task.UpdatedAt = time.Now()
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
return result
}
if openClawArtifactResponse(result, routing, params) { if openClawArtifactResponse(result, routing, params) {
o.completeOpenClawScopedArtifactExport(result, params, openClawGatewayProviderForArtifacts(result, routing, params), turnID) o.completeOpenClawScopedArtifactExport(result, params, openClawGatewayProviderForArtifacts(result, routing, params), turnID)
} }

View File

@ -55,7 +55,27 @@ func (s *Server) executeSessionTask(t task) (map[string]any, *shared.RPCError) {
if t.req.Method == "" { if t.req.Method == "" {
t.req.Method = "session.start" t.req.Method = "session.start"
} }
return s.handleRequest(t.req, t.notify) response, rpcErr := s.handleRequest(t.req, t.notify)
if rpcErr != nil || strings.TrimSpace(shared.StringArg(response, "status", "")) != string(TaskStateRunning) {
return response, rpcErr
}
return s.handleRequest(shared.RPCRequest{
Method: "xworkmate.tasks.get",
Params: map[string]any{
"sessionId": shared.StringArg(response, "sessionId", ""),
"threadId": shared.StringArg(response, "threadId", ""),
"turnId": shared.StringArg(response, "turnId", ""),
"runId": shared.StringArg(response, "runId", ""),
"sessionKey": shared.StringArg(response, "sessionKey", ""),
"artifactScope": shared.StringArg(response, "artifactScope", ""),
"artifactDirectory": shared.StringArg(response, "artifactDirectory", ""),
"gatewayProviderId": shared.StringArg(response, "resolvedGatewayProviderId", ""),
"runtimeBudgetMinutes": shared.StringArg(response, "runtimeBudgetMinutes", ""),
"taskLoadClass": shared.StringArg(response, "taskLoadClass", ""),
"expectedArtifactExtensions": shared.ListArg(response, "expectedArtifactExtensions"),
"requiredArtifactExtensions": shared.ListArg(response, "requiredArtifactExtensions"),
},
}, t.notify)
} }
func newExternalSingleAgentProvider( func newExternalSingleAgentProvider(
@ -532,11 +552,8 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) {
if !ok { if !ok {
t.Fatalf("expected numeric OpenClaw agent.wait timeoutMs, got %#v", waitParams) t.Fatalf("expected numeric OpenClaw agent.wait timeoutMs, got %#v", waitParams)
} }
if got := int64(timeoutMs); got != openClawAgentWaitDefaultTimeout.Milliseconds() { if got := int64(timeoutMs); got != openClawTaskProbeTimeoutMs {
t.Fatalf("expected default OpenClaw agent.wait timeoutMs %d, got %#v", openClawAgentWaitDefaultTimeout.Milliseconds(), waitParams) t.Fatalf("expected OpenClaw probe timeoutMs %d, got %#v", openClawTaskProbeTimeoutMs, waitParams)
}
if got := int64(timeoutMs); got <= 120000 {
t.Fatalf("expected OpenClaw agent.wait timeout to exceed the previous 120s cap, got %#v", waitParams)
} }
if gateway.ArtifactExportCount() != 1 { if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export sync after run, got %d", gateway.ArtifactExportCount()) t.Fatalf("expected one OpenClaw artifact export sync after run, got %d", gateway.ArtifactExportCount())
@ -865,17 +882,20 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi
}, },
}, },
}) })
if rpcErr == nil { if rpcErr != nil {
t.Fatalf("expected wait-timeout rpc error, got response: %#v", response) t.Fatalf("expected wait-timeout probe to keep task running, got rpc error: %#v", rpcErr)
} }
if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "openclaw wait timeout") { if got := response["status"]; got != string(TaskStateRunning) {
t.Fatalf("expected surfaced wait timeout, got %#v", rpcErr) t.Fatalf("expected wait-timeout probe to keep running status, got %#v", response)
} }
if got := gateway.ChatSendCount(); got != 1 { if got := gateway.ChatSendCount(); got != 1 {
t.Fatalf("expected no automatic repair model turn, got %d", got) t.Fatalf("expected no automatic repair model turn, got %d", got)
} }
if got := gateway.AgentWaitCount(); got != 1 { if got := gateway.AgentWaitCount(); got != 1 {
t.Fatalf("expected one failed wait, got %d", got) t.Fatalf("expected one status probe, got %d", got)
}
if got := gateway.ArtifactExportCount(); got != 0 {
t.Fatalf("expected no artifact export before terminal state, got %d", got)
} }
} }
@ -1551,19 +1571,25 @@ func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) {
}, },
}, },
}) })
if rpcErr == nil { if rpcErr != nil {
t.Fatalf("expected OpenClaw agent.wait error, got response: %#v", response) t.Fatalf("expected OpenClaw agent.wait error as task result, got rpc error: %#v", rpcErr)
} }
if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "openclaw wait failed") { if got := response["status"]; got != string(TaskStateFailed) {
t.Fatalf("expected surfaced agent.wait failure, got %#v", rpcErr) t.Fatalf("expected failed task result, got %#v", response)
}
if got := response["code"]; got != "OPENCLAW_WAIT_FAILED" {
t.Fatalf("expected OpenClaw wait failure code, got %#v", response)
}
if got := shared.StringArg(response, "message", ""); !strings.Contains(got, "openclaw wait failed") {
t.Fatalf("expected surfaced agent.wait failure, got %#v", response)
} }
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait"}) { if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait"}) {
t.Fatalf("expected connect, artifact prepare, chat.send, then agent.wait, got %#v", got) t.Fatalf("expected connect, artifact prepare, chat.send, then agent.wait, got %#v", got)
} }
snapshot := server.handleSessionGet(map[string]any{ snapshot := server.handleTaskGet(context.Background(), map[string]any{
"sessionId": "session-openclaw-wait-fail", "sessionId": "session-openclaw-wait-fail",
"threadId": "thread-openclaw-wait-fail", "threadId": "thread-openclaw-wait-fail",
}) }, nil)
if got := snapshot["status"]; got != string(TaskStateFailed) { if got := snapshot["status"]; got != string(TaskStateFailed) {
t.Fatalf("expected failed session snapshot, got %#v from %#v", got, snapshot) t.Fatalf("expected failed session snapshot, got %#v from %#v", got, snapshot)
} }

View File

@ -59,8 +59,11 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
case "xworkmate.jobs.submit", "xworkmate.jobs.get", "xworkmate.jobs.list", "xworkmate.jobs.stats": case "xworkmate.jobs.submit", "xworkmate.jobs.get", "xworkmate.jobs.list", "xworkmate.jobs.stats":
return s.handleJobMethod(ctx, method, request.Params, notify) return s.handleJobMethod(ctx, method, request.Params, notify)
case "xworkmate.sessions.get": case "xworkmate.tasks.get":
return s.handleSessionGet(request.Params), nil return s.handleTaskGet(ctx, request.Params, notify), nil
case "xworkmate.tasks.cancel":
return s.handleTaskCancel(ctx, request.Params, notify), nil
case "xworkmate.tools.invoke": case "xworkmate.tools.invoke":
return s.invokeOpenClawTool(ctx, request.Params) return s.invokeOpenClawTool(ctx, request.Params)
@ -73,63 +76,167 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
} }
} }
func (s *Server) handleSessionGet(params map[string]any) map[string]any { func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")) sess := s.findTaskSession(params)
threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")) if sess == nil {
if sessionID == "" && threadID == "" { sess = s.reassociateOpenClawTask(params)
}
if sess == nil {
return map[string]any{"status": "not_found"} return map[string]any{"status": "not_found"}
} }
s.mu.RLock() return s.orchestrator.probeOpenClawTask(ctx, sess, notify)
sess := s.sessions[sessionID] }
if sess == nil && threadID != "" {
for _, candidate := range s.sessions { func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
if candidate != nil && candidate.threadID == threadID { sess := s.findTaskSession(params)
sess = candidate
break
}
}
}
s.mu.RUnlock()
if sess == nil { if sess == nil {
return map[string]any{ sess = s.reassociateOpenClawTask(params)
"status": "not_found", }
"sessionId": sessionID, if sess == nil {
"threadId": threadID, return map[string]any{"accepted": false, "status": "not_found"}
}
} }
sess.mu.Lock() sess.mu.Lock()
defer sess.mu.Unlock() gatewayProvider := sess.task.GatewayProviderID
payload := map[string]any{ runID := sess.task.RunID
"status": string(sess.task.State), sess.task.State = TaskStateCancelled
"sessionId": sess.sessionID, sess.task.UpdatedAt = time.Now()
"threadId": sess.threadID, sess.task.ProgressStage = "cancelled"
"task": map[string]any{ sess.task.ProgressMessage = "OpenClaw task cancelled"
"sessionId": sess.task.SessionID, sess.task.ProgressTerminal = true
"threadId": sess.task.ThreadID, if sess.openClaw != nil {
"turnId": sess.task.TurnID, sess.openClaw.ProgressStage = "cancelled"
"provider": sess.task.Provider, sess.openClaw.ProgressMessage = "OpenClaw task cancelled"
"target": sess.task.Target, sess.openClaw.ProgressTerminal = true
"state": string(sess.task.State),
"kind": string(sess.task.Kind),
"updatedAt": sess.task.UpdatedAt.UTC().Format(time.RFC3339Nano),
},
} }
if len(sess.lastResult) > 0 { snapshot := openClawSessionSnapshotLocked(sess)
payload["result"] = cloneMap(sess.lastResult) sess.mu.Unlock()
s.orchestrator.releaseOpenClawAdmission(sess)
if strings.TrimSpace(gatewayProvider) != "" && strings.TrimSpace(runID) != "" && s.gateway != nil {
_ = s.gateway.RequestByMode(
gatewayProvider,
"agent.cancel",
map[string]any{"runId": runID},
5*time.Second,
notify,
)
} }
if len(sess.artifacts.Artifacts) > 0 || snapshot["accepted"] = true
sess.artifacts.RemoteWorkingDirectory != "" || return snapshot
sess.artifacts.RemoteWorkspaceRefKind != "" || }
sess.artifacts.ResultSummary != "" {
payload["artifacts"] = map[string]any{ func (s *Server) findTaskSession(params map[string]any) *session {
"items": cloneMapSlice(sess.artifacts.Artifacts), sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", ""))
"remoteWorkingDirectory": sess.artifacts.RemoteWorkingDirectory, threadID := strings.TrimSpace(shared.StringArg(params, "threadId", ""))
"remoteWorkspaceRefKind": sess.artifacts.RemoteWorkspaceRefKind, turnID := strings.TrimSpace(shared.StringArg(params, "turnId", ""))
"resultSummary": sess.artifacts.ResultSummary, runID := strings.TrimSpace(shared.StringArg(params, "runId", ""))
"updatedAt": sess.artifacts.UpdatedAt.UTC().Format(time.RFC3339Nano), artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
s.mu.RLock()
defer s.mu.RUnlock()
if sessionID != "" && s.sessions[sessionID] != nil {
return s.sessions[sessionID]
}
for _, candidate := range s.sessions {
if candidate == nil {
continue
}
candidate.mu.Lock()
matches := (threadID != "" && candidate.threadID == threadID) ||
(turnID != "" && candidate.task.TurnID == turnID) ||
(runID != "" && candidate.task.RunID == runID) ||
(artifactScope != "" && candidate.task.ArtifactScope == artifactScope)
candidate.mu.Unlock()
if matches {
return candidate
} }
} }
return payload return nil
}
func (s *Server) reassociateOpenClawTask(params map[string]any) *session {
runID := strings.TrimSpace(shared.StringArg(params, "runId", ""))
artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
if runID == "" || artifactScope == "" {
return nil
}
sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", ""))
threadID := strings.TrimSpace(shared.StringArg(params, "threadId", sessionID))
if sessionID == "" {
sessionID = threadID
}
if sessionID == "" {
sessionID = strings.TrimSpace(shared.StringArg(params, "sessionKey", ""))
}
if sessionID == "" {
sessionID = "openclaw:" + runID
}
if threadID == "" {
threadID = sessionID
}
turnID := strings.TrimSpace(shared.StringArg(params, "turnId", runID))
sessionKey := strings.TrimSpace(shared.StringArg(params, "sessionKey", threadID))
gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", "openclaw"))
budget := shared.IntArg(shared.StringArg(params, "runtimeBudgetMinutes", ""), openClawComplexTaskMinutes)
now := time.Now()
prepared := &openClawPreparedArtifactScope{
ArtifactScope: artifactScope,
ArtifactDirectory: strings.TrimSpace(shared.StringArg(params, "artifactDirectory", "")),
RelativeArtifactDirectory: artifactScope,
ScopeKind: "task",
RemoteWorkingDirectory: strings.TrimSpace(shared.StringArg(params, "remoteWorkingDirectory", "")),
RemoteWorkspaceRefKind: strings.TrimSpace(shared.StringArg(params, "remoteWorkspaceRefKind", "")),
}
contract := openClawArtifactContract{
TaskLoadClass: strings.TrimSpace(shared.StringArg(params, "taskLoadClass", "")),
ExpectedArtifactExtensions: normalizeOpenClawExtensionList(shared.ListArg(params, "expectedArtifactExtensions")),
RequiredFinalExtensions: normalizeOpenClawExtensionList(shared.ListArg(params, "requiredArtifactExtensions")),
}
sess := s.getOrCreateSession(sessionID, threadID)
sess.mu.Lock()
sess.provider = gatewayProvider
sess.target = "gateway"
sess.mode = "gateway"
sess.task = QueuedTask{
SessionID: sessionID,
ThreadID: threadID,
TurnID: turnID,
RunID: runID,
SessionKey: sessionKey,
Provider: gatewayProvider,
Target: "gateway",
GatewayProviderID: gatewayProvider,
State: TaskStateRunning,
Kind: TaskKindGateway,
TaskLoadClass: contract.TaskLoadClass,
ArtifactScope: artifactScope,
ArtifactDirectory: prepared.ArtifactDirectory,
RuntimeBudgetMinutes: budget,
StartedAt: now,
DeadlineAt: now.Add(time.Duration(budget) * time.Minute),
UpdatedAt: now,
ProgressStage: "reassociated",
ProgressMessage: "OpenClaw task reassociated from task handle",
}
sess.openClaw = &OpenClawTaskRecord{
SessionID: sessionID,
ThreadID: threadID,
TurnID: turnID,
RunID: runID,
SessionKey: sessionKey,
GatewayProviderID: gatewayProvider,
TaskLoadClass: contract.TaskLoadClass,
ArtifactSinceUnixMs: 0,
RuntimeBudgetMinutes: budget,
StartedAt: now,
DeadlineAt: now.Add(time.Duration(budget) * time.Minute),
ProgressStage: "reassociated",
ProgressMessage: "OpenClaw task reassociated from task handle",
ChatParams: map[string]any{"sessionKey": sessionKey},
PreparedArtifact: prepared,
ArtifactContract: contract,
}
sess.lastResult = openClawRunningTaskResult(sess.openClaw)
sess.mu.Unlock()
return sess
} }
func (s *Server) cancelSession(ctx context.Context, sessionID string) { func (s *Server) cancelSession(ctx context.Context, sessionID string) {

View File

@ -36,14 +36,27 @@ type ControlPlaneSession struct {
} }
type QueuedTask struct { type QueuedTask struct {
SessionID string SessionID string
ThreadID string ThreadID string
TurnID string TurnID string
Provider string RunID string
Target string SessionKey string
State TaskState Provider string
Kind TaskKind Target string
UpdatedAt time.Time GatewayProviderID string
State TaskState
Kind TaskKind
TaskLoadClass string
ArtifactScope string
ArtifactDirectory string
RuntimeBudgetMinutes int
StartedAt time.Time
UpdatedAt time.Time
DeadlineAt time.Time
LastProbeAt time.Time
ProgressStage string
ProgressMessage string
ProgressTerminal bool
} }
type ArtifactRecord struct { type ArtifactRecord struct {
@ -69,6 +82,7 @@ type session struct {
task QueuedTask task QueuedTask
artifacts ArtifactRecord artifacts ArtifactRecord
lastResult map[string]any lastResult map[string]any
openClaw *OpenClawTaskRecord
} }
type Server struct { type Server struct {

View File

@ -18,6 +18,104 @@ import (
"xworkmate-bridge/internal/shared" "xworkmate-bridge/internal/shared"
) )
func sseResultEnvelope(t *testing.T, body string, id string) map[string]any {
t.Helper()
for _, rawLine := range strings.Split(body, "\n") {
line := strings.TrimSpace(rawLine)
if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" {
continue
}
var envelope map[string]any
if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope); err != nil {
t.Fatalf("decode SSE envelope %q: %v", line, err)
}
if strings.TrimSpace(shared.StringArg(envelope, "id", "")) == id {
return envelope
}
}
t.Fatalf("missing SSE result envelope %q in body: %s", id, body)
return nil
}
func sseFirstResultEnvelope(t *testing.T, body string) map[string]any {
t.Helper()
for _, rawLine := range strings.Split(body, "\n") {
line := strings.TrimSpace(rawLine)
if !strings.HasPrefix(line, "data: ") || line == "data: [DONE]" {
continue
}
var envelope map[string]any
if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &envelope); err != nil {
t.Fatalf("decode SSE envelope %q: %v", line, err)
}
if _, ok := envelope["result"]; ok {
return envelope
}
}
t.Fatalf("missing SSE result envelope in body: %s", body)
return nil
}
func taskGetHTTPResult(t *testing.T, handler http.Handler, handle map[string]any) map[string]any {
t.Helper()
body := fmt.Sprintf(
`{"jsonrpc":"2.0","id":"task-get","method":"xworkmate.tasks.get","params":{"sessionId":%q,"threadId":%q,"turnId":%q,"runId":%q,"sessionKey":%q,"artifactScope":%q,"artifactDirectory":%q,"gatewayProviderId":%q,"runtimeBudgetMinutes":%q,"taskLoadClass":%q,"expectedArtifactExtensions":%s,"requiredArtifactExtensions":%s}}`,
shared.StringArg(handle, "sessionId", ""),
shared.StringArg(handle, "threadId", ""),
shared.StringArg(handle, "turnId", ""),
shared.StringArg(handle, "runId", ""),
shared.StringArg(handle, "sessionKey", ""),
shared.StringArg(handle, "artifactScope", ""),
shared.StringArg(handle, "artifactDirectory", ""),
shared.StringArg(handle, "resolvedGatewayProviderId", "openclaw"),
shared.StringArg(handle, "runtimeBudgetMinutes", ""),
shared.StringArg(handle, "taskLoadClass", ""),
jsonArrayString(t, shared.ListArg(handle, "expectedArtifactExtensions")),
jsonArrayString(t, shared.ListArg(handle, "requiredArtifactExtensions")),
)
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodPost, "http://127.0.0.1/acp/rpc", strings.NewReader(body))
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Authorization", "Bearer bridge-test-token")
handler.ServeHTTP(recorder, request)
if recorder.Code != http.StatusOK {
t.Fatalf("expected task get 200, got %d: %s", recorder.Code, recorder.Body.String())
}
var decoded map[string]any
if err := json.Unmarshal(recorder.Body.Bytes(), &decoded); err != nil {
t.Fatalf("decode task get response: %v", err)
}
return shared.AsMap(decoded["result"])
}
func taskGetHTTPTerminalResult(t *testing.T, handler http.Handler, handle map[string]any) map[string]any {
t.Helper()
deadline := time.Now().Add(5 * time.Second)
for {
result := taskGetHTTPResult(t, handler, handle)
switch result["status"] {
case "completed", "failed", "cancelled":
return result
}
if time.Now().After(deadline) {
t.Fatalf("task did not reach terminal state: %#v", result)
}
time.Sleep(25 * time.Millisecond)
}
}
func jsonArrayString(t *testing.T, values []any) string {
t.Helper()
if values == nil {
return "[]"
}
encoded, err := json.Marshal(values)
if err != nil {
t.Fatalf("encode array: %v", err)
}
return string(encoded)
}
func TestHTTPHandlerRootAndPingExposeRuntimeVersionInfo(t *testing.T) { func TestHTTPHandlerRootAndPingExposeRuntimeVersionInfo(t *testing.T) {
t.Setenv("BRIDGE_AUTH_TOKEN", "") t.Setenv("BRIDGE_AUTH_TOKEN", "")
SetRuntimeVersionInfo(RuntimeVersionInfo{ SetRuntimeVersionInfo(RuntimeVersionInfo{
@ -156,19 +254,13 @@ func TestHTTPHandlerRPCSSEWritesFinalEnvelopeAndDone(t *testing.T) {
} }
} }
func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *testing.T) { func TestHTTPHandlerGatewayOpenClawReturnsRunningEnvelopeAndDone(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t) gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close() defer gateway.Close()
gateway.agentWaitDelayMs.Store(50)
t.Setenv("GATEWAY_RPC_URL", gateway.URL()) t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-test-token") t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-test-token")
t.Setenv("BRIDGE_CONFIG_PATH", filepath.Join(t.TempDir(), "missing-config.yaml")) t.Setenv("BRIDGE_CONFIG_PATH", filepath.Join(t.TempDir(), "missing-config.yaml"))
previousInterval := httpSSEKeepaliveInterval
httpSSEKeepaliveInterval = 10 * time.Millisecond
t.Cleanup(func() {
httpSSEKeepaliveInterval = previousInterval
})
server := NewServer() server := NewServer()
httpServer := httptest.NewServer(server.Handler()) httpServer := httptest.NewServer(server.Handler())
@ -202,15 +294,15 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes
t.Fatalf("expected event-stream content type, got %q", contentType) t.Fatalf("expected event-stream content type, got %q", contentType)
} }
events := strings.Split(strings.TrimSpace(string(body)), "\n\n") bodyText := string(body)
if len(events) < 4 { events := strings.Split(strings.TrimSpace(bodyText), "\n\n")
t.Fatalf("expected accepted, keepalive, final envelope, and done events, got %q", string(body)) if len(events) < 3 {
t.Fatalf("expected accepted, running envelope, and done events, got %q", bodyText)
} }
if events[len(events)-1] != "data: [DONE]" { if events[len(events)-1] != "data: [DONE]" {
t.Fatalf("expected done event, got %q", events[len(events)-1]) t.Fatalf("expected done event, got %q", events[len(events)-1])
} }
var sawAcceptedBeforeKeepalive bool var sawAcceptedBeforeFinal bool
var sawKeepaliveBeforeFinal bool
var sawFinal bool var sawFinal bool
for _, event := range events[:len(events)-1] { for _, event := range events[:len(events)-1] {
if !strings.HasPrefix(event, "data: ") { if !strings.HasPrefix(event, "data: ") {
@ -220,27 +312,25 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes
if err := json.Unmarshal([]byte(strings.TrimPrefix(event, "data: ")), &envelope); err != nil { if err := json.Unmarshal([]byte(strings.TrimPrefix(event, "data: ")), &envelope); err != nil {
t.Fatalf("decode event %q: %v", event, err) t.Fatalf("decode event %q: %v", event, err)
} }
if envelope["method"] == "xworkmate.bridge.accepted" && !sawKeepaliveBeforeFinal && !sawFinal { if envelope["method"] == "xworkmate.bridge.accepted" && !sawFinal {
sawAcceptedBeforeKeepalive = true sawAcceptedBeforeFinal = true
}
if envelope["method"] == "xworkmate.bridge.keepalive" && !sawFinal {
sawKeepaliveBeforeFinal = true
} }
if envelope["id"] == "task-keepalive" { if envelope["id"] == "task-keepalive" {
sawFinal = true sawFinal = true
if _, ok := envelope["result"].(map[string]any); !ok { result := shared.AsMap(envelope["result"])
t.Fatalf("expected result envelope, got %#v", envelope) if got := result["status"]; got != "running" {
t.Fatalf("expected running task handle, got %#v", result)
}
if got := result["runtimeBudgetMinutes"]; got != float64(openClawShortTaskMinutes) {
t.Fatalf("expected short task budget in running handle, got %#v", result)
} }
} }
} }
if !sawAcceptedBeforeKeepalive { if !sawAcceptedBeforeFinal {
t.Fatalf("expected accepted event before keepalive/final envelope, got %q", string(body)) t.Fatalf("expected accepted event before final envelope, got %q", bodyText)
}
if !sawKeepaliveBeforeFinal {
t.Fatalf("expected keepalive event before final envelope, got %q", string(body))
} }
if !sawFinal { if !sawFinal {
t.Fatalf("expected final task envelope, got %q", string(body)) t.Fatalf("expected running task envelope, got %q", bodyText)
} }
} }
@ -311,7 +401,7 @@ func TestHTTPHandlerGatewayOpenClawAdmissionQueuesExcessConcurrentSSE(t *testing
close(results) close(results)
var sawQueued bool var sawQueued bool
var finalCount int var runningHandleCount int
for item := range results { for item := range results {
if item.err != nil { if item.err != nil {
t.Fatalf("concurrent request failed: %v", item.err) t.Fatalf("concurrent request failed: %v", item.err)
@ -319,15 +409,17 @@ func TestHTTPHandlerGatewayOpenClawAdmissionQueuesExcessConcurrentSSE(t *testing
if strings.Contains(item.body, `"event":"queued"`) { if strings.Contains(item.body, `"event":"queued"`) {
sawQueued = true sawQueued = true
} }
if strings.Contains(item.body, `"result"`) && strings.Contains(item.body, `data: [DONE]`) { envelope := sseFirstResultEnvelope(t, item.body)
finalCount += 1 result := shared.AsMap(envelope["result"])
if result["status"] == "running" && strings.TrimSpace(shared.StringArg(result, "runId", "")) != "" {
runningHandleCount += 1
} }
} }
if !sawQueued { if !sawQueued {
t.Fatalf("expected one queued session.update event") t.Fatalf("expected one queued session.update event")
} }
if finalCount != 2 { if runningHandleCount != 2 {
t.Fatalf("expected both requests to return final result, got %d", finalCount) t.Fatalf("expected both requests to return running handles, got %d", runningHandleCount)
} }
if got := gateway.ChatSendCount(); got != 2 { if got := gateway.ChatSendCount(); got != 2 {
t.Fatalf("expected queued request to run after a slot releases, got %d chat.send calls", got) t.Fatalf("expected queued request to run after a slot releases, got %d chat.send calls", got)
@ -412,7 +504,7 @@ func TestHTTPHandlerGatewayOpenClawHandlesFiveConcurrentE2ECases(t *testing.T) {
wg.Wait() wg.Wait()
close(results) close(results)
var finalCount int var runningHandleCount int
var missingFinalArtifactCount int var missingFinalArtifactCount int
for item := range results { for item := range results {
if item.err != nil { if item.err != nil {
@ -431,15 +523,19 @@ func TestHTTPHandlerGatewayOpenClawHandlesFiveConcurrentE2ECases(t *testing.T) {
t.Fatalf("unexpected gateway stability error %q in body: %s", unexpected, item.body) t.Fatalf("unexpected gateway stability error %q in body: %s", unexpected, item.body)
} }
} }
if strings.Contains(item.body, "OPENCLAW_REQUIRED_ARTIFACT_MISSING") { envelope := sseFirstResultEnvelope(t, item.body)
handle := shared.AsMap(envelope["result"])
if got := handle["status"]; got != "running" {
t.Fatalf("expected running task handle, got %#v", handle)
}
runningHandleCount += 1
result := taskGetHTTPTerminalResult(t, httpServer.Config.Handler, handle)
if result["code"] == "OPENCLAW_REQUIRED_ARTIFACT_MISSING" {
missingFinalArtifactCount += 1 missingFinalArtifactCount += 1
} }
if strings.Contains(item.body, `"result"`) && strings.Contains(item.body, `data: [DONE]`) {
finalCount += 1
}
} }
if finalCount != len(prompts) { if runningHandleCount != len(prompts) {
t.Fatalf("expected all five e2e requests to return final result, got %d", finalCount) t.Fatalf("expected all five e2e requests to return running handles, got %d", runningHandleCount)
} }
if missingFinalArtifactCount != len(prompts) { if missingFinalArtifactCount != len(prompts) {
t.Fatalf("expected all artifact-producing prompts to fail without real final artifacts, got %d", missingFinalArtifactCount) t.Fatalf("expected all artifact-producing prompts to fail without real final artifacts, got %d", missingFinalArtifactCount)
@ -584,15 +680,14 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t
events := strings.Split(strings.TrimSpace(bodyText), "\n\n") events := strings.Split(strings.TrimSpace(bodyText), "\n\n")
if len(events) < 4 { if len(events) < 4 {
t.Fatalf("expected accepted, session.update, final envelope, and done events, got %q", bodyText) t.Fatalf("expected accepted, session.update, running envelope, and done events, got %q", bodyText)
} }
if events[len(events)-1] != "data: [DONE]" { if events[len(events)-1] != "data: [DONE]" {
t.Fatalf("expected done event, got %q", events[len(events)-1]) t.Fatalf("expected done event, got %q", events[len(events)-1])
} }
var sawAccepted bool var sawAccepted bool
var sawDelta bool
var sawCompletedSnapshot bool
var sawFinal bool var sawFinal bool
var handle map[string]any
for _, event := range events[:len(events)-1] { for _, event := range events[:len(events)-1] {
if !strings.HasPrefix(event, "data: ") { if !strings.HasPrefix(event, "data: ") {
t.Fatalf("expected data event, got %q", event) t.Fatalf("expected data event, got %q", event)
@ -606,8 +701,7 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t
sawAccepted = true sawAccepted = true
case "session.update": case "session.update":
params := shared.AsMap(envelope["params"]) params := shared.AsMap(envelope["params"])
if params["type"] == "delta" && params["delta"] == "streamed delta" { if params["type"] == "status" && params["event"] == "running" {
sawDelta = true
if got := params["sessionId"]; got != "session-filter" { if got := params["sessionId"]; got != "session-filter" {
t.Fatalf("expected session-filter session update, got %#v", params) t.Fatalf("expected session-filter session update, got %#v", params)
} }
@ -615,39 +709,33 @@ func TestHTTPHandlerGatewayOpenClawFiltersRawGatewayEventsAndKeepsFinalResult(t
t.Fatalf("expected thread-filter session update, got %#v", params) t.Fatalf("expected thread-filter session update, got %#v", params)
} }
} }
result := shared.AsMap(params["result"])
if params["type"] == "status" && params["event"] == "completed" && len(result) > 0 {
sawCompletedSnapshot = true
if got := result["resolvedGatewayProviderId"]; got != "openclaw" {
t.Fatalf("expected completed snapshot to carry openclaw final result, got %#v", result)
}
if !strings.Contains(bodyText, openClawArtifactDownloadPath) {
t.Fatalf("expected completed snapshot to include normalized artifact download URL, got %s", bodyText)
}
}
} }
if envelope["id"] == "task-filter" { if envelope["id"] == "task-filter" {
sawFinal = true sawFinal = true
result := shared.AsMap(envelope["result"]) handle = shared.AsMap(envelope["result"])
if got := result["resolvedGatewayProviderId"]; got != "openclaw" { if got := handle["status"]; got != "running" {
t.Fatalf("expected openclaw final result, got %#v", result) t.Fatalf("expected running task handle, got %#v", handle)
} }
if !strings.Contains(bodyText, openClawArtifactDownloadPath) { if got := handle["resolvedGatewayProviderId"]; got != "openclaw" {
t.Fatalf("expected normalized artifact download URL in final result, got %s", bodyText) t.Fatalf("expected openclaw running result, got %#v", handle)
} }
} }
} }
if !sawAccepted { if !sawAccepted {
t.Fatalf("expected accepted event, got %q", bodyText) t.Fatalf("expected accepted event, got %q", bodyText)
} }
if !sawDelta {
t.Fatalf("expected compact session.update delta, got %q", bodyText)
}
if !sawCompletedSnapshot {
t.Fatalf("expected completed session.update result snapshot, got %q", bodyText)
}
if !sawFinal { if !sawFinal {
t.Fatalf("expected final result envelope, got %q", bodyText) t.Fatalf("expected running result envelope, got %q", bodyText)
}
result := taskGetHTTPTerminalResult(t, httpServer.Config.Handler, handle)
if got := result["resolvedGatewayProviderId"]; got != "openclaw" {
t.Fatalf("expected openclaw final result, got %#v", result)
}
if got := result["status"]; got != "completed" {
t.Fatalf("expected completed task result, got %#v", result)
}
if !strings.Contains(fmt.Sprint(result), openClawArtifactDownloadPath) {
t.Fatalf("expected normalized artifact download URL in task result, got %#v", result)
} }
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) { if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected artifact workflow methods to prepare before chat.send, got %#v", got) t.Fatalf("expected artifact workflow methods to prepare before chat.send, got %#v", got)
@ -680,15 +768,27 @@ func TestHTTPHandlerGatewayOpenClawForcesGatewayRouting(t *testing.T) {
if !strings.Contains(recorder.Body.String(), `"resolvedGatewayProviderId":"openclaw"`) { if !strings.Contains(recorder.Body.String(), `"resolvedGatewayProviderId":"openclaw"`) {
t.Fatalf("expected forced OpenClaw gateway result, got %q", recorder.Body.String()) t.Fatalf("expected forced OpenClaw gateway result, got %q", recorder.Body.String())
} }
var decoded map[string]any
if err := json.Unmarshal(recorder.Body.Bytes(), &decoded); err != nil {
t.Fatalf("decode start response: %v", err)
}
handle := shared.AsMap(decoded["result"])
if got := handle["status"]; got != "running" {
t.Fatalf("expected running task handle, got %#v", handle)
}
if gateway.ChatSendCount() != 1 { if gateway.ChatSendCount() != 1 {
t.Fatalf("expected one OpenClaw chat.send, got %d", gateway.ChatSendCount()) t.Fatalf("expected one OpenClaw chat.send, got %d", gateway.ChatSendCount())
} }
result := taskGetHTTPTerminalResult(t, handler, handle)
if got := result["status"]; got != "completed" {
t.Fatalf("expected completed task result, got %#v", result)
}
if gateway.AgentWaitCount() != 1 { if gateway.AgentWaitCount() != 1 {
t.Fatalf("expected one OpenClaw agent.wait, got %d", gateway.AgentWaitCount()) t.Fatalf("expected one OpenClaw agent.wait, got %d", gateway.AgentWaitCount())
} }
} }
func TestHTTPHandlerSessionGetReturnsCompletedOpenClawResult(t *testing.T) { func TestHTTPHandlerTasksGetReturnsCompletedOpenClawResult(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t) gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close() defer gateway.Close()
@ -710,38 +810,26 @@ func TestHTTPHandlerSessionGetReturnsCompletedOpenClawResult(t *testing.T) {
if startRecorder.Code != http.StatusOK { if startRecorder.Code != http.StatusOK {
t.Fatalf("expected start 200, got %d: %s", startRecorder.Code, startRecorder.Body.String()) t.Fatalf("expected start 200, got %d: %s", startRecorder.Code, startRecorder.Body.String())
} }
getRecorder := httptest.NewRecorder()
getRequest := httptest.NewRequest(
http.MethodPost,
"http://127.0.0.1/acp/rpc",
strings.NewReader(`{"jsonrpc":"2.0","id":"get-1","method":"xworkmate.sessions.get","params":{"sessionId":"s1","threadId":"t1"}}`),
)
getRequest.Header.Set("Content-Type", "application/json")
getRequest.Header.Set("Authorization", "Bearer bridge-test-token")
handler.ServeHTTP(getRecorder, getRequest)
if getRecorder.Code != http.StatusOK {
t.Fatalf("expected get 200, got %d: %s", getRecorder.Code, getRecorder.Body.String())
}
var decoded map[string]any var decoded map[string]any
if err := json.Unmarshal(getRecorder.Body.Bytes(), &decoded); err != nil { if err := json.Unmarshal(startRecorder.Body.Bytes(), &decoded); err != nil {
t.Fatalf("decode session get response: %v", err) t.Fatalf("decode start response: %v", err)
} }
result := shared.AsMap(decoded["result"]) handle := shared.AsMap(decoded["result"])
if got := handle["status"]; got != "running" {
t.Fatalf("expected running task handle, got %#v", handle)
}
result := taskGetHTTPTerminalResult(t, handler, handle)
if got := result["status"]; got != "completed" { if got := result["status"]; got != "completed" {
t.Fatalf("expected completed status, got %#v from %#v", got, result) t.Fatalf("expected completed status, got %#v from %#v", got, result)
} }
task := shared.AsMap(result["task"]) if got := result["turnId"]; got == "" {
if got := task["turnId"]; got == "" { t.Fatalf("expected retained task turn id, got %#v", result)
t.Fatalf("expected retained task turn id, got %#v", task)
} }
snapshot := shared.AsMap(result["result"]) if got := result["resolvedGatewayProviderId"]; got != "openclaw" {
if got := snapshot["resolvedGatewayProviderId"]; got != "openclaw" { t.Fatalf("expected OpenClaw snapshot, got %#v", result)
t.Fatalf("expected OpenClaw snapshot, got %#v", snapshot)
} }
if got := snapshot["output"]; got == "" { if got := result["output"]; got == "" {
t.Fatalf("expected output in session snapshot, got %#v", snapshot) t.Fatalf("expected output in task result, got %#v", result)
} }
} }