Compare commits

..

No commits in common. "main" and "runtime-b8f4ed6102ca" have entirely different histories.

9 changed files with 19 additions and 562 deletions

View File

@ -56,7 +56,6 @@ jobs:
- name: Load Vault secrets
id: vault
if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository }}
continue-on-error: true
uses: hashicorp/vault-action@v2
with:
url: ${{ env.VAULT_ADDR }}
@ -68,7 +67,7 @@ jobs:
kv/data/github-actions/xworkmate-bridge AI_WORKSPACE_AUTH_TOKEN | AI_WORKSPACE_AUTH_TOKEN
- name: Export bridge auth token
if: ${{ steps.vault.outcome == 'success' && steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN != '' }}
if: ${{ steps.vault.outcome == 'success' }}
run: echo "AI_WORKSPACE_AUTH_TOKEN=${{ steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN }}" >> "$GITHUB_ENV"
- name: Probe current production bridge
@ -76,17 +75,6 @@ jobs:
env:
BRIDGE_SERVER_URL: https://xworkmate-bridge.svc.plus
run: |
if [[ -z "${AI_WORKSPACE_AUTH_TOKEN:-}" ]]; then
echo "::notice title=Production state skipped::AI_WORKSPACE_AUTH_TOKEN is unavailable from Vault; continuing without production bridge metadata."
{
echo "production_image="
echo "production_tag="
echo "production_commit="
echo "production_version="
} >> "$GITHUB_OUTPUT"
exit 0
fi
while IFS='=' read -r key value; do
echo "${key}=${value}" >> "$GITHUB_OUTPUT"
done < <(bash ./scripts/github-actions/report-production-state.sh "${BRIDGE_SERVER_URL}")

View File

@ -1,44 +0,0 @@
name: Validate Release PR
# release/* 分支的发布策略门禁:仅接受 hotfix/* 或带 cherry-pick/backport 标签的 PR。
# 详见 iac_modules/docs/tldr-github-branch-model.md
on:
pull_request_target:
types: [opened, synchronize, reopened, labeled, unlabeled]
permissions:
contents: read
pull-requests: read
jobs:
validate-release-source:
runs-on: ubuntu-latest
if: startsWith(github.base_ref, 'release/')
steps:
- name: Check PR source branch
run: |
SRC="${{ github.head_ref }}"
TGT="${{ github.base_ref }}"
LABELS="${{ join(github.event.pull_request.labels.*.name, ',') }}"
echo "🔍 Validating PR into release branch"
echo " source: $SRC"
echo " target: $TGT"
echo " labels: $LABELS"
if [[ "$SRC" =~ ^hotfix/ ]]; then
echo "✅ Allowed: hotfix/* branch"
exit 0
fi
if [[ "$LABELS" =~ (^|,)(cherry-pick|backport)(,|$) ]]; then
echo "✅ Allowed: cherry-pick/backport labeled PR"
exit 0
fi
echo "❌ Rejected."
echo "release/* 仅接受:"
echo " - 来自 hotfix/* 的 PR"
echo " - 带 cherry-pick 或 backport 标签的 PR已验证 feature 的 backport/cherry-pick"
echo "禁止从 main / develop / feature/* 直接合并到 release/*。"
exit 1

View File

@ -37,7 +37,6 @@ func (s *Server) Handler() http.Handler {
"commit": info.Commit,
"version": info.Version,
"buildDate": info.BuildDate,
"metrics": bridgeStabilityMetricsSnapshot(), // T12
}
body, _ := json.Marshal(resp)
w.Header().Set("Content-Type", "application/json")

View File

