feat(acp): implement distributed task router and openclaw gateway ingress
This commit is contained in:
parent
d2d32f554d
commit
911f5709a3
@ -54,53 +54,100 @@
|
||||
|
||||
## 3.1 Lightweight Distributed Task Forwarding
|
||||
|
||||
bridge 可以把本机收到的 HTTP 任务提交转发到另一个 bridge endpoint,用于轻量分布式部署。当前实现落地的是双节点分布式拓扑:
|
||||
bridge 可以把本机收到的 HTTP 任务提交转发到另一个 bridge endpoint,用于轻量分布式部署。当前实现是一个静态 task router,不做自动发现,不依赖 config center:
|
||||
|
||||
- `cn-xworkmate-bridge.svc.plus` 负责本地入口和鉴权
|
||||
- `xworkmate-bridge.svc.plus` 负责实际 OpenClaw task runtime
|
||||
- cn bridge 配置 `task_forward_peer_id` 后,`POST /acp/rpc` 的 `session.start` / `session.message` 会转发到 peer 的同一路径
|
||||
- 主 bridge 不配置 `task_forward_peer_id`,因此不会把任务反向转发到 CN
|
||||
- `nodes` 是静态 peer catalog,记录 bridge 节点身份、角色、能力和私网 endpoint
|
||||
- `forwarding.rules` 决定哪些 JSON-RPC method 转发到哪个节点或哪类节点
|
||||
- `forwarding.routes` 决定显式 next-hop,用于星状或网状拓扑
|
||||
- `session.start` 选中目标后,`session.message` 通过本机 session route store 粘到同一个目标节点
|
||||
- 公网域名只作为 ingress;bridge 之间的 task forward 只走 WireGuard over VLESS 私网 endpoint
|
||||
|
||||
配置:
|
||||
双节点简写配置:
|
||||
|
||||
```yaml
|
||||
distributed:
|
||||
topology: "dual-node"
|
||||
local_node_id: "cn-xworkmate-bridge"
|
||||
task_forward_peer_id: "xworkmate-bridge"
|
||||
nodes:
|
||||
- id: "xworkmate-bridge"
|
||||
role: "primary"
|
||||
public_base_url: "https://xworkmate-bridge.svc.plus"
|
||||
bridge_endpoint: "http://172.29.10.1:8787"
|
||||
- id: "cn-xworkmate-bridge"
|
||||
role: "edge"
|
||||
public_base_url: "https://cn-xworkmate-bridge.svc.plus"
|
||||
bridge_endpoint: "http://172.29.10.2:8787"
|
||||
task_forward_token: ""
|
||||
```
|
||||
|
||||
单 endpoint 覆盖仍可用于临时验证:
|
||||
`task_forward_peer_id: "xworkmate-bridge"` 等价于把 `session.start` / `session.message` 转发到 peer `xworkmate-bridge`。内置 peer catalog 会把它解析为 `http://172.29.10.1:8787`。主 bridge 不配置 `task_forward_peer_id`,因此不会反向转发。
|
||||
|
||||
```text
|
||||
XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT=https://xworkmate-bridge.svc.plus
|
||||
XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN=$PEER_BRIDGE_AUTH_TOKEN
|
||||
通用多节点配置:
|
||||
|
||||
```yaml
|
||||
distributed:
|
||||
local_node_id: "edge-cn"
|
||||
nodes:
|
||||
- id: "edge-cn"
|
||||
role: "edge"
|
||||
zone: "cn"
|
||||
bridge_endpoint: "http://172.29.10.2:8787"
|
||||
capabilities: ["ingress"]
|
||||
- id: "worker-a"
|
||||
role: "executor"
|
||||
zone: "global"
|
||||
bridge_endpoint: "http://172.29.10.11:8787"
|
||||
capabilities: ["openclaw"]
|
||||
- id: "worker-b"
|
||||
role: "executor"
|
||||
zone: "global"
|
||||
bridge_endpoint: "http://172.29.10.12:8787"
|
||||
capabilities: ["openclaw"]
|
||||
forwarding:
|
||||
hop_limit: 3
|
||||
default_action: "execute_local"
|
||||
rules:
|
||||
- methods: ["session.start", "session.message"]
|
||||
target:
|
||||
selector:
|
||||
role: "executor"
|
||||
capability: "openclaw"
|
||||
strategy: "round_robin"
|
||||
```
|
||||
|
||||
星状或显式 next-hop 网状配置:
|
||||
|
||||
```yaml
|
||||
distributed:
|
||||
local_node_id: "edge-cn"
|
||||
nodes:
|
||||
- id: "edge-cn"
|
||||
role: "edge"
|
||||
bridge_endpoint: "http://172.29.10.2:8787"
|
||||
- id: "hub-main"
|
||||
role: "hub"
|
||||
bridge_endpoint: "http://172.29.10.1:8787"
|
||||
- id: "worker-eu"
|
||||
role: "executor"
|
||||
bridge_endpoint: "http://172.29.10.30:8787"
|
||||
forwarding:
|
||||
hop_limit: 3
|
||||
rules:
|
||||
- methods: ["session.start"]
|
||||
target:
|
||||
node_id: "worker-eu"
|
||||
routes:
|
||||
- target_node_id: "worker-eu"
|
||||
next_hop_node_id: "hub-main"
|
||||
```
|
||||
|
||||
规则:
|
||||
|
||||
- `task_forward_peer_id` 指向的节点 `bridge_endpoint` 是 peer bridge base URL,bridge 会按当前请求路径拼接 `/acp/rpc`
|
||||
- 同步消息不能走公网明文:公网 endpoint 必须使用 `https://`
|
||||
- `http://` 只允许 loopback、private、link-local 这类本机或 VPN 内网地址,用于 WireGuard 等隧道已经提供加密的场景
|
||||
- endpoint 可以是公网 HTTPS,也可以是 VPN 内网 HTTP(S)。bridge 明确支持把 `bridge_endpoint` 配成 WireGuard、WireGuard over VLESS/TCP/TLS、WebSocket/TLS 等隧道后的本机或私网地址
|
||||
- `bridge_endpoint` 是 peer bridge base URL,bridge 会按当前请求路径拼接 `/acp/rpc` 或 `/gateway/openclaw`
|
||||
- 同步消息不能走公网;`bridge_endpoint` 必须是 loopback、private、link-local 这类本机或 VPN 内网地址,用于 WireGuard over VLESS 等隧道已经提供加密的场景
|
||||
- 只要求本机网络能路由到 endpoint;bridge 不依赖 config center 或额外注册中心
|
||||
- `task_forward_token` 为空时复用本机 `BRIDGE_AUTH_TOKEN`
|
||||
- 转发请求会带 `X-XWorkmate-Bridge-Forwarded: 1`,收到该 header 后不会再次转发,避免 bridge 之间循环
|
||||
- 转发请求会带 `X-XWorkmate-Bridge-Forwarded: 1`
|
||||
- `X-XWorkmate-Forward-Source` 是源节点,`X-XWorkmate-Forward-Target` 是最终目标节点
|
||||
- `X-XWorkmate-Forward-Hop` 逐跳递增,超过 `forwarding.hop_limit` 时拒绝转发,避免循环
|
||||
- 收到已转发请求时,如果 target 是本机则本机执行;如果 target 不是本机,则按 `forwarding.routes` 查 next-hop
|
||||
|
||||
抗干扰建议:
|
||||
|
||||
- 跨境或 GFW 环境可以直接把 `task_forward_endpoint` 指向 WireGuard over VLESS/TCP/TLS 后的 `http://10.x/172.16-31.x/192.168.x` 私网 bridge endpoint
|
||||
- 运营商 UDP 阻断时,支持把裸 WireGuard UDP 替换为 WireGuard over VLESS/TCP/TLS、WebSocket/TLS 或等价可靠加密通道,bridge 继续使用同一个 peer endpoint 配置
|
||||
- 跨境或 GFW 环境下,`bridge_endpoint` 应指向 WireGuard over VLESS/TCP/TLS 后的 `http://10.x/172.16-31.x/192.168.x` 私网 bridge endpoint
|
||||
- 运营商 UDP 阻断时,支持把裸 WireGuard UDP 替换为 WireGuard over VLESS/TCP/TLS、WebSocket/TLS 或等价可靠加密通道,bridge 继续使用同一个 peer catalog 和 forwarding rules
|
||||
- bridge 层继续使用 bearer token 鉴权;隧道层负责链路加密和抗干扰,应用层负责 peer 身份和任务权限
|
||||
|
||||
推荐 APP 配置:
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
# ACP Forwarding Topology
|
||||
|
||||
Last Updated: 2026-05-03
|
||||
Last Updated: 2026-06-02
|
||||
|
||||
本文档只描述当前保留的 canonical topology。
|
||||
|
||||
@ -77,6 +77,35 @@ flowchart LR
|
||||
- provider catalog 与 gatewayProviders 由 bridge 独占生成
|
||||
- bridge 只暴露 canonical ACP contract
|
||||
- provider / gateway 实际地址属于 bridge internal truth
|
||||
- bridge-to-bridge task forward 只使用 WireGuard over VLESS 私网 endpoint,公网域名只作为 ingress
|
||||
|
||||
## Distributed Task Router
|
||||
|
||||
分布式转发是 bridge 内部能力,不改变 app-facing canonical surface。每个 bridge 从静态 peer catalog、forwarding rules 和 routes 得出下一跳:
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
CN["cn-xworkmate-bridge<br/>edge ingress"]
|
||||
MAIN["xworkmate-bridge<br/>primary / hub"]
|
||||
WA["worker-a<br/>executor"]
|
||||
WB["worker-b<br/>executor"]
|
||||
EU["worker-eu<br/>executor"]
|
||||
|
||||
CN -- "task_forward_peer_id or rule<br/>http://172.29.10.1:8787" --> MAIN
|
||||
MAIN -- "selector role=executor<br/>round_robin" --> WA
|
||||
MAIN -- "selector role=executor<br/>round_robin" --> WB
|
||||
CN -- "route target=worker-eu<br/>next_hop=xworkmate-bridge" --> MAIN
|
||||
MAIN -- "private next hop" --> EU
|
||||
```
|
||||
|
||||
Router contract:
|
||||
|
||||
- `nodes` 保存节点身份、角色、能力、zone 和私网 `bridge_endpoint`
|
||||
- `forwarding.rules` 选择最终 target node
|
||||
- `forwarding.routes` 选择 next-hop,用于星状或显式 mesh
|
||||
- `session.start` 选中 target 后,`session.message` 使用本机 session route store 粘到同一个 target
|
||||
- `X-XWorkmate-Forward-Hop` 受 `forwarding.hop_limit` 限制,避免循环
|
||||
- `https://*.svc.plus` 这类公网域名不能作为 bridge-to-bridge endpoint
|
||||
|
||||
## Non-Contract Facts
|
||||
|
||||
|
||||
@ -17,28 +17,43 @@ openclaw_gateway:
|
||||
max_queued: 20
|
||||
queue_timeout: "10m"
|
||||
|
||||
# Lightweight distributed bridge forwarding.
|
||||
# A dual-node topology can keep both bridge nodes explicit while only enabling
|
||||
# task forwarding from the node that sets task_forward_peer_id.
|
||||
# Public plaintext HTTP is rejected; use HTTPS on public paths, or HTTP only on
|
||||
# loopback/private/link-local VPN addresses where the tunnel provides encryption.
|
||||
# bridge_endpoint is the peer bridge base URL; /gateway/openclaw or /acp/rpc is
|
||||
# appended from the inbound request path when forwarding is enabled.
|
||||
# Lightweight distributed bridge task router.
|
||||
# Public domains are ingress only. Bridge-to-bridge forwarding must use HTTP
|
||||
# loopback/private/link-local endpoints protected by WireGuard over VLESS or an
|
||||
# equivalent private transport.
|
||||
distributed:
|
||||
topology: "dual-node"
|
||||
local_node_id: "cn-xworkmate-bridge"
|
||||
# Dual-node shorthand. Equivalent to a forwarding rule that sends
|
||||
# session.start/session.message to xworkmate-bridge.
|
||||
task_forward_peer_id: "xworkmate-bridge"
|
||||
nodes:
|
||||
- id: "xworkmate-bridge"
|
||||
role: "primary"
|
||||
zone: "global"
|
||||
public_base_url: "https://xworkmate-bridge.svc.plus"
|
||||
bridge_endpoint: "http://172.29.10.1:8787"
|
||||
capabilities: ["openclaw", "codex"]
|
||||
- id: "cn-xworkmate-bridge"
|
||||
role: "edge"
|
||||
zone: "cn"
|
||||
public_base_url: "https://cn-xworkmate-bridge.svc.plus"
|
||||
bridge_endpoint: "http://172.29.10.2:8787"
|
||||
# Optional direct override. Leave empty when using the topology above.
|
||||
task_forward_endpoint: ""
|
||||
capabilities: ["ingress"]
|
||||
forwarding:
|
||||
hop_limit: 3
|
||||
default_action: "execute_local"
|
||||
# For multi-node/star/mesh mode, prefer explicit rules over task_forward_peer_id:
|
||||
# rules:
|
||||
# - methods: ["session.start", "session.message"]
|
||||
# target:
|
||||
# selector:
|
||||
# role: "executor"
|
||||
# capability: "openclaw"
|
||||
# strategy: "round_robin"
|
||||
# routes:
|
||||
# - target_node_id: "worker-eu"
|
||||
# next_hop_node_id: "xworkmate-bridge"
|
||||
# Optional. Defaults to BRIDGE_AUTH_TOKEN when omitted.
|
||||
task_forward_token: ""
|
||||
|
||||
@ -55,5 +70,5 @@ notes:
|
||||
- The bridge reads its own auth token from BRIDGE_AUTH_TOKEN.
|
||||
- Upstream URLs can include sub-paths which will be preserved during WebSocket handshake.
|
||||
- Multi-agent and Single-agent modes will use these endpoints to delegate tasks.
|
||||
- Distributed task forwarding rejects public plaintext HTTP; use HTTPS or private VPN HTTP(S) endpoints.
|
||||
- task_forward_endpoint can point at a WireGuard-over-VLESS/TCP/TLS local/private bridge address for GFW or carrier UDP blocking environments.
|
||||
- Distributed task forwarding rejects public endpoints; bridge_endpoint must be private/VPN HTTP.
|
||||
- task_forward_peer_id is the dual-node shorthand; forwarding.rules/routes express multi-node, star, and mesh topologies.
|
||||
|
||||
@ -29,19 +29,50 @@ type BridgeConfig struct {
|
||||
}
|
||||
|
||||
type DistributedConfig struct {
|
||||
TaskForwardEndpoint string `yaml:"task_forward_endpoint"`
|
||||
TaskForwardToken string `yaml:"task_forward_token"`
|
||||
Topology string `yaml:"topology"`
|
||||
LocalNodeID string `yaml:"local_node_id"`
|
||||
TaskForwardPeerID string `yaml:"task_forward_peer_id"`
|
||||
Nodes []DistributedNodeConfig `yaml:"nodes"`
|
||||
TaskForwardToken string `yaml:"task_forward_token"`
|
||||
Topology string `yaml:"topology"`
|
||||
LocalNodeID string `yaml:"local_node_id"`
|
||||
TaskForwardPeerID string `yaml:"task_forward_peer_id"`
|
||||
Nodes []DistributedNodeConfig `yaml:"nodes"`
|
||||
Forwarding DistributedForwardingConfig `yaml:"forwarding"`
|
||||
}
|
||||
|
||||
type DistributedNodeConfig struct {
|
||||
ID string `yaml:"id"`
|
||||
Role string `yaml:"role"`
|
||||
PublicBaseURL string `yaml:"public_base_url"`
|
||||
BridgeEndpoint string `yaml:"bridge_endpoint"`
|
||||
ID string `yaml:"id"`
|
||||
Role string `yaml:"role"`
|
||||
Zone string `yaml:"zone"`
|
||||
PublicBaseURL string `yaml:"public_base_url"`
|
||||
BridgeEndpoint string `yaml:"bridge_endpoint"`
|
||||
Capabilities []string `yaml:"capabilities"`
|
||||
}
|
||||
|
||||
type DistributedForwardingConfig struct {
|
||||
HopLimit int `yaml:"hop_limit"`
|
||||
DefaultAction string `yaml:"default_action"`
|
||||
Rules []DistributedForwardRuleConfig `yaml:"rules"`
|
||||
Routes []DistributedRouteConfig `yaml:"routes"`
|
||||
}
|
||||
|
||||
type DistributedForwardRuleConfig struct {
|
||||
Methods []string `yaml:"methods"`
|
||||
Target DistributedForwardTargetConfig `yaml:"target"`
|
||||
}
|
||||
|
||||
type DistributedForwardTargetConfig struct {
|
||||
NodeID string `yaml:"node_id"`
|
||||
Selector DistributedForwardSelectorConfig `yaml:"selector"`
|
||||
Strategy string `yaml:"strategy"`
|
||||
}
|
||||
|
||||
type DistributedForwardSelectorConfig struct {
|
||||
Role string `yaml:"role"`
|
||||
Zone string `yaml:"zone"`
|
||||
Capability string `yaml:"capability"`
|
||||
}
|
||||
|
||||
type DistributedRouteConfig struct {
|
||||
TargetNodeID string `yaml:"target_node_id"`
|
||||
NextHopNodeID string `yaml:"next_hop_node_id"`
|
||||
}
|
||||
|
||||
type OpenClawGatewayConfig struct {
|
||||
@ -88,18 +119,6 @@ func bridgeSharedAuthToken() string {
|
||||
return strings.TrimSpace(shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", ""))
|
||||
}
|
||||
|
||||
func resolveDistributedTaskForwardEndpoint(config *BridgeConfig) string {
|
||||
yamlVal := ""
|
||||
if config != nil {
|
||||
yamlVal = config.Distributed.TaskForwardEndpoint
|
||||
}
|
||||
return resolveURL(
|
||||
firstNonEmpty(yamlVal, resolveDistributedTopologyTaskForwardEndpoint(config)),
|
||||
"XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT",
|
||||
"BRIDGE_TASK_FORWARD_ENDPOINT",
|
||||
)
|
||||
}
|
||||
|
||||
func resolveDistributedTaskForwardToken(config *BridgeConfig) string {
|
||||
if token := strings.TrimSpace(os.Getenv("XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN")); token != "" {
|
||||
return token
|
||||
@ -115,38 +134,21 @@ func resolveDistributedTaskForwardToken(config *BridgeConfig) string {
|
||||
return bridgeSharedAuthToken()
|
||||
}
|
||||
|
||||
func resolveDistributedTopologyTaskForwardEndpoint(config *BridgeConfig) string {
|
||||
if config == nil {
|
||||
return ""
|
||||
func defaultDistributedNodes() []DistributedNodeConfig {
|
||||
return []DistributedNodeConfig{
|
||||
{
|
||||
ID: "xworkmate-bridge",
|
||||
Role: "primary",
|
||||
PublicBaseURL: "https://xworkmate-bridge.svc.plus",
|
||||
BridgeEndpoint: "http://172.29.10.1:8787",
|
||||
},
|
||||
{
|
||||
ID: "cn-xworkmate-bridge",
|
||||
Role: "edge",
|
||||
PublicBaseURL: "https://cn-xworkmate-bridge.svc.plus",
|
||||
BridgeEndpoint: "http://172.29.10.2:8787",
|
||||
},
|
||||
}
|
||||
distributed := config.Distributed
|
||||
topology := strings.TrimSpace(distributed.Topology)
|
||||
if topology == "" {
|
||||
return ""
|
||||
}
|
||||
if !strings.EqualFold(topology, "dual-node") {
|
||||
return ""
|
||||
}
|
||||
localNodeID := strings.TrimSpace(distributed.LocalNodeID)
|
||||
peerNodeID := strings.TrimSpace(distributed.TaskForwardPeerID)
|
||||
if localNodeID == "" || peerNodeID == "" || localNodeID == peerNodeID {
|
||||
return ""
|
||||
}
|
||||
for _, node := range distributed.Nodes {
|
||||
if strings.TrimSpace(node.ID) == peerNodeID {
|
||||
return strings.TrimSpace(node.BridgeEndpoint)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func firstNonEmpty(values ...string) string {
|
||||
for _, value := range values {
|
||||
if trimmed := strings.TrimSpace(value); trimmed != "" {
|
||||
return trimmed
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func newProductionProviderCatalog() (*BridgeConfig, map[string]syncedProvider, []string) {
|
||||
|
||||
@ -1,62 +1,205 @@
|
||||
package acp
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
func TestResolveDistributedTaskForwardEndpointFromDualNodeTopology(t *testing.T) {
|
||||
"xworkmate-bridge/internal/shared"
|
||||
)
|
||||
|
||||
func TestDistributedTaskRouterFromDualNodePeerID(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.Topology = "dual-node"
|
||||
config.Distributed.LocalNodeID = "cn-xworkmate-bridge"
|
||||
config.Distributed.TaskForwardPeerID = "xworkmate-bridge"
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{
|
||||
ID: "xworkmate-bridge",
|
||||
BridgeEndpoint: "http://172.29.10.1:8787",
|
||||
},
|
||||
{
|
||||
ID: "cn-xworkmate-bridge",
|
||||
BridgeEndpoint: "http://172.29.10.2:8787",
|
||||
},
|
||||
}
|
||||
|
||||
if got := resolveDistributedTaskForwardEndpoint(config); got != "http://172.29.10.1:8787" {
|
||||
t.Fatalf("resolveDistributedTaskForwardEndpoint() = %q, want %q", got, "http://172.29.10.1:8787")
|
||||
router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"})
|
||||
if router == nil {
|
||||
t.Fatal("expected router")
|
||||
}
|
||||
decision, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/gateway/openclaw", nil), sessionStart("s1"))
|
||||
if err != nil {
|
||||
t.Fatalf("forwardDecision() error = %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("expected forward decision")
|
||||
}
|
||||
if decision.targetNodeID != "xworkmate-bridge" || decision.nextHopID != "xworkmate-bridge" {
|
||||
t.Fatalf("decision = %#v, want target and next hop xworkmate-bridge", decision)
|
||||
}
|
||||
if decision.endpoint != "http://172.29.10.1:8787" {
|
||||
t.Fatalf("endpoint = %q, want private main bridge endpoint", decision.endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveDistributedTaskForwardEndpointDisabledWhenPeerUnset(t *testing.T) {
|
||||
func TestDistributedTaskRouterDisabledWhenPeerUnset(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.Topology = "dual-node"
|
||||
config.Distributed.LocalNodeID = "xworkmate-bridge"
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{
|
||||
ID: "xworkmate-bridge",
|
||||
BridgeEndpoint: "http://172.29.10.1:8787",
|
||||
},
|
||||
{
|
||||
ID: "cn-xworkmate-bridge",
|
||||
BridgeEndpoint: "http://172.29.10.2:8787",
|
||||
},
|
||||
}
|
||||
|
||||
if got := resolveDistributedTaskForwardEndpoint(config); got != "" {
|
||||
t.Fatalf("resolveDistributedTaskForwardEndpoint() = %q, want empty endpoint", got)
|
||||
if router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"}); router != nil {
|
||||
t.Fatalf("newDistributedTaskRouter() = %#v, want nil", router)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveDistributedTaskForwardEndpointKeepsExplicitEndpoint(t *testing.T) {
|
||||
func TestDistributedTaskRouterPrefersConfiguredNodeOverDefault(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.TaskForwardEndpoint = "https://xworkmate-bridge.svc.plus"
|
||||
config.Distributed.Topology = "dual-node"
|
||||
config.Distributed.LocalNodeID = "cn-xworkmate-bridge"
|
||||
config.Distributed.TaskForwardPeerID = "xworkmate-bridge"
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{
|
||||
ID: "xworkmate-bridge",
|
||||
BridgeEndpoint: "http://172.29.10.1:8787",
|
||||
BridgeEndpoint: "http://172.29.10.11:8787",
|
||||
},
|
||||
}
|
||||
|
||||
if got := resolveDistributedTaskForwardEndpoint(config); got != "https://xworkmate-bridge.svc.plus" {
|
||||
t.Fatalf("resolveDistributedTaskForwardEndpoint() = %q, want explicit endpoint", got)
|
||||
router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"})
|
||||
decision, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s1"))
|
||||
if err != nil {
|
||||
t.Fatalf("forwardDecision() error = %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("expected forward decision")
|
||||
}
|
||||
if decision.endpoint != "http://172.29.10.11:8787" {
|
||||
t.Fatalf("endpoint = %q, want configured endpoint", decision.endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDistributedTaskRouterSelectorRoundRobinKeepsSessionAffinity(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.LocalNodeID = "edge-cn"
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{ID: "edge-cn", Role: "edge", Zone: "cn", BridgeEndpoint: "http://172.29.10.2:8787"},
|
||||
{ID: "worker-a", Role: "executor", Zone: "global", BridgeEndpoint: "http://172.29.10.11:8787", Capabilities: []string{"openclaw"}},
|
||||
{ID: "worker-b", Role: "executor", Zone: "global", BridgeEndpoint: "http://172.29.10.12:8787", Capabilities: []string{"openclaw"}},
|
||||
}
|
||||
config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{
|
||||
{
|
||||
Methods: []string{"session.start", "session.message"},
|
||||
Target: DistributedForwardTargetConfig{
|
||||
Selector: DistributedForwardSelectorConfig{Role: "executor", Capability: "openclaw"},
|
||||
Strategy: "round_robin",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"})
|
||||
first, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s1"))
|
||||
if err != nil {
|
||||
t.Fatalf("first forwardDecision() error = %v", err)
|
||||
}
|
||||
if !ok || first.targetNodeID != "worker-a" {
|
||||
t.Fatalf("first decision = %#v, want worker-a", first)
|
||||
}
|
||||
secondSession, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s2"))
|
||||
if err != nil {
|
||||
t.Fatalf("second forwardDecision() error = %v", err)
|
||||
}
|
||||
if !ok || secondSession.targetNodeID != "worker-b" {
|
||||
t.Fatalf("second session decision = %#v, want worker-b", secondSession)
|
||||
}
|
||||
followUp, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionMessage("s1"))
|
||||
if err != nil {
|
||||
t.Fatalf("follow-up forwardDecision() error = %v", err)
|
||||
}
|
||||
if !ok || followUp.targetNodeID != "worker-a" {
|
||||
t.Fatalf("follow-up decision = %#v, want sticky worker-a", followUp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDistributedTaskRouterUsesExplicitNextHopRoute(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.LocalNodeID = "edge-cn"
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{ID: "edge-cn", Role: "edge", BridgeEndpoint: "http://172.29.10.2:8787"},
|
||||
{ID: "hub-main", Role: "hub", BridgeEndpoint: "http://172.29.10.1:8787"},
|
||||
{ID: "worker-eu", Role: "executor", BridgeEndpoint: "http://172.29.10.30:8787"},
|
||||
}
|
||||
config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{
|
||||
{
|
||||
Methods: []string{"session.start"},
|
||||
Target: DistributedForwardTargetConfig{NodeID: "worker-eu"},
|
||||
},
|
||||
}
|
||||
config.Distributed.Forwarding.Routes = []DistributedRouteConfig{
|
||||
{TargetNodeID: "worker-eu", NextHopNodeID: "hub-main"},
|
||||
}
|
||||
|
||||
router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"})
|
||||
decision, ok, err := router.forwardDecision(httptest.NewRequest("POST", "/acp/rpc", nil), sessionStart("s1"))
|
||||
if err != nil {
|
||||
t.Fatalf("forwardDecision() error = %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("expected forward decision")
|
||||
}
|
||||
if decision.targetNodeID != "worker-eu" || decision.nextHopID != "hub-main" || decision.endpoint != "http://172.29.10.1:8787" {
|
||||
t.Fatalf("decision = %#v, want target worker-eu via hub-main", decision)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDistributedTaskRouterStopsWhenForwardedTargetIsLocal(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.LocalNodeID = "hub-main"
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{ID: "hub-main", Role: "hub", BridgeEndpoint: "http://172.29.10.1:8787"},
|
||||
{ID: "worker-eu", Role: "executor", BridgeEndpoint: "http://172.29.10.30:8787"},
|
||||
}
|
||||
config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{
|
||||
{Methods: []string{"session.start"}, Target: DistributedForwardTargetConfig{NodeID: "worker-eu"}},
|
||||
}
|
||||
|
||||
router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"})
|
||||
req := httptest.NewRequest("POST", "/acp/rpc", nil)
|
||||
req.Header.Set(distributedForwardedHeader, "1")
|
||||
req.Header.Set(distributedTargetHeader, "hub-main")
|
||||
if _, ok, err := router.forwardDecision(req, sessionStart("s1")); err != nil || ok {
|
||||
t.Fatalf("forwardDecision() ok=%t err=%v, want local execution", ok, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDistributedTaskRouterRejectsHopLimitExceeded(t *testing.T) {
|
||||
config := &BridgeConfig{}
|
||||
config.Distributed.LocalNodeID = "hub-main"
|
||||
config.Distributed.Forwarding.HopLimit = 1
|
||||
config.Distributed.Nodes = []DistributedNodeConfig{
|
||||
{ID: "hub-main", Role: "hub", BridgeEndpoint: "http://172.29.10.1:8787"},
|
||||
{ID: "worker-eu", Role: "executor", BridgeEndpoint: "http://172.29.10.30:8787"},
|
||||
}
|
||||
config.Distributed.Forwarding.Rules = []DistributedForwardRuleConfig{
|
||||
{Methods: []string{"session.start"}, Target: DistributedForwardTargetConfig{NodeID: "worker-eu"}},
|
||||
}
|
||||
|
||||
router := newDistributedTaskRouter(distributedTaskRouterConfig{Config: config, Token: "token"})
|
||||
req := httptest.NewRequest("POST", "/acp/rpc", nil)
|
||||
req.Header.Set(distributedForwardedHeader, "1")
|
||||
req.Header.Set(distributedTargetHeader, "worker-eu")
|
||||
req.Header.Set(distributedHopHeader, "1")
|
||||
if _, ok, err := router.forwardDecision(req, sessionStart("s1")); err == nil || ok {
|
||||
t.Fatalf("forwardDecision() ok=%t err=%v, want hop limit error", ok, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDistributedForwardURLRejectsPublicEndpoint(t *testing.T) {
|
||||
if _, err := distributedForwardURL("https://xworkmate-bridge.svc.plus", "/acp/rpc"); err == nil {
|
||||
t.Fatal("expected public endpoint rejection")
|
||||
}
|
||||
}
|
||||
|
||||
func sessionStart(sessionID string) shared.RPCRequest {
|
||||
return shared.RPCRequest{
|
||||
ID: sessionID,
|
||||
Method: "session.start",
|
||||
Params: map[string]any{"sessionId": sessionID, "threadId": "thread-" + sessionID},
|
||||
}
|
||||
}
|
||||
|
||||
func sessionMessage(sessionID string) shared.RPCRequest {
|
||||
return shared.RPCRequest{
|
||||
ID: sessionID + "-message",
|
||||
Method: "session.message",
|
||||
Params: map[string]any{"sessionId": sessionID, "threadId": "thread-" + sessionID},
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,51 +9,150 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"xworkmate-bridge/internal/shared"
|
||||
)
|
||||
|
||||
const distributedForwardedHeader = "X-XWorkmate-Bridge-Forwarded"
|
||||
const (
|
||||
distributedForwardedHeader = "X-XWorkmate-Bridge-Forwarded"
|
||||
distributedSourceHeader = "X-XWorkmate-Forward-Source"
|
||||
distributedTargetHeader = "X-XWorkmate-Forward-Target"
|
||||
distributedTraceHeader = "X-XWorkmate-Forward-Trace"
|
||||
distributedHopHeader = "X-XWorkmate-Forward-Hop"
|
||||
|
||||
type distributedTaskForwarderConfig struct {
|
||||
Endpoint string
|
||||
Token string
|
||||
defaultDistributedHopLimit = 3
|
||||
defaultSessionRouteTTL = 24 * time.Hour
|
||||
)
|
||||
|
||||
type distributedTaskRouterConfig struct {
|
||||
Config *BridgeConfig
|
||||
Token string
|
||||
}
|
||||
|
||||
type distributedTaskForwarder struct {
|
||||
endpoint string
|
||||
token string
|
||||
httpClient *http.Client
|
||||
type distributedTaskRouter struct {
|
||||
localNodeID string
|
||||
token string
|
||||
hopLimit int
|
||||
nodes map[string]DistributedNodeConfig
|
||||
rules []DistributedForwardRuleConfig
|
||||
routes map[string]string
|
||||
routeStore *distributedSessionRouteStore
|
||||
roundRobin map[string]int
|
||||
httpClient *http.Client
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newDistributedTaskForwarder(config distributedTaskForwarderConfig) *distributedTaskForwarder {
|
||||
endpoint := strings.TrimRight(strings.TrimSpace(config.Endpoint), "/")
|
||||
if endpoint == "" {
|
||||
type distributedForwardDecision struct {
|
||||
targetNodeID string
|
||||
nextHopID string
|
||||
endpoint string
|
||||
}
|
||||
|
||||
type distributedSessionRoute struct {
|
||||
TargetNodeID string
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
type distributedSessionRouteStore struct {
|
||||
mu sync.Mutex
|
||||
routes map[string]distributedSessionRoute
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
func newDistributedTaskRouter(config distributedTaskRouterConfig) *distributedTaskRouter {
|
||||
if config.Config == nil {
|
||||
return nil
|
||||
}
|
||||
return &distributedTaskForwarder{
|
||||
endpoint: endpoint,
|
||||
token: strings.TrimSpace(config.Token),
|
||||
distributed := config.Config.Distributed
|
||||
localNodeID := strings.TrimSpace(distributed.LocalNodeID)
|
||||
if localNodeID == "" {
|
||||
return nil
|
||||
}
|
||||
nodes := distributedNodeCatalog(distributed.Nodes)
|
||||
if _, ok := nodes[localNodeID]; !ok {
|
||||
return nil
|
||||
}
|
||||
rules := distributedForwardRules(distributed)
|
||||
if len(rules) == 0 {
|
||||
return nil
|
||||
}
|
||||
hopLimit := distributed.Forwarding.HopLimit
|
||||
if hopLimit <= 0 {
|
||||
hopLimit = defaultDistributedHopLimit
|
||||
}
|
||||
return &distributedTaskRouter{
|
||||
localNodeID: localNodeID,
|
||||
token: strings.TrimSpace(config.Token),
|
||||
hopLimit: hopLimit,
|
||||
nodes: nodes,
|
||||
rules: rules,
|
||||
routes: distributedRouteMap(distributed.Forwarding.Routes),
|
||||
routeStore: newDistributedSessionRouteStore(defaultSessionRouteTTL),
|
||||
roundRobin: make(map[string]int),
|
||||
httpClient: &http.Client{
|
||||
Timeout: openClawAgentWaitMaxTimeout + openClawAgentWaitHTTPMargin,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *distributedTaskForwarder) shouldForward(r *http.Request, request shared.RPCRequest) bool {
|
||||
if f == nil || strings.TrimSpace(f.endpoint) == "" || r == nil {
|
||||
return false
|
||||
func distributedNodeCatalog(configured []DistributedNodeConfig) map[string]DistributedNodeConfig {
|
||||
nodes := make(map[string]DistributedNodeConfig)
|
||||
for _, node := range defaultDistributedNodes() {
|
||||
if id := strings.TrimSpace(node.ID); id != "" {
|
||||
node.ID = id
|
||||
nodes[id] = node
|
||||
}
|
||||
}
|
||||
if strings.TrimSpace(r.Header.Get(distributedForwardedHeader)) != "" {
|
||||
return false
|
||||
for _, node := range configured {
|
||||
if id := strings.TrimSpace(node.ID); id != "" {
|
||||
node.ID = id
|
||||
nodes[id] = node
|
||||
}
|
||||
}
|
||||
method := strings.TrimSpace(request.Method)
|
||||
return method == "session.start" || method == "session.message"
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseWriter, r *http.Request, request shared.RPCRequest) bool {
|
||||
if !f.shouldForward(r, request) {
|
||||
func distributedForwardRules(distributed DistributedConfig) []DistributedForwardRuleConfig {
|
||||
if len(distributed.Forwarding.Rules) > 0 {
|
||||
return distributed.Forwarding.Rules
|
||||
}
|
||||
if peerID := strings.TrimSpace(distributed.TaskForwardPeerID); peerID != "" {
|
||||
return []DistributedForwardRuleConfig{
|
||||
{
|
||||
Methods: []string{"session.start", "session.message"},
|
||||
Target: DistributedForwardTargetConfig{
|
||||
NodeID: peerID,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func distributedRouteMap(routes []DistributedRouteConfig) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for _, route := range routes {
|
||||
target := strings.TrimSpace(route.TargetNodeID)
|
||||
nextHop := strings.TrimSpace(route.NextHopNodeID)
|
||||
if target != "" && nextHop != "" && target != nextHop {
|
||||
result[target] = nextHop
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) forward(ctx context.Context, w http.ResponseWriter, req *http.Request, request shared.RPCRequest) bool {
|
||||
decision, ok, err := r.forwardDecision(req, request)
|
||||
if err != nil {
|
||||
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, err.Error())
|
||||
return true
|
||||
}
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
payload, err := json.Marshal(request)
|
||||
@ -61,7 +160,7 @@ func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseW
|
||||
shared.WriteJSONError(w, request.ID, http.StatusInternalServerError, -32603, "TASK_FORWARD_ENCODE_FAILED")
|
||||
return true
|
||||
}
|
||||
forwardURL, err := f.forwardURL(r.URL.Path)
|
||||
forwardURL, err := distributedForwardURL(decision.endpoint, req.URL.Path)
|
||||
if err != nil {
|
||||
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, err.Error())
|
||||
return true
|
||||
@ -71,15 +170,9 @@ func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseW
|
||||
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_REQUEST_BUILD_FAILED: "+err.Error())
|
||||
return true
|
||||
}
|
||||
outbound.Header.Set("Content-Type", "application/json")
|
||||
outbound.Header.Set(distributedForwardedHeader, "1")
|
||||
copyForwardHeader(outbound.Header, r.Header, "Accept")
|
||||
copyForwardHeader(outbound.Header, r.Header, "Origin")
|
||||
if f.token != "" {
|
||||
outbound.Header.Set("Authorization", distributedForwardBearerHeader(f.token))
|
||||
}
|
||||
r.copyForwardRequestHeaders(outbound.Header, req.Header, request, decision)
|
||||
|
||||
response, err := f.httpClient.Do(outbound)
|
||||
response, err := r.httpClient.Do(outbound)
|
||||
if err != nil {
|
||||
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_FAILED: "+err.Error())
|
||||
return true
|
||||
@ -91,26 +184,243 @@ func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseW
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *distributedTaskForwarder) forwardURL(path string) (string, error) {
|
||||
base, err := url.Parse(f.endpoint)
|
||||
if err != nil || base.Scheme == "" || base.Host == "" {
|
||||
return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INVALID: %s", f.endpoint)
|
||||
func (r *distributedTaskRouter) forwardDecision(req *http.Request, request shared.RPCRequest) (distributedForwardDecision, bool, error) {
|
||||
if r == nil || req == nil {
|
||||
return distributedForwardDecision{}, false, nil
|
||||
}
|
||||
if !distributedForwardEndpointEncryptedOrPrivate(base) {
|
||||
return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INSECURE: use https or a private VPN endpoint")
|
||||
forwardedTarget := strings.TrimSpace(req.Header.Get(distributedTargetHeader))
|
||||
if strings.TrimSpace(req.Header.Get(distributedForwardedHeader)) != "" && forwardedTarget == "" {
|
||||
return distributedForwardDecision{}, false, nil
|
||||
}
|
||||
if forwardedTarget != "" {
|
||||
if strings.EqualFold(forwardedTarget, r.localNodeID) {
|
||||
return distributedForwardDecision{}, false, nil
|
||||
}
|
||||
return r.decisionForTarget(forwardedTarget, req)
|
||||
}
|
||||
|
||||
method := strings.TrimSpace(request.Method)
|
||||
if !distributedForwardableMethod(method) {
|
||||
return distributedForwardDecision{}, false, nil
|
||||
}
|
||||
sessionKey := distributedSessionRouteKey(request)
|
||||
if sessionKey != "" {
|
||||
if routed, ok := r.routeStore.get(sessionKey); ok {
|
||||
return r.decisionForTarget(routed, req)
|
||||
}
|
||||
}
|
||||
targetNodeID, ok := r.targetForRequest(method)
|
||||
if !ok || targetNodeID == "" || strings.EqualFold(targetNodeID, r.localNodeID) {
|
||||
return distributedForwardDecision{}, false, nil
|
||||
}
|
||||
if sessionKey != "" {
|
||||
r.routeStore.set(sessionKey, targetNodeID)
|
||||
}
|
||||
return r.decisionForTarget(targetNodeID, req)
|
||||
}
|
||||
|
||||
func distributedForwardableMethod(method string) bool {
|
||||
return method == "session.start" || method == "session.message"
|
||||
}
|
||||
|
||||
func distributedSessionRouteKey(request shared.RPCRequest) string {
|
||||
params := shared.AsMap(request.Params)
|
||||
if params == nil {
|
||||
return ""
|
||||
}
|
||||
if sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", "")); sessionID != "" {
|
||||
return "session:" + sessionID
|
||||
}
|
||||
if threadID := strings.TrimSpace(shared.StringArg(params, "threadId", "")); threadID != "" {
|
||||
return "thread:" + threadID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) targetForRequest(method string) (string, bool) {
|
||||
for _, rule := range r.rules {
|
||||
if !distributedRuleMatchesMethod(rule, method) {
|
||||
continue
|
||||
}
|
||||
if target := strings.TrimSpace(rule.Target.NodeID); target != "" {
|
||||
return target, true
|
||||
}
|
||||
target, ok := r.selectNode(rule.Target)
|
||||
return target, ok
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func distributedRuleMatchesMethod(rule DistributedForwardRuleConfig, method string) bool {
|
||||
for _, candidate := range rule.Methods {
|
||||
candidate = strings.TrimSpace(candidate)
|
||||
if candidate == "*" || candidate == method {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) selectNode(target DistributedForwardTargetConfig) (string, bool) {
|
||||
candidates := make([]string, 0, len(r.nodes))
|
||||
for id, node := range r.nodes {
|
||||
if id == r.localNodeID {
|
||||
continue
|
||||
}
|
||||
if !distributedNodeMatchesSelector(node, target.Selector) {
|
||||
continue
|
||||
}
|
||||
candidates = append(candidates, id)
|
||||
}
|
||||
if len(candidates) == 0 {
|
||||
return "", false
|
||||
}
|
||||
sort.Strings(candidates)
|
||||
strategy := strings.TrimSpace(target.Strategy)
|
||||
if strategy == "" || strings.EqualFold(strategy, "first") {
|
||||
return candidates[0], true
|
||||
}
|
||||
if strings.EqualFold(strategy, "round_robin") {
|
||||
key := distributedSelectorKey(target.Selector)
|
||||
r.mu.Lock()
|
||||
index := r.roundRobin[key] % len(candidates)
|
||||
r.roundRobin[key]++
|
||||
r.mu.Unlock()
|
||||
return candidates[index], true
|
||||
}
|
||||
return candidates[0], true
|
||||
}
|
||||
|
||||
func distributedNodeMatchesSelector(node DistributedNodeConfig, selector DistributedForwardSelectorConfig) bool {
|
||||
if role := strings.TrimSpace(selector.Role); role != "" && !strings.EqualFold(strings.TrimSpace(node.Role), role) {
|
||||
return false
|
||||
}
|
||||
if zone := strings.TrimSpace(selector.Zone); zone != "" && !strings.EqualFold(strings.TrimSpace(node.Zone), zone) {
|
||||
return false
|
||||
}
|
||||
if capability := strings.TrimSpace(selector.Capability); capability != "" && !distributedNodeHasCapability(node, capability) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func distributedNodeHasCapability(node DistributedNodeConfig, capability string) bool {
|
||||
for _, candidate := range node.Capabilities {
|
||||
if strings.EqualFold(strings.TrimSpace(candidate), capability) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func distributedSelectorKey(selector DistributedForwardSelectorConfig) string {
|
||||
return strings.Join([]string{
|
||||
strings.TrimSpace(selector.Role),
|
||||
strings.TrimSpace(selector.Zone),
|
||||
strings.TrimSpace(selector.Capability),
|
||||
}, "|")
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) decisionForTarget(targetNodeID string, req *http.Request) (distributedForwardDecision, bool, error) {
|
||||
targetNodeID = strings.TrimSpace(targetNodeID)
|
||||
nextHopID := r.nextHopForTarget(targetNodeID)
|
||||
if nextHopID == "" || strings.EqualFold(nextHopID, r.localNodeID) {
|
||||
return distributedForwardDecision{}, false, nil
|
||||
}
|
||||
node, ok := r.nodes[nextHopID]
|
||||
if !ok {
|
||||
return distributedForwardDecision{}, false, fmt.Errorf("TASK_FORWARD_PEER_UNKNOWN: %s", nextHopID)
|
||||
}
|
||||
if err := r.validateHopLimit(req); err != nil {
|
||||
return distributedForwardDecision{}, false, err
|
||||
}
|
||||
endpoint := strings.TrimSpace(node.BridgeEndpoint)
|
||||
if endpoint == "" {
|
||||
return distributedForwardDecision{}, false, fmt.Errorf("TASK_FORWARD_ENDPOINT_MISSING: %s", nextHopID)
|
||||
}
|
||||
return distributedForwardDecision{
|
||||
targetNodeID: targetNodeID,
|
||||
nextHopID: nextHopID,
|
||||
endpoint: endpoint,
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) nextHopForTarget(targetNodeID string) string {
|
||||
if nextHopID := strings.TrimSpace(r.routes[targetNodeID]); nextHopID != "" {
|
||||
return nextHopID
|
||||
}
|
||||
return targetNodeID
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) validateHopLimit(req *http.Request) error {
|
||||
nextHop := distributedForwardHop(req) + 1
|
||||
if nextHop > r.hopLimit {
|
||||
return fmt.Errorf("TASK_FORWARD_HOP_LIMIT_EXCEEDED: %d", r.hopLimit)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *distributedTaskRouter) copyForwardRequestHeaders(dst http.Header, src http.Header, request shared.RPCRequest, decision distributedForwardDecision) {
|
||||
dst.Set("Content-Type", "application/json")
|
||||
dst.Set(distributedForwardedHeader, "1")
|
||||
dst.Set(distributedSourceHeader, firstForwardHeader(src, distributedSourceHeader, r.localNodeID))
|
||||
dst.Set(distributedTargetHeader, decision.targetNodeID)
|
||||
dst.Set(distributedTraceHeader, firstForwardHeader(src, distributedTraceHeader, distributedTraceID(request)))
|
||||
dst.Set(distributedHopHeader, strconv.Itoa(distributedForwardHopFromHeader(src.Get(distributedHopHeader))+1))
|
||||
copyForwardHeader(dst, src, "Accept")
|
||||
copyForwardHeader(dst, src, "Origin")
|
||||
if r.token != "" {
|
||||
dst.Set("Authorization", distributedForwardBearerHeader(r.token))
|
||||
}
|
||||
}
|
||||
|
||||
func distributedTraceID(request shared.RPCRequest) string {
|
||||
if request.ID != nil {
|
||||
return fmt.Sprint(request.ID)
|
||||
}
|
||||
return strconv.FormatInt(time.Now().UnixNano(), 36)
|
||||
}
|
||||
|
||||
func firstForwardHeader(src http.Header, key string, fallback string) string {
|
||||
if value := strings.TrimSpace(src.Get(key)); value != "" {
|
||||
return value
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func distributedForwardHop(req *http.Request) int {
|
||||
if req == nil {
|
||||
return 0
|
||||
}
|
||||
return distributedForwardHopFromHeader(req.Header.Get(distributedHopHeader))
|
||||
}
|
||||
|
||||
func distributedForwardHopFromHeader(value string) int {
|
||||
hop, err := strconv.Atoi(strings.TrimSpace(value))
|
||||
if err != nil || hop < 0 {
|
||||
return 0
|
||||
}
|
||||
return hop
|
||||
}
|
||||
|
||||
func distributedForwardURL(endpoint string, path string) (string, error) {
|
||||
endpoint = strings.TrimRight(strings.TrimSpace(endpoint), "/")
|
||||
base, err := url.Parse(endpoint)
|
||||
if err != nil || base.Scheme == "" || base.Host == "" {
|
||||
return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INVALID: %s", endpoint)
|
||||
}
|
||||
if !distributedForwardEndpointPrivate(base) {
|
||||
return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INSECURE: use a private VPN endpoint")
|
||||
}
|
||||
base.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/")
|
||||
base.RawQuery = ""
|
||||
return base.String(), nil
|
||||
}
|
||||
|
||||
func distributedForwardEndpointEncryptedOrPrivate(endpoint *url.URL) bool {
|
||||
func distributedForwardEndpointPrivate(endpoint *url.URL) bool {
|
||||
if endpoint == nil {
|
||||
return false
|
||||
}
|
||||
if strings.EqualFold(endpoint.Scheme, "https") {
|
||||
return true
|
||||
}
|
||||
if !strings.EqualFold(endpoint.Scheme, "http") {
|
||||
return false
|
||||
}
|
||||
@ -146,3 +456,40 @@ func copyForwardResponseHeaders(dst http.Header, src http.Header) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newDistributedSessionRouteStore(ttl time.Duration) *distributedSessionRouteStore {
|
||||
return &distributedSessionRouteStore{
|
||||
routes: make(map[string]distributedSessionRoute),
|
||||
ttl: ttl,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *distributedSessionRouteStore) get(key string) (string, bool) {
|
||||
if s == nil || strings.TrimSpace(key) == "" {
|
||||
return "", false
|
||||
}
|
||||
now := time.Now()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
route, ok := s.routes[key]
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
if !route.ExpiresAt.IsZero() && now.After(route.ExpiresAt) {
|
||||
delete(s.routes, key)
|
||||
return "", false
|
||||
}
|
||||
return route.TargetNodeID, true
|
||||
}
|
||||
|
||||
func (s *distributedSessionRouteStore) set(key string, targetNodeID string) {
|
||||
if s == nil || strings.TrimSpace(key) == "" || strings.TrimSpace(targetNodeID) == "" {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.routes[key] = distributedSessionRoute{
|
||||
TargetNodeID: strings.TrimSpace(targetNodeID),
|
||||
ExpiresAt: time.Now().Add(s.ttl),
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,6 +46,8 @@ func (s *Server) Handler() http.Handler {
|
||||
s.HandleRPC(w, r)
|
||||
case "/acp":
|
||||
s.HandleWebSocket(w, r)
|
||||
case "/gateway/openclaw":
|
||||
s.HandleOpenClawGatewayRPC(w, r)
|
||||
case openClawArtifactDownloadPath:
|
||||
s.HandleOpenClawArtifactDownload(w, r)
|
||||
default:
|
||||
@ -112,6 +114,10 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
||||
s.handleRPCWithTransform(w, r, nil)
|
||||
}
|
||||
|
||||
func (s *Server) HandleOpenClawGatewayRPC(w http.ResponseWriter, r *http.Request) {
|
||||
s.handleRPCWithTransform(w, r, forceOpenClawGatewayRequest)
|
||||
}
|
||||
|
||||
func (s *Server) HandleDisabledProviderDirectPath(w http.ResponseWriter, r *http.Request) {
|
||||
shared.ApplyCORS(w, r, s.allowedOrigins)
|
||||
if r.Method == http.MethodOptions {
|
||||
@ -180,7 +186,7 @@ func (s *Server) handleRPCWithTransform(
|
||||
}
|
||||
request = transformed
|
||||
}
|
||||
if s.taskForwarder.forward(r.Context(), w, r, request) {
|
||||
if s.taskRouter.forward(r.Context(), w, r, request) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -440,6 +446,62 @@ func sseEventType(payload map[string]any) string {
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest, *shared.RPCError) {
|
||||
method := strings.TrimSpace(request.Method)
|
||||
switch method {
|
||||
case "session.start", "session.message":
|
||||
default:
|
||||
return request, &shared.RPCError{Code: -32601, Message: "OPENCLAW_GATEWAY_METHOD_NOT_ALLOWED: " + method}
|
||||
}
|
||||
params := shared.AsMap(request.Params)
|
||||
if params == nil {
|
||||
params = map[string]any{}
|
||||
}
|
||||
if parseBool(params["multiAgent"]) || strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), "multi-agent") {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"}
|
||||
}
|
||||
if provider := strings.TrimSpace(shared.StringArg(params, "provider", "")); provider != "" {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: provider must not be set on /gateway/openclaw"}
|
||||
}
|
||||
for _, key := range []string{"executionTarget", "requestedExecutionTarget"} {
|
||||
if target := strings.TrimSpace(shared.StringArg(params, key, "")); target != "" && !strings.EqualFold(target, "gateway") {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: " + key + " must be gateway"}
|
||||
}
|
||||
}
|
||||
for _, key := range []string{"preferredGatewayProviderId", "gatewayProviderId", "gatewayProvider"} {
|
||||
if provider := strings.TrimSpace(shared.StringArg(params, key, "")); provider != "" && !strings.EqualFold(provider, "openclaw") {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: gateway provider must be openclaw"}
|
||||
}
|
||||
}
|
||||
routing := shared.AsMap(params["routing"])
|
||||
if routing == nil {
|
||||
routing = map[string]any{}
|
||||
}
|
||||
if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"}
|
||||
}
|
||||
if provider := strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", "")); provider != "" {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitProviderId must not be set on /gateway/openclaw"}
|
||||
}
|
||||
if target := strings.TrimSpace(shared.StringArg(routing, "explicitExecutionTarget", "")); target != "" && !strings.EqualFold(target, "gateway") {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitExecutionTarget must be gateway"}
|
||||
}
|
||||
for _, key := range []string{"preferredGatewayProviderId", "gatewayProviderId", "gatewayProvider"} {
|
||||
if provider := strings.TrimSpace(shared.StringArg(routing, key, "")); provider != "" && !strings.EqualFold(provider, "openclaw") {
|
||||
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: gateway provider must be openclaw"}
|
||||
}
|
||||
}
|
||||
routing["routingMode"] = "explicit"
|
||||
routing["explicitExecutionTarget"] = "gateway"
|
||||
routing["preferredGatewayProviderId"] = "openclaw"
|
||||
delete(routing, "explicitProviderId")
|
||||
params["routing"] = routing
|
||||
params["requestedExecutionTarget"] = "gateway"
|
||||
params["executionTarget"] = "gateway"
|
||||
request.Params = params
|
||||
return request, nil
|
||||
}
|
||||
|
||||
func requestUsesOpenClawGatewaySubmit(params map[string]any) bool {
|
||||
if len(params) == 0 {
|
||||
return false
|
||||
|
||||
@ -52,9 +52,9 @@ func NewServer() *Server {
|
||||
shared.EnvOrDefault("BRIDGE_REVIEW_AUTH_TOKEN", ""),
|
||||
),
|
||||
openClawGate: newOpenClawGatewayAdmissionGate(config),
|
||||
taskForwarder: newDistributedTaskForwarder(distributedTaskForwarderConfig{
|
||||
Endpoint: resolveDistributedTaskForwardEndpoint(config),
|
||||
Token: resolveDistributedTaskForwardToken(config),
|
||||
taskRouter: newDistributedTaskRouter(distributedTaskRouterConfig{
|
||||
Config: config,
|
||||
Token: resolveDistributedTaskForwardToken(config),
|
||||
}),
|
||||
}
|
||||
s.Bootstrap()
|
||||
|
||||
@ -87,7 +87,7 @@ type Server struct {
|
||||
gateway *gatewayruntime.Manager
|
||||
openClawGate *openClawGatewayAdmissionGate
|
||||
jobs *jobManager
|
||||
taskForwarder *distributedTaskForwarder
|
||||
taskRouter *distributedTaskRouter
|
||||
|
||||
// Legacy / Common
|
||||
authService interface{} // Minimal auth dependency
|
||||
|
||||
Loading…
Reference in New Issue
Block a user