Compare commits
15 Commits
runtime-40
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 4fc5e380f2 | |||
|
|
188ca4ba4a | ||
|
|
0a50621664 | ||
|
|
81f65e3308 | ||
|
|
fa9cc78add | ||
|
|
02808934c8 | ||
|
|
3c7de420d2 | ||
|
|
2333c3e5fd | ||
|
|
e22d0f1cbf | ||
| 311db31e03 | |||
|
|
c7b2f8ee3a | ||
|
|
b8f4ed6102 | ||
|
|
9a8dd2dfe1 | ||
|
|
6a405a00e5 | ||
|
|
28a7eb3343 |
14
.github/workflows/pipeline.yml
vendored
14
.github/workflows/pipeline.yml
vendored
@ -56,6 +56,7 @@ jobs:
|
|||||||
- name: Load Vault secrets
|
- name: Load Vault secrets
|
||||||
id: vault
|
id: vault
|
||||||
if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository }}
|
if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository }}
|
||||||
|
continue-on-error: true
|
||||||
uses: hashicorp/vault-action@v2
|
uses: hashicorp/vault-action@v2
|
||||||
with:
|
with:
|
||||||
url: ${{ env.VAULT_ADDR }}
|
url: ${{ env.VAULT_ADDR }}
|
||||||
@ -67,7 +68,7 @@ jobs:
|
|||||||
kv/data/github-actions/xworkmate-bridge AI_WORKSPACE_AUTH_TOKEN | AI_WORKSPACE_AUTH_TOKEN
|
kv/data/github-actions/xworkmate-bridge AI_WORKSPACE_AUTH_TOKEN | AI_WORKSPACE_AUTH_TOKEN
|
||||||
|
|
||||||
- name: Export bridge auth token
|
- name: Export bridge auth token
|
||||||
if: ${{ steps.vault.outcome == 'success' }}
|
if: ${{ steps.vault.outcome == 'success' && steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN != '' }}
|
||||||
run: echo "AI_WORKSPACE_AUTH_TOKEN=${{ steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN }}" >> "$GITHUB_ENV"
|
run: echo "AI_WORKSPACE_AUTH_TOKEN=${{ steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN }}" >> "$GITHUB_ENV"
|
||||||
|
|
||||||
- name: Probe current production bridge
|
- name: Probe current production bridge
|
||||||
@ -75,6 +76,17 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
BRIDGE_SERVER_URL: https://xworkmate-bridge.svc.plus
|
BRIDGE_SERVER_URL: https://xworkmate-bridge.svc.plus
|
||||||
run: |
|
run: |
|
||||||
|
if [[ -z "${AI_WORKSPACE_AUTH_TOKEN:-}" ]]; then
|
||||||
|
echo "::notice title=Production state skipped::AI_WORKSPACE_AUTH_TOKEN is unavailable from Vault; continuing without production bridge metadata."
|
||||||
|
{
|
||||||
|
echo "production_image="
|
||||||
|
echo "production_tag="
|
||||||
|
echo "production_commit="
|
||||||
|
echo "production_version="
|
||||||
|
} >> "$GITHUB_OUTPUT"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
while IFS='=' read -r key value; do
|
while IFS='=' read -r key value; do
|
||||||
echo "${key}=${value}" >> "$GITHUB_OUTPUT"
|
echo "${key}=${value}" >> "$GITHUB_OUTPUT"
|
||||||
done < <(bash ./scripts/github-actions/report-production-state.sh "${BRIDGE_SERVER_URL}")
|
done < <(bash ./scripts/github-actions/report-production-state.sh "${BRIDGE_SERVER_URL}")
|
||||||
|
|||||||
24
.github/workflows/runtime-release.yml
vendored
24
.github/workflows/runtime-release.yml
vendored
@ -19,11 +19,12 @@ concurrency:
|
|||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
name: Build linux-${{ matrix.arch }}
|
name: Build ${{ matrix.os }}-${{ matrix.arch }}
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
|
os: [linux, darwin]
|
||||||
arch: [amd64, arm64]
|
arch: [amd64, arm64]
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
@ -38,12 +39,13 @@ jobs:
|
|||||||
|
|
||||||
- name: Build runtime asset
|
- name: Build runtime asset
|
||||||
env:
|
env:
|
||||||
|
TARGET_OS: ${{ matrix.os }}
|
||||||
TARGET_ARCH: ${{ matrix.arch }}
|
TARGET_ARCH: ${{ matrix.arch }}
|
||||||
run: |
|
run: |
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
root="dist/runtime/xworkmate-bridge"
|
root="dist/runtime/xworkmate-bridge"
|
||||||
mkdir -p "${root}/bin" dist/assets
|
mkdir -p "${root}/bin" dist/assets
|
||||||
CGO_ENABLED=0 GOOS=linux GOARCH="${TARGET_ARCH}" \
|
CGO_ENABLED=0 GOOS="${TARGET_OS}" GOARCH="${TARGET_ARCH}" \
|
||||||
go build -buildvcs=false -trimpath \
|
go build -buildvcs=false -trimpath \
|
||||||
-ldflags "-X main.buildCommit=${GITHUB_SHA}" \
|
-ldflags "-X main.buildCommit=${GITHUB_SHA}" \
|
||||||
-o "${root}/bin/xworkmate-go-core" .
|
-o "${root}/bin/xworkmate-go-core" .
|
||||||
@ -51,21 +53,27 @@ jobs:
|
|||||||
{
|
{
|
||||||
"component": "xworkmate-bridge",
|
"component": "xworkmate-bridge",
|
||||||
"commit": "${GITHUB_SHA}",
|
"commit": "${GITHUB_SHA}",
|
||||||
"os": "linux",
|
"os": "${TARGET_OS}",
|
||||||
"arch": "${TARGET_ARCH}",
|
"arch": "${TARGET_ARCH}",
|
||||||
"binary": "bin/xworkmate-go-core"
|
"binary": "bin/xworkmate-go-core"
|
||||||
}
|
}
|
||||||
JSON
|
JSON
|
||||||
tar -czf "dist/assets/xworkmate-bridge-linux-${TARGET_ARCH}.tar.gz" \
|
tar -czf "dist/assets/xworkmate-bridge-${TARGET_OS}-${TARGET_ARCH}.tar.gz" \
|
||||||
-C dist/runtime xworkmate-bridge
|
-C dist/runtime xworkmate-bridge
|
||||||
(
|
(
|
||||||
cd dist/assets
|
cd dist/assets
|
||||||
sha256sum -- ./*.tar.gz | sed 's# \./# #' > "SHA256SUMS-${TARGET_ARCH}"
|
# Name the per-job checksum file by OS *and* ARCH. Keying on ARCH
|
||||||
|
# alone makes the linux/darwin jobs of the same arch both emit
|
||||||
|
# SHA256SUMS-<arch>, which then clobber each other under the
|
||||||
|
# publish job's merge-multiple download — leaving SHA256SUMS with
|
||||||
|
# only 2 of the 4 platforms and breaking arm64 (and darwin-amd64)
|
||||||
|
# consumers with "missing checksum".
|
||||||
|
sha256sum -- ./*.tar.gz | sed 's# \./# #' > "SHA256SUMS-${TARGET_OS}-${TARGET_ARCH}"
|
||||||
)
|
)
|
||||||
|
|
||||||
- uses: actions/upload-artifact@v4
|
- uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: xworkmate-bridge-linux-${{ matrix.arch }}
|
name: xworkmate-bridge-${{ matrix.os }}-${{ matrix.arch }}
|
||||||
path: |
|
path: |
|
||||||
dist/assets/*.tar.gz
|
dist/assets/*.tar.gz
|
||||||
dist/assets/SHA256SUMS-*
|
dist/assets/SHA256SUMS-*
|
||||||
@ -77,7 +85,7 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- uses: actions/download-artifact@v4
|
- uses: actions/download-artifact@v4
|
||||||
with:
|
with:
|
||||||
pattern: xworkmate-bridge-linux-*
|
pattern: xworkmate-bridge-*
|
||||||
path: dist
|
path: dist
|
||||||
merge-multiple: true
|
merge-multiple: true
|
||||||
|
|
||||||
@ -97,5 +105,5 @@ jobs:
|
|||||||
--repo "${GITHUB_REPOSITORY}" \
|
--repo "${GITHUB_REPOSITORY}" \
|
||||||
--target "${GITHUB_SHA}" \
|
--target "${GITHUB_SHA}" \
|
||||||
--title "XWorkmate Bridge runtime ${GITHUB_SHA::12}" \
|
--title "XWorkmate Bridge runtime ${GITHUB_SHA::12}" \
|
||||||
--notes "Prebuilt Linux bridge binaries. No target-host Go build is required."
|
--notes "Prebuilt bridge binaries. No target-host Go build is required."
|
||||||
fi
|
fi
|
||||||
|
|||||||
44
.github/workflows/validate-release-pr.yml
vendored
Normal file
44
.github/workflows/validate-release-pr.yml
vendored
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
name: Validate Release PR
|
||||||
|
|
||||||
|
# release/* 分支的发布策略门禁:仅接受 hotfix/* 或带 cherry-pick/backport 标签的 PR。
|
||||||
|
# 详见 iac_modules/docs/tldr-github-branch-model.md
|
||||||
|
on:
|
||||||
|
pull_request_target:
|
||||||
|
types: [opened, synchronize, reopened, labeled, unlabeled]
|
||||||
|
|
||||||
|
permissions:
|
||||||
|
contents: read
|
||||||
|
pull-requests: read
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
validate-release-source:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
if: startsWith(github.base_ref, 'release/')
|
||||||
|
steps:
|
||||||
|
- name: Check PR source branch
|
||||||
|
run: |
|
||||||
|
SRC="${{ github.head_ref }}"
|
||||||
|
TGT="${{ github.base_ref }}"
|
||||||
|
LABELS="${{ join(github.event.pull_request.labels.*.name, ',') }}"
|
||||||
|
|
||||||
|
echo "🔍 Validating PR into release branch"
|
||||||
|
echo " source: $SRC"
|
||||||
|
echo " target: $TGT"
|
||||||
|
echo " labels: $LABELS"
|
||||||
|
|
||||||
|
if [[ "$SRC" =~ ^hotfix/ ]]; then
|
||||||
|
echo "✅ Allowed: hotfix/* branch"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ "$LABELS" =~ (^|,)(cherry-pick|backport)(,|$) ]]; then
|
||||||
|
echo "✅ Allowed: cherry-pick/backport labeled PR"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "❌ Rejected."
|
||||||
|
echo "release/* 仅接受:"
|
||||||
|
echo " - 来自 hotfix/* 的 PR"
|
||||||
|
echo " - 带 cherry-pick 或 backport 标签的 PR(已验证 feature 的 backport/cherry-pick)"
|
||||||
|
echo "禁止从 main / develop / feature/* 直接合并到 release/*。"
|
||||||
|
exit 1
|
||||||
@ -92,20 +92,7 @@ func handleGatewayConnect(
|
|||||||
server.gateway = gatewayruntime.NewManager()
|
server.gateway = gatewayruntime.NewManager()
|
||||||
}
|
}
|
||||||
|
|
||||||
result := server.gateway.Connect(request, notify)
|
result := connectOpenClawGateway(server.gateway, request, notify, usesBridgeIdentity)
|
||||||
if usesBridgeIdentity && shouldRetryOpenClawGatewayWithSharedToken(result) {
|
|
||||||
clearBridgeGatewayDeviceToken()
|
|
||||||
request.Auth.DeviceToken = ""
|
|
||||||
request.HasDeviceToken = false
|
|
||||||
request.Auth.Token = bridgeSharedAuthToken()
|
|
||||||
request.HasSharedAuth = strings.TrimSpace(request.Auth.Token) != ""
|
|
||||||
if request.HasSharedAuth {
|
|
||||||
request.ConnectAuthMode = "shared-token"
|
|
||||||
request.ConnectAuthFields = []string{"token"}
|
|
||||||
request.ConnectAuthSources = []string{"bridge:repair"}
|
|
||||||
result = server.gateway.Connect(request, notify)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if result.OK && usesBridgeIdentity {
|
if result.OK && usesBridgeIdentity {
|
||||||
saveBridgeGatewayDeviceToken(result.ReturnedDeviceToken)
|
saveBridgeGatewayDeviceToken(result.ReturnedDeviceToken)
|
||||||
}
|
}
|
||||||
@ -297,20 +284,7 @@ func ensureProductionGatewayConnected(
|
|||||||
request.Auth.DeviceToken = deviceToken
|
request.Auth.DeviceToken = deviceToken
|
||||||
request.HasDeviceToken = deviceToken != ""
|
request.HasDeviceToken = deviceToken != ""
|
||||||
request.ReportedRemoteAddress = resolveGatewayReportedRemoteAddress(server, request)
|
request.ReportedRemoteAddress = resolveGatewayReportedRemoteAddress(server, request)
|
||||||
result := server.gateway.Connect(request, notify)
|
result := connectOpenClawGateway(server.gateway, request, notify, true)
|
||||||
if shouldRetryOpenClawGatewayWithSharedToken(result) {
|
|
||||||
clearBridgeGatewayDeviceToken()
|
|
||||||
request.Auth.DeviceToken = ""
|
|
||||||
request.HasDeviceToken = false
|
|
||||||
request.Auth.Token = bridgeSharedAuthToken()
|
|
||||||
request.HasSharedAuth = strings.TrimSpace(request.Auth.Token) != ""
|
|
||||||
if request.HasSharedAuth {
|
|
||||||
request.ConnectAuthMode = "shared-token"
|
|
||||||
request.ConnectAuthFields = []string{"token"}
|
|
||||||
request.ConnectAuthSources = []string{"bridge:repair"}
|
|
||||||
result = server.gateway.Connect(request, notify)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if result.OK {
|
if result.OK {
|
||||||
saveBridgeGatewayDeviceToken(result.ReturnedDeviceToken)
|
saveBridgeGatewayDeviceToken(result.ReturnedDeviceToken)
|
||||||
return nil
|
return nil
|
||||||
@ -323,6 +297,28 @@ func ensureProductionGatewayConnected(
|
|||||||
return &shared.RPCError{Code: -32002, Message: "GATEWAY_CONNECT_FAILED: " + message}
|
return &shared.RPCError{Code: -32002, Message: "GATEWAY_CONNECT_FAILED: " + message}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func connectOpenClawGateway(
|
||||||
|
manager *gatewayruntime.Manager,
|
||||||
|
request gatewayruntime.ConnectRequest,
|
||||||
|
notify func(map[string]any),
|
||||||
|
usesBridgeIdentity bool,
|
||||||
|
) gatewayruntime.ConnectResult {
|
||||||
|
result := manager.Connect(request, notify)
|
||||||
|
if !usesBridgeIdentity || !shouldRetryOpenClawGatewayWithSharedToken(result) {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
clearBridgeGatewayDeviceToken()
|
||||||
|
request.Auth.DeviceToken = ""
|
||||||
|
request.HasDeviceToken = false
|
||||||
|
request.Auth.Token = bridgeSharedAuthToken()
|
||||||
|
request.HasSharedAuth = true
|
||||||
|
request.ConnectAuthMode = "shared-token"
|
||||||
|
request.ConnectAuthFields = []string{"token"}
|
||||||
|
request.ConnectAuthSources = []string{"bridge:device-token-reissue"}
|
||||||
|
return manager.Connect(request, notify)
|
||||||
|
}
|
||||||
|
|
||||||
func shouldRetryOpenClawGatewayWithSharedToken(result gatewayruntime.ConnectResult) bool {
|
func shouldRetryOpenClawGatewayWithSharedToken(result gatewayruntime.ConnectResult) bool {
|
||||||
if result.OK || strings.TrimSpace(bridgeSharedAuthToken()) == "" {
|
if result.OK || strings.TrimSpace(bridgeSharedAuthToken()) == "" {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@ -37,6 +37,7 @@ func (s *Server) Handler() http.Handler {
|
|||||||
"commit": info.Commit,
|
"commit": info.Commit,
|
||||||
"version": info.Version,
|
"version": info.Version,
|
||||||
"buildDate": info.BuildDate,
|
"buildDate": info.BuildDate,
|
||||||
|
"metrics": bridgeStabilityMetricsSnapshot(), // T12
|
||||||
}
|
}
|
||||||
body, _ := json.Marshal(resp)
|
body, _ := json.Marshal(resp)
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|||||||
29
internal/acp/metrics.go
Normal file
29
internal/acp/metrics.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package acp
|
||||||
|
|
||||||
|
import "sync/atomic"
|
||||||
|
|
||||||
|
// 关键稳定性指标(T12,docs/cases/06 §5)。
|
||||||
|
//
|
||||||
|
// 进程内累计计数,经 /api/ping 暴露,用于把「网关抖动 / run 超时」从靠用户截图
|
||||||
|
// 变为可监控。三个计数对应三类已知的不稳定来源:
|
||||||
|
// - gatewaySocketClosed : gatewayRPCError 命中 OPENCLAW_GATEWAY_SOCKET_CLOSED(连接断)
|
||||||
|
// - taskGetUnconfirmedFallback: tasks.get 走持久 run 仓兜底(gateway 无法确认 run,T7)
|
||||||
|
// - runDeadlineInterrupt : run 超过 DeadlineAt 且 gateway 无法确认,回 interrupted(T9)
|
||||||
|
var bridgeStabilityMetrics struct {
|
||||||
|
gatewaySocketClosed atomic.Int64
|
||||||
|
taskGetUnconfirmedFallback atomic.Int64
|
||||||
|
runDeadlineInterrupt atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricGatewaySocketClosedInc() { bridgeStabilityMetrics.gatewaySocketClosed.Add(1) }
|
||||||
|
func metricTaskGetUnconfirmedFallbackInc() { bridgeStabilityMetrics.taskGetUnconfirmedFallback.Add(1) }
|
||||||
|
func metricRunDeadlineInterruptInc() { bridgeStabilityMetrics.runDeadlineInterrupt.Add(1) }
|
||||||
|
|
||||||
|
// bridgeStabilityMetricsSnapshot 返回当前计数快照,供 /api/ping 输出。
|
||||||
|
func bridgeStabilityMetricsSnapshot() map[string]any {
|
||||||
|
return map[string]any{
|
||||||
|
"gatewaySocketClosed": bridgeStabilityMetrics.gatewaySocketClosed.Load(),
|
||||||
|
"taskGetUnconfirmedFallback": bridgeStabilityMetrics.taskGetUnconfirmedFallback.Load(),
|
||||||
|
"runDeadlineInterrupt": bridgeStabilityMetrics.runDeadlineInterrupt.Load(),
|
||||||
|
}
|
||||||
|
}
|
||||||
197
internal/acp/openclaw_run_registry.go
Normal file
197
internal/acp/openclaw_run_registry.go
Normal file
@ -0,0 +1,197 @@
|
|||||||
|
package acp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"xworkmate-bridge/internal/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
// 持久 run 仓 / run 关联与 WS 解耦(T7/T8/T9)。
|
||||||
|
//
|
||||||
|
// 背景:OpenClaw gateway turn 采用异步模型——chat.send 快速返回 runId,bridge 把
|
||||||
|
// run 记录(sess.openClaw)、预算(sess.task.DeadlineAt)、运行句柄(sess.lastResult)
|
||||||
|
// 落在「按 sessionID 维度」的 per-session store 里(s.sessions),其生命周期独立于
|
||||||
|
// bridge↔gateway 的 WebSocket 连接。客户端随后轮询 tasks.get。
|
||||||
|
//
|
||||||
|
// 此前 tasks.get 每次都强依赖 gateway 应答:一旦 WS 抖动 / 重连后 run 内存态丢失,
|
||||||
|
// tasks.get 回 not_found 或 socket_closed,已完成的结果就此丢失,客户端要么硬失败、
|
||||||
|
// 要么(修复前)无限轮询。下列辅助把 tasks.get 改造为「优先用持久 run 仓兜底」:
|
||||||
|
//
|
||||||
|
// T8 已观察到的终态结果缓存进 sess.lastResult,gateway 之后查不到也不丢;
|
||||||
|
// T7 gateway 暂时无法确认(unavailable / socket closed / not_found)但 run 仍在预算内时,
|
||||||
|
// 合成一个 running 句柄让客户端继续轮询,跨越瞬时抖动(与 WS 生命周期解耦);
|
||||||
|
// T9 run 超过 DeadlineAt 且 gateway 仍无法确认时,回确定性的 interrupted 终态。
|
||||||
|
|
||||||
|
// openClawTaskGetResultIsTerminal 判断一个 tasks.get 结果是否表示 run 已结束。
|
||||||
|
// 注意:仍在 artifact 同步中的结果会被 normalizeOpenClawTaskGetResult 重写为 status=running,
|
||||||
|
// 因此这里只认显式终态,不会把「同步中」误判为终态。
|
||||||
|
func openClawTaskGetResultIsTerminal(payload map[string]any) bool {
|
||||||
|
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
|
||||||
|
case string(TaskStateCompleted), string(TaskStateFailed), string(TaskStateCancelled),
|
||||||
|
"interrupted", "partially_delivered":
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheOpenClawTaskGetResultIfTerminal 把一次 gateway 确认的终态结果落进 per-session 持久 run 仓(T8)。
|
||||||
|
func (s *Server) cacheOpenClawTaskGetResultIfTerminal(params map[string]any, payload map[string]any) {
|
||||||
|
if len(payload) == 0 || !openClawTaskGetResultIsTerminal(payload) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sess := s.findTaskSession(params)
|
||||||
|
if sess == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sess.mu.Lock()
|
||||||
|
defer sess.mu.Unlock()
|
||||||
|
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
|
||||||
|
case string(TaskStateFailed):
|
||||||
|
sess.task.State = TaskStateFailed
|
||||||
|
case string(TaskStateCancelled):
|
||||||
|
sess.task.State = TaskStateCancelled
|
||||||
|
default:
|
||||||
|
sess.task.State = TaskStateCompleted
|
||||||
|
}
|
||||||
|
sess.task.ProgressTerminal = true
|
||||||
|
sess.task.ProgressStage = strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", "")))
|
||||||
|
sess.task.UpdatedAt = time.Now()
|
||||||
|
sess.lastResult = cloneMap(payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// cachedTerminalOpenClawResult 返回某 run 此前已观察到的终态结果(若有)(T7/T8)。
|
||||||
|
func (s *Server) cachedTerminalOpenClawResult(params map[string]any) (map[string]any, bool) {
|
||||||
|
sess := s.findTaskSession(params)
|
||||||
|
if sess == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
sess.mu.Lock()
|
||||||
|
defer sess.mu.Unlock()
|
||||||
|
return cachedTerminalForRunLocked(sess, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
// cachedTerminalForRunLocked 仅当缓存终态确实属于「本次请求的 runId」时才命中,
|
||||||
|
// 防止同一 session 复用后把旧 run 的终态错配给新 run。调用方须持有 sess.mu。
|
||||||
|
func cachedTerminalForRunLocked(sess *session, params map[string]any) (map[string]any, bool) {
|
||||||
|
if !sess.task.ProgressTerminal || len(sess.lastResult) == 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if !openClawTaskGetResultIsTerminal(sess.lastResult) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
requestedRun := strings.TrimSpace(shared.StringArg(params, "runId", ""))
|
||||||
|
if requestedRun == "" {
|
||||||
|
requestedRun = strings.TrimSpace(shared.StringArg(params, "taskId", ""))
|
||||||
|
}
|
||||||
|
if requestedRun != "" {
|
||||||
|
cachedRun := firstNonEmptyString(sess.lastResult, "runId", "taskId")
|
||||||
|
if cachedRun != "" && !strings.EqualFold(cachedRun, requestedRun) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cloneMap(sess.lastResult), true
|
||||||
|
}
|
||||||
|
|
||||||
|
// openClawTaskGetGatewayUnconfirmedFallback 在 gateway 无法确认 run 时,用持久 run 仓兜底(T7/T9):
|
||||||
|
// - 已有缓存终态 -> 直接返回;
|
||||||
|
// - run 仍在预算内 -> 合成 running 句柄,客户端继续轮询,跨越瞬时抖动;
|
||||||
|
// - run 超过 deadline -> 回确定性 interrupted 终态。
|
||||||
|
//
|
||||||
|
// 没有任何 per-session 记录时退回旧行为(not_found),不改变无状态查询的语义。
|
||||||
|
func (s *Server) openClawTaskGetGatewayUnconfirmedFallback(params map[string]any, code string, message string) map[string]any {
|
||||||
|
notFound := func() map[string]any {
|
||||||
|
return map[string]any{
|
||||||
|
"ok": false,
|
||||||
|
"status": "not_found",
|
||||||
|
"code": fallbackString(code, "TASK_LOOKUP_FAILED"),
|
||||||
|
"message": fallbackString(message, "openclaw native task lookup failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sess := s.findTaskSession(params)
|
||||||
|
if sess == nil {
|
||||||
|
return notFound()
|
||||||
|
}
|
||||||
|
sess.mu.Lock()
|
||||||
|
defer sess.mu.Unlock()
|
||||||
|
if cached, ok := cachedTerminalForRunLocked(sess, params); ok {
|
||||||
|
return cached
|
||||||
|
}
|
||||||
|
if sess.openClaw == nil {
|
||||||
|
return notFound()
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
if !sess.task.DeadlineAt.IsZero() && now.After(sess.task.DeadlineAt) {
|
||||||
|
return s.markOpenClawRunDeadlineInterruptedLocked(sess, code, message)
|
||||||
|
}
|
||||||
|
// 仍在预算内:合成 running 句柄让客户端继续轮询,不因一次瞬时抖动硬失败。
|
||||||
|
metricTaskGetUnconfirmedFallbackInc() // T12
|
||||||
|
running := openClawRunningTaskResult(sess.openClaw)
|
||||||
|
running["transportDegraded"] = true
|
||||||
|
if strings.TrimSpace(code) != "" {
|
||||||
|
running["transportDegradedCode"] = strings.TrimSpace(code)
|
||||||
|
}
|
||||||
|
// T11:带 runId 的日志,便于与 App / 插件 / 网关四层按 runId 串联。
|
||||||
|
log.Printf("level=warn component=openclaw_run_registry event=tasks_get_unconfirmed_fallback runId=%q openclawSessionKey=%q code=%q",
|
||||||
|
sess.openClaw.RunID, sess.openClaw.SessionKey, strings.TrimSpace(code))
|
||||||
|
sess.lastResult = cloneMap(running)
|
||||||
|
return running
|
||||||
|
}
|
||||||
|
|
||||||
|
// markOpenClawRunDeadlineInterruptedLocked 为「超过预算且 gateway 无法确认」的 run 生成确定性
|
||||||
|
// interrupted 终态(T9)。调用方须持有 sess.mu。
|
||||||
|
func (s *Server) markOpenClawRunDeadlineInterruptedLocked(sess *session, code string, message string) map[string]any {
|
||||||
|
now := time.Now()
|
||||||
|
sess.task.State = TaskStateFailed
|
||||||
|
sess.task.ProgressTerminal = true
|
||||||
|
sess.task.ProgressStage = "interrupted"
|
||||||
|
sess.task.ProgressMessage = "OpenClaw run exceeded its budget and could not be confirmed"
|
||||||
|
sess.task.UpdatedAt = now
|
||||||
|
metricRunDeadlineInterruptInc() // T12
|
||||||
|
// T11:带 runId 的终态日志。
|
||||||
|
if sess.openClaw != nil {
|
||||||
|
log.Printf("level=warn component=openclaw_run_registry event=run_deadline_interrupt runId=%q openclawSessionKey=%q deadlineAt=%q code=%q",
|
||||||
|
sess.openClaw.RunID, sess.openClaw.SessionKey,
|
||||||
|
sess.openClaw.DeadlineAt.UTC().Format(time.RFC3339Nano), strings.TrimSpace(code))
|
||||||
|
}
|
||||||
|
|
||||||
|
result := map[string]any{
|
||||||
|
"ok": true,
|
||||||
|
"success": false,
|
||||||
|
"status": "interrupted",
|
||||||
|
"event": "interrupted",
|
||||||
|
"pending": false,
|
||||||
|
"code": "OPENCLAW_RUN_DEADLINE_EXCEEDED",
|
||||||
|
"artifactSyncStatus": "interrupted",
|
||||||
|
"message": "OpenClaw 任务超过预算上限且网关无法确认结果,已结束本轮等待。任务可能已在后台完成,请重新发送请求以拿回结果。",
|
||||||
|
"artifacts": []any{},
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(code) != "" {
|
||||||
|
result["gatewayUnconfirmedCode"] = strings.TrimSpace(code)
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(message) != "" {
|
||||||
|
result["gatewayUnconfirmedMessage"] = strings.TrimSpace(message)
|
||||||
|
}
|
||||||
|
if record := sess.openClaw; record != nil {
|
||||||
|
result["runId"] = record.RunID
|
||||||
|
result["taskId"] = record.RunID
|
||||||
|
result["turnId"] = record.TurnID
|
||||||
|
result["sessionId"] = record.SessionID
|
||||||
|
result["threadId"] = record.ThreadID
|
||||||
|
result["appThreadKey"] = record.ThreadID
|
||||||
|
result["openclawSessionKey"] = record.SessionKey
|
||||||
|
result["resolvedGatewayProviderId"] = record.GatewayProviderID
|
||||||
|
result["startedAt"] = record.StartedAt.UTC().Format(time.RFC3339Nano)
|
||||||
|
result["deadlineAt"] = record.DeadlineAt.UTC().Format(time.RFC3339Nano)
|
||||||
|
}
|
||||||
|
sess.lastResult = cloneMap(result)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func fallbackString(value string, fallback string) string {
|
||||||
|
if strings.TrimSpace(value) == "" {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(value)
|
||||||
|
}
|
||||||
154
internal/acp/openclaw_run_registry_test.go
Normal file
154
internal/acp/openclaw_run_registry_test.go
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
package acp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"xworkmate-bridge/internal/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newRunRegistryTestServer(deadline time.Time) (*Server, map[string]any) {
|
||||||
|
sess := &session{sessionID: "s1", threadID: "t1"}
|
||||||
|
sess.task.RunID = "run-1"
|
||||||
|
sess.task.SessionKey = "sk"
|
||||||
|
sess.task.GatewayProviderID = "openclaw"
|
||||||
|
sess.task.DeadlineAt = deadline
|
||||||
|
sess.openClaw = &OpenClawTaskRecord{
|
||||||
|
SessionID: "s1",
|
||||||
|
ThreadID: "t1",
|
||||||
|
TurnID: "turn-1",
|
||||||
|
RunID: "run-1",
|
||||||
|
SessionKey: "sk",
|
||||||
|
GatewayProviderID: "openclaw",
|
||||||
|
StartedAt: time.Now().Add(-time.Minute),
|
||||||
|
DeadlineAt: deadline,
|
||||||
|
}
|
||||||
|
srv := &Server{sessions: map[string]*session{"s1": sess}}
|
||||||
|
params := map[string]any{"sessionId": "s1", "runId": "run-1"}
|
||||||
|
return srv, params
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenClawTaskGetResultIsTerminal(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
status string
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{"completed", true},
|
||||||
|
{"failed", true},
|
||||||
|
{"cancelled", true},
|
||||||
|
{"interrupted", true},
|
||||||
|
{"partially_delivered", true},
|
||||||
|
{"running", false},
|
||||||
|
{"syncing-artifacts", false},
|
||||||
|
{"queued", false},
|
||||||
|
{"", false},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
if got := openClawTaskGetResultIsTerminal(map[string]any{"status": tc.status}); got != tc.want {
|
||||||
|
t.Errorf("status=%q: got %v, want %v", tc.status, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// T7: gateway 无法确认但 run 仍在预算内 -> 合成 running 句柄续轮询。
|
||||||
|
func TestGatewayUnconfirmedFallbackWithinBudgetKeepsPolling(t *testing.T) {
|
||||||
|
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||||
|
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
|
||||||
|
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
|
||||||
|
t.Fatalf("status = %q, want running", status)
|
||||||
|
}
|
||||||
|
if !parseBool(got["transportDegraded"]) {
|
||||||
|
t.Fatalf("transportDegraded not set: %v", got)
|
||||||
|
}
|
||||||
|
if shared.StringArg(got, "runId", "") != "run-1" {
|
||||||
|
t.Fatalf("runId mismatch: %v", got["runId"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// T9: run 超过 deadline 且 gateway 无法确认 -> 确定性 interrupted 终态。
|
||||||
|
func TestGatewayUnconfirmedFallbackPastDeadlineInterrupts(t *testing.T) {
|
||||||
|
srv, params := newRunRegistryTestServer(time.Now().Add(-time.Minute))
|
||||||
|
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
|
||||||
|
if status := shared.StringArg(got, "status", ""); status != "interrupted" {
|
||||||
|
t.Fatalf("status = %q, want interrupted", status)
|
||||||
|
}
|
||||||
|
if code := shared.StringArg(got, "code", ""); code != "OPENCLAW_RUN_DEADLINE_EXCEEDED" {
|
||||||
|
t.Fatalf("code = %q, want OPENCLAW_RUN_DEADLINE_EXCEEDED", code)
|
||||||
|
}
|
||||||
|
if parseBool(got["success"]) {
|
||||||
|
t.Fatalf("interrupted result must not be success")
|
||||||
|
}
|
||||||
|
sess := srv.findTaskSession(params)
|
||||||
|
if sess == nil || !sess.task.ProgressTerminal || sess.task.State != TaskStateFailed {
|
||||||
|
t.Fatalf("session terminal state not recorded: %+v", sess)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// T8: 已观察到的终态被缓存,且即使之后 gateway 不可达也优先返回缓存终态。
|
||||||
|
func TestTerminalResultCachedAndServedAfterGatewayLoss(t *testing.T) {
|
||||||
|
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||||
|
terminal := map[string]any{
|
||||||
|
"ok": true,
|
||||||
|
"success": true,
|
||||||
|
"status": "completed",
|
||||||
|
"runId": "run-1",
|
||||||
|
"message": "done",
|
||||||
|
}
|
||||||
|
srv.cacheOpenClawTaskGetResultIfTerminal(params, terminal)
|
||||||
|
|
||||||
|
cached, ok := srv.cachedTerminalOpenClawResult(params)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected cached terminal result")
|
||||||
|
}
|
||||||
|
if shared.StringArg(cached, "status", "") != "completed" {
|
||||||
|
t.Fatalf("cached status = %q, want completed", cached["status"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// 即使 run 已过 deadline + gateway 丢失,也应优先返回缓存终态而非 interrupted。
|
||||||
|
sess := srv.findTaskSession(params)
|
||||||
|
sess.mu.Lock()
|
||||||
|
sess.task.DeadlineAt = time.Now().Add(-time.Hour)
|
||||||
|
sess.mu.Unlock()
|
||||||
|
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
|
||||||
|
if shared.StringArg(got, "status", "") != "completed" {
|
||||||
|
t.Fatalf("expected cached completed to win over deadline interrupt, got %v", got["status"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 同一 session 复用后,旧 run 的终态不得错配给新 runId 的查询。
|
||||||
|
func TestCachedTerminalNotServedForDifferentRunId(t *testing.T) {
|
||||||
|
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||||
|
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{
|
||||||
|
"status": "completed", "success": true, "runId": "run-1",
|
||||||
|
})
|
||||||
|
// 新一轮查询带不同 runId -> 不应命中旧缓存。
|
||||||
|
newParams := map[string]any{"sessionId": "s1", "runId": "run-2"}
|
||||||
|
if _, ok := srv.cachedTerminalOpenClawResult(newParams); ok {
|
||||||
|
t.Fatalf("stale terminal for run-1 must not be served for run-2")
|
||||||
|
}
|
||||||
|
// 原 runId 仍应命中。
|
||||||
|
if _, ok := srv.cachedTerminalOpenClawResult(params); !ok {
|
||||||
|
t.Fatalf("terminal for run-1 should still be served for run-1")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// running 结果不应被当作终态缓存。
|
||||||
|
func TestRunningResultNotCachedAsTerminal(t *testing.T) {
|
||||||
|
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||||
|
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{"status": "running", "runId": "run-1"})
|
||||||
|
if _, ok := srv.cachedTerminalOpenClawResult(params); ok {
|
||||||
|
t.Fatalf("running result must not be cached as terminal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 无 per-session 记录时退回旧的 not_found 行为。
|
||||||
|
func TestGatewayUnconfirmedFallbackWithoutSessionReturnsNotFound(t *testing.T) {
|
||||||
|
srv := &Server{sessions: map[string]*session{}}
|
||||||
|
got := srv.openClawTaskGetGatewayUnconfirmedFallback(map[string]any{"sessionId": "missing"}, "X", "y")
|
||||||
|
if parseBool(got["ok"]) {
|
||||||
|
t.Fatalf("expected ok=false not_found, got %v", got)
|
||||||
|
}
|
||||||
|
if shared.StringArg(got, "status", "") != "not_found" {
|
||||||
|
t.Fatalf("status = %q, want not_found", got["status"])
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -417,6 +417,10 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask(
|
|||||||
sess.task.DeadlineAt = record.DeadlineAt
|
sess.task.DeadlineAt = record.DeadlineAt
|
||||||
sess.task.ProgressStage = "running"
|
sess.task.ProgressStage = "running"
|
||||||
sess.task.ProgressMessage = "OpenClaw task accepted"
|
sess.task.ProgressMessage = "OpenClaw task accepted"
|
||||||
|
// 新一轮 turn 复用同一 session 时,必须重置上一轮可能留下的终态标记,
|
||||||
|
// 否则持久 run 仓(T8)会把旧 runId 的终态错配给新 run。
|
||||||
|
sess.task.State = TaskStateRunning
|
||||||
|
sess.task.ProgressTerminal = false
|
||||||
sess.openClaw = record
|
sess.openClaw = record
|
||||||
running := openClawRunningTaskResult(record)
|
running := openClawRunningTaskResult(record)
|
||||||
sess.lastResult = cloneMap(running)
|
sess.lastResult = cloneMap(running)
|
||||||
@ -599,6 +603,9 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
|
|||||||
notify,
|
notify,
|
||||||
)
|
)
|
||||||
if !prepareResult.OK {
|
if !prepareResult.OK {
|
||||||
|
if isOpenClawUnknownMethodError(prepareResult.Error, "xworkmate.session.prepare") {
|
||||||
|
return openClawPreparedArtifactScopeFromPayload(openClawFallbackSessionPreparePayload(prepareParams)), nil
|
||||||
|
}
|
||||||
return nil, gatewayRPCError(prepareResult.Error, "openclaw artifact prepare failed")
|
return nil, gatewayRPCError(prepareResult.Error, "openclaw artifact prepare failed")
|
||||||
}
|
}
|
||||||
prepared := openClawPreparedArtifactScopeFromPayload(shared.AsMap(prepareResult.Payload))
|
prepared := openClawPreparedArtifactScopeFromPayload(shared.AsMap(prepareResult.Payload))
|
||||||
@ -608,6 +615,60 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
|
|||||||
return prepared, nil
|
return prepared, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isOpenClawUnknownMethodError(errorPayload map[string]any, method string) bool {
|
||||||
|
message := strings.ToLower(strings.TrimSpace(shared.StringArg(errorPayload, "message", "")))
|
||||||
|
if message == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// 消息形如「unknown method: <method>」已明确指向「网关不认识该方法」,足以判定,
|
||||||
|
// 据此走 graceful fallback(如 openClawFallbackSessionPreparePayload)。
|
||||||
|
//
|
||||||
|
// 注意:不能再用严格的 code 白名单来 gate。真实网关常以数字 JSON-RPC code
|
||||||
|
// (-32601 method not found / -32600 invalid request / -32002 等) 回传,
|
||||||
|
// 经 shared.StringArg(fmt.Sprint) 会被字符串化为 "-32601"/"-32002",
|
||||||
|
// 旧实现只接受 {"", INVALID_REQUEST, METHOD_NOT_FOUND},导致 fallback 失效、
|
||||||
|
// session.prepare 直接以 -32002 硬失败整轮任务。
|
||||||
|
return strings.Contains(message, "unknown method") &&
|
||||||
|
strings.Contains(message, strings.ToLower(strings.TrimSpace(method)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func openClawFallbackSessionPreparePayload(params map[string]any) map[string]any {
|
||||||
|
sessionKey := strings.TrimSpace(shared.StringArg(params, "openclawSessionKey", ""))
|
||||||
|
if sessionKey == "" {
|
||||||
|
sessionKey = strings.TrimSpace(shared.StringArg(params, "sessionKey", ""))
|
||||||
|
}
|
||||||
|
if sessionKey == "" {
|
||||||
|
sessionKey = "main"
|
||||||
|
}
|
||||||
|
runID := strings.TrimSpace(shared.StringArg(params, "runId", ""))
|
||||||
|
if runID == "" {
|
||||||
|
runID = strings.TrimSpace(shared.StringArg(params, "taskId", ""))
|
||||||
|
}
|
||||||
|
if runID == "" {
|
||||||
|
runID = strings.TrimSpace(shared.StringArg(params, "requestId", ""))
|
||||||
|
}
|
||||||
|
if runID == "" {
|
||||||
|
runID = "default"
|
||||||
|
}
|
||||||
|
relativeArtifactDirectory := filepath.Join("tasks", sessionKey, runID)
|
||||||
|
workspaceDir := openClawArtifactWorkspaceDir(params)
|
||||||
|
artifactDirectory := filepath.Join(workspaceDir, relativeArtifactDirectory)
|
||||||
|
return map[string]any{
|
||||||
|
"ok": true,
|
||||||
|
"fallback": true,
|
||||||
|
"compatibilityMode": "local-session-prepare",
|
||||||
|
"runId": runID,
|
||||||
|
"sessionKey": sessionKey,
|
||||||
|
"openclawSessionKey": sessionKey,
|
||||||
|
"remoteWorkingDirectory": workspaceDir,
|
||||||
|
"remoteWorkspaceRefKind": "path",
|
||||||
|
"artifactScope": relativeArtifactDirectory,
|
||||||
|
"artifactDirectory": artifactDirectory,
|
||||||
|
"relativeArtifactDirectory": relativeArtifactDirectory,
|
||||||
|
"scopeKind": "task",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func openClawSessionPrepareParams(params map[string]any, openClawSessionKey string, runID string, artifactContract openClawArtifactContract) map[string]any {
|
func openClawSessionPrepareParams(params map[string]any, openClawSessionKey string, runID string, artifactContract openClawArtifactContract) map[string]any {
|
||||||
appThreadKey := openClawAppThreadKey(params)
|
appThreadKey := openClawAppThreadKey(params)
|
||||||
result := map[string]any{
|
result := map[string]any{
|
||||||
@ -1626,11 +1687,16 @@ func applyOpenClawConstraintDeliveryStatus(result map[string]any) {
|
|||||||
|
|
||||||
func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError {
|
func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError {
|
||||||
if isOpenClawRetryableGatewayError(errorPayload) {
|
if isOpenClawRetryableGatewayError(errorPayload) {
|
||||||
|
metricGatewaySocketClosedInc() // T12
|
||||||
|
// T10:连接断属「可重试 / run 可能仍在后台、可续轮询」语义,而非 run 确实失败。
|
||||||
|
// 带 retryable/poll 提示,客户端据此降级为「后台续跑·重连中」(T5) 续轮询 tasks.get,而非硬失败。
|
||||||
return &shared.RPCError{
|
return &shared.RPCError{
|
||||||
Code: -32002,
|
Code: -32002,
|
||||||
Message: "OPENCLAW_GATEWAY_SOCKET_CLOSED: OpenClaw gateway connection closed during task execution",
|
Message: "OPENCLAW_GATEWAY_SOCKET_CLOSED: OpenClaw gateway connection closed during task execution",
|
||||||
Data: map[string]any{
|
Data: map[string]any{
|
||||||
"code": "OPENCLAW_GATEWAY_SOCKET_CLOSED",
|
"code": "OPENCLAW_GATEWAY_SOCKET_CLOSED",
|
||||||
|
"retryable": true,
|
||||||
|
"poll": true,
|
||||||
"originalCode": strings.TrimSpace(shared.StringArg(errorPayload, "code", "")),
|
"originalCode": strings.TrimSpace(shared.StringArg(errorPayload, "code", "")),
|
||||||
"originalError": strings.TrimSpace(shared.StringArg(errorPayload, "message", "")),
|
"originalError": strings.TrimSpace(shared.StringArg(errorPayload, "message", "")),
|
||||||
},
|
},
|
||||||
|
|||||||
@ -132,6 +132,49 @@ func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceKeepsActiveRecordRunning
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExpectedArtifactDirectoriesDoNotBlockTerminalTaskState(t *testing.T) {
|
||||||
|
params := map[string]any{"expectedArtifactDirs": []any{"reports/", "artifacts/"}}
|
||||||
|
payload := map[string]any{
|
||||||
|
"success": true,
|
||||||
|
"status": string(TaskStateCompleted),
|
||||||
|
"artifactScope": "tasks/session/run",
|
||||||
|
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
|
||||||
|
"expectedArtifactDirs": []any{
|
||||||
|
"reports/",
|
||||||
|
"artifacts/",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if openClawTaskGetRequiresArtifactExport(params, payload) {
|
||||||
|
t.Fatal("expectedArtifactDirs must remain non-blocking scan hints")
|
||||||
|
}
|
||||||
|
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
|
||||||
|
if status := shared.StringArg(got, "status", ""); status != string(TaskStateCompleted) {
|
||||||
|
t.Fatalf("expected terminal status to remain completed, got %#v", got)
|
||||||
|
}
|
||||||
|
if parseBool(got["pending"]) {
|
||||||
|
t.Fatalf("expected terminal payload not to become pending, got %#v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRequiredArtifactExtensionsStillBlockUntilVerified(t *testing.T) {
|
||||||
|
params := map[string]any{"requiredArtifactExtensions": []any{"md"}}
|
||||||
|
payload := map[string]any{
|
||||||
|
"success": true,
|
||||||
|
"status": string(TaskStateCompleted),
|
||||||
|
"artifactScope": "tasks/session/run",
|
||||||
|
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
|
||||||
|
}
|
||||||
|
|
||||||
|
if !openClawTaskGetRequiresArtifactExport(params, payload) {
|
||||||
|
t.Fatal("requiredArtifactExtensions must remain a blocking delivery contract")
|
||||||
|
}
|
||||||
|
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
|
||||||
|
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
|
||||||
|
t.Fatalf("expected missing required artifact to remain syncing, got %#v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceFailsAfterDeadlineWithoutRequiredArtifacts(t *testing.T) {
|
func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceFailsAfterDeadlineWithoutRequiredArtifacts(t *testing.T) {
|
||||||
payload := map[string]any{
|
payload := map[string]any{
|
||||||
"success": false,
|
"success": false,
|
||||||
@ -282,3 +325,50 @@ func TestTaskGetArtifactExportReceivesRequiredArtifactExtensions(t *testing.T) {
|
|||||||
t.Fatalf("expected expectedFileCountByExtension to reach export, got %#v", exportParams)
|
t.Fatalf("expected expectedFileCountByExtension to reach export, got %#v", exportParams)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsOpenClawUnknownMethodErrorAcceptsNumericGatewayCodes(t *testing.T) {
|
||||||
|
const method = "xworkmate.session.prepare"
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
payload map[string]any
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "string invalid_request code",
|
||||||
|
payload: map[string]any{"code": "INVALID_REQUEST", "message": "unknown method: xworkmate.session.prepare"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "numeric -32002 (real gateway shape that previously hard-failed)",
|
||||||
|
payload: map[string]any{"code": float64(-32002), "message": "unknown method: xworkmate.session.prepare"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "numeric -32601 method not found",
|
||||||
|
payload: map[string]any{"code": float64(-32601), "message": "Unknown method: xworkmate.session.prepare"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty code",
|
||||||
|
payload: map[string]any{"message": "unknown method: xworkmate.session.prepare"},
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unrelated error must not be swallowed",
|
||||||
|
payload: map[string]any{"code": float64(-32002), "message": "gateway socket closed"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unknown method for a different method name",
|
||||||
|
payload: map[string]any{"code": float64(-32601), "message": "unknown method: chat.send"},
|
||||||
|
want: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
if got := isOpenClawUnknownMethodError(tc.payload, method); got != tc.want {
|
||||||
|
t.Fatalf("isOpenClawUnknownMethodError(%v) = %v, want %v", tc.payload, got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -833,7 +833,7 @@ func TestGatewayRequestSkillsStatusAutoConnectsOpenClaw(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExecuteSessionTaskGatewayFailsWhenPrepareUnsupported(t *testing.T) {
|
func TestExecuteSessionTaskGatewayFallsBackWhenPrepareUnsupported(t *testing.T) {
|
||||||
gateway := newAcpFakeOpenClawGateway(t)
|
gateway := newAcpFakeOpenClawGateway(t)
|
||||||
gateway.unsupportedSessionPrepare.Store(true)
|
gateway.unsupportedSessionPrepare.Store(true)
|
||||||
defer gateway.Close()
|
defer gateway.Close()
|
||||||
@ -858,15 +858,48 @@ func TestExecuteSessionTaskGatewayFailsWhenPrepareUnsupported(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if rpcErr == nil {
|
if rpcErr != nil {
|
||||||
t.Fatalf("expected prepare error without legacy fallback, got response: %#v", response)
|
t.Fatalf("expected prepare compatibility fallback, got error: %#v", rpcErr)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "unknown method: xworkmate.session.prepare") {
|
if response["success"] != true {
|
||||||
t.Fatalf("expected surfaced prepare unsupported error, got %#v", rpcErr)
|
t.Fatalf("expected successful gateway task with prepare fallback, got %#v", response)
|
||||||
|
}
|
||||||
|
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare", "chat.send", "xworkmate.tasks.get"}) {
|
||||||
|
t.Fatalf("expected bridge to continue to chat.send when prepare is unsupported, got %#v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleSessionPrepareFallsBackWhenGatewayMethodUnsupported(t *testing.T) {
|
||||||
|
gateway := newAcpFakeOpenClawGateway(t)
|
||||||
|
gateway.unsupportedSessionPrepare.Store(true)
|
||||||
|
defer gateway.Close()
|
||||||
|
|
||||||
|
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
|
||||||
|
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
|
||||||
|
|
||||||
|
server := NewServer()
|
||||||
|
response, rpcErr := server.handleRequest(
|
||||||
|
shared.RPCRequest{
|
||||||
|
Method: "xworkmate.session.prepare",
|
||||||
|
Params: map[string]any{
|
||||||
|
"openclawSessionKey": "thread-prepare",
|
||||||
|
"runId": "run-prepare",
|
||||||
|
"workspaceDir": "/remote/openclaw/workspace",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
func(map[string]any) {},
|
||||||
|
)
|
||||||
|
if rpcErr != nil {
|
||||||
|
t.Fatalf("expected fallback prepare response, got error: %#v", rpcErr)
|
||||||
|
}
|
||||||
|
if response["fallback"] != true {
|
||||||
|
t.Fatalf("expected fallback marker, got %#v", response)
|
||||||
|
}
|
||||||
|
if response["artifactScope"] != "tasks/thread-prepare/run-prepare" {
|
||||||
|
t.Fatalf("expected fallback task artifact scope, got %#v", response)
|
||||||
}
|
}
|
||||||
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare"}) {
|
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.session.prepare"}) {
|
||||||
t.Fatalf("expected bridge to stop before chat.send when prepare is unsupported, got %#v", got)
|
t.Fatalf("expected bridge to try gateway prepare before fallback, got %#v", got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -76,6 +76,9 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
|
|||||||
case "xworkmate.tasks.get":
|
case "xworkmate.tasks.get":
|
||||||
return s.handleTaskGet(ctx, request.Params, notify), nil
|
return s.handleTaskGet(ctx, request.Params, notify), nil
|
||||||
|
|
||||||
|
case "xworkmate.session.prepare":
|
||||||
|
return s.handleSessionPrepare(ctx, request.Params, notify)
|
||||||
|
|
||||||
case "xworkmate.tasks.cancel":
|
case "xworkmate.tasks.cancel":
|
||||||
return s.handleTaskCancel(ctx, request.Params, notify), nil
|
return s.handleTaskCancel(ctx, request.Params, notify), nil
|
||||||
|
|
||||||
@ -90,6 +93,36 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleSessionPrepare(ctx context.Context, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) {
|
||||||
|
gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", ""))
|
||||||
|
if gatewayProvider == "" {
|
||||||
|
gatewayProvider = strings.TrimSpace(shared.StringArg(params, "resolvedGatewayProviderId", ""))
|
||||||
|
}
|
||||||
|
if gatewayProvider == "" {
|
||||||
|
gatewayProvider = "openclaw"
|
||||||
|
}
|
||||||
|
if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil {
|
||||||
|
return openClawFallbackSessionPreparePayload(params), nil
|
||||||
|
}
|
||||||
|
result := s.gateway.RequestByMode(
|
||||||
|
gatewayProvider,
|
||||||
|
"xworkmate.session.prepare",
|
||||||
|
params,
|
||||||
|
30*time.Second,
|
||||||
|
notify,
|
||||||
|
)
|
||||||
|
if result.OK {
|
||||||
|
payload := shared.AsMap(result.Payload)
|
||||||
|
if openClawPreparedArtifactScopeFromPayload(payload) != nil {
|
||||||
|
return payload, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !result.OK && !isOpenClawUnknownMethodError(result.Error, "xworkmate.session.prepare") {
|
||||||
|
return nil, gatewayRPCError(result.Error, "openclaw artifact prepare failed")
|
||||||
|
}
|
||||||
|
return openClawFallbackSessionPreparePayload(params), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
|
func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
|
||||||
params = s.taskGetParamsWithSessionScope(params)
|
params = s.taskGetParamsWithSessionScope(params)
|
||||||
gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", ""))
|
gatewayProvider := strings.TrimSpace(shared.StringArg(params, "gatewayProviderId", ""))
|
||||||
@ -99,13 +132,13 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
|
|||||||
if gatewayProvider == "" {
|
if gatewayProvider == "" {
|
||||||
gatewayProvider = "openclaw"
|
gatewayProvider = "openclaw"
|
||||||
}
|
}
|
||||||
|
// T7/T8: 一旦观察到终态就从持久 run 仓返回,避免之后 gateway 查不到导致结果丢失。
|
||||||
|
if cached, ok := s.cachedTerminalOpenClawResult(params); ok {
|
||||||
|
return cached
|
||||||
|
}
|
||||||
if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil {
|
if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil {
|
||||||
return map[string]any{
|
// T7/T9: gateway 不可达时按持久 run 仓兜底(续轮询 / deadline 终态),而非裸 not_found。
|
||||||
"ok": false,
|
return s.openClawTaskGetGatewayUnconfirmedFallback(params, "GATEWAY_UNAVAILABLE", rpcErr.Message)
|
||||||
"status": "not_found",
|
|
||||||
"code": "GATEWAY_UNAVAILABLE",
|
|
||||||
"message": rpcErr.Message,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
result := s.gateway.RequestByMode(
|
result := s.gateway.RequestByMode(
|
||||||
gatewayProvider,
|
gatewayProvider,
|
||||||
@ -129,16 +162,15 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
|
|||||||
}
|
}
|
||||||
s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID)
|
s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID)
|
||||||
stripOpenClawArtifactInlineContent(payload)
|
stripOpenClawArtifactInlineContent(payload)
|
||||||
|
// T8: 缓存「最终客户端可见形态」(已 decorate 下载 URL + strip 内联内容),
|
||||||
|
// 这样从缓存回放时与正常路径完全一致。
|
||||||
|
s.cacheOpenClawTaskGetResultIfTerminal(params, payload)
|
||||||
return payload
|
return payload
|
||||||
}
|
}
|
||||||
|
// T7/T9: gateway 返回错误(socket closed / not_found / lookup failed)时同样走持久 run 仓兜底。
|
||||||
message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed"))
|
message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed"))
|
||||||
code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED"))
|
code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED"))
|
||||||
return map[string]any{
|
return s.openClawTaskGetGatewayUnconfirmedFallback(params, code, message)
|
||||||
"ok": false,
|
|
||||||
"status": "not_found",
|
|
||||||
"code": code,
|
|
||||||
"message": message,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any {
|
func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any {
|
||||||
@ -439,9 +471,11 @@ func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[st
|
|||||||
if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) {
|
if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return len(shared.ListArg(params, "expectedArtifactDirs")) > 0 ||
|
// expectedArtifactDirs are discovery hints for the plugin's workspace-root
|
||||||
len(shared.ListArg(payload, "expectedArtifactDirs")) > 0 ||
|
// scan. They do not prove that the caller requires a file before the run can
|
||||||
len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
|
// reach a terminal state. Treating them as a blocking contract turns a
|
||||||
|
// failed/no-output agent run into an endless "syncing-artifacts" loop.
|
||||||
|
return len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
|
||||||
len(shared.ListArg(payload, "requiredArtifactExtensions")) > 0
|
len(shared.ListArg(payload, "requiredArtifactExtensions")) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -360,7 +360,11 @@ func sameConnectTarget(current ConnectRequest, next ConnectRequest) bool {
|
|||||||
strings.TrimSpace(current.Endpoint.Host) == strings.TrimSpace(next.Endpoint.Host) &&
|
strings.TrimSpace(current.Endpoint.Host) == strings.TrimSpace(next.Endpoint.Host) &&
|
||||||
current.Endpoint.Port == next.Endpoint.Port &&
|
current.Endpoint.Port == next.Endpoint.Port &&
|
||||||
current.Endpoint.TLS == next.Endpoint.TLS &&
|
current.Endpoint.TLS == next.Endpoint.TLS &&
|
||||||
normalizeEndpointPath(current.Endpoint.Path) == normalizeEndpointPath(next.Endpoint.Path)
|
normalizeEndpointPath(current.Endpoint.Path) == normalizeEndpointPath(next.Endpoint.Path) &&
|
||||||
|
strings.TrimSpace(current.Identity.DeviceID) == strings.TrimSpace(next.Identity.DeviceID) &&
|
||||||
|
strings.TrimSpace(current.Auth.Token) == strings.TrimSpace(next.Auth.Token) &&
|
||||||
|
strings.TrimSpace(current.Auth.DeviceToken) == strings.TrimSpace(next.Auth.DeviceToken) &&
|
||||||
|
strings.TrimSpace(current.Auth.Password) == strings.TrimSpace(next.Auth.Password)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) connectAttempt() (ConnectResult, *GatewayError) {
|
func (s *session) connectAttempt() (ConnectResult, *GatewayError) {
|
||||||
@ -412,11 +416,6 @@ func (s *session) connectAttempt() (ConnectResult, *GatewayError) {
|
|||||||
snapshotPayload := asMap(payload["snapshot"])
|
snapshotPayload := asMap(payload["snapshot"])
|
||||||
sessionDefaults := asMap(snapshotPayload["sessionDefaults"])
|
sessionDefaults := asMap(snapshotPayload["sessionDefaults"])
|
||||||
returnedDeviceToken := strings.TrimSpace(stringValue(auth["deviceToken"]))
|
returnedDeviceToken := strings.TrimSpace(stringValue(auth["deviceToken"]))
|
||||||
if returnedDeviceToken != "" {
|
|
||||||
s.mu.Lock()
|
|
||||||
s.config.Auth.DeviceToken = returnedDeviceToken
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
|
||||||
negotiatedScopes := stringSlice(auth["scopes"])
|
negotiatedScopes := stringSlice(auth["scopes"])
|
||||||
negotiatedRole := strings.TrimSpace(stringValue(auth["role"]))
|
negotiatedRole := strings.TrimSpace(stringValue(auth["role"]))
|
||||||
if negotiatedRole == "" {
|
if negotiatedRole == "" {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user