@ -1,29 +0,0 @@
package acp
import "sync/atomic"
// 关键稳定性指标T12docs/cases/06 §5
//
// 进程内累计计数,经 /api/ping 暴露,用于把「网关抖动 / run 超时」从靠用户截图
// 变为可监控。三个计数对应三类已知的不稳定来源:
// - gatewaySocketClosed : gatewayRPCError 命中 OPENCLAW_GATEWAY_SOCKET_CLOSED连接断
// - taskGetUnconfirmedFallback: tasks.get 走持久 run 仓兜底gateway 无法确认 runT7
// - runDeadlineInterrupt : run 超过 DeadlineAt 且 gateway 无法确认,回 interruptedT9
var bridgeStabilityMetrics struct {
gatewaySocketClosed atomic.Int64
taskGetUnconfirmedFallback atomic.Int64
runDeadlineInterrupt atomic.Int64
}
func metricGatewaySocketClosedInc() { bridgeStabilityMetrics.gatewaySocketClosed.Add(1) }
func metricTaskGetUnconfirmedFallbackInc() { bridgeStabilityMetrics.taskGetUnconfirmedFallback.Add(1) }
func metricRunDeadlineInterruptInc() { bridgeStabilityMetrics.runDeadlineInterrupt.Add(1) }
// bridgeStabilityMetricsSnapshot 返回当前计数快照,供 /api/ping 输出。
func bridgeStabilityMetricsSnapshot() map[string]any {
return map[string]any{
"gatewaySocketClosed": bridgeStabilityMetrics.gatewaySocketClosed.Load(),
"taskGetUnconfirmedFallback": bridgeStabilityMetrics.taskGetUnconfirmedFallback.Load(),
"runDeadlineInterrupt": bridgeStabilityMetrics.runDeadlineInterrupt.Load(),
}
}

View File

