From 911f5709a3f04249d22b0ba5bed8c45370930ee6 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Tue, 2 Jun 2026 12:19:27 +0800 Subject: [PATCH] feat(acp): implement distributed task router and openclaw gateway ingress --- docs/api-reference.md | 99 +++-- docs/architecture/acp-forwarding-topology.md | 31 +- example/config.yaml | 37 +- internal/acp/config.go | 108 ++--- internal/acp/distributed_config_test.go | 207 +++++++-- internal/acp/distributed_forwarder.go | 431 +++++++++++++++++-- internal/acp/http_handler.go | 64 ++- internal/acp/server.go | 6 +- internal/acp/types.go | 2 +- 9 files changed, 815 insertions(+), 170 deletions(-) diff --git a/docs/api-reference.md b/docs/api-reference.md index 5d341c4..c1e9536 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -54,53 +54,100 @@ ## 3.1 Lightweight Distributed Task Forwarding -bridge 可以把本机收到的 HTTP 任务提交转发到另一个 bridge endpoint,用于轻量分布式部署。当前实现落地的是双节点分布式拓扑: +bridge 可以把本机收到的 HTTP 任务提交转发到另一个 bridge endpoint,用于轻量分布式部署。当前实现是一个静态 task router,不做自动发现,不依赖 config center: -- `cn-xworkmate-bridge.svc.plus` 负责本地入口和鉴权 -- `xworkmate-bridge.svc.plus` 负责实际 OpenClaw task runtime -- cn bridge 配置 `task_forward_peer_id` 后,`POST /acp/rpc` 的 `session.start` / `session.message` 会转发到 peer 的同一路径 -- 主 bridge 不配置 `task_forward_peer_id`,因此不会把任务反向转发到 CN +- `nodes` 是静态 peer catalog,记录 bridge 节点身份、角色、能力和私网 endpoint +- `forwarding.rules` 决定哪些 JSON-RPC method 转发到哪个节点或哪类节点 +- `forwarding.routes` 决定显式 next-hop,用于星状或网状拓扑 +- `session.start` 选中目标后,`session.message` 通过本机 session route store 粘到同一个目标节点 +- 公网域名只作为 ingress;bridge 之间的 task forward 只走 WireGuard over VLESS 私网 endpoint -配置: +双节点简写配置: ```yaml distributed: topology: "dual-node" local_node_id: "cn-xworkmate-bridge" task_forward_peer_id: "xworkmate-bridge" - nodes: - - id: "xworkmate-bridge" - role: "primary" - public_base_url: "https://xworkmate-bridge.svc.plus" - bridge_endpoint: "http://172.29.10.1:8787" - - id: "cn-xworkmate-bridge" - role: "edge" - public_base_url: "https://cn-xworkmate-bridge.svc.plus" - bridge_endpoint: "http://172.29.10.2:8787" task_forward_token: "" ``` -单 endpoint 覆盖仍可用于临时验证: +`task_forward_peer_id: "xworkmate-bridge"` 等价于把 `session.start` / `session.message` 转发到 peer `xworkmate-bridge`。内置 peer catalog 会把它解析为 `http://172.29.10.1:8787`。主 bridge 不配置 `task_forward_peer_id`,因此不会反向转发。 -```text -XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT=https://xworkmate-bridge.svc.plus -XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN=$PEER_BRIDGE_AUTH_TOKEN +通用多节点配置: + +```yaml +distributed: + local_node_id: "edge-cn" + nodes: + - id: "edge-cn" + role: "edge" + zone: "cn" + bridge_endpoint: "http://172.29.10.2:8787" + capabilities: ["ingress"] + - id: "worker-a" + role: "executor" + zone: "global" + bridge_endpoint: "http://172.29.10.11:8787" + capabilities: ["openclaw"] + - id: "worker-b" + role: "executor" + zone: "global" + bridge_endpoint: "http://172.29.10.12:8787" + capabilities: ["openclaw"] + forwarding: + hop_limit: 3 + default_action: "execute_local" + rules: + - methods: ["session.start", "session.message"] + target: + selector: + role: "executor" + capability: "openclaw" + strategy: "round_robin" +``` + +星状或显式 next-hop 网状配置: + +```yaml +distributed: + local_node_id: "edge-cn" + nodes: + - id: "edge-cn" + role: "edge" + bridge_endpoint: "http://172.29.10.2:8787" + - id: "hub-main" + role: "hub" + bridge_endpoint: "http://172.29.10.1:8787" + - id: "worker-eu" + role: "executor" + bridge_endpoint: "http://172.29.10.30:8787" + forwarding: + hop_limit: 3 + rules: + - methods: ["session.start"] + target: + node_id: "worker-eu" + routes: + - target_node_id: "worker-eu" + next_hop_node_id: "hub-main" ``` 规则: -- `task_forward_peer_id` 指向的节点 `bridge_endpoint` 是 peer bridge base URL,bridge 会按当前请求路径拼接 `/acp/rpc` -- 同步消息不能走公网明文:公网 endpoint 必须使用 `https://` -- `http://` 只允许 loopback、private、link-local 这类本机或 VPN 内网地址,用于 WireGuard 等隧道已经提供加密的场景 -- endpoint 可以是公网 HTTPS,也可以是 VPN 内网 HTTP(S)。bridge 明确支持把 `bridge_endpoint` 配成 WireGuard、WireGuard over VLESS/TCP/TLS、WebSocket/TLS 等隧道后的本机或私网地址 +- `bridge_endpoint` 是 peer bridge base URL,bridge 会按当前请求路径拼接 `/acp/rpc` 或 `/gateway/openclaw` +- 同步消息不能走公网;`bridge_endpoint` 必须是 loopback、private、link-local 这类本机或 VPN 内网地址,用于 WireGuard over VLESS 等隧道已经提供加密的场景 - 只要求本机网络能路由到 endpoint;bridge 不依赖 config center 或额外注册中心 - `task_forward_token` 为空时复用本机 `BRIDGE_AUTH_TOKEN` -- 转发请求会带 `X-XWorkmate-Bridge-Forwarded: 1`,收到该 header 后不会再次转发,避免 bridge 之间循环 +- 转发请求会带 `X-XWorkmate-Bridge-Forwarded: 1` +- `X-XWorkmate-Forward-Source` 是源节点,`X-XWorkmate-Forward-Target` 是最终目标节点 +- `X-XWorkmate-Forward-Hop` 逐跳递增,超过 `forwarding.hop_limit` 时拒绝转发,避免循环 +- 收到已转发请求时,如果 target 是本机则本机执行;如果 target 不是本机,则按 `forwarding.routes` 查 next-hop 抗干扰建议: -- 跨境或 GFW 环境可以直接把 `task_forward_endpoint` 指向 WireGuard over VLESS/TCP/TLS 后的 `http://10.x/172.16-31.x/192.168.x` 私网 bridge endpoint -- 运营商 UDP 阻断时,支持把裸 WireGuard UDP 替换为 WireGuard over VLESS/TCP/TLS、WebSocket/TLS 或等价可靠加密通道,bridge 继续使用同一个 peer endpoint 配置 +- 跨境或 GFW 环境下,`bridge_endpoint` 应指向 WireGuard over VLESS/TCP/TLS 后的 `http://10.x/172.16-31.x/192.168.x` 私网 bridge endpoint +- 运营商 UDP 阻断时,支持把裸 WireGuard UDP 替换为 WireGuard over VLESS/TCP/TLS、WebSocket/TLS 或等价可靠加密通道,bridge 继续使用同一个 peer catalog 和 forwarding rules - bridge 层继续使用 bearer token 鉴权;隧道层负责链路加密和抗干扰,应用层负责 peer 身份和任务权限 推荐 APP 配置: diff --git a/docs/architecture/acp-forwarding-topology.md b/docs/architecture/acp-forwarding-topology.md index 9fd174f..1a9f5ef 100644 --- a/docs/architecture/acp-forwarding-topology.md +++ b/docs/architecture/acp-forwarding-topology.md @@ -1,6 +1,6 @@ # ACP Forwarding Topology -Last Updated: 2026-05-03 +Last Updated: 2026-06-02 本文档只描述当前保留的 canonical topology。 @@ -77,6 +77,35 @@ flowchart LR - provider catalog 与 gatewayProviders 由 bridge 独占生成 - bridge 只暴露 canonical ACP contract - provider / gateway 实际地址属于 bridge internal truth +- bridge-to-bridge task forward 只使用 WireGuard over VLESS 私网 endpoint,公网域名只作为 ingress + +## Distributed Task Router + +分布式转发是 bridge 内部能力,不改变 app-facing canonical surface。每个 bridge 从静态 peer catalog、forwarding rules 和 routes 得出下一跳: + +```mermaid +flowchart LR + CN["cn-xworkmate-bridge
edge ingress"] + MAIN["xworkmate-bridge
primary / hub"] + WA["worker-a
executor"] + WB["worker-b
executor"] + EU["worker-eu
executor"] + + CN -- "task_forward_peer_id or rule
http://172.29.10.1:8787" --> MAIN + MAIN -- "selector role=executor
round_robin" --> WA + MAIN -- "selector role=executor
round_robin" --> WB + CN -- "route target=worker-eu
next_hop=xworkmate-bridge" --> MAIN + MAIN -- "private next hop" --> EU +``` + +Router contract: + +- `nodes` 保存节点身份、角色、能力、zone 和私网 `bridge_endpoint` +- `forwarding.rules` 选择最终 target node +- `forwarding.routes` 选择 next-hop,用于星状或显式 mesh +- `session.start` 选中 target 后,`session.message` 使用本机 session route store 粘到同一个 target +- `X-XWorkmate-Forward-Hop` 受 `forwarding.hop_limit` 限制,避免循环 +- `https://*.svc.plus` 这类公网域名不能作为 bridge-to-bridge endpoint ## Non-Contract Facts diff --git a/example/config.yaml b/example/config.yaml index c0a82e7..28a4dea 100644 --- a/example/config.yaml +++ b/example/config.yaml @@ -17,28 +17,43 @@ openclaw_gateway: max_queued: 20 queue_timeout: "10m" -# Lightweight distributed bridge forwarding. -# A dual-node topology can keep both bridge nodes explicit while only enabling -# task forwarding from the node that sets task_forward_peer_id. -# Public plaintext HTTP is rejected; use HTTPS on public paths, or HTTP only on -# loopback/private/link-local VPN addresses where the tunnel provides encryption. -# bridge_endpoint is the peer bridge base URL; /gateway/openclaw or /acp/rpc is -# appended from the inbound request path when forwarding is enabled. +# Lightweight distributed bridge task router. +# Public domains are ingress only. Bridge-to-bridge forwarding must use HTTP +# loopback/private/link-local endpoints protected by WireGuard over VLESS or an +# equivalent private transport. distributed: topology: "dual-node" local_node_id: "cn-xworkmate-bridge" + # Dual-node shorthand. Equivalent to a forwarding rule that sends + # session.start/session.message to xworkmate-bridge. task_forward_peer_id: "xworkmate-bridge" nodes: - id: "xworkmate-bridge" role: "primary" + zone: "global" public_base_url: "https://xworkmate-bridge.svc.plus" bridge_endpoint: "http://172.29.10.1:8787" + capabilities: ["openclaw", "codex"] - id: "cn-xworkmate-bridge" role: "edge" + zone: "cn" public_base_url: "https://cn-xworkmate-bridge.svc.plus" bridge_endpoint: "http://172.29.10.2:8787" - # Optional direct override. Leave empty when using the topology above. - task_forward_endpoint: "" + capabilities: ["ingress"] + forwarding: + hop_limit: 3 + default_action: "execute_local" + # For multi-node/star/mesh mode, prefer explicit rules over task_forward_peer_id: + # rules: + # - methods: ["session.start", "session.message"] + # target: + # selector: + # role: "executor" + # capability: "openclaw" + # strategy: "round_robin" + # routes: + # - target_node_id: "worker-eu" + # next_hop_node_id: "xworkmate-bridge" # Optional. Defaults to BRIDGE_AUTH_TOKEN when omitted. task_forward_token: "" @@ -55,5 +70,5 @@ notes: - The bridge reads its own auth token from BRIDGE_AUTH_TOKEN. - Upstream URLs can include sub-paths which will be preserved during WebSocket handshake. - Multi-agent and Single-agent modes will use these endpoints to delegate tasks. - - Distributed task forwarding rejects public plaintext HTTP; use HTTPS or private VPN HTTP(S) endpoints. - - task_forward_endpoint can point at a WireGuard-over-VLESS/TCP/TLS local/private bridge address for GFW or carrier UDP blocking environments. + - Distributed task forwarding rejects public endpoints; bridge_endpoint must be private/VPN HTTP. + - task_forward_peer_id is the dual-node shorthand; forwarding.rules/routes express multi-node, star, and mesh topologies. diff --git a/internal/acp/config.go b/internal/acp/config.go index e6d3111..d20f72f 100644 --- a/internal/acp/config.go +++ b/internal/acp/config.go @@ -29,19 +29,50 @@ type BridgeConfig struct { } type DistributedConfig struct { - TaskForwardEndpoint string `yaml:"task_forward_endpoint"` - TaskForwardToken string `yaml:"task_forward_token"` - Topology string `yaml:"topology"` - LocalNodeID string `yaml:"local_node_id"` - TaskForwardPeerID string `yaml:"task_forward_peer_id"` - Nodes []DistributedNodeConfig `yaml:"nodes"` + TaskForwardToken string `yaml:"task_forward_token"` + Topology string `yaml:"topology"` + LocalNodeID string `yaml:"local_node_id"` + TaskForwardPeerID string `yaml:"task_forward_peer_id"` + Nodes []DistributedNodeConfig `yaml:"nodes"` + Forwarding DistributedForwardingConfig `yaml:"forwarding"` } type DistributedNodeConfig struct { - ID string `yaml:"id"` - Role string `yaml:"role"` - PublicBaseURL string `yaml:"public_base_url"` - BridgeEndpoint string `yaml:"bridge_endpoint"` + ID string `yaml:"id"` + Role string `yaml:"role"` + Zone string `yaml:"zone"` + PublicBaseURL string `yaml:"public_base_url"` + BridgeEndpoint string `yaml:"bridge_endpoint"` + Capabilities []string `yaml:"capabilities"` +} + +type DistributedForwardingConfig struct { + HopLimit int `yaml:"hop_limit"` + DefaultAction string `yaml:"default_action"` + Rules []DistributedForwardRuleConfig `yaml:"rules"` + Routes []DistributedRouteConfig `yaml:"routes"` +} + +type DistributedForwardRuleConfig struct { + Methods []string `yaml:"methods"` + Target DistributedForwardTargetConfig `yaml:"target"` +} + +type DistributedForwardTargetConfig struct { + NodeID string `yaml:"node_id"` + Selector DistributedForwardSelectorConfig `yaml:"selector"` + Strategy string `yaml:"strategy"` +} + +type DistributedForwardSelectorConfig struct { + Role string `yaml:"role"` + Zone string `yaml:"zone"` + Capability string `yaml:"capability"` +} + +type DistributedRouteConfig struct { + TargetNodeID string `yaml:"target_node_id"` + NextHopNodeID string `yaml:"next_hop_node_id"` } type OpenClawGatewayConfig struct { @@ -88,18 +119,6 @@ func bridgeSharedAuthToken() string { return strings.TrimSpace(shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", "")) } -func resolveDistributedTaskForwardEndpoint(config *BridgeConfig) string { - yamlVal := "" - if config != nil { - yamlVal = config.Distributed.TaskForwardEndpoint - } - return resolveURL( - firstNonEmpty(yamlVal, resolveDistributedTopologyTaskForwardEndpoint(config)), - "XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT", - "BRIDGE_TASK_FORWARD_ENDPOINT", - ) -} - func resolveDistributedTaskForwardToken(config *BridgeConfig) string { if token := strings.TrimSpace(os.Getenv("XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN")); token != "" { return token @@ -115,38 +134,21 @@ func resolveDistributedTaskForwardToken(config *BridgeConfig) string { return bridgeSharedAuthToken() } -func resolveDistributedTopologyTaskForwardEndpoint(config *BridgeConfig) string { - if config == nil { - return "" +func defaultDistributedNodes() []DistributedNodeConfig { + return []DistributedNodeConfig{ + { + ID: "xworkmate-bridge", + Role: "primary", + PublicBaseURL: "https://xworkmate-bridge.svc.plus", + BridgeEndpoint: "http://172.29.10.1:8787", + }, + { + ID: "cn-xworkmate-bridge", + Role: "edge", + PublicBaseURL: "https://cn-xworkmate-bridge.svc.plus", + BridgeEndpoint: "http://172.29.10.2:8787", + }, } - distributed := config.Distributed - topology := strings.TrimSpace(distributed.Topology) - if topology == "" { - return "" - } - if !strings.EqualFold(topology, "dual-node") { - return "" - } - localNodeID := strings.TrimSpace(distributed.LocalNodeID) - peerNodeID := strings.TrimSpace(distributed.TaskForwardPeerID) - if localNodeID == "" || peerNodeID == "" || localNodeID == peerNodeID { - return "" - } - for _, node := range distributed.Nodes { - if strings.TrimSpace(node.ID) == peerNodeID { - return strings.TrimSpace(node.BridgeEndpoint) - } - } - return "" -} - -func firstNonEmpty(values ...string) string { - for _, value := range values { - if trimmed := strings.TrimSpace(value); trimmed != "" { - return trimmed - } - } - return "" } func newProductionProviderCatalog() (*BridgeConfig, map[string]syncedProvider, []string) { diff --git a/internal/acp/distributed_config_test.go b/internal/acp/distributed_config_test.go index a4033e6..b4cc07e 100644 --- a/internal/acp/distributed_config_test.go +++ b/internal/acp/distributed_config_test.go @@ -1,62 +1,205 @@ package acp -import "testing" +import ( + "net/http/httptest" + "testing" -func TestResolveDistributedTaskForwardEndpointFromDualNodeTopology(t *testing.T) { + "xworkmate-bridge/internal/shared" +) + +func TestDistributedTaskRouterFromDualNodePeerID(t *testing.T) { config := &BridgeConfig{} config.Distributed.Topology = "dual-node" config.Distributed.LocalNodeID = "cn-xworkmate-bridge" config.Distributed.TaskForwardPeerID = "xworkmate-bridge" - config.Distributed.Nodes = []DistributedNodeConfig{ - { - ID: "xworkmate-bridge", - BridgeEndpoint: "http://172.29.10.1:8787", - }, - { - ID: "cn-xworkmate-bridge", - BridgeEndpoint: "http://172.29.10.2:8787", - }, - } - if got := resolveDistributedTaskForwardEndpoint(config); got != "http://172.29.10.1:8787" { - t.Fatalf("resolveDistributedTaskForwardEndpoint() = %q, want %q", got, "http://172.29.10.1:8787") + router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}) + if router == nil { + t.Fatal("expected router") + } + decision, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/gateway/openclaw", nil), sessionStart("s1")) + if err != nil { + t.Fatalf("forwardDecision() error = %v", err) + } + if !ok { + t.Fatal("expected forward decision") + } + if decision.targetNodeID != "xworkmate-bridge" || decision.nextHopID != "xworkmate-bridge" { + t.Fatalf("decision = %#v, want target and next hop xworkmate-bridge", decision) + } + if decision.endpoint != "http://172.29.10.1:8787" { + t.Fatalf("endpoint = %q, want private main bridge endpoint", decision.endpoint) } } -func TestResolveDistributedTaskForwardEndpointDisabledWhenPeerUnset(t *testing.T) { +func TestDistributedTaskRouterDisabledWhenPeerUnset(t *testing.T) { config := &BridgeConfig{} config.Distributed.Topology = "dual-node" config.Distributed.LocalNodeID = "xworkmate-bridge" - config.Distributed.Nodes = []DistributedNodeConfig{ - { - ID: "xworkmate-bridge", - BridgeEndpoint: "http://172.29.10.1:8787", - }, - { - ID: "cn-xworkmate-bridge", - BridgeEndpoint: "http://172.29.10.2:8787", - }, - } - if got := resolveDistributedTaskForwardEndpoint(config); got != "" { - t.Fatalf("resolveDistributedTaskForwardEndpoint() = %q, want empty endpoint", got) + if router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}); router != nil { + t.Fatalf("newDistributedTaskRouter() = %#v, want nil", router) } } -func TestResolveDistributedTaskForwardEndpointKeepsExplicitEndpoint(t *testing.T) { +func TestDistributedTaskRouterPrefersConfiguredNodeOverDefault(t *testing.T) { config := &BridgeConfig{} - config.Distributed.TaskForwardEndpoint = "https://xworkmate-bridge.svc.plus" config.Distributed.Topology = "dual-node" config.Distributed.LocalNodeID = "cn-xworkmate-bridge" config.Distributed.TaskForwardPeerID = "xworkmate-bridge" config.Distributed.Nodes = []DistributedNodeConfig{ { ID: "xworkmate-bridge", - BridgeEndpoint: "http://172.29.10.1:8787", + BridgeEndpoint: "http://172.29.10.11:8787", }, } - if got := resolveDistributedTaskForwardEndpoint(config); got != "https://xworkmate-bridge.svc.plus" { - t.Fatalf("resolveDistributedTaskForwardEndpoint() = %q, want explicit endpoint", got) + router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}) + decision, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s1")) + if err != nil { + t.Fatalf("forwardDecision() error = %v", err) + } + if !ok { + t.Fatal("expected forward decision") + } + if decision.endpoint != "http://172.29.10.11:8787" { + t.Fatalf("endpoint = %q, want configured endpoint", decision.endpoint) + } +} + +func TestDistributedTaskRouterSelectorRoundRobinKeepsSessionAffinity(t *testing.T) { + config := &BridgeConfig{} + config.Distributed.LocalNodeID = "edge-cn" + config.Distributed.Nodes = []DistributedNodeConfig{ + {ID: "edge-cn", Role: "edge", Zone: "cn", BridgeEndpoint: "http://172.29.10.2:8787"}, + {ID: "worker-a", Role: "executor", Zone: "global", BridgeEndpoint: "http://172.29.10.11:8787", Capabilities: []string{"openclaw"}}, + {ID: "worker-b", Role: "executor", Zone: "global", BridgeEndpoint: "http://172.29.10.12:8787", Capabilities: []string{"openclaw"}}, + } + config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{ + { + Methods: []string{"session.start", "session.message"}, + Target: DistributedForwardTargetConfig{ + Selector: DistributedForwardSelectorConfig{Role: "executor", Capability: "openclaw"}, + Strategy: "round_robin", + }, + }, + } + + router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}) + first, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s1")) + if err != nil { + t.Fatalf("first forwardDecision() error = %v", err) + } + if !ok || first.targetNodeID != "worker-a" { + t.Fatalf("first decision = %#v, want worker-a", first) + } + secondSession, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s2")) + if err != nil { + t.Fatalf("second forwardDecision() error = %v", err) + } + if !ok || secondSession.targetNodeID != "worker-b" { + t.Fatalf("second session decision = %#v, want worker-b", secondSession) + } + followUp, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionMessage("s1")) + if err != nil { + t.Fatalf("follow-up forwardDecision() error = %v", err) + } + if !ok || followUp.targetNodeID != "worker-a" { + t.Fatalf("follow-up decision = %#v, want sticky worker-a", followUp) + } +} + +func TestDistributedTaskRouterUsesExplicitNextHopRoute(t *testing.T) { + config := &BridgeConfig{} + config.Distributed.LocalNodeID = "edge-cn" + config.Distributed.Nodes = []DistributedNodeConfig{ + {ID: "edge-cn", Role: "edge", BridgeEndpoint: "http://172.29.10.2:8787"}, + {ID: "hub-main", Role: "hub", BridgeEndpoint: "http://172.29.10.1:8787"}, + {ID: "worker-eu", Role: "executor", BridgeEndpoint: "http://172.29.10.30:8787"}, + } + config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{ + { + Methods: []string{"session.start"}, + Target: DistributedForwardTargetConfig{NodeID: "worker-eu"}, + }, + } + config.Distributed.Forwarding.Routes = []DistributedRouteConfig{ + {TargetNodeID: "worker-eu", NextHopNodeID: "hub-main"}, + } + + router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}) + decision, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s1")) + if err != nil { + t.Fatalf("forwardDecision() error = %v", err) + } + if !ok { + t.Fatal("expected forward decision") + } + if decision.targetNodeID != "worker-eu" || decision.nextHopID != "hub-main" || decision.endpoint != "http://172.29.10.1:8787" { + t.Fatalf("decision = %#v, want target worker-eu via hub-main", decision) + } +} + +func TestDistributedTaskRouterStopsWhenForwardedTargetIsLocal(t *testing.T) { + config := &BridgeConfig{} + config.Distributed.LocalNodeID = "hub-main" + config.Distributed.Nodes = []DistributedNodeConfig{ + {ID: "hub-main", Role: "hub", BridgeEndpoint: "http://172.29.10.1:8787"}, + {ID: "worker-eu", Role: "executor", BridgeEndpoint: "http://172.29.10.30:8787"}, + } + config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{ + {Methods: []string{"session.start"}, Target: DistributedForwardTargetConfig{NodeID: "worker-eu"}}, + } + + router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}) + req := httptest.NewRequest("POST", "/acp/rpc", nil) + req.Header.Set(distributedForwardedHeader, "1") + req.Header.Set(distributedTargetHeader, "hub-main") + if _, ok, err := router.forwardDecision(req, sessionStart("s1")); err != nil || ok { + t.Fatalf("forwardDecision() ok=%t err=%v, want local execution", ok, err) + } +} + +func TestDistributedTaskRouterRejectsHopLimitExceeded(t *testing.T) { + config := &BridgeConfig{} + config.Distributed.LocalNodeID = "hub-main" + config.Distributed.Forwarding.HopLimit = 1 + config.Distributed.Nodes = []DistributedNodeConfig{ + {ID: "hub-main", Role: "hub", BridgeEndpoint: "http://172.29.10.1:8787"}, + {ID: "worker-eu", Role: "executor", BridgeEndpoint: "http://172.29.10.30:8787"}, + } + config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{ + {Methods: []string{"session.start"}, Target: DistributedForwardTargetConfig{NodeID: "worker-eu"}}, + } + + router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}) + req := httptest.NewRequest("POST", "/acp/rpc", nil) + req.Header.Set(distributedForwardedHeader, "1") + req.Header.Set(distributedTargetHeader, "worker-eu") + req.Header.Set(distributedHopHeader, "1") + if _, ok, err := router.forwardDecision(req, sessionStart("s1")); err == nil || ok { + t.Fatalf("forwardDecision() ok=%t err=%v, want hop limit error", ok, err) + } +} + +func TestDistributedForwardURLRejectsPublicEndpoint(t *testing.T) { + if _, err := distributedForwardURL("https://xworkmate-bridge.svc.plus", "/acp/rpc"); err == nil { + t.Fatal("expected public endpoint rejection") + } +} + +func sessionStart(sessionID string) shared.RPCRequest { + return shared.RPCRequest{ + ID: sessionID, + Method: "session.start", + Params: map[string]any{"sessionId": sessionID, "threadId": "thread-" + sessionID}, + } +} + +func sessionMessage(sessionID string) shared.RPCRequest { + return shared.RPCRequest{ + ID: sessionID + "-message", + Method: "session.message", + Params: map[string]any{"sessionId": sessionID, "threadId": "thread-" + sessionID}, } } diff --git a/internal/acp/distributed_forwarder.go b/internal/acp/distributed_forwarder.go index dec30bb..037f844 100644 --- a/internal/acp/distributed_forwarder.go +++ b/internal/acp/distributed_forwarder.go @@ -9,51 +9,150 @@ import ( "net" "net/http" "net/url" + "sort" + "strconv" "strings" + "sync" + "time" "xworkmate-bridge/internal/shared" ) -const distributedForwardedHeader = "X-XWorkmate-Bridge-Forwarded" +const ( + distributedForwardedHeader = "X-XWorkmate-Bridge-Forwarded" + distributedSourceHeader = "X-XWorkmate-Forward-Source" + distributedTargetHeader = "X-XWorkmate-Forward-Target" + distributedTraceHeader = "X-XWorkmate-Forward-Trace" + distributedHopHeader = "X-XWorkmate-Forward-Hop" -type distributedTaskForwarderConfig struct { - Endpoint string - Token string + defaultDistributedHopLimit = 3 + defaultSessionRouteTTL = 24 * time.Hour +) + +type distributedTaskRouterConfig struct { + Config *BridgeConfig + Token string } -type distributedTaskForwarder struct { - endpoint string - token string - httpClient *http.Client +type distributedTaskRouter struct { + localNodeID string + token string + hopLimit int + nodes map[string]DistributedNodeConfig + rules []DistributedForwardRuleConfig + routes map[string]string + routeStore *distributedSessionRouteStore + roundRobin map[string]int + httpClient *http.Client + mu sync.Mutex } -func newDistributedTaskForwarder(config distributedTaskForwarderConfig) *distributedTaskForwarder { - endpoint := strings.TrimRight(strings.TrimSpace(config.Endpoint), "/") - if endpoint == "" { +type distributedForwardDecision struct { + targetNodeID string + nextHopID string + endpoint string +} + +type distributedSessionRoute struct { + TargetNodeID string + ExpiresAt time.Time +} + +type distributedSessionRouteStore struct { + mu sync.Mutex + routes map[string]distributedSessionRoute + ttl time.Duration +} + +func newDistributedTaskRouter(config distributedTaskRouterConfig) *distributedTaskRouter { + if config.Config == nil { return nil } - return &distributedTaskForwarder{ - endpoint: endpoint, - token: strings.TrimSpace(config.Token), + distributed := config.Config.Distributed + localNodeID := strings.TrimSpace(distributed.LocalNodeID) + if localNodeID == "" { + return nil + } + nodes := distributedNodeCatalog(distributed.Nodes) + if _, ok := nodes[localNodeID]; !ok { + return nil + } + rules := distributedForwardRules(distributed) + if len(rules) == 0 { + return nil + } + hopLimit := distributed.Forwarding.HopLimit + if hopLimit <= 0 { + hopLimit = defaultDistributedHopLimit + } + return &distributedTaskRouter{ + localNodeID: localNodeID, + token: strings.TrimSpace(config.Token), + hopLimit: hopLimit, + nodes: nodes, + rules: rules, + routes: distributedRouteMap(distributed.Forwarding.Routes), + routeStore: newDistributedSessionRouteStore(defaultSessionRouteTTL), + roundRobin: make(map[string]int), httpClient: &http.Client{ Timeout: openClawAgentWaitMaxTimeout + openClawAgentWaitHTTPMargin, }, } } -func (f *distributedTaskForwarder) shouldForward(r *http.Request, request shared.RPCRequest) bool { - if f == nil || strings.TrimSpace(f.endpoint) == "" || r == nil { - return false +func distributedNodeCatalog(configured []DistributedNodeConfig) map[string]DistributedNodeConfig { + nodes := make(map[string]DistributedNodeConfig) + for _, node := range defaultDistributedNodes() { + if id := strings.TrimSpace(node.ID); id != "" { + node.ID = id + nodes[id] = node + } } - if strings.TrimSpace(r.Header.Get(distributedForwardedHeader)) != "" { - return false + for _, node := range configured { + if id := strings.TrimSpace(node.ID); id != "" { + node.ID = id + nodes[id] = node + } } - method := strings.TrimSpace(request.Method) - return method == "session.start" || method == "session.message" + return nodes } -func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseWriter, r *http.Request, request shared.RPCRequest) bool { - if !f.shouldForward(r, request) { +func distributedForwardRules(distributed DistributedConfig) []DistributedForwardRuleConfig { + if len(distributed.Forwarding.Rules) > 0 { + return distributed.Forwarding.Rules + } + if peerID := strings.TrimSpace(distributed.TaskForwardPeerID); peerID != "" { + return []DistributedForwardRuleConfig{ + { + Methods: []string{"session.start", "session.message"}, + Target: DistributedForwardTargetConfig{ + NodeID: peerID, + }, + }, + } + } + return nil +} + +func distributedRouteMap(routes []DistributedRouteConfig) map[string]string { + result := make(map[string]string) + for _, route := range routes { + target := strings.TrimSpace(route.TargetNodeID) + nextHop := strings.TrimSpace(route.NextHopNodeID) + if target != "" && nextHop != "" && target != nextHop { + result[target] = nextHop + } + } + return result +} + +func (r *distributedTaskRouter) forward(ctx context.Context, w http.ResponseWriter, req *http.Request, request shared.RPCRequest) bool { + decision, ok, err := r.forwardDecision(req, request) + if err != nil { + shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, err.Error()) + return true + } + if !ok { return false } payload, err := json.Marshal(request) @@ -61,7 +160,7 @@ func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseW shared.WriteJSONError(w, request.ID, http.StatusInternalServerError, -32603, "TASK_FORWARD_ENCODE_FAILED") return true } - forwardURL, err := f.forwardURL(r.URL.Path) + forwardURL, err := distributedForwardURL(decision.endpoint, req.URL.Path) if err != nil { shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, err.Error()) return true @@ -71,15 +170,9 @@ func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseW shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_REQUEST_BUILD_FAILED: "+err.Error()) return true } - outbound.Header.Set("Content-Type", "application/json") - outbound.Header.Set(distributedForwardedHeader, "1") - copyForwardHeader(outbound.Header, r.Header, "Accept") - copyForwardHeader(outbound.Header, r.Header, "Origin") - if f.token != "" { - outbound.Header.Set("Authorization", distributedForwardBearerHeader(f.token)) - } + r.copyForwardRequestHeaders(outbound.Header, req.Header, request, decision) - response, err := f.httpClient.Do(outbound) + response, err := r.httpClient.Do(outbound) if err != nil { shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_FAILED: "+err.Error()) return true @@ -91,26 +184,243 @@ func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseW return true } -func (f *distributedTaskForwarder) forwardURL(path string) (string, error) { - base, err := url.Parse(f.endpoint) - if err != nil || base.Scheme == "" || base.Host == "" { - return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INVALID: %s", f.endpoint) +func (r *distributedTaskRouter) forwardDecision(req *http.Request, request shared.RPCRequest) (distributedForwardDecision, bool, error) { + if r == nil || req == nil { + return distributedForwardDecision{}, false, nil } - if !distributedForwardEndpointEncryptedOrPrivate(base) { - return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INSECURE: use https or a private VPN endpoint") + forwardedTarget := strings.TrimSpace(req.Header.Get(distributedTargetHeader)) + if strings.TrimSpace(req.Header.Get(distributedForwardedHeader)) != "" && forwardedTarget == "" { + return distributedForwardDecision{}, false, nil + } + if forwardedTarget != "" { + if strings.EqualFold(forwardedTarget, r.localNodeID) { + return distributedForwardDecision{}, false, nil + } + return r.decisionForTarget(forwardedTarget, req) + } + + method := strings.TrimSpace(request.Method) + if !distributedForwardableMethod(method) { + return distributedForwardDecision{}, false, nil + } + sessionKey := distributedSessionRouteKey(request) + if sessionKey != "" { + if routed, ok := r.routeStore.get(sessionKey); ok { + return r.decisionForTarget(routed, req) + } + } + targetNodeID, ok := r.targetForRequest(method) + if !ok || targetNodeID == "" || strings.EqualFold(targetNodeID, r.localNodeID) { + return distributedForwardDecision{}, false, nil + } + if sessionKey != "" { + r.routeStore.set(sessionKey, targetNodeID) + } + return r.decisionForTarget(targetNodeID, req) +} + +func distributedForwardableMethod(method string) bool { + return method == "session.start" || method == "session.message" +} + +func distributedSessionRouteKey(request shared.RPCRequest) string { + params := shared.AsMap(request.Params) + if params == nil { + return "" + } + if sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")); sessionID != "" { + return "session:" + sessionID + } + if threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")); threadID != "" { + return "thread:" + threadID + } + return "" +} + +func (r *distributedTaskRouter) targetForRequest(method string) (string, bool) { + for _, rule := range r.rules { + if !distributedRuleMatchesMethod(rule, method) { + continue + } + if target := strings.TrimSpace(rule.Target.NodeID); target != "" { + return target, true + } + target, ok := r.selectNode(rule.Target) + return target, ok + } + return "", false +} + +func distributedRuleMatchesMethod(rule DistributedForwardRuleConfig, method string) bool { + for _, candidate := range rule.Methods { + candidate = strings.TrimSpace(candidate) + if candidate == "*" || candidate == method { + return true + } + } + return false +} + +func (r *distributedTaskRouter) selectNode(target DistributedForwardTargetConfig) (string, bool) { + candidates := make([]string, 0, len(r.nodes)) + for id, node := range r.nodes { + if id == r.localNodeID { + continue + } + if !distributedNodeMatchesSelector(node, target.Selector) { + continue + } + candidates = append(candidates, id) + } + if len(candidates) == 0 { + return "", false + } + sort.Strings(candidates) + strategy := strings.TrimSpace(target.Strategy) + if strategy == "" || strings.EqualFold(strategy, "first") { + return candidates[0], true + } + if strings.EqualFold(strategy, "round_robin") { + key := distributedSelectorKey(target.Selector) + r.mu.Lock() + index := r.roundRobin[key] % len(candidates) + r.roundRobin[key]++ + r.mu.Unlock() + return candidates[index], true + } + return candidates[0], true +} + +func distributedNodeMatchesSelector(node DistributedNodeConfig, selector DistributedForwardSelectorConfig) bool { + if role := strings.TrimSpace(selector.Role); role != "" && !strings.EqualFold(strings.TrimSpace(node.Role), role) { + return false + } + if zone := strings.TrimSpace(selector.Zone); zone != "" && !strings.EqualFold(strings.TrimSpace(node.Zone), zone) { + return false + } + if capability := strings.TrimSpace(selector.Capability); capability != "" && !distributedNodeHasCapability(node, capability) { + return false + } + return true +} + +func distributedNodeHasCapability(node DistributedNodeConfig, capability string) bool { + for _, candidate := range node.Capabilities { + if strings.EqualFold(strings.TrimSpace(candidate), capability) { + return true + } + } + return false +} + +func distributedSelectorKey(selector DistributedForwardSelectorConfig) string { + return strings.Join([]string{ + strings.TrimSpace(selector.Role), + strings.TrimSpace(selector.Zone), + strings.TrimSpace(selector.Capability), + }, "|") +} + +func (r *distributedTaskRouter) decisionForTarget(targetNodeID string, req *http.Request) (distributedForwardDecision, bool, error) { + targetNodeID = strings.TrimSpace(targetNodeID) + nextHopID := r.nextHopForTarget(targetNodeID) + if nextHopID == "" || strings.EqualFold(nextHopID, r.localNodeID) { + return distributedForwardDecision{}, false, nil + } + node, ok := r.nodes[nextHopID] + if !ok { + return distributedForwardDecision{}, false, fmt.Errorf("TASK_FORWARD_PEER_UNKNOWN: %s", nextHopID) + } + if err := r.validateHopLimit(req); err != nil { + return distributedForwardDecision{}, false, err + } + endpoint := strings.TrimSpace(node.BridgeEndpoint) + if endpoint == "" { + return distributedForwardDecision{}, false, fmt.Errorf("TASK_FORWARD_ENDPOINT_MISSING: %s", nextHopID) + } + return distributedForwardDecision{ + targetNodeID: targetNodeID, + nextHopID: nextHopID, + endpoint: endpoint, + }, true, nil +} + +func (r *distributedTaskRouter) nextHopForTarget(targetNodeID string) string { + if nextHopID := strings.TrimSpace(r.routes[targetNodeID]); nextHopID != "" { + return nextHopID + } + return targetNodeID +} + +func (r *distributedTaskRouter) validateHopLimit(req *http.Request) error { + nextHop := distributedForwardHop(req) + 1 + if nextHop > r.hopLimit { + return fmt.Errorf("TASK_FORWARD_HOP_LIMIT_EXCEEDED: %d", r.hopLimit) + } + return nil +} + +func (r *distributedTaskRouter) copyForwardRequestHeaders(dst http.Header, src http.Header, request shared.RPCRequest, decision distributedForwardDecision) { + dst.Set("Content-Type", "application/json") + dst.Set(distributedForwardedHeader, "1") + dst.Set(distributedSourceHeader, firstForwardHeader(src, distributedSourceHeader, r.localNodeID)) + dst.Set(distributedTargetHeader, decision.targetNodeID) + dst.Set(distributedTraceHeader, firstForwardHeader(src, distributedTraceHeader, distributedTraceID(request))) + dst.Set(distributedHopHeader, strconv.Itoa(distributedForwardHopFromHeader(src.Get(distributedHopHeader))+1)) + copyForwardHeader(dst, src, "Accept") + copyForwardHeader(dst, src, "Origin") + if r.token != "" { + dst.Set("Authorization", distributedForwardBearerHeader(r.token)) + } +} + +func distributedTraceID(request shared.RPCRequest) string { + if request.ID != nil { + return fmt.Sprint(request.ID) + } + return strconv.FormatInt(time.Now().UnixNano(), 36) +} + +func firstForwardHeader(src http.Header, key string, fallback string) string { + if value := strings.TrimSpace(src.Get(key)); value != "" { + return value + } + return fallback +} + +func distributedForwardHop(req *http.Request) int { + if req == nil { + return 0 + } + return distributedForwardHopFromHeader(req.Header.Get(distributedHopHeader)) +} + +func distributedForwardHopFromHeader(value string) int { + hop, err := strconv.Atoi(strings.TrimSpace(value)) + if err != nil || hop < 0 { + return 0 + } + return hop +} + +func distributedForwardURL(endpoint string, path string) (string, error) { + endpoint = strings.TrimRight(strings.TrimSpace(endpoint), "/") + base, err := url.Parse(endpoint) + if err != nil || base.Scheme == "" || base.Host == "" { + return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INVALID: %s", endpoint) + } + if !distributedForwardEndpointPrivate(base) { + return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INSECURE: use a private VPN endpoint") } base.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/") base.RawQuery = "" return base.String(), nil } -func distributedForwardEndpointEncryptedOrPrivate(endpoint *url.URL) bool { +func distributedForwardEndpointPrivate(endpoint *url.URL) bool { if endpoint == nil { return false } - if strings.EqualFold(endpoint.Scheme, "https") { - return true - } if !strings.EqualFold(endpoint.Scheme, "http") { return false } @@ -146,3 +456,40 @@ func copyForwardResponseHeaders(dst http.Header, src http.Header) { } } } + +func newDistributedSessionRouteStore(ttl time.Duration) *distributedSessionRouteStore { + return &distributedSessionRouteStore{ + routes: make(map[string]distributedSessionRoute), + ttl: ttl, + } +} + +func (s *distributedSessionRouteStore) get(key string) (string, bool) { + if s == nil || strings.TrimSpace(key) == "" { + return "", false + } + now := time.Now() + s.mu.Lock() + defer s.mu.Unlock() + route, ok := s.routes[key] + if !ok { + return "", false + } + if !route.ExpiresAt.IsZero() && now.After(route.ExpiresAt) { + delete(s.routes, key) + return "", false + } + return route.TargetNodeID, true +} + +func (s *distributedSessionRouteStore) set(key string, targetNodeID string) { + if s == nil || strings.TrimSpace(key) == "" || strings.TrimSpace(targetNodeID) == "" { + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.routes[key] = distributedSessionRoute{ + TargetNodeID: strings.TrimSpace(targetNodeID), + ExpiresAt: time.Now().Add(s.ttl), + } +} diff --git a/internal/acp/http_handler.go b/internal/acp/http_handler.go index 7e8cdd1..80d39e1 100644 --- a/internal/acp/http_handler.go +++ b/internal/acp/http_handler.go @@ -46,6 +46,8 @@ func (s *Server) Handler() http.Handler { s.HandleRPC(w, r) case "/acp": s.HandleWebSocket(w, r) + case "/gateway/openclaw": + s.HandleOpenClawGatewayRPC(w, r) case openClawArtifactDownloadPath: s.HandleOpenClawArtifactDownload(w, r) default: @@ -112,6 +114,10 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { s.handleRPCWithTransform(w, r, nil) } +func (s *Server) HandleOpenClawGatewayRPC(w http.ResponseWriter, r *http.Request) { + s.handleRPCWithTransform(w, r, forceOpenClawGatewayRequest) +} + func (s *Server) HandleDisabledProviderDirectPath(w http.ResponseWriter, r *http.Request) { shared.ApplyCORS(w, r, s.allowedOrigins) if r.Method == http.MethodOptions { @@ -180,7 +186,7 @@ func (s *Server) handleRPCWithTransform( } request = transformed } - if s.taskForwarder.forward(r.Context(), w, r, request) { + if s.taskRouter.forward(r.Context(), w, r, request) { return } @@ -440,6 +446,62 @@ func sseEventType(payload map[string]any) string { return "unknown" } +func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest, *shared.RPCError) { + method := strings.TrimSpace(request.Method) + switch method { + case "session.start", "session.message": + default: + return request, &shared.RPCError{Code: -32601, Message: "OPENCLAW_GATEWAY_METHOD_NOT_ALLOWED: " + method} + } + params := shared.AsMap(request.Params) + if params == nil { + params = map[string]any{} + } + if parseBool(params["multiAgent"]) || strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), "multi-agent") { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"} + } + if provider := strings.TrimSpace(shared.StringArg(params, "provider", "")); provider != "" { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: provider must not be set on /gateway/openclaw"} + } + for _, key := range []string{"executionTarget", "requestedExecutionTarget"} { + if target := strings.TrimSpace(shared.StringArg(params, key, "")); target != "" && !strings.EqualFold(target, "gateway") { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: " + key + " must be gateway"} + } + } + for _, key := range []string{"preferredGatewayProviderId", "gatewayProviderId", "gatewayProvider"} { + if provider := strings.TrimSpace(shared.StringArg(params, key, "")); provider != "" && !strings.EqualFold(provider, "openclaw") { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: gateway provider must be openclaw"} + } + } + routing := shared.AsMap(params["routing"]) + if routing == nil { + routing = map[string]any{} + } + if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"} + } + if provider := strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", "")); provider != "" { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitProviderId must not be set on /gateway/openclaw"} + } + if target := strings.TrimSpace(shared.StringArg(routing, "explicitExecutionTarget", "")); target != "" && !strings.EqualFold(target, "gateway") { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitExecutionTarget must be gateway"} + } + for _, key := range []string{"preferredGatewayProviderId", "gatewayProviderId", "gatewayProvider"} { + if provider := strings.TrimSpace(shared.StringArg(routing, key, "")); provider != "" && !strings.EqualFold(provider, "openclaw") { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: gateway provider must be openclaw"} + } + } + routing["routingMode"] = "explicit" + routing["explicitExecutionTarget"] = "gateway" + routing["preferredGatewayProviderId"] = "openclaw" + delete(routing, "explicitProviderId") + params["routing"] = routing + params["requestedExecutionTarget"] = "gateway" + params["executionTarget"] = "gateway" + request.Params = params + return request, nil +} + func requestUsesOpenClawGatewaySubmit(params map[string]any) bool { if len(params) == 0 { return false diff --git a/internal/acp/server.go b/internal/acp/server.go index 8cc3c60..03dd173 100644 --- a/internal/acp/server.go +++ b/internal/acp/server.go @@ -52,9 +52,9 @@ func NewServer() *Server { shared.EnvOrDefault("BRIDGE_REVIEW_AUTH_TOKEN", ""), ), openClawGate: newOpenClawGatewayAdmissionGate(config), - taskForwarder: newDistributedTaskForwarder(distributedTaskForwarderConfig{ - Endpoint: resolveDistributedTaskForwardEndpoint(config), - Token: resolveDistributedTaskForwardToken(config), + taskRouter: newDistributedTaskRouter(distributedTaskRouterConfig{ + Config: config, + Token: resolveDistributedTaskForwardToken(config), }), } s.Bootstrap() diff --git a/internal/acp/types.go b/internal/acp/types.go index fbc87f5..f3da07e 100644 --- a/internal/acp/types.go +++ b/internal/acp/types.go @@ -87,7 +87,7 @@ type Server struct { gateway *gatewayruntime.Manager openClawGate *openClawGatewayAdmissionGate jobs *jobManager - taskForwarder *distributedTaskForwarder + taskRouter *distributedTaskRouter // Legacy / Common authService interface{} // Minimal auth dependency