fix: keep OpenClaw artifact tasks running until export

This commit is contained in:
Haitao Pan 2026-06-06 18:23:05 +08:00
parent c19631fd9c
commit 49637e87ea
4 changed files with 307 additions and 51 deletions

View File

@ -15,21 +15,22 @@ const (
)
type OpenClawTaskRecord struct {
SessionID string
ThreadID string
TurnID string
RunID string
SessionKey string
GatewayProviderID string
TaskLoadClass string
RuntimeBudgetMinutes int
StartedAt time.Time
DeadlineAt time.Time
ProgressStage string
ProgressMessage string
PreparedArtifact *openClawPreparedArtifactScope
ResolvedModel string
ResolvedSkills []string
SessionID string
ThreadID string
TurnID string
RunID string
SessionKey string
GatewayProviderID string
TaskLoadClass string
RuntimeBudgetMinutes int
StartedAt time.Time
DeadlineAt time.Time
ProgressStage string
ProgressMessage string
PreparedArtifact *openClawPreparedArtifactScope
RequiresArtifactExport bool
ResolvedModel string
ResolvedSkills []string
}
func openClawTaskRuntimePolicy(params map[string]any, chatParams map[string]any, contract openClawArtifactContract) (string, int) {
@ -84,6 +85,7 @@ func openClawRunningTaskResult(record *OpenClawTaskRecord) map[string]any {
"resolvedGatewayProviderId": record.GatewayProviderID,
"taskLoadClass": record.TaskLoadClass,
"runtimeBudgetMinutes": record.RuntimeBudgetMinutes,
"requiresArtifactExport": record.RequiresArtifactExport,
"startedAt": record.StartedAt.UTC().Format(time.RFC3339Nano),
"deadlineAt": record.DeadlineAt.UTC().Format(time.RFC3339Nano),
"progress": openClawTaskProgress(record),

View File

@ -384,21 +384,22 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask(
taskLoadClass, runtimeBudgetMinutes := openClawTaskRuntimePolicy(params, chatParams, artifactContract)
startedAt := time.Now()
record := &OpenClawTaskRecord{
SessionID: sessionID,
ThreadID: threadID,
TurnID: turnID,
RunID: runID,
SessionKey: sessionKey,
GatewayProviderID: gatewayProvider,
TaskLoadClass: taskLoadClass,
RuntimeBudgetMinutes: runtimeBudgetMinutes,
StartedAt: startedAt,
DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute),
ProgressStage: "running",
ProgressMessage: "OpenClaw task accepted",
PreparedArtifact: preparedArtifact,
ResolvedModel: routing.Model,
ResolvedSkills: append([]string(nil), routing.Skills...),
SessionID: sessionID,
ThreadID: threadID,
TurnID: turnID,
RunID: runID,
SessionKey: sessionKey,
GatewayProviderID: gatewayProvider,
TaskLoadClass: taskLoadClass,
RuntimeBudgetMinutes: runtimeBudgetMinutes,
StartedAt: startedAt,
DeadlineAt: startedAt.Add(time.Duration(runtimeBudgetMinutes) * time.Minute),
ProgressStage: "running",
ProgressMessage: "OpenClaw task accepted",
PreparedArtifact: preparedArtifact,
RequiresArtifactExport: artifactContract.RequiresArtifactExport,
ResolvedModel: routing.Model,
ResolvedSkills: append([]string(nil), routing.Skills...),
}
sess := o.server.getOrCreateSession(sessionID, threadID)
sess.mu.Lock()
@ -617,6 +618,9 @@ func openClawSessionPrepareParams(params map[string]any, openClawSessionKey stri
if len(artifactContract.ExpectedArtifactDirs) > 0 {
result["expectedArtifactDirs"] = append([]string(nil), artifactContract.ExpectedArtifactDirs...)
}
if artifactContract.RequiresArtifactExport {
result["requiresArtifactExport"] = true
}
if workspaceDir := openClawArtifactWorkspaceDir(params); workspaceDir != "" {
result["workspaceDir"] = workspaceDir
}
@ -770,10 +774,11 @@ func shellSingleQuote(value string) string {
}
type openClawArtifactContract struct {
TaskLoadClass string
ComplexLongChain bool
ExpectedArtifactDirs []string
SourceMessage string
TaskLoadClass string
ComplexLongChain bool
RequiresArtifactExport bool
ExpectedArtifactDirs []string
SourceMessage string
}
func openClawArtifactContractForParams(params map[string]any, chatParams map[string]any) openClawArtifactContract {
@ -786,12 +791,14 @@ func openClawArtifactContractForParams(params map[string]any, chatParams map[str
lowerMessage := strings.ToLower(message)
contract := shared.AsMap(metadata["xworkmateTaskArtifactContract"])
expectedDirs := normalizeOpenClawDirList(shared.ListArg(contract, "expectedArtifactDirs"))
requiresExport := parseBool(contract["requiresExportBeforeFinalResponse"]) || len(expectedDirs) > 0
complex := taskLoadClass == "complex_long_chain_task" || isOpenClawLongArtifactTask(lowerMessage)
return openClawArtifactContract{
TaskLoadClass: taskLoadClass,
ComplexLongChain: complex,
ExpectedArtifactDirs: expectedDirs,
SourceMessage: message,
TaskLoadClass: taskLoadClass,
ComplexLongChain: complex,
RequiresArtifactExport: requiresExport,
ExpectedArtifactDirs: expectedDirs,
SourceMessage: message,
}
}

View File

@ -598,11 +598,11 @@ func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) {
if gateway.AgentWaitCount() != 0 {
t.Fatalf("expected native task lookup to avoid Bridge-owned agent.wait, got %d", gateway.AgentWaitCount())
}
if gateway.ArtifactExportCount() != 0 {
t.Fatalf("expected native task lookup to avoid Bridge-owned artifact export, got %d", gateway.ArtifactExportCount())
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected empty terminal task lookup to fall back to artifact export, got %d", gateway.ArtifactExportCount())
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) {
t.Fatalf("expected connect, prepare, chat.send, then native task lookup, got %#v", got)
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, prepare, chat.send, native task lookup, then artifact export fallback, got %#v", got)
}
client := gateway.LastConnectClient()
if got := client["id"]; got != "openclaw-macos" {
@ -2593,7 +2593,7 @@ func TestExecuteSessionTaskGatewayCollectsOpenClawEventArtifacts(t *testing.T) {
}
}
func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing.T) {
func TestExecuteSessionTaskGatewayKeepsRunningWhenTerminalLookupExportsNoArtifacts(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
@ -2609,6 +2609,11 @@ func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing
"threadId": "thread-openclaw-artifact-missing",
"taskPrompt": "say pong",
"workingDirectory": t.TempDir(),
"metadata": map[string]any{
"xworkmateTaskArtifactContract": map[string]any{
"requiresExportBeforeFinalResponse": true,
},
},
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
@ -2618,16 +2623,74 @@ func TestExecuteSessionTaskGatewayAlwaysSyncsGatewayArtifactsAfterRun(t *testing
},
})
if rpcErr != nil {
t.Fatalf("expected gateway text response despite artifact export failure, got rpc error: %#v", rpcErr)
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if got := response["output"]; got != "gateway pong" {
t.Fatalf("expected gateway pong output, got %#v", response)
if got := response["status"]; got != string(TaskStateRunning) {
t.Fatalf("expected empty terminal artifact export to keep task running, got %#v", response)
}
if gateway.ArtifactExportCount() != 0 {
t.Fatalf("expected native task-registry lookup without Bridge artifact export sync, got %d", gateway.ArtifactExportCount())
if got := shared.StringArg(shared.AsMap(response["progress"]), "stage", ""); got != "syncing-artifacts" {
t.Fatalf("expected syncing-artifacts progress, got %#v", response)
}
if warnings := shared.ListArg(response, "artifactWarnings"); len(warnings) != 0 {
t.Fatalf("expected no artifact warnings when gateway export succeeds empty, got %#v", warnings)
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected artifact export fallback after empty terminal lookup, got %d", gateway.ArtifactExportCount())
}
}
func TestTaskGetBackfillsArtifactScopeFromBridgeSession(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
start, rpcErr := server.handleRequest(shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-scope-backfill",
"threadId": "thread-openclaw-scope-backfill",
"taskPrompt": "say pong",
"workingDirectory": t.TempDir(),
"metadata": map[string]any{
"xworkmateTaskArtifactContract": map[string]any{
"requiresExportBeforeFinalResponse": true,
},
},
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
}, nil)
if rpcErr != nil {
t.Fatalf("expected running task handle, got rpc error: %#v", rpcErr)
}
if got := start["status"]; got != string(TaskStateRunning) {
t.Fatalf("expected running start response, got %#v", start)
}
response, rpcErr := server.handleRequest(shared.RPCRequest{
Method: "xworkmate.tasks.get",
Params: map[string]any{
"runId": shared.StringArg(start, "runId", ""),
"appThreadKey": shared.StringArg(start, "appThreadKey", ""),
"openclawSessionKey": shared.StringArg(start, "openclawSessionKey", ""),
"includeArtifacts": true,
},
}, nil)
if rpcErr != nil {
t.Fatalf("expected task lookup response, got rpc error: %#v", rpcErr)
}
if got := response["status"]; got != string(TaskStateRunning) {
t.Fatalf("expected empty terminal lookup to remain running, got %#v", response)
}
exportParams := gateway.LastArtifactExportParams()
if got := shared.StringArg(exportParams, "artifactScope", ""); got == "" {
t.Fatalf("expected Bridge to backfill artifactScope for export, got %#v", exportParams)
}
if got, want := shared.StringArg(exportParams, "artifactScope", ""), shared.StringArg(start, "artifactScope", ""); got != want {
t.Fatalf("expected export artifactScope %q, got %#v", want, exportParams)
}
}

View File

@ -91,6 +91,7 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
}
func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
params = s.taskGetParamsWithSessionScope(params)
gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", ""))
if gatewayProvider == "" {
gatewayProvider = strings.TrimSpace(shared.StringArg(params, "resolvedGatewayProviderId", ""))
@ -114,7 +115,9 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
notify,
)
if result.OK {
return shared.AsMap(result.Payload)
payload := shared.AsMap(result.Payload)
s.mergeOpenClawTaskGetArtifactExport(payload, params, gatewayProvider, notify)
return normalizeOpenClawTaskGetResult(params, payload, gatewayProvider)
}
message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed"))
code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED"))
@ -126,6 +129,184 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
}
}
func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any {
next := make(map[string]any, len(params)+8)
for key, value := range params {
next[key] = value
}
sess := s.findTaskSession(params)
if sess == nil {
return next
}
sess.mu.Lock()
defer sess.mu.Unlock()
if strings.TrimSpace(shared.StringArg(next, "runId", "")) == "" {
next["runId"] = sess.task.RunID
}
if strings.TrimSpace(shared.StringArg(next, "taskId", "")) == "" {
next["taskId"] = sess.task.RunID
}
if strings.TrimSpace(shared.StringArg(next, "gatewayProviderId", "")) == "" {
next["gatewayProviderId"] = sess.task.GatewayProviderID
}
if strings.TrimSpace(shared.StringArg(next, "artifactScope", "")) == "" {
next["artifactScope"] = sess.task.ArtifactScope
}
if strings.TrimSpace(shared.StringArg(next, "artifactDirectory", "")) == "" {
next["artifactDirectory"] = sess.task.ArtifactDirectory
}
if _, ok := next["requiresArtifactExport"]; !ok && sess.openClaw != nil && sess.openClaw.RequiresArtifactExport {
next["requiresArtifactExport"] = true
}
if strings.TrimSpace(shared.StringArg(next, "openclawSessionKey", "")) == "" {
next["openclawSessionKey"] = sess.task.SessionKey
}
if strings.TrimSpace(shared.StringArg(next, "appThreadKey", "")) == "" {
next["appThreadKey"] = sess.threadID
}
return next
}
func (s *Server) mergeOpenClawTaskGetArtifactExport(payload map[string]any, params map[string]any, gatewayProvider string, notify func(map[string]any)) {
if len(payload) == 0 || s == nil || s.orchestrator == nil {
return
}
status := strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", "")))
if status == string(TaskStateRunning) || status == string(TaskStateFailed) || status == string(TaskStateCancelled) {
return
}
if !openClawTaskGetRequiresArtifactExport(params, payload) {
return
}
success := true
if value, ok := payload["success"]; ok {
success = parseBool(value)
}
if !success {
return
}
remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(payload, "remoteWorkingDirectory", ""))
if len(extractArtifactPayloads(payload, remoteWorkingDirectory)) > 0 {
return
}
sessionKey := firstNonEmptyString(payload, "openclawSessionKey", "sessionKey")
if sessionKey == "" {
sessionKey = strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", ""))
}
runID := firstNonEmptyString(payload, "runId", "taskId")
if runID == "" {
runID = strings.TrimSpace(shared.StringArg(params, "runId", ""))
}
artifactScope := firstNonEmptyString(payload, "artifactScope")
if artifactScope == "" {
artifactScope = strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
}
artifactDirectory := firstNonEmptyString(payload, "artifactDirectory")
if artifactDirectory == "" {
artifactDirectory = strings.TrimSpace(shared.StringArg(params, "artifactDirectory", ""))
}
if sessionKey == "" || runID == "" || artifactScope == "" || artifactDirectory == "" {
return
}
prepared := &openClawPreparedArtifactScope{
RemoteWorkingDirectory: remoteWorkingDirectory,
RemoteWorkspaceRefKind: strings.TrimSpace(shared.StringArg(payload, "remoteWorkspaceRefKind", "")),
ArtifactScope: artifactScope,
ArtifactDirectory: artifactDirectory,
ScopeKind: "task",
}
exportParams := map[string]any{
"openclawSessionKey": sessionKey,
"runId": runID,
"artifactScope": artifactScope,
"sinceUnixMs": 0,
"maxFiles": 64,
"maxInlineBytes": 0,
"includeContent": false,
}
if expectedDirs := shared.ListArg(params, "expectedArtifactDirs"); len(expectedDirs) > 0 {
exportParams["expectedArtifactDirs"] = expectedDirs
}
mergeOpenClawArtifactPayload(payload, s.orchestrator.openClawArtifactExportRequest(gatewayProvider, exportParams, notify))
applyOpenClawPreparedArtifactToResult(payload, prepared)
s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID)
stripOpenClawArtifactInlineContent(payload)
}
func normalizeOpenClawTaskGetResult(params map[string]any, payload map[string]any, gatewayProvider string) map[string]any {
if len(payload) == 0 {
return payload
}
artifactScope := firstNonEmptyString(payload, "artifactScope")
if artifactScope == "" {
artifactScope = strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
}
artifactDirectory := firstNonEmptyString(payload, "artifactDirectory")
if artifactDirectory == "" {
artifactDirectory = strings.TrimSpace(shared.StringArg(params, "artifactDirectory", ""))
}
if artifactScope == "" && artifactDirectory == "" {
return payload
}
remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(payload, "remoteWorkingDirectory", ""))
if len(extractArtifactPayloads(payload, remoteWorkingDirectory)) > 0 {
return payload
}
status := strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", "")))
success := true
if value, ok := payload["success"]; ok {
success = parseBool(value)
}
if !success || status == string(TaskStateRunning) || status == string(TaskStateFailed) || status == string(TaskStateCancelled) {
return payload
}
if !openClawTaskGetRequiresArtifactExport(params, payload) {
return payload
}
runID := firstNonEmptyString(payload, "runId", "taskId")
if runID == "" {
runID = strings.TrimSpace(shared.StringArg(params, "runId", ""))
}
sessionKey := firstNonEmptyString(payload, "openclawSessionKey", "sessionKey")
if sessionKey == "" {
sessionKey = strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", ""))
}
payload["success"] = true
payload["status"] = string(TaskStateRunning)
payload["event"] = string(TaskStateRunning)
payload["pending"] = true
payload["artifactSyncStatus"] = "syncing"
payload["message"] = "OpenClaw task completed; waiting for artifact export."
payload["runId"] = runID
payload["taskId"] = runID
payload["openclawSessionKey"] = sessionKey
if strings.TrimSpace(shared.StringArg(payload, "appThreadKey", "")) == "" {
payload["appThreadKey"] = strings.TrimSpace(shared.StringArg(params, "appThreadKey", ""))
}
payload["artifactScope"] = artifactScope
payload["artifactDirectory"] = artifactDirectory
if strings.TrimSpace(shared.StringArg(payload, "resolvedGatewayProviderId", "")) == "" {
payload["resolvedGatewayProviderId"] = gatewayProvider
}
payload["progress"] = map[string]any{
"stage": "syncing-artifacts",
"message": "Waiting for OpenClaw artifact export.",
"terminal": false,
}
return payload
}
func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[string]any) bool {
if parseBool(params["requiresArtifactExport"]) || parseBool(payload["requiresArtifactExport"]) {
return true
}
if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) {
return true
}
return len(shared.ListArg(params, "expectedArtifactDirs")) > 0 ||
len(shared.ListArg(payload, "expectedArtifactDirs")) > 0
}
func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
sess := s.findTaskSession(params)
runID := strings.TrimSpace(shared.StringArg(params, "runId", ""))
@ -204,6 +385,9 @@ func openClawTaskLookupParams(params map[string]any) map[string]any {
"includeContent",
"expectedArtifactDirs",
"workspaceDir",
"artifactScope",
"artifactDirectory",
"requiresArtifactExport",
} {
if value, ok := params[key]; ok {
result[key] = value