@ -1,197 +0,0 @@
package acp
import (
"log"
"strings"
"time"
"xworkmate-bridge/internal/shared"
)
// 持久 run 仓 / run 关联与 WS 解耦T7/T8/T9
//
// 背景OpenClaw gateway turn 采用异步模型——chat.send 快速返回 runIdbridge 把
// run 记录(sess.openClaw)、预算(sess.task.DeadlineAt)、运行句柄(sess.lastResult)
// 落在「按 sessionID 维度」的 per-session store 里s.sessions其生命周期独立于
// bridge↔gateway 的 WebSocket 连接。客户端随后轮询 tasks.get。
//
// 此前 tasks.get 每次都强依赖 gateway 应答:一旦 WS 抖动 / 重连后 run 内存态丢失,
// tasks.get 回 not_found 或 socket_closed已完成的结果就此丢失客户端要么硬失败、
// 要么(修复前)无限轮询。下列辅助把 tasks.get 改造为「优先用持久 run 仓兜底」:
//
// T8 已观察到的终态结果缓存进 sess.lastResultgateway 之后查不到也不丢;
// T7 gateway 暂时无法确认unavailable / socket closed / not_found但 run 仍在预算内时,
// 合成一个 running 句柄让客户端继续轮询,跨越瞬时抖动(与 WS 生命周期解耦);
// T9 run 超过 DeadlineAt 且 gateway 仍无法确认时,回确定性的 interrupted 终态。
// openClawTaskGetResultIsTerminal 判断一个 tasks.get 结果是否表示 run 已结束。
// 注意:仍在 artifact 同步中的结果会被 normalizeOpenClawTaskGetResult 重写为 status=running
// 因此这里只认显式终态,不会把「同步中」误判为终态。
func openClawTaskGetResultIsTerminal(payload map[string]any) bool {
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
case string(TaskStateCompleted), string(TaskStateFailed), string(TaskStateCancelled),
"interrupted", "partially_delivered":
return true
}
return false
}
// cacheOpenClawTaskGetResultIfTerminal 把一次 gateway 确认的终态结果落进 per-session 持久 run 仓T8
func (s *Server) cacheOpenClawTaskGetResultIfTerminal(params map[string]any, payload map[string]any) {
if len(payload) == 0 || !openClawTaskGetResultIsTerminal(payload) {
return
}
sess := s.findTaskSession(params)
if sess == nil {
return
}
sess.mu.Lock()
defer sess.mu.Unlock()
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
case string(TaskStateFailed):
sess.task.State = TaskStateFailed
case string(TaskStateCancelled):
sess.task.State = TaskStateCancelled
default:
sess.task.State = TaskStateCompleted
}
sess.task.ProgressTerminal = true
sess.task.ProgressStage = strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", "")))
sess.task.UpdatedAt = time.Now()
sess.lastResult = cloneMap(payload)
}
// cachedTerminalOpenClawResult 返回某 run 此前已观察到的终态结果若有T7/T8
func (s *Server) cachedTerminalOpenClawResult(params map[string]any) (map[string]any, bool) {
sess := s.findTaskSession(params)
if sess == nil {
return nil, false
}
sess.mu.Lock()
defer sess.mu.Unlock()
return cachedTerminalForRunLocked(sess, params)
}
// cachedTerminalForRunLocked 仅当缓存终态确实属于「本次请求的 runId」时才命中
// 防止同一 session 复用后把旧 run 的终态错配给新 run。调用方须持有 sess.mu。
func cachedTerminalForRunLocked(sess *session, params map[string]any) (map[string]any, bool) {
if !sess.task.ProgressTerminal || len(sess.lastResult) == 0 {
return nil, false
}
if !openClawTaskGetResultIsTerminal(sess.lastResult) {
return nil, false
}
requestedRun := strings.TrimSpace(shared.StringArg(params, "runId", ""))
if requestedRun == "" {
requestedRun = strings.TrimSpace(shared.StringArg(params, "taskId", ""))
}
if requestedRun != "" {
cachedRun := firstNonEmptyString(sess.lastResult, "runId", "taskId")
if cachedRun != "" && !strings.EqualFold(cachedRun, requestedRun) {
return nil, false
}
}
return cloneMap(sess.lastResult), true
}
// openClawTaskGetGatewayUnconfirmedFallback 在 gateway 无法确认 run 时,用持久 run 仓兜底T7/T9
// - 已有缓存终态 -> 直接返回;
// - run 仍在预算内 -> 合成 running 句柄,客户端继续轮询,跨越瞬时抖动;
// - run 超过 deadline -> 回确定性 interrupted 终态。
//
// 没有任何 per-session 记录时退回旧行为not_found不改变无状态查询的语义。
func (s *Server) openClawTaskGetGatewayUnconfirmedFallback(params map[string]any, code string, message string) map[string]any {
notFound := func() map[string]any {
return map[string]any{
"ok": false,
"status": "not_found",
"code": fallbackString(code, "TASK_LOOKUP_FAILED"),
"message": fallbackString(message, "openclaw native task lookup failed"),
}
}
sess := s.findTaskSession(params)
if sess == nil {
return notFound()
}
sess.mu.Lock()
defer sess.mu.Unlock()
if cached, ok := cachedTerminalForRunLocked(sess, params); ok {
return cached
}
if sess.openClaw == nil {
return notFound()
}
now := time.Now()
if !sess.task.DeadlineAt.IsZero() && now.After(sess.task.DeadlineAt) {
return s.markOpenClawRunDeadlineInterruptedLocked(sess, code, message)
}
// 仍在预算内:合成 running 句柄让客户端继续轮询,不因一次瞬时抖动硬失败。
metricTaskGetUnconfirmedFallbackInc() // T12
running := openClawRunningTaskResult(sess.openClaw)
running["transportDegraded"] = true
if strings.TrimSpace(code) != "" {
running["transportDegradedCode"] = strings.TrimSpace(code)
}
// T11带 runId 的日志,便于与 App / 插件 / 网关四层按 runId 串联。
log.Printf("level=warn component=openclaw_run_registry event=tasks_get_unconfirmed_fallback runId=%q openclawSessionKey=%q code=%q",
sess.openClaw.RunID, sess.openClaw.SessionKey, strings.TrimSpace(code))
sess.lastResult = cloneMap(running)
return running
}
// markOpenClawRunDeadlineInterruptedLocked 为「超过预算且 gateway 无法确认」的 run 生成确定性
// interrupted 终态T9。调用方须持有 sess.mu。
func (s *Server) markOpenClawRunDeadlineInterruptedLocked(sess *session, code string, message string) map[string]any {
now := time.Now()
sess.task.State = TaskStateFailed
sess.task.ProgressTerminal = true
sess.task.ProgressStage = "interrupted"
sess.task.ProgressMessage = "OpenClaw run exceeded its budget and could not be confirmed"
sess.task.UpdatedAt = now
metricRunDeadlineInterruptInc() // T12
// T11带 runId 的终态日志。
if sess.openClaw != nil {
log.Printf("level=warn component=openclaw_run_registry event=run_deadline_interrupt runId=%q openclawSessionKey=%q deadlineAt=%q code=%q",
sess.openClaw.RunID, sess.openClaw.SessionKey,
sess.openClaw.DeadlineAt.UTC().Format(time.RFC3339Nano), strings.TrimSpace(code))
}
result := map[string]any{
"ok": true,
"success": false,
"status": "interrupted",
"event": "interrupted",
"pending": false,
"code": "OPENCLAW_RUN_DEADLINE_EXCEEDED",
"artifactSyncStatus": "interrupted",
"message": "OpenClaw 任务超过预算上限且网关无法确认结果,已结束本轮等待。任务可能已在后台完成,请重新发送请求以拿回结果。",
"artifacts": []any{},
}
if strings.TrimSpace(code) != "" {
result["gatewayUnconfirmedCode"] = strings.TrimSpace(code)
}
if strings.TrimSpace(message) != "" {
result["gatewayUnconfirmedMessage"] = strings.TrimSpace(message)
}
if record := sess.openClaw; record != nil {
result["runId"] = record.RunID
result["taskId"] = record.RunID
result["turnId"] = record.TurnID
result["sessionId"] = record.SessionID
result["threadId"] = record.ThreadID
result["appThreadKey"] = record.ThreadID
result["openclawSessionKey"] = record.SessionKey
result["resolvedGatewayProviderId"] = record.GatewayProviderID
result["startedAt"] = record.StartedAt.UTC().Format(time.RFC3339Nano)
result["deadlineAt"] = record.DeadlineAt.UTC().Format(time.RFC3339Nano)
}
sess.lastResult = cloneMap(result)
return result
}
func fallbackString(value string, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return strings.TrimSpace(value)
}

