diff --git a/docs/api-reference.md b/docs/api-reference.md index 7d7bbfe..82dc855 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -51,6 +51,45 @@ - `BRIDGE_AUTH_TOKEN` 非空时,接受裸 token 或 `Bearer ` - `xworkmate-app` 生产 Origin 固定为 `https://xworkmate.svc.plus` +## 3.1 Lightweight Distributed Task Forwarding + +bridge 可以把本机收到的 HTTP 任务提交转发到另一个 bridge endpoint,用于轻量分布式部署。例如: + +- `cn-xworkmate-bridge.svc.plus` 负责本地入口和鉴权 +- `xworkmate-bridge.svc.plus` 负责实际 OpenClaw task runtime +- cn bridge 配置 peer endpoint 后,`POST /gateway/openclaw` 的 `session.start` / `session.message` 会转发到 peer 的同一路径 + +配置: + +```yaml +distributed: + task_forward_endpoint: "https://xworkmate-bridge.svc.plus" + task_forward_token: "" +``` + +等价环境变量: + +```text +XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT=https://xworkmate-bridge.svc.plus +XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN=$PEER_BRIDGE_AUTH_TOKEN +``` + +规则: + +- endpoint 是 peer bridge base URL,bridge 会按当前请求路径拼接 `/gateway/openclaw` 或 `/acp/rpc` +- 同步消息不能走公网明文:公网 endpoint 必须使用 `https://` +- `http://` 只允许 loopback、private、link-local 这类本机或 VPN 内网地址,用于 WireGuard 等隧道已经提供加密的场景 +- endpoint 可以是公网 HTTPS,也可以是 VPN 内网 HTTP(S)。bridge 明确支持把 `task_forward_endpoint` 配成 WireGuard、WireGuard over VLESS/TCP/TLS、WebSocket/TLS 等隧道后的本机或私网地址 +- 只要求本机网络能路由到 endpoint;bridge 不依赖 config center 或额外注册中心 +- `task_forward_token` 为空时复用本机 `BRIDGE_AUTH_TOKEN` +- 转发请求会带 `X-XWorkmate-Bridge-Forwarded: 1`,收到该 header 后不会再次转发,避免 bridge 之间循环 + +抗干扰建议: + +- 跨境或 GFW 环境可以直接把 `task_forward_endpoint` 指向 WireGuard over VLESS/TCP/TLS 后的 `http://10.x/172.16-31.x/192.168.x` 私网 bridge endpoint +- 运营商 UDP 阻断时,支持把裸 WireGuard UDP 替换为 WireGuard over VLESS/TCP/TLS、WebSocket/TLS 或等价可靠加密通道,bridge 继续使用同一个 peer endpoint 配置 +- bridge 层继续使用 bearer token 鉴权;隧道层负责链路加密和抗干扰,应用层负责 peer 身份和任务权限 + 推荐 APP 配置: ```text diff --git a/example/config.yaml b/example/config.yaml index 2d54121..b0d5b2f 100644 --- a/example/config.yaml +++ b/example/config.yaml @@ -17,6 +17,19 @@ openclaw_gateway: max_queued: 20 queue_timeout: "10m" +# Lightweight distributed bridge forwarding. +# cn-xworkmate-bridge.svc.plus can forward task submissions to a peer bridge, +# including VPN-only endpoints such as WireGuard or WireGuard-over-VLESS +# addresses that are reachable from this host. +# Public plaintext HTTP is rejected; use HTTPS on public paths, or HTTP only on +# loopback/private/link-local VPN addresses where the tunnel provides encryption. +# The endpoint is the peer bridge base URL; /gateway/openclaw or /acp/rpc is +# appended from the inbound request path. +distributed: + task_forward_endpoint: "" + # Optional. Defaults to BRIDGE_AUTH_TOKEN when omitted. + task_forward_token: "" + # Legacy/Reference structure (Normally managed via code constants or environment) bridge: listenAddr: 127.0.0.1:8787 @@ -30,3 +43,5 @@ notes: - The bridge reads its own auth token from BRIDGE_AUTH_TOKEN. - Upstream URLs can include sub-paths which will be preserved during WebSocket handshake. - Multi-agent and Single-agent modes will use these endpoints to delegate tasks. + - Distributed task forwarding rejects public plaintext HTTP; use HTTPS or private VPN HTTP(S) endpoints. + - task_forward_endpoint can point at a WireGuard-over-VLESS/TCP/TLS local/private bridge address for GFW or carrier UDP blocking environments. diff --git a/internal/acp/config.go b/internal/acp/config.go index bb2de44..39054ff 100644 --- a/internal/acp/config.go +++ b/internal/acp/config.go @@ -24,9 +24,15 @@ type BridgeConfig struct { GeminiURL string `yaml:"gemini_url"` HermesURL string `yaml:"hermes_url"` } `yaml:"upstream"` + Distributed DistributedConfig `yaml:"distributed"` OpenClawGateway OpenClawGatewayConfig `yaml:"openclaw_gateway"` } +type DistributedConfig struct { + TaskForwardEndpoint string `yaml:"task_forward_endpoint"` + TaskForwardToken string `yaml:"task_forward_token"` +} + type OpenClawGatewayConfig struct { MaxActive *int `yaml:"max_active"` MaxQueued *int `yaml:"max_queued"` @@ -71,6 +77,29 @@ func bridgeSharedAuthToken() string { return strings.TrimSpace(shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", "")) } +func resolveDistributedTaskForwardEndpoint(config *BridgeConfig) string { + yamlVal := "" + if config != nil { + yamlVal = config.Distributed.TaskForwardEndpoint + } + return resolveURL(yamlVal, "XWORKMATE_BRIDGE_TASK_FORWARD_ENDPOINT", "BRIDGE_TASK_FORWARD_ENDPOINT") +} + +func resolveDistributedTaskForwardToken(config *BridgeConfig) string { + if token := strings.TrimSpace(os.Getenv("XWORKMATE_BRIDGE_TASK_FORWARD_TOKEN")); token != "" { + return token + } + if token := strings.TrimSpace(os.Getenv("BRIDGE_TASK_FORWARD_TOKEN")); token != "" { + return token + } + if config != nil { + if token := strings.TrimSpace(config.Distributed.TaskForwardToken); token != "" { + return token + } + } + return bridgeSharedAuthToken() +} + func newProductionProviderCatalog() (*BridgeConfig, map[string]syncedProvider, []string) { return newProductionProviderCatalogFromConfig(loadBridgeConfig()) } diff --git a/internal/acp/distributed_forwarder.go b/internal/acp/distributed_forwarder.go new file mode 100644 index 0000000..dec30bb --- /dev/null +++ b/internal/acp/distributed_forwarder.go @@ -0,0 +1,148 @@ +package acp + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + + "xworkmate-bridge/internal/shared" +) + +const distributedForwardedHeader = "X-XWorkmate-Bridge-Forwarded" + +type distributedTaskForwarderConfig struct { + Endpoint string + Token string +} + +type distributedTaskForwarder struct { + endpoint string + token string + httpClient *http.Client +} + +func newDistributedTaskForwarder(config distributedTaskForwarderConfig) *distributedTaskForwarder { + endpoint := strings.TrimRight(strings.TrimSpace(config.Endpoint), "/") + if endpoint == "" { + return nil + } + return &distributedTaskForwarder{ + endpoint: endpoint, + token: strings.TrimSpace(config.Token), + httpClient: &http.Client{ + Timeout: openClawAgentWaitMaxTimeout + openClawAgentWaitHTTPMargin, + }, + } +} + +func (f *distributedTaskForwarder) shouldForward(r *http.Request, request shared.RPCRequest) bool { + if f == nil || strings.TrimSpace(f.endpoint) == "" || r == nil { + return false + } + if strings.TrimSpace(r.Header.Get(distributedForwardedHeader)) != "" { + return false + } + method := strings.TrimSpace(request.Method) + return method == "session.start" || method == "session.message" +} + +func (f *distributedTaskForwarder) forward(ctx context.Context, w http.ResponseWriter, r *http.Request, request shared.RPCRequest) bool { + if !f.shouldForward(r, request) { + return false + } + payload, err := json.Marshal(request) + if err != nil { + shared.WriteJSONError(w, request.ID, http.StatusInternalServerError, -32603, "TASK_FORWARD_ENCODE_FAILED") + return true + } + forwardURL, err := f.forwardURL(r.URL.Path) + if err != nil { + shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, err.Error()) + return true + } + outbound, err := http.NewRequestWithContext(ctx, http.MethodPost, forwardURL, bytes.NewReader(payload)) + if err != nil { + shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_REQUEST_BUILD_FAILED: "+err.Error()) + return true + } + outbound.Header.Set("Content-Type", "application/json") + outbound.Header.Set(distributedForwardedHeader, "1") + copyForwardHeader(outbound.Header, r.Header, "Accept") + copyForwardHeader(outbound.Header, r.Header, "Origin") + if f.token != "" { + outbound.Header.Set("Authorization", distributedForwardBearerHeader(f.token)) + } + + response, err := f.httpClient.Do(outbound) + if err != nil { + shared.WriteJSONError(w, request.ID, http.StatusBadGateway, -32060, "TASK_FORWARD_FAILED: "+err.Error()) + return true + } + defer func() { _ = response.Body.Close() }() + copyForwardResponseHeaders(w.Header(), response.Header) + w.WriteHeader(response.StatusCode) + _, _ = io.Copy(w, response.Body) + return true +} + +func (f *distributedTaskForwarder) forwardURL(path string) (string, error) { + base, err := url.Parse(f.endpoint) + if err != nil || base.Scheme == "" || base.Host == "" { + return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INVALID: %s", f.endpoint) + } + if !distributedForwardEndpointEncryptedOrPrivate(base) { + return "", fmt.Errorf("TASK_FORWARD_ENDPOINT_INSECURE: use https or a private VPN endpoint") + } + base.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/") + base.RawQuery = "" + return base.String(), nil +} + +func distributedForwardEndpointEncryptedOrPrivate(endpoint *url.URL) bool { + if endpoint == nil { + return false + } + if strings.EqualFold(endpoint.Scheme, "https") { + return true + } + if !strings.EqualFold(endpoint.Scheme, "http") { + return false + } + host := strings.Trim(endpoint.Hostname(), "[]") + if host == "localhost" { + return true + } + ip := net.ParseIP(host) + if ip == nil { + return false + } + return ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() +} + +func distributedForwardBearerHeader(token string) string { + token = strings.TrimSpace(token) + if token == "" || strings.HasPrefix(strings.ToLower(token), "bearer ") { + return token + } + return "Bearer " + token +} + +func copyForwardHeader(dst http.Header, src http.Header, key string) { + if value := strings.TrimSpace(src.Get(key)); value != "" { + dst.Set(key, value) + } +} + +func copyForwardResponseHeaders(dst http.Header, src http.Header) { + for _, key := range []string{"Content-Type", "Cache-Control", "Connection"} { + if value := strings.TrimSpace(src.Get(key)); value != "" { + dst.Set(key, value) + } + } +} diff --git a/internal/acp/http_handler.go b/internal/acp/http_handler.go index 16bbd48..b2c8cfb 100644 --- a/internal/acp/http_handler.go +++ b/internal/acp/http_handler.go @@ -182,6 +182,9 @@ func (s *Server) handleRPCWithTransform( } request = transformed } + if s.taskForwarder.forward(r.Context(), w, r, request) { + return + } accept := strings.ToLower(r.Header.Get("Accept")) stream := strings.Contains(accept, "text/event-stream") diff --git a/internal/acp/server.go b/internal/acp/server.go index 1ca0334..06e7159 100644 --- a/internal/acp/server.go +++ b/internal/acp/server.go @@ -49,6 +49,10 @@ func NewServer() *Server { allowedOrigins: shared.ParseAllowedOrigins(shared.EnvOrDefault("ACP_ALLOWED_ORIGINS", "https://xworkmate.svc.plus,http://localhost:*,http://127.0.0.1:*")), authService: service.NewStaticTokenAuthService(shared.EnvOrDefault("BRIDGE_AUTH_TOKEN", "")), openClawGate: newOpenClawGatewayAdmissionGate(config), + taskForwarder: newDistributedTaskForwarder(distributedTaskForwarderConfig{ + Endpoint: resolveDistributedTaskForwardEndpoint(config), + Token: resolveDistributedTaskForwardToken(config), + }), } s.Bootstrap() return s diff --git a/internal/acp/types.go b/internal/acp/types.go index b9adab1..fbc87f5 100644 --- a/internal/acp/types.go +++ b/internal/acp/types.go @@ -87,6 +87,7 @@ type Server struct { gateway *gatewayruntime.Manager openClawGate *openClawGatewayAdmissionGate jobs *jobManager + taskForwarder *distributedTaskForwarder // Legacy / Common authService interface{} // Minimal auth dependency