Add distributed task forwarding

This commit is contained in:
Haitao Pan 2026-05-26 14:11:01 +08:00
parent 85fe904352
commit 110cb20e52
7 changed files with 239 additions and 0 deletions

View File

@ -51,6 +51,45 @@
- `BRIDGE_AUTH_TOKEN` 非空时,接受裸 token 或 `Bearer <token>`
- `xworkmate-app` 生产 Origin 固定为 `https://xworkmate.svc.plus`
## 3.1 Lightweight Distributed Task Forwarding
bridge 可以把本机收到的 HTTP 任务提交转发到另一个 bridge endpoint用于轻量分布式部署。例如
- `cn-xworkmate-bridge.svc.plus` 负责本地入口和鉴权
- `xworkmate-bridge.svc.plus` 负责实际 OpenClaw task runtime
- cn bridge 配置 peer endpoint 后,`POST /gateway/openclaw` 的 `session.start` / `session.message` 会转发到 peer 的同一路径
配置:
```yaml
distributed:
task_forward_endpoint: "https://xworkmate-bridge.svc.plus"
task_forward_token: ""
```
等价环境变量:
```text
XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT=https://xworkmate-bridge.svc.plus
XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN=$PEER_BRIDGE_AUTH_TOKEN
```
规则:
- endpoint 是 peer bridge base URLbridge 会按当前请求路径拼接 `/gateway/openclaw``/acp/rpc`
- 同步消息不能走公网明文:公网 endpoint 必须使用 `https://`
- `http://` 只允许 loopback、private、link-local 这类本机或 VPN 内网地址,用于 WireGuard 等隧道已经提供加密的场景
- endpoint 可以是公网 HTTPS也可以是 VPN 内网 HTTP(S)。bridge 明确支持把 `task_forward_endpoint` 配成 WireGuard、WireGuard over VLESS/TCP/TLS、WebSocket/TLS 等隧道后的本机或私网地址
- 只要求本机网络能路由到 endpointbridge 不依赖 config center 或额外注册中心
- `task_forward_token` 为空时复用本机 `BRIDGE_AUTH_TOKEN`
- 转发请求会带 `X-XWorkmate-Bridge-Forwarded: 1`,收到该 header 后不会再次转发,避免 bridge 之间循环
抗干扰建议:
- 跨境或 GFW 环境可以直接把 `task_forward_endpoint` 指向 WireGuard over VLESS/TCP/TLS 后的 `http://10.x/172.16-31.x/192.168.x` 私网 bridge endpoint
- 运营商 UDP 阻断时,支持把裸 WireGuard UDP 替换为 WireGuard over VLESS/TCP/TLS、WebSocket/TLS 或等价可靠加密通道bridge 继续使用同一个 peer endpoint 配置
- bridge 层继续使用 bearer token 鉴权;隧道层负责链路加密和抗干扰,应用层负责 peer 身份和任务权限
推荐 APP 配置:
```text

View File

@ -17,6 +17,19 @@ openclaw_gateway:
max_queued: 20
queue_timeout: "10m"
# Lightweight distributed bridge forwarding.
# cn-xworkmate-bridge.svc.plus can forward task submissions to a peer bridge,
# including VPN-only endpoints such as WireGuard or WireGuard-over-VLESS
# addresses that are reachable from this host.
# Public plaintext HTTP is rejected; use HTTPS on public paths, or HTTP only on
# loopback/private/link-local VPN addresses where the tunnel provides encryption.
# The endpoint is the peer bridge base URL; /gateway/openclaw or /acp/rpc is
# appended from the inbound request path.
distributed:
task_forward_endpoint: ""
# Optional. Defaults to BRIDGE_AUTH_TOKEN when omitted.
task_forward_token: ""
# Legacy/Reference structure (Normally managed via code constants or environment)
bridge:
listenAddr: 127.0.0.1:8787
@ -30,3 +43,5 @@ notes:
- The bridge reads its own auth token from BRIDGE_AUTH_TOKEN.
- Upstream URLs can include sub-paths which will be preserved during WebSocket handshake.
- Multi-agent and Single-agent modes will use these endpoints to delegate tasks.
- Distributed task forwarding rejects public plaintext HTTP; use HTTPS or private VPN HTTP(S) endpoints.
- task_forward_endpoint can point at a WireGuard-over-VLESS/TCP/TLS local/private bridge address for GFW or carrier UDP blocking environments.

View File

@ -24,9 +24,15 @@ type BridgeConfig struct {
GeminiURL string `yaml:"gemini_url"`
HermesURL string `yaml:"hermes_url"`
} `yaml:"upstream"`
Distributed DistributedConfig `yaml:"distributed"`
OpenClawGateway OpenClawGatewayConfig `yaml:"openclaw_gateway"`
}
type DistributedConfig struct {
TaskForwardEndpoint string `yaml:"task_forward_endpoint"`
TaskForwardToken string `yaml:"task_forward_token"`
}
type OpenClawGatewayConfig struct {
MaxActive *int `yaml:"max_active"`
MaxQueued *int `yaml:"max_queued"`
@ -71,6 +77,29 @@ func bridgeSharedAuthToken() string {
return strings.TrimSpace(shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", ""))
}
func resolveDistributedTaskForwardEndpoint(config *BridgeConfig) string {
yamlVal := ""
if config != nil {
yamlVal = config.Distributed.TaskForwardEndpoint
}
return resolveURL(yamlVal, "XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT", "BRIDGE_TASK_FORWARD_ENDPOINT")
}
func resolveDistributedTaskForwardToken(config *BridgeConfig) string {
if token := strings.TrimSpace(os.Getenv("XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN")); token != "" {
return token
}
if token := strings.TrimSpace(os.Getenv("BRIDGE_TASK_FORWARD_TOKEN")); token != "" {
return token
}
if config != nil {
if token := strings.TrimSpace(config.Distributed.TaskForwardToken); token != "" {
return token
}
}
return bridgeSharedAuthToken()
}
func newProductionProviderCatalog() (*BridgeConfig, map[string]syncedProvider, []string) {
return newProductionProviderCatalogFromConfig(loadBridgeConfig())
}

View File

@ -0,0 +1,148 @@
package acp
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"xworkmate-bridge/internal/shared"
)
const distributedForwardedHeader = "X-XWorkmate-Bridge-Forwarded"
type distributedTaskForwarderConfig struct {
Endpoint string
Token string
}
type distributedTaskForwarder struct {
endpoint string
token string
httpClient *http.Client
}
func newDistributedTaskForwarder(config distributedTaskForwarderConfig) *distributedTaskForwarder {
endpoint := strings.TrimRight(strings.TrimSpace(config.Endpoint), "/")
if endpoint == "" {
return nil
}
return &distributedTaskForwarder{
endpoint: endpoint,
token: strings.TrimSpace(config.Token),
httpClient: &http.Client{
Timeout: openClawAgentWaitMaxTimeout + openClawAgentWaitHTTPMargin,
},
}
}
func (f *distributedTaskForwarder) shouldForward(r *http.Request, request shared.RPCRequest) bool {
if f == nil || strings.TrimSpace(f.endpoint) == "" || r == nil {
return false
}
if strings.TrimSpace(r.Header.Get(distributedForwardedHeader)) != "" {
return false
}
method := strings.TrimSpace(request.Method)
return method == "session.start" || method == "session.message"
}
func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseWriter, r *http.Request, request shared.RPCRequest) bool {
if !f.shouldForward(r, request) {
return false
}
payload, err := json.Marshal(request)
if err != nil {
shared.WriteJSONError(w, request.ID, http.StatusInternalServerError, -32603, "TASK_FORWARD_ENCODE_FAILED")
return true
}
forwardURL, err := f.forwardURL(r.URL.Path)
if err != nil {
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, err.Error())
return true
}
outbound, err := http.NewRequestWithContext(ctx, http.MethodPost, forwardURL, bytes.NewReader(payload))
if err != nil {
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_REQUEST_BUILD_FAILED: "+err.Error())
return true
}
outbound.Header.Set("Content-Type", "application/json")
outbound.Header.Set(distributedForwardedHeader, "1")
copyForwardHeader(outbound.Header, r.Header, "Accept")
copyForwardHeader(outbound.Header, r.Header, "Origin")
if f.token != "" {
outbound.Header.Set("Authorization", distributedForwardBearerHeader(f.token))
}
response, err := f.httpClient.Do(outbound)
if err != nil {
shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_FAILED: "+err.Error())
return true
}
defer func() { _ = response.Body.Close() }()
copyForwardResponseHeaders(w.Header(), response.Header)
w.WriteHeader(response.StatusCode)
_, _ = io.Copy(w, response.Body)
return true
}
func (f *distributedTaskForwarder) forwardURL(path string) (string, error) {
base, err := url.Parse(f.endpoint)
if err != nil || base.Scheme == "" || base.Host == "" {
return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INVALID: %s", f.endpoint)
}
if !distributedForwardEndpointEncryptedOrPrivate(base) {
return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INSECURE: use https or a private VPN endpoint")
}
base.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/")
base.RawQuery = ""
return base.String(), nil
}
func distributedForwardEndpointEncryptedOrPrivate(endpoint *url.URL) bool {
if endpoint == nil {
return false
}
if strings.EqualFold(endpoint.Scheme, "https") {
return true
}
if !strings.EqualFold(endpoint.Scheme, "http") {
return false
}
host := strings.Trim(endpoint.Hostname(), "[]")
if host == "localhost" {
return true
}
ip := net.ParseIP(host)
if ip == nil {
return false
}
return ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast()
}
func distributedForwardBearerHeader(token string) string {
token = strings.TrimSpace(token)
if token == "" || strings.HasPrefix(strings.ToLower(token), "bearer ") {
return token
}
return "Bearer " + token
}
func copyForwardHeader(dst http.Header, src http.Header, key string) {
if value := strings.TrimSpace(src.Get(key)); value != "" {
dst.Set(key, value)
}
}
func copyForwardResponseHeaders(dst http.Header, src http.Header) {
for _, key := range []string{"Content-Type", "Cache-Control", "Connection"} {
if value := strings.TrimSpace(src.Get(key)); value != "" {
dst.Set(key, value)
}
}
}

View File

@ -182,6 +182,9 @@ func (s *Server) handleRPCWithTransform(
}
request = transformed
}
if s.taskForwarder.forward(r.Context(), w, r, request) {
return
}
accept := strings.ToLower(r.Header.Get("Accept"))
stream := strings.Contains(accept, "text/event-stream")

View File

@ -49,6 +49,10 @@ func NewServer() *Server {
allowedOrigins: shared.ParseAllowedOrigins(shared.EnvOrDefault("ACP_ALLOWED_ORIGINS", "https://xworkmate.svc.plus,http://localhost:*,http://127.0.0.1:*")),
authService: service.NewStaticTokenAuthService(shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", "")),
openClawGate: newOpenClawGatewayAdmissionGate(config),
taskForwarder: newDistributedTaskForwarder(distributedTaskForwarderConfig{
Endpoint: resolveDistributedTaskForwardEndpoint(config),
Token: resolveDistributedTaskForwardToken(config),
}),
}
s.Bootstrap()
return s

View File

@ -87,6 +87,7 @@ type Server struct {
gateway *gatewayruntime.Manager
openClawGate *openClawGatewayAdmissionGate
jobs *jobManager
taskForwarder *distributedTaskForwarder
// Legacy / Common
authService interface{} // Minimal auth dependency