View File

@ -1,154 +0,0 @@
package acp
import (
"testing"
"time"
"xworkmate-bridge/internal/shared"
)
func newRunRegistryTestServer(deadline time.Time) (*Server, map[string]any) {
sess := &session{sessionID: "s1", threadID: "t1"}
sess.task.RunID = "run-1"
sess.task.SessionKey = "sk"
sess.task.GatewayProviderID = "openclaw"
sess.task.DeadlineAt = deadline
sess.openClaw = &OpenClawTaskRecord{
SessionID: "s1",
ThreadID: "t1",
TurnID: "turn-1",
RunID: "run-1",
SessionKey: "sk",
GatewayProviderID: "openclaw",
StartedAt: time.Now().Add(-time.Minute),
DeadlineAt: deadline,
}
srv := &Server{sessions: map[string]*session{"s1": sess}}
params := map[string]any{"sessionId": "s1", "runId": "run-1"}
return srv, params
}
func TestOpenClawTaskGetResultIsTerminal(t *testing.T) {
cases := []struct {
status string
want bool
}{
{"completed", true},
{"failed", true},
{"cancelled", true},
{"interrupted", true},
{"partially_delivered", true},
{"running", false},
{"syncing-artifacts", false},
{"queued", false},
{"", false},
}
for _, tc := range cases {
if got := openClawTaskGetResultIsTerminal(map[string]any{"status": tc.status}); got != tc.want {
t.Errorf("status=%q: got %v, want %v", tc.status, got, tc.want)
}
}
}
// T7: gateway 无法确认但 run 仍在预算内 -> 合成 running 句柄续轮询。
func TestGatewayUnconfirmedFallbackWithinBudgetKeepsPolling(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
t.Fatalf("status = %q, want running", status)
}
if !parseBool(got["transportDegraded"]) {
t.Fatalf("transportDegraded not set: %v", got)
}
if shared.StringArg(got, "runId", "") != "run-1" {
t.Fatalf("runId mismatch: %v", got["runId"])
}
}
// T9: run 超过 deadline 且 gateway 无法确认 -> 确定性 interrupted 终态。
func TestGatewayUnconfirmedFallbackPastDeadlineInterrupts(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(-time.Minute))
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
if status := shared.StringArg(got, "status", ""); status != "interrupted" {
t.Fatalf("status = %q, want interrupted", status)
}
if code := shared.StringArg(got, "code", ""); code != "OPENCLAW_RUN_DEADLINE_EXCEEDED" {
t.Fatalf("code = %q, want OPENCLAW_RUN_DEADLINE_EXCEEDED", code)
}
if parseBool(got["success"]) {
t.Fatalf("interrupted result must not be success")
}
sess := srv.findTaskSession(params)
if sess == nil || !sess.task.ProgressTerminal || sess.task.State != TaskStateFailed {
t.Fatalf("session terminal state not recorded: %+v", sess)
}
}
// T8: 已观察到的终态被缓存,且即使之后 gateway 不可达也优先返回缓存终态。
func TestTerminalResultCachedAndServedAfterGatewayLoss(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
terminal := map[string]any{
"ok": true,
"success": true,
"status": "completed",
"runId": "run-1",
"message": "done",
}
srv.cacheOpenClawTaskGetResultIfTerminal(params, terminal)
cached, ok := srv.cachedTerminalOpenClawResult(params)
if !ok {
t.Fatalf("expected cached terminal result")
}
if shared.StringArg(cached, "status", "") != "completed" {
t.Fatalf("cached status = %q, want completed", cached["status"])
}
// 即使 run 已过 deadline + gateway 丢失,也应优先返回缓存终态而非 interrupted。
sess := srv.findTaskSession(params)
sess.mu.Lock()
sess.task.DeadlineAt = time.Now().Add(-time.Hour)
sess.mu.Unlock()
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
if shared.StringArg(got, "status", "") != "completed" {
t.Fatalf("expected cached completed to win over deadline interrupt, got %v", got["status"])
}
}
// 同一 session 复用后,旧 run 的终态不得错配给新 runId 的查询。
func TestCachedTerminalNotServedForDifferentRunId(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{
"status": "completed", "success": true, "runId": "run-1",
})
// 新一轮查询带不同 runId -> 不应命中旧缓存。
newParams := map[string]any{"sessionId": "s1", "runId": "run-2"}
if _, ok := srv.cachedTerminalOpenClawResult(newParams); ok {
t.Fatalf("stale terminal for run-1 must not be served for run-2")
}
// 原 runId 仍应命中。
if _, ok := srv.cachedTerminalOpenClawResult(params); !ok {
t.Fatalf("terminal for run-1 should still be served for run-1")
}
}
// running 结果不应被当作终态缓存。
func TestRunningResultNotCachedAsTerminal(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{"status": "running", "runId": "run-1"})
if _, ok := srv.cachedTerminalOpenClawResult(params); ok {
t.Fatalf("running result must not be cached as terminal")
}
}
// 无 per-session 记录时退回旧的 not_found 行为。
func TestGatewayUnconfirmedFallbackWithoutSessionReturnsNotFound(t *testing.T) {
srv := &Server{sessions: map[string]*session{}}
got := srv.openClawTaskGetGatewayUnconfirmedFallback(map[string]any{"sessionId": "missing"}, "X", "y")
if parseBool(got["ok"]) {
t.Fatalf("expected ok=false not_found, got %v", got)
}
if shared.StringArg(got, "status", "") != "not_found" {
t.Fatalf("status = %q, want not_found", got["status"])
}
}

View File

@ -417,10 +417,6 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask(
sess.task.DeadlineAt = record.DeadlineAt
sess.task.ProgressStage = "running"
sess.task.ProgressMessage = "OpenClaw task accepted"
// 新一轮 turn 复用同一 session 时,必须重置上一轮可能留下的终态标记,
// 否则持久 run 仓(T8)会把旧 runId 的终态错配给新 run。
sess.task.State = TaskStateRunning
sess.task.ProgressTerminal = false
sess.openClaw = record
running := openClawRunningTaskResult(record)
sess.lastResult = cloneMap(running)
@ -617,19 +613,13 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
func isOpenClawUnknownMethodError(errorPayload map[string]any, method string) bool {
message := strings.ToLower(strings.TrimSpace(shared.StringArg(errorPayload, "message", "")))
code := strings.ToUpper(strings.TrimSpace(shared.StringArg(errorPayload, "code", "")))
if message == "" {
return false
}
// 消息形如「unknown method: <method>」已明确指向「网关不认识该方法」,足以判定,
// 据此走 graceful fallback如 openClawFallbackSessionPreparePayload
//
// 注意:不能再用严格的 code 白名单来 gate。真实网关常以数字 JSON-RPC code
// (-32601 method not found / -32600 invalid request / -32002 等) 回传,
// 经 shared.StringArg(fmt.Sprint) 会被字符串化为 "-32601"/"-32002"
// 旧实现只接受 {"", INVALID_REQUEST, METHOD_NOT_FOUND},导致 fallback 失效、
// session.prepare 直接以 -32002 硬失败整轮任务。
return strings.Contains(message, "unknown method") &&
strings.Contains(message, strings.ToLower(strings.TrimSpace(method)))
strings.Contains(message, strings.ToLower(strings.TrimSpace(method))) &&
(code == "" || code == "INVALID_REQUEST" || code == "METHOD_NOT_FOUND")
}
func openClawFallbackSessionPreparePayload(params map[string]any) map[string]any {
@ -1687,16 +1677,11 @@ func applyOpenClawConstraintDeliveryStatus(result map[string]any) {
func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError {
if isOpenClawRetryableGatewayError(errorPayload) {
metricGatewaySocketClosedInc() // T12
// T10连接断属「可重试 / run 可能仍在后台、可续轮询」语义,而非 run 确实失败。
// 带 retryable/poll 提示,客户端据此降级为「后台续跑·重连中」(T5) 续轮询 tasks.get而非硬失败。
return &shared.RPCError{
Code: -32002,
Message: "OPENCLAW_GATEWAY_SOCKET_CLOSED: OpenClaw gateway connection closed during task execution",
Data: map[string]any{
"code": "OPENCLAW_GATEWAY_SOCKET_CLOSED",
"retryable": true,
"poll": true,
"originalCode": strings.TrimSpace(shared.StringArg(errorPayload, "code", "")),
"originalError": strings.TrimSpace(shared.StringArg(errorPayload, "message", "")),
},

View File

@ -132,49 +132,6 @@ func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceKeepsActiveRecordRunning
}
}
func TestExpectedArtifactDirectoriesDoNotBlockTerminalTaskState(t *testing.T) {
params := map[string]any{"expectedArtifactDirs": []any{"reports/", "artifacts/"}}
payload := map[string]any{
"success": true,
"status": string(TaskStateCompleted),
"artifactScope": "tasks/session/run",
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
"expectedArtifactDirs": []any{
"reports/",
"artifacts/",
},
}
if openClawTaskGetRequiresArtifactExport(params, payload) {
t.Fatal("expectedArtifactDirs must remain non-blocking scan hints")
}
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
if status := shared.StringArg(got, "status", ""); status != string(TaskStateCompleted) {
t.Fatalf("expected terminal status to remain completed, got %#v", got)
}
if parseBool(got["pending"]) {
t.Fatalf("expected terminal payload not to become pending, got %#v", got)
}
}
func TestRequiredArtifactExtensionsStillBlockUntilVerified(t *testing.T) {
params := map[string]any{"requiredArtifactExtensions": []any{"md"}}
payload := map[string]any{
"success": true,
"status": string(TaskStateCompleted),
"artifactScope": "tasks/session/run",
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
}
if !openClawTaskGetRequiresArtifactExport(params, payload) {
t.Fatal("requiredArtifactExtensions must remain a blocking delivery contract")
}
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
t.Fatalf("expected missing required artifact to remain syncing, got %#v", got)
}
}
func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceFailsAfterDeadlineWithoutRequiredArtifacts(t *testing.T) {
payload := map[string]any{
"success": false,
@ -325,50 +282,3 @@ func TestTaskGetArtifactExportReceivesRequiredArtifactExtensions(t *testing.T) {
t.Fatalf("expected expectedFileCountByExtension to reach export, got %#v", exportParams)
}
}
func TestIsOpenClawUnknownMethodErrorAcceptsNumericGatewayCodes(t *testing.T) {
const method = "xworkmate.session.prepare"
cases := []struct {
name string
payload map[string]any
want bool
}{
{
name: "string invalid_request code",
payload: map[string]any{"code": "INVALID_REQUEST", "message": "unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "numeric -32002 (real gateway shape that previously hard-failed)",
payload: map[string]any{"code": float64(-32002), "message": "unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "numeric -32601 method not found",
payload: map[string]any{"code": float64(-32601), "message": "Unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "empty code",
payload: map[string]any{"message": "unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "unrelated error must not be swallowed",
payload: map[string]any{"code": float64(-32002), "message": "gateway socket closed"},
want: false,
},
{
name: "unknown method for a different method name",
payload: map[string]any{"code": float64(-32601), "message": "unknown method: chat.send"},
want: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := isOpenClawUnknownMethodError(tc.payload, method); got != tc.want {
t.Fatalf("isOpenClawUnknownMethodError(%v) = %v, want %v", tc.payload, got, tc.want)
}
})
}
}

View File

@ -132,13 +132,13 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
if gatewayProvider == "" {
gatewayProvider = "openclaw"
}
// T7/T8: 一旦观察到终态就从持久 run 仓返回,避免之后 gateway 查不到导致结果丢失。
if cached, ok := s.cachedTerminalOpenClawResult(params); ok {
return cached
}
if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil {
// T7/T9: gateway 不可达时按持久 run 仓兜底(续轮询 / deadline 终态),而非裸 not_found。
return s.openClawTaskGetGatewayUnconfirmedFallback(params, "GATEWAY_UNAVAILABLE", rpcErr.Message)
return map[string]any{
"ok": false,
"status": "not_found",
"code": "GATEWAY_UNAVAILABLE",
"message": rpcErr.Message,
}
}
result := s.gateway.RequestByMode(
gatewayProvider,
@ -162,15 +162,16 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
}
s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID)
stripOpenClawArtifactInlineContent(payload)
// T8: 缓存「最终客户端可见形态」(已 decorate 下载 URL + strip 内联内容),
// 这样从缓存回放时与正常路径完全一致。
s.cacheOpenClawTaskGetResultIfTerminal(params, payload)
return payload
}
// T7/T9: gateway 返回错误socket closed / not_found / lookup failed时同样走持久 run 仓兜底。
message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed"))
code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED"))
return s.openClawTaskGetGatewayUnconfirmedFallback(params, code, message)
return map[string]any{
"ok": false,
"status": "not_found",
"code": code,
"message": message,
}
}
func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any {
@ -471,11 +472,9 @@ func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[st
if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) {
return true
}
// expectedArtifactDirs are discovery hints for the plugin's workspace-root
// scan. They do not prove that the caller requires a file before the run can
// reach a terminal state. Treating them as a blocking contract turns a
// failed/no-output agent run into an endless "syncing-artifacts" loop.
return len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
return len(shared.ListArg(params, "expectedArtifactDirs")) > 0 ||
len(shared.ListArg(payload, "expectedArtifactDirs")) > 0 ||
len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
len(shared.ListArg(payload, "requiredArtifactExtensions")) > 0
}