Compare commits

...

13 Commits

Author SHA1 Message Date
4fc5e380f2
ci: add release/* branch source validation workflow (#11)
release/* 仅接受 hotfix/* 或带 cherry-pick/backport 标签的 PR。
详见 iac_modules/docs/tldr-github-branch-model.md

Co-authored-by: Haitao Pan <haitao.pan@xworkmate.ai>
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-28 12:12:16 +08:00
Haitao Pan
188ca4ba4a fix(acp): keep artifact scan hints non-blocking 2026-06-27 12:03:08 +08:00
Haitao Pan
0a50621664 fix(acp): remove orphaned S1 test (helper reverted) — keep main compiling
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 06:44:06 +08:00
Haitao Pan
81f65e3308 Merge: T10/T11/T12 observability + revert S1 2026-06-27 06:43:21 +08:00
Haitao Pan
fa9cc78add fix(acp): T10/T11/T12 observability + error semantics; revert S1 (broke main)
T10: gatewayRPCError marks OPENCLAW_GATEWAY_SOCKET_CLOSED with retryable=true,
poll=true so the client degrades to "background/reconnecting" + keeps polling
instead of hard-failing (feeds App T5).
T11: runId-tagged warn logs at the tasks.get unconfirmed-fallback and
run-deadline-interrupt sites, so a runId can be joined across App→bridge→plugin→gateway.
T12: process-level stability counters (gatewaySocketClosed, taskGetUnconfirmedFallback,
runDeadlineInterrupt) exposed via /api/ping.metrics.

Revert S1 (default expectedArtifactDirs): it set requiresExport=true / default dirs
for any artifact-inferring task, which made a gateway run that succeeds with NO
artifact hang "waiting for artifact export" (TestHTTPHandlerGatewayOpenClawHandlesFive
ConcurrentE2ECases + ...WithoutPromptHeuristic went red). The blocking is tied to
expectedArtifactDirs presence in openClawTaskGetRequiresArtifactExport; decoupling
scan-hint from block-on-export needs a careful, separately-tested change. Reverted to
keep main green; S1 to be redesigned (see docs/cases/06 §7).

Full internal/acp suite green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 06:43:21 +08:00
Haitao Pan
02808934c8 Merge: S1 default expectedArtifactDirs (stability — artifact delivery) 2026-06-27 06:31:35 +08:00
Haitao Pan
3c7de420d2 fix(acp): S1 — default expectedArtifactDirs so plugin root-fallback collects artifacts
Live verification (docs/cases/06 §7 S1) showed the session mapping recorded
expectedArtifactDirs:[] for an md-producing task. openclaw-multi-session-plugins
only scans the workspace-root deliverable dirs (reports/, artifacts/, ...) when
expectedArtifactDirs is non-empty; empty → the root fallback is inert, so an agent
that writes news.md to the workspace root (the common case) yields "no files".

openClawArtifactContractForParams now defaults expectedArtifactDirs to
reports//artifacts//exports/ when the task expects artifacts (requiresExport or
inferred requiredExts) but declared no dirs, and marks requiresExport so the export
path runs. Pure-chat turns (no artifact intent) are unaffected.

Test: orchestrator_s1_artifact_dirs_test.go (md task gets dirs+export; chat gets neither).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 06:31:34 +08:00
Haitao Pan
2333c3e5fd feat(acp): durable per-session run registry — survive gateway WS loss (T7/T8/T9)
OpenClaw gateway turns are async: chat.send returns a runId fast, the app then
polls tasks.get. Previously every tasks.get re-asked the gateway, so a WS blip /
reconnect that lost the gateway's in-memory run state turned into not_found /
socket_closed — the already-finished result was lost and the client either
hard-failed or polled forever.

Make tasks.get resilient by leaning on the per-session store (s.sessions),
whose lifetime is independent of the bridge<->gateway WebSocket:

- T8: cache a gateway-confirmed terminal result (final client-facing shape, after
  download-URL decoration + inline-content stripping) into sess.lastResult and
  serve it on subsequent polls, so a later gateway not_found cannot lose it.
- T7: when the gateway can't confirm (unavailable / socket closed / not_found) but
  the run is still within budget, synthesize a running handle so the client keeps
  polling across a transient blip — run tracking decoupled from WS lifetime.
- T9: when the run is past its DeadlineAt and the gateway still can't confirm,
  return a deterministic `interrupted` terminal (OPENCLAW_RUN_DEADLINE_EXCEEDED).

Correctness guards:
- startOpenClawGatewayTask resets State/ProgressTerminal when a session is reused
  for a new turn, so a prior turn's terminal can't be mis-served for a new runId.
- cache lookups verify the cached runId matches the requested runId (defense in depth).

Design note: T7 is handled at the tasks.get layer (re-correlate by runId via the
durable session store) rather than rewiring gatewayruntime's pending map — lower
risk, equivalent effect. A killed in-flight request surfaces as a gateway error
that the new fallback absorbs. T9 only force-terminates when the gateway is
unconfirmed, never when it explicitly reports running (avoids killing legit long
runs; the client-side deadline T3 covers that case).

Tests: internal/acp/openclaw_run_registry_test.go (terminal detection, within-budget
keep-polling, past-deadline interrupt, cache hit/replay, cross-runId isolation,
no-session not_found). go vet + full acp package green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 18:52:41 +08:00
Haitao Pan
e22d0f1cbf ci: keep bridge packaging unblocked without production token 2026-06-26 18:47:37 +08:00
311db31e03
Merge pull request #9 from ai-workspace-lab/fix/gateway-turn-stability-day1
fix(acp): session.prepare falls back on numeric gateway unknown-method codes
2026-06-26 17:56:41 +08:00
Haitao Pan
c7b2f8ee3a fix(acp): treat numeric gateway codes as unknown-method so session.prepare falls back
A gateway that doesn't implement xworkmate.session.prepare returns an
"unknown method: xworkmate.session.prepare" error. isOpenClawUnknownMethodError
gated on a string code allowlist {"", INVALID_REQUEST, METHOD_NOT_FOUND}, but real
gateways send a numeric JSON-RPC code (e.g. -32002 / -32601) which shared.StringArg
stringifies to "-32002". The matcher then returned false, so the graceful fallback
(openClawFallbackSessionPreparePayload) never fired and every turn hard-failed with
"-32002: unknown method: xworkmate.session.prepare".

Match on the unambiguous message ("unknown method" + the method name) instead of the
stringified numeric code. Add a regression test covering numeric codes and guarding
against swallowing unrelated errors / other method names.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 11:04:08 +08:00
Haitao Pan
b8f4ed6102 fix(ci): key per-job SHA256SUMS by os and arch
The runtime-release matrix builds linux+darwin × amd64+arm64, but each job
wrote its checksum to SHA256SUMS-<arch> (arch only). The linux/<arch> and
darwin/<arch> jobs therefore emitted the same filename, which clobbered each
other under the publish job's `merge-multiple: true` download. The merged
SHA256SUMS ended up with only 2 of the 4 platforms, so consumers of the
missing tarballs (notably xworkmate-bridge-linux-arm64.tar.gz) failed with
"missing checksum" — breaking the console offline arm64 package build.

Name the per-job file SHA256SUMS-<os>-<arch> so all four are unique and the
merged SHA256SUMS lists every published tarball.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-22 17:26:05 +08:00
Haitao Pan
9a8dd2dfe1 ci: add darwin build matrix for macos offline deployment 2026-06-19 19:06:50 +08:00
10 changed files with 578 additions and 27 deletions

View File

@ -56,6 +56,7 @@ jobs:
- name: Load Vault secrets
id: vault
if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository }}
continue-on-error: true
uses: hashicorp/vault-action@v2
with:
url: ${{ env.VAULT_ADDR }}
@ -67,7 +68,7 @@ jobs:
kv/data/github-actions/xworkmate-bridge AI_WORKSPACE_AUTH_TOKEN | AI_WORKSPACE_AUTH_TOKEN
- name: Export bridge auth token
if: ${{ steps.vault.outcome == 'success' }}
if: ${{ steps.vault.outcome == 'success' && steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN != '' }}
run: echo "AI_WORKSPACE_AUTH_TOKEN=${{ steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN }}" >> "$GITHUB_ENV"
- name: Probe current production bridge
@ -75,6 +76,17 @@ jobs:
env:
BRIDGE_SERVER_URL: https://xworkmate-bridge.svc.plus
run: |
if [[ -z "${AI_WORKSPACE_AUTH_TOKEN:-}" ]]; then
echo "::notice title=Production state skipped::AI_WORKSPACE_AUTH_TOKEN is unavailable from Vault; continuing without production bridge metadata."
{
echo "production_image="
echo "production_tag="
echo "production_commit="
echo "production_version="
} >> "$GITHUB_OUTPUT"
exit 0
fi
while IFS='=' read -r key value; do
echo "${key}=${value}" >> "$GITHUB_OUTPUT"
done < <(bash ./scripts/github-actions/report-production-state.sh "${BRIDGE_SERVER_URL}")

View File

@ -19,11 +19,12 @@ concurrency:
jobs:
build:
name: Build linux-${{ matrix.arch }}
name: Build ${{ matrix.os }}-${{ matrix.arch }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
os: [linux, darwin]
arch: [amd64, arm64]
steps:
- uses: actions/checkout@v4
@ -38,12 +39,13 @@ jobs:
- name: Build runtime asset
env:
TARGET_OS: ${{ matrix.os }}
TARGET_ARCH: ${{ matrix.arch }}
run: |
set -euo pipefail
root="dist/runtime/xworkmate-bridge"
mkdir -p "${root}/bin" dist/assets
CGO_ENABLED=0 GOOS=linux GOARCH="${TARGET_ARCH}" \
CGO_ENABLED=0 GOOS="${TARGET_OS}" GOARCH="${TARGET_ARCH}" \
go build -buildvcs=false -trimpath \
-ldflags "-X main.buildCommit=${GITHUB_SHA}" \
-o "${root}/bin/xworkmate-go-core" .
@ -51,21 +53,27 @@ jobs:
{
"component": "xworkmate-bridge",
"commit": "${GITHUB_SHA}",
"os": "linux",
"os": "${TARGET_OS}",
"arch": "${TARGET_ARCH}",
"binary": "bin/xworkmate-go-core"
}
JSON
tar -czf "dist/assets/xworkmate-bridge-linux-${TARGET_ARCH}.tar.gz" \
tar -czf "dist/assets/xworkmate-bridge-${TARGET_OS}-${TARGET_ARCH}.tar.gz" \
-C dist/runtime xworkmate-bridge
(
cd dist/assets
sha256sum -- ./*.tar.gz | sed 's# \./# #' > "SHA256SUMS-${TARGET_ARCH}"
# Name the per-job checksum file by OS *and* ARCH. Keying on ARCH
# alone makes the linux/darwin jobs of the same arch both emit
# SHA256SUMS-<arch>, which then clobber each other under the
# publish job's merge-multiple download — leaving SHA256SUMS with
# only 2 of the 4 platforms and breaking arm64 (and darwin-amd64)
# consumers with "missing checksum".
sha256sum -- ./*.tar.gz | sed 's# \./# #' > "SHA256SUMS-${TARGET_OS}-${TARGET_ARCH}"
)
- uses: actions/upload-artifact@v4
with:
name: xworkmate-bridge-linux-${{ matrix.arch }}
name: xworkmate-bridge-${{ matrix.os }}-${{ matrix.arch }}
path: |
dist/assets/*.tar.gz
dist/assets/SHA256SUMS-*
@ -77,7 +85,7 @@ jobs:
steps:
- uses: actions/download-artifact@v4
with:
pattern: xworkmate-bridge-linux-*
pattern: xworkmate-bridge-*
path: dist
merge-multiple: true
@ -97,5 +105,5 @@ jobs:
--repo "${GITHUB_REPOSITORY}" \
--target "${GITHUB_SHA}" \
--title "XWorkmate Bridge runtime ${GITHUB_SHA::12}" \
--notes "Prebuilt Linux bridge binaries. No target-host Go build is required."
--notes "Prebuilt bridge binaries. No target-host Go build is required."
fi

View File

@ -0,0 +1,44 @@
name: Validate Release PR
# release/* 分支的发布策略门禁:仅接受 hotfix/* 或带 cherry-pick/backport 标签的 PR。
# 详见 iac_modules/docs/tldr-github-branch-model.md
on:
pull_request_target:
types: [opened, synchronize, reopened, labeled, unlabeled]
permissions:
contents: read
pull-requests: read
jobs:
validate-release-source:
runs-on: ubuntu-latest
if: startsWith(github.base_ref, 'release/')
steps:
- name: Check PR source branch
run: |
SRC="${{ github.head_ref }}"
TGT="${{ github.base_ref }}"
LABELS="${{ join(github.event.pull_request.labels.*.name, ',') }}"
echo "🔍 Validating PR into release branch"
echo " source: $SRC"
echo " target: $TGT"
echo " labels: $LABELS"
if [[ "$SRC" =~ ^hotfix/ ]]; then
echo "✅ Allowed: hotfix/* branch"
exit 0
fi
if [[ "$LABELS" =~ (^|,)(cherry-pick|backport)(,|$) ]]; then
echo "✅ Allowed: cherry-pick/backport labeled PR"
exit 0
fi
echo "❌ Rejected."
echo "release/* 仅接受:"
echo " - 来自 hotfix/* 的 PR"
echo " - 带 cherry-pick 或 backport 标签的 PR已验证 feature 的 backport/cherry-pick"
echo "禁止从 main / develop / feature/* 直接合并到 release/*。"
exit 1

View File

@ -37,6 +37,7 @@ func (s *Server) Handler() http.Handler {
"commit": info.Commit,
"version": info.Version,
"buildDate": info.BuildDate,
"metrics": bridgeStabilityMetricsSnapshot(), // T12
}
body, _ := json.Marshal(resp)
w.Header().Set("Content-Type", "application/json")

29
internal/acp/metrics.go Normal file
View File

@ -0,0 +1,29 @@
package acp
import "sync/atomic"
// 关键稳定性指标T12docs/cases/06 §5
//
// 进程内累计计数,经 /api/ping 暴露,用于把「网关抖动 / run 超时」从靠用户截图
// 变为可监控。三个计数对应三类已知的不稳定来源:
// - gatewaySocketClosed : gatewayRPCError 命中 OPENCLAW_GATEWAY_SOCKET_CLOSED连接断
// - taskGetUnconfirmedFallback: tasks.get 走持久 run 仓兜底gateway 无法确认 runT7
// - runDeadlineInterrupt : run 超过 DeadlineAt 且 gateway 无法确认,回 interruptedT9
var bridgeStabilityMetrics struct {
gatewaySocketClosed atomic.Int64
taskGetUnconfirmedFallback atomic.Int64
runDeadlineInterrupt atomic.Int64
}
func metricGatewaySocketClosedInc() { bridgeStabilityMetrics.gatewaySocketClosed.Add(1) }
func metricTaskGetUnconfirmedFallbackInc() { bridgeStabilityMetrics.taskGetUnconfirmedFallback.Add(1) }
func metricRunDeadlineInterruptInc() { bridgeStabilityMetrics.runDeadlineInterrupt.Add(1) }
// bridgeStabilityMetricsSnapshot 返回当前计数快照,供 /api/ping 输出。
func bridgeStabilityMetricsSnapshot() map[string]any {
return map[string]any{
"gatewaySocketClosed": bridgeStabilityMetrics.gatewaySocketClosed.Load(),
"taskGetUnconfirmedFallback": bridgeStabilityMetrics.taskGetUnconfirmedFallback.Load(),
"runDeadlineInterrupt": bridgeStabilityMetrics.runDeadlineInterrupt.Load(),
}
}

View File

@ -0,0 +1,197 @@
package acp
import (
"log"
"strings"
"time"
"xworkmate-bridge/internal/shared"
)
// 持久 run 仓 / run 关联与 WS 解耦T7/T8/T9
//
// 背景OpenClaw gateway turn 采用异步模型——chat.send 快速返回 runIdbridge 把
// run 记录(sess.openClaw)、预算(sess.task.DeadlineAt)、运行句柄(sess.lastResult)
// 落在「按 sessionID 维度」的 per-session store 里s.sessions其生命周期独立于
// bridge↔gateway 的 WebSocket 连接。客户端随后轮询 tasks.get。
//
// 此前 tasks.get 每次都强依赖 gateway 应答:一旦 WS 抖动 / 重连后 run 内存态丢失,
// tasks.get 回 not_found 或 socket_closed已完成的结果就此丢失客户端要么硬失败、
// 要么(修复前)无限轮询。下列辅助把 tasks.get 改造为「优先用持久 run 仓兜底」:
//
// T8 已观察到的终态结果缓存进 sess.lastResultgateway 之后查不到也不丢;
// T7 gateway 暂时无法确认unavailable / socket closed / not_found但 run 仍在预算内时,
// 合成一个 running 句柄让客户端继续轮询,跨越瞬时抖动(与 WS 生命周期解耦);
// T9 run 超过 DeadlineAt 且 gateway 仍无法确认时,回确定性的 interrupted 终态。
// openClawTaskGetResultIsTerminal 判断一个 tasks.get 结果是否表示 run 已结束。
// 注意:仍在 artifact 同步中的结果会被 normalizeOpenClawTaskGetResult 重写为 status=running
// 因此这里只认显式终态,不会把「同步中」误判为终态。
func openClawTaskGetResultIsTerminal(payload map[string]any) bool {
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
case string(TaskStateCompleted), string(TaskStateFailed), string(TaskStateCancelled),
"interrupted", "partially_delivered":
return true
}
return false
}
// cacheOpenClawTaskGetResultIfTerminal 把一次 gateway 确认的终态结果落进 per-session 持久 run 仓T8
func (s *Server) cacheOpenClawTaskGetResultIfTerminal(params map[string]any, payload map[string]any) {
if len(payload) == 0 || !openClawTaskGetResultIsTerminal(payload) {
return
}
sess := s.findTaskSession(params)
if sess == nil {
return
}
sess.mu.Lock()
defer sess.mu.Unlock()
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
case string(TaskStateFailed):
sess.task.State = TaskStateFailed
case string(TaskStateCancelled):
sess.task.State = TaskStateCancelled
default:
sess.task.State = TaskStateCompleted
}
sess.task.ProgressTerminal = true
sess.task.ProgressStage = strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", "")))
sess.task.UpdatedAt = time.Now()
sess.lastResult = cloneMap(payload)
}
// cachedTerminalOpenClawResult 返回某 run 此前已观察到的终态结果若有T7/T8
func (s *Server) cachedTerminalOpenClawResult(params map[string]any) (map[string]any, bool) {
sess := s.findTaskSession(params)
if sess == nil {
return nil, false
}
sess.mu.Lock()
defer sess.mu.Unlock()
return cachedTerminalForRunLocked(sess, params)
}
// cachedTerminalForRunLocked 仅当缓存终态确实属于「本次请求的 runId」时才命中
// 防止同一 session 复用后把旧 run 的终态错配给新 run。调用方须持有 sess.mu。
func cachedTerminalForRunLocked(sess *session, params map[string]any) (map[string]any, bool) {
if !sess.task.ProgressTerminal || len(sess.lastResult) == 0 {
return nil, false
}
if !openClawTaskGetResultIsTerminal(sess.lastResult) {
return nil, false
}
requestedRun := strings.TrimSpace(shared.StringArg(params, "runId", ""))
if requestedRun == "" {
requestedRun = strings.TrimSpace(shared.StringArg(params, "taskId", ""))
}
if requestedRun != "" {
cachedRun := firstNonEmptyString(sess.lastResult, "runId", "taskId")
if cachedRun != "" && !strings.EqualFold(cachedRun, requestedRun) {
return nil, false
}
}
return cloneMap(sess.lastResult), true
}
// openClawTaskGetGatewayUnconfirmedFallback 在 gateway 无法确认 run 时,用持久 run 仓兜底T7/T9
// - 已有缓存终态 -> 直接返回;
// - run 仍在预算内 -> 合成 running 句柄,客户端继续轮询,跨越瞬时抖动;
// - run 超过 deadline -> 回确定性 interrupted 终态。
//
// 没有任何 per-session 记录时退回旧行为not_found不改变无状态查询的语义。
func (s *Server) openClawTaskGetGatewayUnconfirmedFallback(params map[string]any, code string, message string) map[string]any {
notFound := func() map[string]any {
return map[string]any{
"ok": false,
"status": "not_found",
"code": fallbackString(code, "TASK_LOOKUP_FAILED"),
"message": fallbackString(message, "openclaw native task lookup failed"),
}
}
sess := s.findTaskSession(params)
if sess == nil {
return notFound()
}
sess.mu.Lock()
defer sess.mu.Unlock()
if cached, ok := cachedTerminalForRunLocked(sess, params); ok {
return cached
}
if sess.openClaw == nil {
return notFound()
}
now := time.Now()
if !sess.task.DeadlineAt.IsZero() && now.After(sess.task.DeadlineAt) {
return s.markOpenClawRunDeadlineInterruptedLocked(sess, code, message)
}
// 仍在预算内:合成 running 句柄让客户端继续轮询,不因一次瞬时抖动硬失败。
metricTaskGetUnconfirmedFallbackInc() // T12
running := openClawRunningTaskResult(sess.openClaw)
running["transportDegraded"] = true
if strings.TrimSpace(code) != "" {
running["transportDegradedCode"] = strings.TrimSpace(code)
}
// T11带 runId 的日志,便于与 App / 插件 / 网关四层按 runId 串联。
log.Printf("level=warn component=openclaw_run_registry event=tasks_get_unconfirmed_fallback runId=%q openclawSessionKey=%q code=%q",
sess.openClaw.RunID, sess.openClaw.SessionKey, strings.TrimSpace(code))
sess.lastResult = cloneMap(running)
return running
}
// markOpenClawRunDeadlineInterruptedLocked 为「超过预算且 gateway 无法确认」的 run 生成确定性
// interrupted 终态T9。调用方须持有 sess.mu。
func (s *Server) markOpenClawRunDeadlineInterruptedLocked(sess *session, code string, message string) map[string]any {
now := time.Now()
sess.task.State = TaskStateFailed
sess.task.ProgressTerminal = true
sess.task.ProgressStage = "interrupted"
sess.task.ProgressMessage = "OpenClaw run exceeded its budget and could not be confirmed"
sess.task.UpdatedAt = now
metricRunDeadlineInterruptInc() // T12
// T11带 runId 的终态日志。
if sess.openClaw != nil {
log.Printf("level=warn component=openclaw_run_registry event=run_deadline_interrupt runId=%q openclawSessionKey=%q deadlineAt=%q code=%q",
sess.openClaw.RunID, sess.openClaw.SessionKey,
sess.openClaw.DeadlineAt.UTC().Format(time.RFC3339Nano), strings.TrimSpace(code))
}
result := map[string]any{
"ok": true,
"success": false,
"status": "interrupted",
"event": "interrupted",
"pending": false,
"code": "OPENCLAW_RUN_DEADLINE_EXCEEDED",
"artifactSyncStatus": "interrupted",
"message": "OpenClaw 任务超过预算上限且网关无法确认结果,已结束本轮等待。任务可能已在后台完成,请重新发送请求以拿回结果。",
"artifacts": []any{},
}
if strings.TrimSpace(code) != "" {
result["gatewayUnconfirmedCode"] = strings.TrimSpace(code)
}
if strings.TrimSpace(message) != "" {
result["gatewayUnconfirmedMessage"] = strings.TrimSpace(message)
}
if record := sess.openClaw; record != nil {
result["runId"] = record.RunID
result["taskId"] = record.RunID
result["turnId"] = record.TurnID
result["sessionId"] = record.SessionID
result["threadId"] = record.ThreadID
result["appThreadKey"] = record.ThreadID
result["openclawSessionKey"] = record.SessionKey
result["resolvedGatewayProviderId"] = record.GatewayProviderID
result["startedAt"] = record.StartedAt.UTC().Format(time.RFC3339Nano)
result["deadlineAt"] = record.DeadlineAt.UTC().Format(time.RFC3339Nano)
}
sess.lastResult = cloneMap(result)
return result
}
func fallbackString(value string, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return strings.TrimSpace(value)
}

View File

@ -0,0 +1,154 @@
package acp
import (
"testing"
"time"
"xworkmate-bridge/internal/shared"
)
func newRunRegistryTestServer(deadline time.Time) (*Server, map[string]any) {
sess := &session{sessionID: "s1", threadID: "t1"}
sess.task.RunID = "run-1"
sess.task.SessionKey = "sk"
sess.task.GatewayProviderID = "openclaw"
sess.task.DeadlineAt = deadline
sess.openClaw = &OpenClawTaskRecord{
SessionID: "s1",
ThreadID: "t1",
TurnID: "turn-1",
RunID: "run-1",
SessionKey: "sk",
GatewayProviderID: "openclaw",
StartedAt: time.Now().Add(-time.Minute),
DeadlineAt: deadline,
}
srv := &Server{sessions: map[string]*session{"s1": sess}}
params := map[string]any{"sessionId": "s1", "runId": "run-1"}
return srv, params
}
func TestOpenClawTaskGetResultIsTerminal(t *testing.T) {
cases := []struct {
status string
want bool
}{
{"completed", true},
{"failed", true},
{"cancelled", true},
{"interrupted", true},
{"partially_delivered", true},
{"running", false},
{"syncing-artifacts", false},
{"queued", false},
{"", false},
}
for _, tc := range cases {
if got := openClawTaskGetResultIsTerminal(map[string]any{"status": tc.status}); got != tc.want {
t.Errorf("status=%q: got %v, want %v", tc.status, got, tc.want)
}
}
}
// T7: gateway 无法确认但 run 仍在预算内 -> 合成 running 句柄续轮询。
func TestGatewayUnconfirmedFallbackWithinBudgetKeepsPolling(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
t.Fatalf("status = %q, want running", status)
}
if !parseBool(got["transportDegraded"]) {
t.Fatalf("transportDegraded not set: %v", got)
}
if shared.StringArg(got, "runId", "") != "run-1" {
t.Fatalf("runId mismatch: %v", got["runId"])
}
}
// T9: run 超过 deadline 且 gateway 无法确认 -> 确定性 interrupted 终态。
func TestGatewayUnconfirmedFallbackPastDeadlineInterrupts(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(-time.Minute))
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
if status := shared.StringArg(got, "status", ""); status != "interrupted" {
t.Fatalf("status = %q, want interrupted", status)
}
if code := shared.StringArg(got, "code", ""); code != "OPENCLAW_RUN_DEADLINE_EXCEEDED" {
t.Fatalf("code = %q, want OPENCLAW_RUN_DEADLINE_EXCEEDED", code)
}
if parseBool(got["success"]) {
t.Fatalf("interrupted result must not be success")
}
sess := srv.findTaskSession(params)
if sess == nil || !sess.task.ProgressTerminal || sess.task.State != TaskStateFailed {
t.Fatalf("session terminal state not recorded: %+v", sess)
}
}
// T8: 已观察到的终态被缓存,且即使之后 gateway 不可达也优先返回缓存终态。
func TestTerminalResultCachedAndServedAfterGatewayLoss(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
terminal := map[string]any{
"ok": true,
"success": true,
"status": "completed",
"runId": "run-1",
"message": "done",
}
srv.cacheOpenClawTaskGetResultIfTerminal(params, terminal)
cached, ok := srv.cachedTerminalOpenClawResult(params)
if !ok {
t.Fatalf("expected cached terminal result")
}
if shared.StringArg(cached, "status", "") != "completed" {
t.Fatalf("cached status = %q, want completed", cached["status"])
}
// 即使 run 已过 deadline + gateway 丢失,也应优先返回缓存终态而非 interrupted。
sess := srv.findTaskSession(params)
sess.mu.Lock()
sess.task.DeadlineAt = time.Now().Add(-time.Hour)
sess.mu.Unlock()
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
if shared.StringArg(got, "status", "") != "completed" {
t.Fatalf("expected cached completed to win over deadline interrupt, got %v", got["status"])
}
}
// 同一 session 复用后,旧 run 的终态不得错配给新 runId 的查询。
func TestCachedTerminalNotServedForDifferentRunId(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{
"status": "completed", "success": true, "runId": "run-1",
})
// 新一轮查询带不同 runId -> 不应命中旧缓存。
newParams := map[string]any{"sessionId": "s1", "runId": "run-2"}
if _, ok := srv.cachedTerminalOpenClawResult(newParams); ok {
t.Fatalf("stale terminal for run-1 must not be served for run-2")
}
// 原 runId 仍应命中。
if _, ok := srv.cachedTerminalOpenClawResult(params); !ok {
t.Fatalf("terminal for run-1 should still be served for run-1")
}
}
// running 结果不应被当作终态缓存。
func TestRunningResultNotCachedAsTerminal(t *testing.T) {
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{"status": "running", "runId": "run-1"})
if _, ok := srv.cachedTerminalOpenClawResult(params); ok {
t.Fatalf("running result must not be cached as terminal")
}
}
// 无 per-session 记录时退回旧的 not_found 行为。
func TestGatewayUnconfirmedFallbackWithoutSessionReturnsNotFound(t *testing.T) {
srv := &Server{sessions: map[string]*session{}}
got := srv.openClawTaskGetGatewayUnconfirmedFallback(map[string]any{"sessionId": "missing"}, "X", "y")
if parseBool(got["ok"]) {
t.Fatalf("expected ok=false not_found, got %v", got)
}
if shared.StringArg(got, "status", "") != "not_found" {
t.Fatalf("status = %q, want not_found", got["status"])
}
}

View File

@ -417,6 +417,10 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask(
sess.task.DeadlineAt = record.DeadlineAt
sess.task.ProgressStage = "running"
sess.task.ProgressMessage = "OpenClaw task accepted"
// 新一轮 turn 复用同一 session 时,必须重置上一轮可能留下的终态标记,
// 否则持久 run 仓(T8)会把旧 runId 的终态错配给新 run。
sess.task.State = TaskStateRunning
sess.task.ProgressTerminal = false
sess.openClaw = record
running := openClawRunningTaskResult(record)
sess.lastResult = cloneMap(running)
@ -613,13 +617,19 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
func isOpenClawUnknownMethodError(errorPayload map[string]any, method string) bool {
message := strings.ToLower(strings.TrimSpace(shared.StringArg(errorPayload, "message", "")))
code := strings.ToUpper(strings.TrimSpace(shared.StringArg(errorPayload, "code", "")))
if message == "" {
return false
}
// 消息形如「unknown method: <method>」已明确指向「网关不认识该方法」,足以判定,
// 据此走 graceful fallback如 openClawFallbackSessionPreparePayload
//
// 注意:不能再用严格的 code 白名单来 gate。真实网关常以数字 JSON-RPC code
// (-32601 method not found / -32600 invalid request / -32002 等) 回传,
// 经 shared.StringArg(fmt.Sprint) 会被字符串化为 "-32601"/"-32002"
// 旧实现只接受 {"", INVALID_REQUEST, METHOD_NOT_FOUND},导致 fallback 失效、
// session.prepare 直接以 -32002 硬失败整轮任务。
return strings.Contains(message, "unknown method") &&
strings.Contains(message, strings.ToLower(strings.TrimSpace(method))) &&
(code == "" || code == "INVALID_REQUEST" || code == "METHOD_NOT_FOUND")
strings.Contains(message, strings.ToLower(strings.TrimSpace(method)))
}
func openClawFallbackSessionPreparePayload(params map[string]any) map[string]any {
@ -1677,11 +1687,16 @@ func applyOpenClawConstraintDeliveryStatus(result map[string]any) {
func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError {
if isOpenClawRetryableGatewayError(errorPayload) {
metricGatewaySocketClosedInc() // T12
// T10连接断属「可重试 / run 可能仍在后台、可续轮询」语义,而非 run 确实失败。
// 带 retryable/poll 提示,客户端据此降级为「后台续跑·重连中」(T5) 续轮询 tasks.get而非硬失败。
return &shared.RPCError{
Code: -32002,
Message: "OPENCLAW_GATEWAY_SOCKET_CLOSED: OpenClaw gateway connection closed during task execution",
Data: map[string]any{
"code": "OPENCLAW_GATEWAY_SOCKET_CLOSED",
"retryable": true,
"poll": true,
"originalCode": strings.TrimSpace(shared.StringArg(errorPayload, "code", "")),
"originalError": strings.TrimSpace(shared.StringArg(errorPayload, "message", "")),
},

View File

@ -132,6 +132,49 @@ func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceKeepsActiveRecordRunning
}
}
func TestExpectedArtifactDirectoriesDoNotBlockTerminalTaskState(t *testing.T) {
params := map[string]any{"expectedArtifactDirs": []any{"reports/", "artifacts/"}}
payload := map[string]any{
"success": true,
"status": string(TaskStateCompleted),
"artifactScope": "tasks/session/run",
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
"expectedArtifactDirs": []any{
"reports/",
"artifacts/",
},
}
if openClawTaskGetRequiresArtifactExport(params, payload) {
t.Fatal("expectedArtifactDirs must remain non-blocking scan hints")
}
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
if status := shared.StringArg(got, "status", ""); status != string(TaskStateCompleted) {
t.Fatalf("expected terminal status to remain completed, got %#v", got)
}
if parseBool(got["pending"]) {
t.Fatalf("expected terminal payload not to become pending, got %#v", got)
}
}
func TestRequiredArtifactExtensionsStillBlockUntilVerified(t *testing.T) {
params := map[string]any{"requiredArtifactExtensions": []any{"md"}}
payload := map[string]any{
"success": true,
"status": string(TaskStateCompleted),
"artifactScope": "tasks/session/run",
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
}
if !openClawTaskGetRequiresArtifactExport(params, payload) {
t.Fatal("requiredArtifactExtensions must remain a blocking delivery contract")
}
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
t.Fatalf("expected missing required artifact to remain syncing, got %#v", got)
}
}
func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceFailsAfterDeadlineWithoutRequiredArtifacts(t *testing.T) {
payload := map[string]any{
"success": false,
@ -282,3 +325,50 @@ func TestTaskGetArtifactExportReceivesRequiredArtifactExtensions(t *testing.T) {
t.Fatalf("expected expectedFileCountByExtension to reach export, got %#v", exportParams)
}
}
func TestIsOpenClawUnknownMethodErrorAcceptsNumericGatewayCodes(t *testing.T) {
const method = "xworkmate.session.prepare"
cases := []struct {
name string
payload map[string]any
want bool
}{
{
name: "string invalid_request code",
payload: map[string]any{"code": "INVALID_REQUEST", "message": "unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "numeric -32002 (real gateway shape that previously hard-failed)",
payload: map[string]any{"code": float64(-32002), "message": "unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "numeric -32601 method not found",
payload: map[string]any{"code": float64(-32601), "message": "Unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "empty code",
payload: map[string]any{"message": "unknown method: xworkmate.session.prepare"},
want: true,
},
{
name: "unrelated error must not be swallowed",
payload: map[string]any{"code": float64(-32002), "message": "gateway socket closed"},
want: false,
},
{
name: "unknown method for a different method name",
payload: map[string]any{"code": float64(-32601), "message": "unknown method: chat.send"},
want: false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := isOpenClawUnknownMethodError(tc.payload, method); got != tc.want {
t.Fatalf("isOpenClawUnknownMethodError(%v) = %v, want %v", tc.payload, got, tc.want)
}
})
}
}

View File

@ -132,13 +132,13 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
if gatewayProvider == "" {
gatewayProvider = "openclaw"
}
// T7/T8: 一旦观察到终态就从持久 run 仓返回,避免之后 gateway 查不到导致结果丢失。
if cached, ok := s.cachedTerminalOpenClawResult(params); ok {
return cached
}
if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil {
return map[string]any{
"ok": false,
"status": "not_found",
"code": "GATEWAY_UNAVAILABLE",
"message": rpcErr.Message,
}
// T7/T9: gateway 不可达时按持久 run 仓兜底(续轮询 / deadline 终态),而非裸 not_found。
return s.openClawTaskGetGatewayUnconfirmedFallback(params, "GATEWAY_UNAVAILABLE", rpcErr.Message)
}
result := s.gateway.RequestByMode(
gatewayProvider,
@ -162,16 +162,15 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
}
s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID)
stripOpenClawArtifactInlineContent(payload)
// T8: 缓存「最终客户端可见形态」(已 decorate 下载 URL + strip 内联内容),
// 这样从缓存回放时与正常路径完全一致。
s.cacheOpenClawTaskGetResultIfTerminal(params, payload)
return payload
}
// T7/T9: gateway 返回错误socket closed / not_found / lookup failed时同样走持久 run 仓兜底。
message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed"))
code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED"))
return map[string]any{
"ok": false,
"status": "not_found",
"code": code,
"message": message,
}
return s.openClawTaskGetGatewayUnconfirmedFallback(params, code, message)
}
func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any {
@ -472,9 +471,11 @@ func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[st
if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) {
return true
}
return len(shared.ListArg(params, "expectedArtifactDirs")) > 0 ||
len(shared.ListArg(payload, "expectedArtifactDirs")) > 0 ||
len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
// expectedArtifactDirs are discovery hints for the plugin's workspace-root
// scan. They do not prove that the caller requires a file before the run can
// reach a terminal state. Treating them as a blocking contract turns a
// failed/no-output agent run into an endless "syncing-artifacts" loop.
return len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
len(shared.ListArg(payload, "requiredArtifactExtensions")) > 0
}