diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index 6dcdbf5..be784a3 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -1416,6 +1416,9 @@ func (o *SessionOrchestrator) openClawArtifactExport( if len(artifactContract.ExpectedArtifactDirs) > 0 { exportParams["expectedArtifactDirs"] = append([]string(nil), artifactContract.ExpectedArtifactDirs...) } + if len(artifactContract.RequiredArtifactExts) > 0 { + exportParams["requiredArtifactExtensions"] = append([]string(nil), artifactContract.RequiredArtifactExts...) + } payload := o.openClawArtifactExportRequest(gatewayProvider, exportParams, notify) return payload } @@ -1478,6 +1481,38 @@ func mergeOpenClawArtifactPayload(result map[string]any, source map[string]any) result[key] = merged } } + if value, ok := source["constraintSatisfied"]; ok { + result["constraintSatisfied"] = parseBool(value) + } + if _, ok := source["missingRequiredExtensions"]; ok { + result["missingRequiredExtensions"] = appendStringList(result["missingRequiredExtensions"], source["missingRequiredExtensions"]) + } +} + +func appendStringList(existing any, incoming any) []any { + seen := map[string]bool{} + merged := make([]any, 0) + add := func(value any) { + item := strings.TrimSpace(fmt.Sprint(value)) + if item == "" || seen[item] { + return + } + seen[item] = true + merged = append(merged, item) + } + for _, values := range []any{existing, incoming} { + switch typed := values.(type) { + case []any: + for _, item := range typed { + add(item) + } + case []string: + for _, item := range typed { + add(item) + } + } + } + return merged } func appendArtifactList(existing any, incoming any) []any { @@ -1501,6 +1536,22 @@ func appendArtifactList(existing any, incoming any) []any { return merged } +func applyOpenClawConstraintDeliveryStatus(result map[string]any) { + if result == nil || !parseBool(result["success"]) { + return + } + if value, ok := result["constraintSatisfied"]; !ok || parseBool(value) { + return + } + switch strings.ToLower(strings.TrimSpace(shared.StringArg(result, "status", ""))) { + case string(TaskStateRunning), string(TaskStateFailed), string(TaskStateCancelled): + return + default: + result["status"] = "partially_delivered" + result["artifactSyncStatus"] = "partial" + } +} + func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError { if isOpenClawRetryableGatewayError(errorPayload) { return &shared.RPCError{ @@ -1712,7 +1763,12 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a delete(result, openClawArtifactExportAttemptedField) successValue, hasSuccess := result["success"] - success := !hasSuccess || parseBool(successValue) + successSource := "explicit" + success := parseBool(successValue) + if !hasSuccess { + successSource = "absent" + success = true + } output := strings.TrimSpace(shared.StringArg(result, "output", "")) if output == "" { @@ -1721,6 +1777,20 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a if output == "" && success { output = strings.TrimSpace(shared.StringArg(result, "message", "")) } + if routing.TargetID == "gateway" && successSource == "absent" { + remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(result, "remoteWorkingDirectory", "")) + if output == "" && len(extractArtifactPayloads(result, remoteWorkingDirectory)) == 0 { + success = false + result["success"] = false + result["status"] = string(TaskStateFailed) + result["code"] = "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" + result["error"] = "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" + result["message"] = "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" + } else { + result["success"] = true + result["successSource"] = "inferred" + } + } sess.mu.Lock() if output != "" { @@ -1735,7 +1805,12 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a result["status"] = "completed" } if !hasSuccess { - result["success"] = true + if _, ok := result["success"]; !ok { + result["success"] = true + } + } + if !parseBool(result["success"]) && strings.TrimSpace(shared.StringArg(result, "status", "")) == string(TaskStateCompleted) { + result["status"] = string(TaskStateFailed) } result["resolvedExecutionTarget"] = routing.TargetID result["resolvedProviderId"] = routing.ProviderID @@ -1762,6 +1837,7 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a sess.task.UpdatedAt = time.Now() sess.mu.Unlock() } + applyOpenClawConstraintDeliveryStatus(result) artifactRecord := buildArtifactRecord(sess, result, output) if artifactRecord.RemoteWorkingDirectory != "" { diff --git a/internal/acp/orchestrator_normalize_result_test.go b/internal/acp/orchestrator_normalize_result_test.go new file mode 100644 index 0000000..5550a55 --- /dev/null +++ b/internal/acp/orchestrator_normalize_result_test.go @@ -0,0 +1,278 @@ +package acp + +import ( + "slices" + "strings" + "testing" + "time" + + "xworkmate-bridge/internal/shared" +) + +func TestNormalizeResultGatewaySuccessEvidenceAdjudication(t *testing.T) { + cases := []struct { + name string + routingTarget string + result map[string]any + wantSuccess bool + wantStatus string + wantCode string + wantSuccessSource string + }{ + { + name: "gateway absent success without output or artifacts fails", + routingTarget: "gateway", + result: map[string]any{}, + wantSuccess: false, + wantStatus: string(TaskStateFailed), + wantCode: "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE", + }, + { + name: "gateway absent success with output is inferred", + routingTarget: "gateway", + result: map[string]any{"output": "done"}, + wantSuccess: true, + wantStatus: string(TaskStateCompleted), + wantSuccessSource: "inferred", + }, + { + name: "gateway absent success with artifacts is inferred", + routingTarget: "gateway", + result: map[string]any{ + "artifacts": []any{map[string]any{"relativePath": "reports/final.md"}}, + }, + wantSuccess: true, + wantStatus: string(TaskStateCompleted), + wantSuccessSource: "inferred", + }, + { + name: "gateway explicit false remains failed", + routingTarget: "gateway", + result: map[string]any{"success": false, "output": "failed"}, + wantSuccess: false, + wantStatus: string(TaskStateFailed), + }, + { + name: "gateway explicit true remains completed", + routingTarget: "gateway", + result: map[string]any{"success": true}, + wantSuccess: true, + wantStatus: string(TaskStateCompleted), + }, + { + name: "non gateway absent success keeps legacy inference", + routingTarget: "single-agent", + result: map[string]any{"output": "done"}, + wantSuccess: true, + wantStatus: string(TaskStateCompleted), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + server := NewServer() + orchestrator := NewSessionOrchestrator(server) + sess := server.getOrCreateSession("session-"+strings.ReplaceAll(tc.name, " ", "-"), "thread") + + got := orchestrator.normalizeResult( + sess, + tc.result, + RoutingResult{TargetID: tc.routingTarget, ProviderID: "provider", GatewayProviderID: "openclaw"}, + "turn-1", + map[string]any{}, + ) + + if parseBool(got["success"]) != tc.wantSuccess { + t.Fatalf("success = %#v, want %v in %#v", got["success"], tc.wantSuccess, got) + } + if status := shared.StringArg(got, "status", ""); status != tc.wantStatus { + t.Fatalf("status = %q, want %q in %#v", status, tc.wantStatus, got) + } + if code := shared.StringArg(got, "code", ""); code != tc.wantCode { + t.Fatalf("code = %q, want %q in %#v", code, tc.wantCode, got) + } + if source := shared.StringArg(got, "successSource", ""); source != tc.wantSuccessSource { + t.Fatalf("successSource = %q, want %q in %#v", source, tc.wantSuccessSource, got) + } + }) + } +} + +func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceKeepsActiveRecordRunning(t *testing.T) { + payload := map[string]any{ + "success": false, + "status": "unknown", + "taskStatus": "unknown", + "evidence": "artifacts_present", + "artifactCount": 1, + "artifactScope": "tasks/session/run", + "artifactDirectory": "/remote/openclaw/workspace/tasks/session/run", + "artifacts": []any{map[string]any{"relativePath": "series.config.json"}}, + } + record := &OpenClawTaskRecord{ + RunID: "run", + SessionKey: "session", + GatewayProviderID: "openclaw", + RequiresArtifactExport: true, + DeadlineAt: time.Now().Add(time.Minute), + } + + got := normalizeOpenClawTaskGetResult( + map[string]any{"requiredArtifactExtensions": []any{"pdf"}}, + payload, + "openclaw", + record, + ) + + if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) { + t.Fatalf("expected active unknown artifact evidence to remain running, got %#v", got) + } + if evidence := shared.StringArg(got, "artifactEvidence", ""); evidence != "artifacts_present" { + t.Fatalf("expected artifact evidence audit field, got %#v", got) + } +} + +func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceFailsAfterDeadlineWithoutRequiredArtifacts(t *testing.T) { + payload := map[string]any{ + "success": false, + "status": "unknown", + "taskStatus": "unknown", + "evidence": "artifacts_present", + "artifactCount": 1, + "runId": "run", + "openclawSessionKey": "session", + "artifactScope": "tasks/session/run", + "artifactDirectory": "/remote/openclaw/workspace/tasks/session/run", + "artifacts": []any{map[string]any{"relativePath": "series.config.json"}}, + } + record := &OpenClawTaskRecord{DeadlineAt: time.Now().Add(-time.Minute)} + + got := normalizeOpenClawTaskGetResult( + map[string]any{"requiredArtifactExtensions": []any{"pdf"}}, + payload, + "openclaw", + record, + ) + + if status := shared.StringArg(got, "status", ""); status != string(TaskStateFailed) { + t.Fatalf("expected expired unknown artifact evidence to fail, got %#v", got) + } + if code := shared.StringArg(got, "code", ""); code != "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" { + t.Fatalf("expected evidence failure code, got %#v", got) + } + if missing := shared.ListArg(got, "missingRequiredExtensions"); !slices.ContainsFunc(missing, func(value any) bool { + return strings.TrimSpace(shared.StringArg(map[string]any{"value": value}, "value", "")) == "pdf" + }) { + t.Fatalf("expected missing pdf extension, got %#v", got) + } +} + +func TestOpenClawArtifactConstraintFieldsArePropagatedAndMarkPartialDelivery(t *testing.T) { + result := map[string]any{} + mergeOpenClawArtifactPayload(result, map[string]any{ + "constraintSatisfied": false, + "missingRequiredExtensions": []any{"pdf"}, + }) + if got := result["constraintSatisfied"]; got != false { + t.Fatalf("expected constraintSatisfied=false to propagate, got %#v", result) + } + if missing := shared.ListArg(result, "missingRequiredExtensions"); len(missing) != 1 || missing[0] != "pdf" { + t.Fatalf("expected missingRequiredExtensions to propagate, got %#v", result) + } + + server := NewServer() + orchestrator := NewSessionOrchestrator(server) + sess := server.getOrCreateSession("session-partial-delivery", "thread-partial-delivery") + got := orchestrator.normalizeResult( + sess, + map[string]any{ + "success": true, + "output": "created some files", + "constraintSatisfied": false, + "missingRequiredExtensions": []any{"pdf"}, + }, + RoutingResult{TargetID: "gateway", ProviderID: "gateway", GatewayProviderID: "openclaw"}, + "turn-partial-delivery", + map[string]any{}, + ) + + if status := shared.StringArg(got, "status", ""); status != "partially_delivered" { + t.Fatalf("expected partially_delivered status, got %#v", got) + } + if !parseBool(got["success"]) { + t.Fatalf("partial delivery should preserve success=true, got %#v", got) + } +} + +func TestOpenClawArtifactsSatisfyEveryRequiredExtension(t *testing.T) { + artifacts := []map[string]any{ + {"relativePath": "exports/final.pdf"}, + } + if openClawArtifactsSatisfyRequiredExtensions(artifacts, []string{"pdf", "mp4"}) { + t.Fatalf("expected only pdf artifact to miss mp4 requirement") + } + if !openClawArtifactsSatisfyRequiredExtensions( + append(artifacts, map[string]any{"relativePath": "exports/final.MP4"}), + []string{"pdf", "mp4"}, + ) { + t.Fatalf("expected pdf and mp4 artifacts to satisfy both requirements") + } +} + +func TestTaskGetArtifactExportReceivesRequiredArtifactExtensions(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + defer gateway.Close() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + start, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-export-required-exts", + "threadId": "thread-export-required-exts", + "taskPrompt": "say pong", + "workingDirectory": t.TempDir(), + "metadata": map[string]any{ + "xworkmateTaskArtifactContract": map[string]any{ + "requiresExportBeforeFinalResponse": true, + }, + }, + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected running task handle, got rpc error: %#v", rpcErr) + } + response, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "xworkmate.tasks.get", + Params: map[string]any{ + "sessionId": shared.StringArg(start, "sessionId", ""), + "threadId": shared.StringArg(start, "threadId", ""), + "turnId": shared.StringArg(start, "turnId", ""), + "runId": shared.StringArg(start, "runId", ""), + "appThreadKey": shared.StringArg(start, "appThreadKey", ""), + "openclawSessionKey": shared.StringArg(start, "openclawSessionKey", ""), + "artifactScope": shared.StringArg(start, "artifactScope", ""), + "artifactDirectory": shared.StringArg(start, "artifactDirectory", ""), + "gatewayProviderId": shared.StringArg(start, "resolvedGatewayProviderId", ""), + "requiresArtifactExport": true, + "requiredArtifactExtensions": []any{"pdf"}, + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected task lookup response, got rpc error: %#v", rpcErr) + } + if status := shared.StringArg(response, "status", ""); status != string(TaskStateRunning) { + t.Fatalf("expected missing required artifact to keep syncing, got %#v", response) + } + exportParams := gateway.LastArtifactExportParams() + if got := shared.ListArg(exportParams, "requiredArtifactExtensions"); len(got) != 1 || got[0] != "pdf" { + t.Fatalf("expected requiredArtifactExtensions to reach export, got %#v", exportParams) + } +} diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index 60e690c..66b5c84 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -116,8 +116,9 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif ) if result.OK { payload := shared.AsMap(result.Payload) + activeOpenClawTask := s.activeOpenClawTaskRecord(params) s.mergeOpenClawTaskGetArtifactExport(payload, params, gatewayProvider, notify) - payload = normalizeOpenClawTaskGetResult(params, payload, gatewayProvider) + payload = normalizeOpenClawTaskGetResult(params, payload, gatewayProvider, activeOpenClawTask) sessionKey := firstNonEmptyString(payload, "openclawSessionKey", "sessionKey") if sessionKey == "" { sessionKey = strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) @@ -241,6 +242,9 @@ func (s *Server) mergeOpenClawTaskGetArtifactExport(payload map[string]any, para if expectedDirs := openClawTaskGetExpectedArtifactDirs(params, payload); len(expectedDirs) > 0 { exportParams["expectedArtifactDirs"] = expectedDirs } + if requiredExts := openClawTaskGetRequiredArtifactExtensions(params, payload); len(requiredExts) > 0 { + exportParams["requiredArtifactExtensions"] = append([]string(nil), requiredExts...) + } exportPayload := s.orchestrator.openClawArtifactExportRequest(gatewayProvider, exportParams, notify) if openClawArtifactExportPayloadAuthoritative(exportPayload) { replaceOpenClawArtifactPayload(payload, exportPayload) @@ -252,7 +256,24 @@ func (s *Server) mergeOpenClawTaskGetArtifactExport(payload map[string]any, para stripOpenClawArtifactInlineContent(payload) } -func normalizeOpenClawTaskGetResult(params map[string]any, payload map[string]any, gatewayProvider string) map[string]any { +func (s *Server) activeOpenClawTaskRecord(params map[string]any) *OpenClawTaskRecord { + sess := s.findTaskSession(params) + if sess == nil { + return nil + } + sess.mu.Lock() + defer sess.mu.Unlock() + if sess.task.State != TaskStateRunning { + return nil + } + if sess.openClaw == nil { + return nil + } + record := *sess.openClaw + return &record +} + +func normalizeOpenClawTaskGetResult(params map[string]any, payload map[string]any, gatewayProvider string, activeRecord *OpenClawTaskRecord) map[string]any { if len(payload) == 0 { return payload } @@ -270,6 +291,13 @@ func normalizeOpenClawTaskGetResult(params map[string]any, payload map[string]an remoteWorkingDirectory := strings.TrimSpace(shared.StringArg(payload, "remoteWorkingDirectory", "")) artifacts := extractArtifactPayloads(payload, remoteWorkingDirectory) requiredExts := openClawTaskGetRequiredArtifactExtensions(params, payload) + if openClawUnknownArtifactEvidence(payload, artifacts) { + return adjudicateOpenClawUnknownArtifactEvidence(params, payload, gatewayProvider, activeRecord, artifacts, requiredExts, artifactScope, artifactDirectory) + } + applyOpenClawConstraintDeliveryStatus(payload) + if strings.TrimSpace(shared.StringArg(payload, "status", "")) == "partially_delivered" { + return payload + } if len(artifacts) > 0 && openClawArtifactsSatisfyRequiredExtensions(artifacts, requiredExts) { return payload } @@ -320,6 +348,87 @@ func normalizeOpenClawTaskGetResult(params map[string]any, payload map[string]an return payload } +func openClawUnknownArtifactEvidence(payload map[string]any, artifacts []map[string]any) bool { + if strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) != "unknown" { + return false + } + evidence := strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "evidence", ""))) + if evidence == "artifacts_present" { + return true + } + return parseBool(payload["artifactsPresent"]) || + shared.IntArg(shared.StringArg(payload, "artifactCount", ""), 0) > 0 || + len(artifacts) > 0 +} + +func adjudicateOpenClawUnknownArtifactEvidence( + params map[string]any, + payload map[string]any, + gatewayProvider string, + activeRecord *OpenClawTaskRecord, + artifacts []map[string]any, + requiredExts []string, + artifactScope string, + artifactDirectory string, +) map[string]any { + if openClawTaskRecordStillActive(activeRecord) { + running := openClawRunningTaskResult(activeRecord) + running["artifactEvidence"] = "artifacts_present" + if strings.TrimSpace(shared.StringArg(running, "resolvedGatewayProviderId", "")) == "" { + running["resolvedGatewayProviderId"] = gatewayProvider + } + return running + } + runID := firstNonEmptyString(payload, "runId", "taskId") + if runID == "" { + runID = strings.TrimSpace(shared.StringArg(params, "runId", "")) + } + sessionKey := firstNonEmptyString(payload, "openclawSessionKey", "sessionKey") + if sessionKey == "" { + sessionKey = strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", "")) + } + if len(requiredExts) > 0 && openClawArtifactsSatisfyRequiredExtensions(artifacts, requiredExts) { + payload["success"] = true + payload["successSource"] = "inferred" + payload["status"] = string(TaskStateCompleted) + payload["event"] = string(TaskStateCompleted) + payload["pending"] = false + } else { + payload["success"] = false + payload["status"] = string(TaskStateFailed) + payload["event"] = string(TaskStateFailed) + payload["pending"] = false + payload["code"] = "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" + payload["error"] = "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" + payload["message"] = "OPENCLAW_TERMINAL_WITHOUT_EVIDENCE" + if len(requiredExts) > 0 { + payload["missingRequiredExtensions"] = openClawMissingRequiredExtensions(artifacts, requiredExts) + } + } + payload["runId"] = runID + payload["taskId"] = runID + payload["openclawSessionKey"] = sessionKey + if strings.TrimSpace(shared.StringArg(payload, "appThreadKey", "")) == "" { + payload["appThreadKey"] = strings.TrimSpace(shared.StringArg(params, "appThreadKey", "")) + } + payload["artifactScope"] = artifactScope + payload["artifactDirectory"] = artifactDirectory + if len(requiredExts) > 0 { + payload["requiredArtifactExtensions"] = append([]string(nil), requiredExts...) + } + if strings.TrimSpace(shared.StringArg(payload, "resolvedGatewayProviderId", "")) == "" { + payload["resolvedGatewayProviderId"] = gatewayProvider + } + return payload +} + +func openClawTaskRecordStillActive(record *OpenClawTaskRecord) bool { + if record == nil { + return false + } + return record.DeadlineAt.IsZero() || time.Now().Before(record.DeadlineAt) +} + func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[string]any) bool { if parseBool(params["requiresArtifactExport"]) || parseBool(payload["requiresArtifactExport"]) { return true @@ -405,16 +514,29 @@ func openClawArtifactsSatisfyRequiredExtensions(artifacts []map[string]any, requ if len(requiredExts) == 0 { return true } - for _, artifact := range artifacts { - relativePath := strings.ToLower(strings.TrimSpace(shared.StringArg(artifact, "relativePath", ""))) - for _, ext := range requiredExts { - normalized := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(ext)), ".") - if normalized != "" && strings.HasSuffix(relativePath, "."+normalized) { - return true + return len(openClawMissingRequiredExtensions(artifacts, requiredExts)) == 0 +} + +func openClawMissingRequiredExtensions(artifacts []map[string]any, requiredExts []string) []any { + missing := make([]any, 0) + for _, ext := range requiredExts { + normalized := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(ext)), ".") + if normalized == "" { + continue + } + found := false + for _, artifact := range artifacts { + relativePath := strings.ToLower(strings.TrimSpace(shared.StringArg(artifact, "relativePath", ""))) + if strings.HasSuffix(relativePath, "."+normalized) { + found = true + break } } + if !found { + missing = append(missing, normalized) + } } - return false + return missing } func (s *Server) handleTaskCancel(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {