Compare commits
12 Commits
runtime-9a
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 4fc5e380f2 | |||
|
|
188ca4ba4a | ||
|
|
0a50621664 | ||
|
|
81f65e3308 | ||
|
|
fa9cc78add | ||
|
|
02808934c8 | ||
|
|
3c7de420d2 | ||
|
|
2333c3e5fd | ||
|
|
e22d0f1cbf | ||
| 311db31e03 | |||
|
|
c7b2f8ee3a | ||
|
|
b8f4ed6102 |
14
.github/workflows/pipeline.yml
vendored
14
.github/workflows/pipeline.yml
vendored
@ -56,6 +56,7 @@ jobs:
|
||||
- name: Load Vault secrets
|
||||
id: vault
|
||||
if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository }}
|
||||
continue-on-error: true
|
||||
uses: hashicorp/vault-action@v2
|
||||
with:
|
||||
url: ${{ env.VAULT_ADDR }}
|
||||
@ -67,7 +68,7 @@ jobs:
|
||||
kv/data/github-actions/xworkmate-bridge AI_WORKSPACE_AUTH_TOKEN | AI_WORKSPACE_AUTH_TOKEN
|
||||
|
||||
- name: Export bridge auth token
|
||||
if: ${{ steps.vault.outcome == 'success' }}
|
||||
if: ${{ steps.vault.outcome == 'success' && steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN != '' }}
|
||||
run: echo "AI_WORKSPACE_AUTH_TOKEN=${{ steps.vault.outputs.AI_WORKSPACE_AUTH_TOKEN }}" >> "$GITHUB_ENV"
|
||||
|
||||
- name: Probe current production bridge
|
||||
@ -75,6 +76,17 @@ jobs:
|
||||
env:
|
||||
BRIDGE_SERVER_URL: https://xworkmate-bridge.svc.plus
|
||||
run: |
|
||||
if [[ -z "${AI_WORKSPACE_AUTH_TOKEN:-}" ]]; then
|
||||
echo "::notice title=Production state skipped::AI_WORKSPACE_AUTH_TOKEN is unavailable from Vault; continuing without production bridge metadata."
|
||||
{
|
||||
echo "production_image="
|
||||
echo "production_tag="
|
||||
echo "production_commit="
|
||||
echo "production_version="
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
while IFS='=' read -r key value; do
|
||||
echo "${key}=${value}" >> "$GITHUB_OUTPUT"
|
||||
done < <(bash ./scripts/github-actions/report-production-state.sh "${BRIDGE_SERVER_URL}")
|
||||
|
||||
8
.github/workflows/runtime-release.yml
vendored
8
.github/workflows/runtime-release.yml
vendored
@ -62,7 +62,13 @@ jobs:
|
||||
-C dist/runtime xworkmate-bridge
|
||||
(
|
||||
cd dist/assets
|
||||
sha256sum -- ./*.tar.gz | sed 's# \./# #' > "SHA256SUMS-${TARGET_ARCH}"
|
||||
# Name the per-job checksum file by OS *and* ARCH. Keying on ARCH
|
||||
# alone makes the linux/darwin jobs of the same arch both emit
|
||||
# SHA256SUMS-<arch>, which then clobber each other under the
|
||||
# publish job's merge-multiple download — leaving SHA256SUMS with
|
||||
# only 2 of the 4 platforms and breaking arm64 (and darwin-amd64)
|
||||
# consumers with "missing checksum".
|
||||
sha256sum -- ./*.tar.gz | sed 's# \./# #' > "SHA256SUMS-${TARGET_OS}-${TARGET_ARCH}"
|
||||
)
|
||||
|
||||
- uses: actions/upload-artifact@v4
|
||||
|
||||
44
.github/workflows/validate-release-pr.yml
vendored
Normal file
44
.github/workflows/validate-release-pr.yml
vendored
Normal file
@ -0,0 +1,44 @@
|
||||
name: Validate Release PR
|
||||
|
||||
# release/* 分支的发布策略门禁:仅接受 hotfix/* 或带 cherry-pick/backport 标签的 PR。
|
||||
# 详见 iac_modules/docs/tldr-github-branch-model.md
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened, synchronize, reopened, labeled, unlabeled]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: read
|
||||
|
||||
jobs:
|
||||
validate-release-source:
|
||||
runs-on: ubuntu-latest
|
||||
if: startsWith(github.base_ref, 'release/')
|
||||
steps:
|
||||
- name: Check PR source branch
|
||||
run: |
|
||||
SRC="${{ github.head_ref }}"
|
||||
TGT="${{ github.base_ref }}"
|
||||
LABELS="${{ join(github.event.pull_request.labels.*.name, ',') }}"
|
||||
|
||||
echo "🔍 Validating PR into release branch"
|
||||
echo " source: $SRC"
|
||||
echo " target: $TGT"
|
||||
echo " labels: $LABELS"
|
||||
|
||||
if [[ "$SRC" =~ ^hotfix/ ]]; then
|
||||
echo "✅ Allowed: hotfix/* branch"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [[ "$LABELS" =~ (^|,)(cherry-pick|backport)(,|$) ]]; then
|
||||
echo "✅ Allowed: cherry-pick/backport labeled PR"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "❌ Rejected."
|
||||
echo "release/* 仅接受:"
|
||||
echo " - 来自 hotfix/* 的 PR"
|
||||
echo " - 带 cherry-pick 或 backport 标签的 PR(已验证 feature 的 backport/cherry-pick)"
|
||||
echo "禁止从 main / develop / feature/* 直接合并到 release/*。"
|
||||
exit 1
|
||||
@ -37,6 +37,7 @@ func (s *Server) Handler() http.Handler {
|
||||
"commit": info.Commit,
|
||||
"version": info.Version,
|
||||
"buildDate": info.BuildDate,
|
||||
"metrics": bridgeStabilityMetricsSnapshot(), // T12
|
||||
}
|
||||
body, _ := json.Marshal(resp)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
29
internal/acp/metrics.go
Normal file
29
internal/acp/metrics.go
Normal file
@ -0,0 +1,29 @@
|
||||
package acp
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
// 关键稳定性指标(T12,docs/cases/06 §5)。
|
||||
//
|
||||
// 进程内累计计数,经 /api/ping 暴露,用于把「网关抖动 / run 超时」从靠用户截图
|
||||
// 变为可监控。三个计数对应三类已知的不稳定来源:
|
||||
// - gatewaySocketClosed : gatewayRPCError 命中 OPENCLAW_GATEWAY_SOCKET_CLOSED(连接断)
|
||||
// - taskGetUnconfirmedFallback: tasks.get 走持久 run 仓兜底(gateway 无法确认 run,T7)
|
||||
// - runDeadlineInterrupt : run 超过 DeadlineAt 且 gateway 无法确认,回 interrupted(T9)
|
||||
var bridgeStabilityMetrics struct {
|
||||
gatewaySocketClosed atomic.Int64
|
||||
taskGetUnconfirmedFallback atomic.Int64
|
||||
runDeadlineInterrupt atomic.Int64
|
||||
}
|
||||
|
||||
func metricGatewaySocketClosedInc() { bridgeStabilityMetrics.gatewaySocketClosed.Add(1) }
|
||||
func metricTaskGetUnconfirmedFallbackInc() { bridgeStabilityMetrics.taskGetUnconfirmedFallback.Add(1) }
|
||||
func metricRunDeadlineInterruptInc() { bridgeStabilityMetrics.runDeadlineInterrupt.Add(1) }
|
||||
|
||||
// bridgeStabilityMetricsSnapshot 返回当前计数快照,供 /api/ping 输出。
|
||||
func bridgeStabilityMetricsSnapshot() map[string]any {
|
||||
return map[string]any{
|
||||
"gatewaySocketClosed": bridgeStabilityMetrics.gatewaySocketClosed.Load(),
|
||||
"taskGetUnconfirmedFallback": bridgeStabilityMetrics.taskGetUnconfirmedFallback.Load(),
|
||||
"runDeadlineInterrupt": bridgeStabilityMetrics.runDeadlineInterrupt.Load(),
|
||||
}
|
||||
}
|
||||
197
internal/acp/openclaw_run_registry.go
Normal file
197
internal/acp/openclaw_run_registry.go
Normal file
@ -0,0 +1,197 @@
|
||||
package acp
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"xworkmate-bridge/internal/shared"
|
||||
)
|
||||
|
||||
// 持久 run 仓 / run 关联与 WS 解耦(T7/T8/T9)。
|
||||
//
|
||||
// 背景:OpenClaw gateway turn 采用异步模型——chat.send 快速返回 runId,bridge 把
|
||||
// run 记录(sess.openClaw)、预算(sess.task.DeadlineAt)、运行句柄(sess.lastResult)
|
||||
// 落在「按 sessionID 维度」的 per-session store 里(s.sessions),其生命周期独立于
|
||||
// bridge↔gateway 的 WebSocket 连接。客户端随后轮询 tasks.get。
|
||||
//
|
||||
// 此前 tasks.get 每次都强依赖 gateway 应答:一旦 WS 抖动 / 重连后 run 内存态丢失,
|
||||
// tasks.get 回 not_found 或 socket_closed,已完成的结果就此丢失,客户端要么硬失败、
|
||||
// 要么(修复前)无限轮询。下列辅助把 tasks.get 改造为「优先用持久 run 仓兜底」:
|
||||
//
|
||||
// T8 已观察到的终态结果缓存进 sess.lastResult,gateway 之后查不到也不丢;
|
||||
// T7 gateway 暂时无法确认(unavailable / socket closed / not_found)但 run 仍在预算内时,
|
||||
// 合成一个 running 句柄让客户端继续轮询,跨越瞬时抖动(与 WS 生命周期解耦);
|
||||
// T9 run 超过 DeadlineAt 且 gateway 仍无法确认时,回确定性的 interrupted 终态。
|
||||
|
||||
// openClawTaskGetResultIsTerminal 判断一个 tasks.get 结果是否表示 run 已结束。
|
||||
// 注意:仍在 artifact 同步中的结果会被 normalizeOpenClawTaskGetResult 重写为 status=running,
|
||||
// 因此这里只认显式终态,不会把「同步中」误判为终态。
|
||||
func openClawTaskGetResultIsTerminal(payload map[string]any) bool {
|
||||
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
|
||||
case string(TaskStateCompleted), string(TaskStateFailed), string(TaskStateCancelled),
|
||||
"interrupted", "partially_delivered":
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// cacheOpenClawTaskGetResultIfTerminal 把一次 gateway 确认的终态结果落进 per-session 持久 run 仓(T8)。
|
||||
func (s *Server) cacheOpenClawTaskGetResultIfTerminal(params map[string]any, payload map[string]any) {
|
||||
if len(payload) == 0 || !openClawTaskGetResultIsTerminal(payload) {
|
||||
return
|
||||
}
|
||||
sess := s.findTaskSession(params)
|
||||
if sess == nil {
|
||||
return
|
||||
}
|
||||
sess.mu.Lock()
|
||||
defer sess.mu.Unlock()
|
||||
switch strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", ""))) {
|
||||
case string(TaskStateFailed):
|
||||
sess.task.State = TaskStateFailed
|
||||
case string(TaskStateCancelled):
|
||||
sess.task.State = TaskStateCancelled
|
||||
default:
|
||||
sess.task.State = TaskStateCompleted
|
||||
}
|
||||
sess.task.ProgressTerminal = true
|
||||
sess.task.ProgressStage = strings.ToLower(strings.TrimSpace(shared.StringArg(payload, "status", "")))
|
||||
sess.task.UpdatedAt = time.Now()
|
||||
sess.lastResult = cloneMap(payload)
|
||||
}
|
||||
|
||||
// cachedTerminalOpenClawResult 返回某 run 此前已观察到的终态结果(若有)(T7/T8)。
|
||||
func (s *Server) cachedTerminalOpenClawResult(params map[string]any) (map[string]any, bool) {
|
||||
sess := s.findTaskSession(params)
|
||||
if sess == nil {
|
||||
return nil, false
|
||||
}
|
||||
sess.mu.Lock()
|
||||
defer sess.mu.Unlock()
|
||||
return cachedTerminalForRunLocked(sess, params)
|
||||
}
|
||||
|
||||
// cachedTerminalForRunLocked 仅当缓存终态确实属于「本次请求的 runId」时才命中,
|
||||
// 防止同一 session 复用后把旧 run 的终态错配给新 run。调用方须持有 sess.mu。
|
||||
func cachedTerminalForRunLocked(sess *session, params map[string]any) (map[string]any, bool) {
|
||||
if !sess.task.ProgressTerminal || len(sess.lastResult) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
if !openClawTaskGetResultIsTerminal(sess.lastResult) {
|
||||
return nil, false
|
||||
}
|
||||
requestedRun := strings.TrimSpace(shared.StringArg(params, "runId", ""))
|
||||
if requestedRun == "" {
|
||||
requestedRun = strings.TrimSpace(shared.StringArg(params, "taskId", ""))
|
||||
}
|
||||
if requestedRun != "" {
|
||||
cachedRun := firstNonEmptyString(sess.lastResult, "runId", "taskId")
|
||||
if cachedRun != "" && !strings.EqualFold(cachedRun, requestedRun) {
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
return cloneMap(sess.lastResult), true
|
||||
}
|
||||
|
||||
// openClawTaskGetGatewayUnconfirmedFallback 在 gateway 无法确认 run 时,用持久 run 仓兜底(T7/T9):
|
||||
// - 已有缓存终态 -> 直接返回;
|
||||
// - run 仍在预算内 -> 合成 running 句柄,客户端继续轮询,跨越瞬时抖动;
|
||||
// - run 超过 deadline -> 回确定性 interrupted 终态。
|
||||
//
|
||||
// 没有任何 per-session 记录时退回旧行为(not_found),不改变无状态查询的语义。
|
||||
func (s *Server) openClawTaskGetGatewayUnconfirmedFallback(params map[string]any, code string, message string) map[string]any {
|
||||
notFound := func() map[string]any {
|
||||
return map[string]any{
|
||||
"ok": false,
|
||||
"status": "not_found",
|
||||
"code": fallbackString(code, "TASK_LOOKUP_FAILED"),
|
||||
"message": fallbackString(message, "openclaw native task lookup failed"),
|
||||
}
|
||||
}
|
||||
sess := s.findTaskSession(params)
|
||||
if sess == nil {
|
||||
return notFound()
|
||||
}
|
||||
sess.mu.Lock()
|
||||
defer sess.mu.Unlock()
|
||||
if cached, ok := cachedTerminalForRunLocked(sess, params); ok {
|
||||
return cached
|
||||
}
|
||||
if sess.openClaw == nil {
|
||||
return notFound()
|
||||
}
|
||||
now := time.Now()
|
||||
if !sess.task.DeadlineAt.IsZero() && now.After(sess.task.DeadlineAt) {
|
||||
return s.markOpenClawRunDeadlineInterruptedLocked(sess, code, message)
|
||||
}
|
||||
// 仍在预算内:合成 running 句柄让客户端继续轮询,不因一次瞬时抖动硬失败。
|
||||
metricTaskGetUnconfirmedFallbackInc() // T12
|
||||
running := openClawRunningTaskResult(sess.openClaw)
|
||||
running["transportDegraded"] = true
|
||||
if strings.TrimSpace(code) != "" {
|
||||
running["transportDegradedCode"] = strings.TrimSpace(code)
|
||||
}
|
||||
// T11:带 runId 的日志,便于与 App / 插件 / 网关四层按 runId 串联。
|
||||
log.Printf("level=warn component=openclaw_run_registry event=tasks_get_unconfirmed_fallback runId=%q openclawSessionKey=%q code=%q",
|
||||
sess.openClaw.RunID, sess.openClaw.SessionKey, strings.TrimSpace(code))
|
||||
sess.lastResult = cloneMap(running)
|
||||
return running
|
||||
}
|
||||
|
||||
// markOpenClawRunDeadlineInterruptedLocked 为「超过预算且 gateway 无法确认」的 run 生成确定性
|
||||
// interrupted 终态(T9)。调用方须持有 sess.mu。
|
||||
func (s *Server) markOpenClawRunDeadlineInterruptedLocked(sess *session, code string, message string) map[string]any {
|
||||
now := time.Now()
|
||||
sess.task.State = TaskStateFailed
|
||||
sess.task.ProgressTerminal = true
|
||||
sess.task.ProgressStage = "interrupted"
|
||||
sess.task.ProgressMessage = "OpenClaw run exceeded its budget and could not be confirmed"
|
||||
sess.task.UpdatedAt = now
|
||||
metricRunDeadlineInterruptInc() // T12
|
||||
// T11:带 runId 的终态日志。
|
||||
if sess.openClaw != nil {
|
||||
log.Printf("level=warn component=openclaw_run_registry event=run_deadline_interrupt runId=%q openclawSessionKey=%q deadlineAt=%q code=%q",
|
||||
sess.openClaw.RunID, sess.openClaw.SessionKey,
|
||||
sess.openClaw.DeadlineAt.UTC().Format(time.RFC3339Nano), strings.TrimSpace(code))
|
||||
}
|
||||
|
||||
result := map[string]any{
|
||||
"ok": true,
|
||||
"success": false,
|
||||
"status": "interrupted",
|
||||
"event": "interrupted",
|
||||
"pending": false,
|
||||
"code": "OPENCLAW_RUN_DEADLINE_EXCEEDED",
|
||||
"artifactSyncStatus": "interrupted",
|
||||
"message": "OpenClaw 任务超过预算上限且网关无法确认结果,已结束本轮等待。任务可能已在后台完成,请重新发送请求以拿回结果。",
|
||||
"artifacts": []any{},
|
||||
}
|
||||
if strings.TrimSpace(code) != "" {
|
||||
result["gatewayUnconfirmedCode"] = strings.TrimSpace(code)
|
||||
}
|
||||
if strings.TrimSpace(message) != "" {
|
||||
result["gatewayUnconfirmedMessage"] = strings.TrimSpace(message)
|
||||
}
|
||||
if record := sess.openClaw; record != nil {
|
||||
result["runId"] = record.RunID
|
||||
result["taskId"] = record.RunID
|
||||
result["turnId"] = record.TurnID
|
||||
result["sessionId"] = record.SessionID
|
||||
result["threadId"] = record.ThreadID
|
||||
result["appThreadKey"] = record.ThreadID
|
||||
result["openclawSessionKey"] = record.SessionKey
|
||||
result["resolvedGatewayProviderId"] = record.GatewayProviderID
|
||||
result["startedAt"] = record.StartedAt.UTC().Format(time.RFC3339Nano)
|
||||
result["deadlineAt"] = record.DeadlineAt.UTC().Format(time.RFC3339Nano)
|
||||
}
|
||||
sess.lastResult = cloneMap(result)
|
||||
return result
|
||||
}
|
||||
|
||||
func fallbackString(value string, fallback string) string {
|
||||
if strings.TrimSpace(value) == "" {
|
||||
return fallback
|
||||
}
|
||||
return strings.TrimSpace(value)
|
||||
}
|
||||
154
internal/acp/openclaw_run_registry_test.go
Normal file
154
internal/acp/openclaw_run_registry_test.go
Normal file
@ -0,0 +1,154 @@
|
||||
package acp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"xworkmate-bridge/internal/shared"
|
||||
)
|
||||
|
||||
func newRunRegistryTestServer(deadline time.Time) (*Server, map[string]any) {
|
||||
sess := &session{sessionID: "s1", threadID: "t1"}
|
||||
sess.task.RunID = "run-1"
|
||||
sess.task.SessionKey = "sk"
|
||||
sess.task.GatewayProviderID = "openclaw"
|
||||
sess.task.DeadlineAt = deadline
|
||||
sess.openClaw = &OpenClawTaskRecord{
|
||||
SessionID: "s1",
|
||||
ThreadID: "t1",
|
||||
TurnID: "turn-1",
|
||||
RunID: "run-1",
|
||||
SessionKey: "sk",
|
||||
GatewayProviderID: "openclaw",
|
||||
StartedAt: time.Now().Add(-time.Minute),
|
||||
DeadlineAt: deadline,
|
||||
}
|
||||
srv := &Server{sessions: map[string]*session{"s1": sess}}
|
||||
params := map[string]any{"sessionId": "s1", "runId": "run-1"}
|
||||
return srv, params
|
||||
}
|
||||
|
||||
func TestOpenClawTaskGetResultIsTerminal(t *testing.T) {
|
||||
cases := []struct {
|
||||
status string
|
||||
want bool
|
||||
}{
|
||||
{"completed", true},
|
||||
{"failed", true},
|
||||
{"cancelled", true},
|
||||
{"interrupted", true},
|
||||
{"partially_delivered", true},
|
||||
{"running", false},
|
||||
{"syncing-artifacts", false},
|
||||
{"queued", false},
|
||||
{"", false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := openClawTaskGetResultIsTerminal(map[string]any{"status": tc.status}); got != tc.want {
|
||||
t.Errorf("status=%q: got %v, want %v", tc.status, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// T7: gateway 无法确认但 run 仍在预算内 -> 合成 running 句柄续轮询。
|
||||
func TestGatewayUnconfirmedFallbackWithinBudgetKeepsPolling(t *testing.T) {
|
||||
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
|
||||
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
|
||||
t.Fatalf("status = %q, want running", status)
|
||||
}
|
||||
if !parseBool(got["transportDegraded"]) {
|
||||
t.Fatalf("transportDegraded not set: %v", got)
|
||||
}
|
||||
if shared.StringArg(got, "runId", "") != "run-1" {
|
||||
t.Fatalf("runId mismatch: %v", got["runId"])
|
||||
}
|
||||
}
|
||||
|
||||
// T9: run 超过 deadline 且 gateway 无法确认 -> 确定性 interrupted 终态。
|
||||
func TestGatewayUnconfirmedFallbackPastDeadlineInterrupts(t *testing.T) {
|
||||
srv, params := newRunRegistryTestServer(time.Now().Add(-time.Minute))
|
||||
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
|
||||
if status := shared.StringArg(got, "status", ""); status != "interrupted" {
|
||||
t.Fatalf("status = %q, want interrupted", status)
|
||||
}
|
||||
if code := shared.StringArg(got, "code", ""); code != "OPENCLAW_RUN_DEADLINE_EXCEEDED" {
|
||||
t.Fatalf("code = %q, want OPENCLAW_RUN_DEADLINE_EXCEEDED", code)
|
||||
}
|
||||
if parseBool(got["success"]) {
|
||||
t.Fatalf("interrupted result must not be success")
|
||||
}
|
||||
sess := srv.findTaskSession(params)
|
||||
if sess == nil || !sess.task.ProgressTerminal || sess.task.State != TaskStateFailed {
|
||||
t.Fatalf("session terminal state not recorded: %+v", sess)
|
||||
}
|
||||
}
|
||||
|
||||
// T8: 已观察到的终态被缓存,且即使之后 gateway 不可达也优先返回缓存终态。
|
||||
func TestTerminalResultCachedAndServedAfterGatewayLoss(t *testing.T) {
|
||||
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||
terminal := map[string]any{
|
||||
"ok": true,
|
||||
"success": true,
|
||||
"status": "completed",
|
||||
"runId": "run-1",
|
||||
"message": "done",
|
||||
}
|
||||
srv.cacheOpenClawTaskGetResultIfTerminal(params, terminal)
|
||||
|
||||
cached, ok := srv.cachedTerminalOpenClawResult(params)
|
||||
if !ok {
|
||||
t.Fatalf("expected cached terminal result")
|
||||
}
|
||||
if shared.StringArg(cached, "status", "") != "completed" {
|
||||
t.Fatalf("cached status = %q, want completed", cached["status"])
|
||||
}
|
||||
|
||||
// 即使 run 已过 deadline + gateway 丢失,也应优先返回缓存终态而非 interrupted。
|
||||
sess := srv.findTaskSession(params)
|
||||
sess.mu.Lock()
|
||||
sess.task.DeadlineAt = time.Now().Add(-time.Hour)
|
||||
sess.mu.Unlock()
|
||||
got := srv.openClawTaskGetGatewayUnconfirmedFallback(params, "SOCKET_CLOSED", "socket closed")
|
||||
if shared.StringArg(got, "status", "") != "completed" {
|
||||
t.Fatalf("expected cached completed to win over deadline interrupt, got %v", got["status"])
|
||||
}
|
||||
}
|
||||
|
||||
// 同一 session 复用后,旧 run 的终态不得错配给新 runId 的查询。
|
||||
func TestCachedTerminalNotServedForDifferentRunId(t *testing.T) {
|
||||
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{
|
||||
"status": "completed", "success": true, "runId": "run-1",
|
||||
})
|
||||
// 新一轮查询带不同 runId -> 不应命中旧缓存。
|
||||
newParams := map[string]any{"sessionId": "s1", "runId": "run-2"}
|
||||
if _, ok := srv.cachedTerminalOpenClawResult(newParams); ok {
|
||||
t.Fatalf("stale terminal for run-1 must not be served for run-2")
|
||||
}
|
||||
// 原 runId 仍应命中。
|
||||
if _, ok := srv.cachedTerminalOpenClawResult(params); !ok {
|
||||
t.Fatalf("terminal for run-1 should still be served for run-1")
|
||||
}
|
||||
}
|
||||
|
||||
// running 结果不应被当作终态缓存。
|
||||
func TestRunningResultNotCachedAsTerminal(t *testing.T) {
|
||||
srv, params := newRunRegistryTestServer(time.Now().Add(30 * time.Minute))
|
||||
srv.cacheOpenClawTaskGetResultIfTerminal(params, map[string]any{"status": "running", "runId": "run-1"})
|
||||
if _, ok := srv.cachedTerminalOpenClawResult(params); ok {
|
||||
t.Fatalf("running result must not be cached as terminal")
|
||||
}
|
||||
}
|
||||
|
||||
// 无 per-session 记录时退回旧的 not_found 行为。
|
||||
func TestGatewayUnconfirmedFallbackWithoutSessionReturnsNotFound(t *testing.T) {
|
||||
srv := &Server{sessions: map[string]*session{}}
|
||||
got := srv.openClawTaskGetGatewayUnconfirmedFallback(map[string]any{"sessionId": "missing"}, "X", "y")
|
||||
if parseBool(got["ok"]) {
|
||||
t.Fatalf("expected ok=false not_found, got %v", got)
|
||||
}
|
||||
if shared.StringArg(got, "status", "") != "not_found" {
|
||||
t.Fatalf("status = %q, want not_found", got["status"])
|
||||
}
|
||||
}
|
||||
@ -417,6 +417,10 @@ func (o *SessionOrchestrator) startOpenClawGatewayTask(
|
||||
sess.task.DeadlineAt = record.DeadlineAt
|
||||
sess.task.ProgressStage = "running"
|
||||
sess.task.ProgressMessage = "OpenClaw task accepted"
|
||||
// 新一轮 turn 复用同一 session 时,必须重置上一轮可能留下的终态标记,
|
||||
// 否则持久 run 仓(T8)会把旧 runId 的终态错配给新 run。
|
||||
sess.task.State = TaskStateRunning
|
||||
sess.task.ProgressTerminal = false
|
||||
sess.openClaw = record
|
||||
running := openClawRunningTaskResult(record)
|
||||
sess.lastResult = cloneMap(running)
|
||||
@ -613,13 +617,19 @@ func (o *SessionOrchestrator) openClawArtifactPrepare(
|
||||
|
||||
func isOpenClawUnknownMethodError(errorPayload map[string]any, method string) bool {
|
||||
message := strings.ToLower(strings.TrimSpace(shared.StringArg(errorPayload, "message", "")))
|
||||
code := strings.ToUpper(strings.TrimSpace(shared.StringArg(errorPayload, "code", "")))
|
||||
if message == "" {
|
||||
return false
|
||||
}
|
||||
// 消息形如「unknown method: <method>」已明确指向「网关不认识该方法」,足以判定,
|
||||
// 据此走 graceful fallback(如 openClawFallbackSessionPreparePayload)。
|
||||
//
|
||||
// 注意:不能再用严格的 code 白名单来 gate。真实网关常以数字 JSON-RPC code
|
||||
// (-32601 method not found / -32600 invalid request / -32002 等) 回传,
|
||||
// 经 shared.StringArg(fmt.Sprint) 会被字符串化为 "-32601"/"-32002",
|
||||
// 旧实现只接受 {"", INVALID_REQUEST, METHOD_NOT_FOUND},导致 fallback 失效、
|
||||
// session.prepare 直接以 -32002 硬失败整轮任务。
|
||||
return strings.Contains(message, "unknown method") &&
|
||||
strings.Contains(message, strings.ToLower(strings.TrimSpace(method))) &&
|
||||
(code == "" || code == "INVALID_REQUEST" || code == "METHOD_NOT_FOUND")
|
||||
strings.Contains(message, strings.ToLower(strings.TrimSpace(method)))
|
||||
}
|
||||
|
||||
func openClawFallbackSessionPreparePayload(params map[string]any) map[string]any {
|
||||
@ -1677,11 +1687,16 @@ func applyOpenClawConstraintDeliveryStatus(result map[string]any) {
|
||||
|
||||
func gatewayRPCError(errorPayload map[string]any, fallback string) *shared.RPCError {
|
||||
if isOpenClawRetryableGatewayError(errorPayload) {
|
||||
metricGatewaySocketClosedInc() // T12
|
||||
// T10:连接断属「可重试 / run 可能仍在后台、可续轮询」语义,而非 run 确实失败。
|
||||
// 带 retryable/poll 提示,客户端据此降级为「后台续跑·重连中」(T5) 续轮询 tasks.get,而非硬失败。
|
||||
return &shared.RPCError{
|
||||
Code: -32002,
|
||||
Message: "OPENCLAW_GATEWAY_SOCKET_CLOSED: OpenClaw gateway connection closed during task execution",
|
||||
Data: map[string]any{
|
||||
"code": "OPENCLAW_GATEWAY_SOCKET_CLOSED",
|
||||
"retryable": true,
|
||||
"poll": true,
|
||||
"originalCode": strings.TrimSpace(shared.StringArg(errorPayload, "code", "")),
|
||||
"originalError": strings.TrimSpace(shared.StringArg(errorPayload, "message", "")),
|
||||
},
|
||||
|
||||
@ -132,6 +132,49 @@ func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceKeepsActiveRecordRunning
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpectedArtifactDirectoriesDoNotBlockTerminalTaskState(t *testing.T) {
|
||||
params := map[string]any{"expectedArtifactDirs": []any{"reports/", "artifacts/"}}
|
||||
payload := map[string]any{
|
||||
"success": true,
|
||||
"status": string(TaskStateCompleted),
|
||||
"artifactScope": "tasks/session/run",
|
||||
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
|
||||
"expectedArtifactDirs": []any{
|
||||
"reports/",
|
||||
"artifacts/",
|
||||
},
|
||||
}
|
||||
|
||||
if openClawTaskGetRequiresArtifactExport(params, payload) {
|
||||
t.Fatal("expectedArtifactDirs must remain non-blocking scan hints")
|
||||
}
|
||||
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
|
||||
if status := shared.StringArg(got, "status", ""); status != string(TaskStateCompleted) {
|
||||
t.Fatalf("expected terminal status to remain completed, got %#v", got)
|
||||
}
|
||||
if parseBool(got["pending"]) {
|
||||
t.Fatalf("expected terminal payload not to become pending, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequiredArtifactExtensionsStillBlockUntilVerified(t *testing.T) {
|
||||
params := map[string]any{"requiredArtifactExtensions": []any{"md"}}
|
||||
payload := map[string]any{
|
||||
"success": true,
|
||||
"status": string(TaskStateCompleted),
|
||||
"artifactScope": "tasks/session/run",
|
||||
"artifactDirectory": "/remote/openclaw/workspace/tasks/session/run",
|
||||
}
|
||||
|
||||
if !openClawTaskGetRequiresArtifactExport(params, payload) {
|
||||
t.Fatal("requiredArtifactExtensions must remain a blocking delivery contract")
|
||||
}
|
||||
got := normalizeOpenClawTaskGetResult(params, payload, "openclaw", nil)
|
||||
if status := shared.StringArg(got, "status", ""); status != string(TaskStateRunning) {
|
||||
t.Fatalf("expected missing required artifact to remain syncing, got %#v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeOpenClawTaskGetUnknownArtifactEvidenceFailsAfterDeadlineWithoutRequiredArtifacts(t *testing.T) {
|
||||
payload := map[string]any{
|
||||
"success": false,
|
||||
@ -282,3 +325,50 @@ func TestTaskGetArtifactExportReceivesRequiredArtifactExtensions(t *testing.T) {
|
||||
t.Fatalf("expected expectedFileCountByExtension to reach export, got %#v", exportParams)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsOpenClawUnknownMethodErrorAcceptsNumericGatewayCodes(t *testing.T) {
|
||||
const method = "xworkmate.session.prepare"
|
||||
cases := []struct {
|
||||
name string
|
||||
payload map[string]any
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "string invalid_request code",
|
||||
payload: map[string]any{"code": "INVALID_REQUEST", "message": "unknown method: xworkmate.session.prepare"},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "numeric -32002 (real gateway shape that previously hard-failed)",
|
||||
payload: map[string]any{"code": float64(-32002), "message": "unknown method: xworkmate.session.prepare"},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "numeric -32601 method not found",
|
||||
payload: map[string]any{"code": float64(-32601), "message": "Unknown method: xworkmate.session.prepare"},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "empty code",
|
||||
payload: map[string]any{"message": "unknown method: xworkmate.session.prepare"},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "unrelated error must not be swallowed",
|
||||
payload: map[string]any{"code": float64(-32002), "message": "gateway socket closed"},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "unknown method for a different method name",
|
||||
payload: map[string]any{"code": float64(-32601), "message": "unknown method: chat.send"},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := isOpenClawUnknownMethodError(tc.payload, method); got != tc.want {
|
||||
t.Fatalf("isOpenClawUnknownMethodError(%v) = %v, want %v", tc.payload, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,13 +132,13 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
|
||||
if gatewayProvider == "" {
|
||||
gatewayProvider = "openclaw"
|
||||
}
|
||||
// T7/T8: 一旦观察到终态就从持久 run 仓返回,避免之后 gateway 查不到导致结果丢失。
|
||||
if cached, ok := s.cachedTerminalOpenClawResult(params); ok {
|
||||
return cached
|
||||
}
|
||||
if rpcErr := ensureProductionGatewayConnected(s, gatewayProvider, notify); rpcErr != nil {
|
||||
return map[string]any{
|
||||
"ok": false,
|
||||
"status": "not_found",
|
||||
"code": "GATEWAY_UNAVAILABLE",
|
||||
"message": rpcErr.Message,
|
||||
}
|
||||
// T7/T9: gateway 不可达时按持久 run 仓兜底(续轮询 / deadline 终态),而非裸 not_found。
|
||||
return s.openClawTaskGetGatewayUnconfirmedFallback(params, "GATEWAY_UNAVAILABLE", rpcErr.Message)
|
||||
}
|
||||
result := s.gateway.RequestByMode(
|
||||
gatewayProvider,
|
||||
@ -162,16 +162,15 @@ func (s *Server) handleTaskGet(ctx context.Context, params map[string]any, notif
|
||||
}
|
||||
s.decorateOpenClawArtifactDownloadURLs(payload, sessionKey, runID)
|
||||
stripOpenClawArtifactInlineContent(payload)
|
||||
// T8: 缓存「最终客户端可见形态」(已 decorate 下载 URL + strip 内联内容),
|
||||
// 这样从缓存回放时与正常路径完全一致。
|
||||
s.cacheOpenClawTaskGetResultIfTerminal(params, payload)
|
||||
return payload
|
||||
}
|
||||
// T7/T9: gateway 返回错误(socket closed / not_found / lookup failed)时同样走持久 run 仓兜底。
|
||||
message := strings.TrimSpace(shared.StringArg(result.Error, "message", "openclaw native task lookup failed"))
|
||||
code := strings.TrimSpace(shared.StringArg(result.Error, "code", "TASK_LOOKUP_FAILED"))
|
||||
return map[string]any{
|
||||
"ok": false,
|
||||
"status": "not_found",
|
||||
"code": code,
|
||||
"message": message,
|
||||
}
|
||||
return s.openClawTaskGetGatewayUnconfirmedFallback(params, code, message)
|
||||
}
|
||||
|
||||
func (s *Server) taskGetParamsWithSessionScope(params map[string]any) map[string]any {
|
||||
@ -472,9 +471,11 @@ func openClawTaskGetRequiresArtifactExport(params map[string]any, payload map[st
|
||||
if parseBool(params["requiresExportBeforeFinalResponse"]) || parseBool(payload["requiresExportBeforeFinalResponse"]) {
|
||||
return true
|
||||
}
|
||||
return len(shared.ListArg(params, "expectedArtifactDirs")) > 0 ||
|
||||
len(shared.ListArg(payload, "expectedArtifactDirs")) > 0 ||
|
||||
len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
|
||||
// expectedArtifactDirs are discovery hints for the plugin's workspace-root
|
||||
// scan. They do not prove that the caller requires a file before the run can
|
||||
// reach a terminal state. Treating them as a blocking contract turns a
|
||||
// failed/no-output agent run into an endless "syncing-artifacts" loop.
|
||||
return len(shared.ListArg(params, "requiredArtifactExtensions")) > 0 ||
|
||||
len(shared.ListArg(payload, "requiredArtifactExtensions")) > 0
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user