From 760a8c0f3b09df43a4f1c59deeaf29cf39b0ad16 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Mon, 18 May 2026 16:55:40 +0800 Subject: [PATCH] feat: add internal multi-agent jobs and tools orchestration --- docs/api-reference.md | 69 +++- internal/acp/bootstrap.go | 1 + internal/acp/catalog.go | 10 +- internal/acp/helpers.go | 48 +++ internal/acp/http_handler.go | 5 +- internal/acp/jobs.go | 354 ++++++++++++++++ internal/acp/multi_agent.go | 612 ++++++++++++++++++++++++++++ internal/acp/orchestrator.go | 4 + internal/acp/provider_compat.go | 36 ++ internal/acp/providers_sync_test.go | 11 +- internal/acp/routing_test.go | 383 +++++++++++++++++ internal/acp/rpc_handler.go | 6 + internal/acp/types.go | 1 + internal/acp/web_contract_test.go | 2 + 14 files changed, 1531 insertions(+), 11 deletions(-) create mode 100644 internal/acp/jobs.go create mode 100644 internal/acp/multi_agent.go diff --git a/docs/api-reference.md b/docs/api-reference.md index e58c635..75ef5e1 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -137,10 +137,15 @@ bridge 对 app 的稳定 method family 只有: - `xworkmate.gateway.connect` - `xworkmate.gateway.request` - `xworkmate.gateway.disconnect` +- `xworkmate.jobs.submit` +- `xworkmate.jobs.get` +- `xworkmate.jobs.list` +- `xworkmate.jobs.stats` +- `xworkmate.tools.invoke` 路径约束: -- `/acp/rpc` 是 capabilities、routing、agent、multi-agent、cancel、close 的 canonical HTTP RPC 入口。 +- `/acp/rpc` 是 capabilities、routing、agent、multi-agent、jobs、tools proxy、cancel、close 的 canonical HTTP RPC 入口。 - `/gateway/openclaw` 只允许 OpenClaw `session.start` 和 follow-up `session.message`。 - `/gateway/openclaw` 拒绝 `acp.capabilities`、`xworkmate.routing.resolve`、`xworkmate.gateway.*`、`session.cancel` 和 `session.close`。 @@ -153,7 +158,7 @@ bridge 对 app 的稳定 method family 只有: ```json { "singleAgent": true, - "multiAgent": false, + "multiAgent": true, "availableExecutionTargets": ["agent", "gateway"], "providerCatalog": [ { "providerId": "codex", "label": "Codex", "targets": ["agent"], "category": "native" }, @@ -249,6 +254,15 @@ unavailable 示例: - `workingDirectory` - `routing` +multi-agent 输入仍使用同一个 `session.start` / `session.message` method,不新增 HTTP path: + +- `multiAgent: true` 或 `mode: "multi-agent"` +- `routing.orchestrationMode`: `sequence`、`parallel`、`race`、`conversation` +- `routing.steps`: `{ "providerId": "codex", "prompt": "...", "outputAs": "...", "timeoutMs": 300000 }[]` +- `routing.participants`、`routing.maxTurns`、`routing.stopConditions` 用于 `conversation` + +multi-agent 只允许通过 `/acp` 或 `/acp/rpc` 进入。`/gateway/openclaw` 仍是 OpenClaw task submit 专用入口,并继续拒绝 `multiAgent=true`。 + 统一结果字段: - `success` @@ -267,7 +281,9 @@ bridge 保证: - provider-specific 差异被 compat layer 吸收 - bridge core 不暴露 stdio/runtime 细节 - 中间通知统一通过 `session.update` +- Native ACP structured event 会透传到 `structuredEvent`,类型包括 `thinking`、`tool_call`、`text`、`status` - `session.message` 是续写合同;provider compat 缺少原会话状态时返回结构化 JSON-RPC error,不静默降级为新的 `session.start` +- provider 发起 `session/request_permission` 时 bridge 自动返回 allow/approved,避免 ACP provider 卡住等待人工确认 续写失败错误: @@ -313,7 +329,54 @@ gateway method family 保留为 control-plane contract: - `openclaw` 是 bridge-owned gateway provider,不是 app-facing direct route - gateway control-plane method 仍走 `/acp` 或 `/acp/rpc`,不走 `/gateway/openclaw` -## 11. 非 Contract 内容 +## 11. Internal Async Jobs + +`xworkmate.jobs.*` 只作为 `/acp` / `/acp/rpc` 内部 JSON-RPC method 暴露,不新增 `/jobs` HTTP path。 + +- `xworkmate.jobs.submit`:提交后台任务,立即返回 `jobId` +- `xworkmate.jobs.get`:按 `jobId` 查询状态和结果 +- `xworkmate.jobs.list`:返回 job 列表和 summary +- `xworkmate.jobs.stats`:返回状态统计 + +`submit` 输入重点: + +- `providerId` +- `sessionId` +- `threadId` +- `taskPrompt` +- `workingDirectory` +- `timeoutMs`,默认 10 分钟 +- `callbackUrl` / `webhookUrl` +- `target`、`channel`、`accountId`,用于通过 OpenClaw message tool 推送 Markdown card + +语义: + +- job 复用已有 provider compat/session 映射,不引入第二套 process pool +- 超过 `timeoutMs` 或默认 10 分钟仍未结束时标记 `failed` +- callback webhook 最多重试 3 次 +- `target` 非空时内部调用 `xworkmate.tools.invoke`,等价于 acp-bridge 的 OpenClaw `/tools/invoke` message send 能力 + +## 12. Internal Tools Proxy + +`xworkmate.tools.invoke` 是 `/tools/invoke` 的 JSON-RPC 内部等价物。 + +输入: + +```json +{ + "tool": "message", + "action": "send", + "args": { + "channel": "discord", + "target": "channel:123", + "message": "..." + } +} +``` + +优先使用 `OPENCLAW_TOOLS_INVOKE_URL` / `OPENCLAW_TOOLS_TOKEN` 直连 OpenClaw tools HTTP endpoint;未配置时复用已连接的 OpenClaw gateway runtime 调用 `tools.invoke`。 + +## 13. 非 Contract 内容 以下内容不是 app contract: diff --git a/internal/acp/bootstrap.go b/internal/acp/bootstrap.go index b14926a..ec49617 100644 --- a/internal/acp/bootstrap.go +++ b/internal/acp/bootstrap.go @@ -26,6 +26,7 @@ func (s *Server) Bootstrap() { s.routingEngine = &DefaultRoutingEngine{server: s} s.orchestrator = NewSessionOrchestrator(s) + s.jobs = newJobManager(s) s.providers = make(map[string]ProviderCompat) s.sessions = make(map[string]*session) diff --git a/internal/acp/catalog.go b/internal/acp/catalog.go index fb4537d..7aca6a8 100644 --- a/internal/acp/catalog.go +++ b/internal/acp/catalog.go @@ -7,10 +7,10 @@ import ( type CapabilityCatalog struct { mu sync.RWMutex - ProviderCatalog []any `json:"providerCatalog"` - GatewayProviders []any `json:"gatewayProviders"` + ProviderCatalog []any `json:"providerCatalog"` + GatewayProviders []any `json:"gatewayProviders"` AvailableExecutionTargets []any `json:"availableExecutionTargets"` - ProviderProbeSummary []any `json:"providerProbeSummary"` + ProviderProbeSummary []any `json:"providerProbeSummary"` } func (c *CapabilityCatalog) Update(providers []any, targets []any) { @@ -26,7 +26,7 @@ func (c *CapabilityCatalog) Get() map[string]any { result := map[string]any{ "singleAgent": true, - "multiAgent": false, + "multiAgent": true, "providerCatalog": append([]any(nil), c.ProviderCatalog...), "gatewayProviders": append([]any(nil), c.GatewayProviders...), "availableExecutionTargets": append([]any(nil), c.AvailableExecutionTargets...), @@ -34,7 +34,7 @@ func (c *CapabilityCatalog) Get() map[string]any { } result["capabilities"] = map[string]any{ "single_agent": true, - "multi_agent": false, + "multi_agent": true, "providerCatalog": append([]any(nil), c.ProviderCatalog...), "gatewayProviders": append([]any(nil), c.GatewayProviders...), "availableExecutionTargets": append([]any(nil), c.AvailableExecutionTargets...), diff --git a/internal/acp/helpers.go b/internal/acp/helpers.go index 7125726..460e095 100644 --- a/internal/acp/helpers.go +++ b/internal/acp/helpers.go @@ -321,6 +321,54 @@ func extractExternalACPAssistantTextValue(value any) string { return normalizeExternalACPAssistantText(extractExternalACPTextValue(value)) } +func structuredExternalACPEvent(notification map[string]any) map[string]any { + if notification == nil { + return nil + } + method := strings.TrimSpace(stringValue(notification["method"])) + payload := asMap(notification["params"]) + if len(payload) == 0 { + payload = notification + } + update := asMap(payload["update"]) + if len(update) == 0 { + update = payload + } + item := asMap(payload["item"]) + source := update + if len(item) > 0 { + source = item + } + eventType := "status" + if strings.Contains(method, "thinking") || strings.TrimSpace(stringValue(source["thinking"])) != "" { + eventType = "thinking" + } else if strings.Contains(method, "tool") || len(asMap(source["toolCall"])) > 0 || len(asMap(source["tool_call"])) > 0 { + eventType = "tool_call" + } else if text := extractExternalACPAssistantTextValue(source); text != "" { + eventType = "text" + _ = text + } + result := map[string]any{ + "type": eventType, + "method": method, + } + if text := extractExternalACPAssistantTextValue(source); text != "" { + result["text"] = text + } + if status := strings.TrimSpace(firstNonEmptyString(source, "status", "sessionUpdate", "session_update")); status != "" { + result["status"] = status + } + if tool := asMap(source["toolCall"]); len(tool) > 0 { + result["toolCall"] = tool + } else if tool := asMap(source["tool_call"]); len(tool) > 0 { + result["toolCall"] = tool + } + if result["text"] == nil && result["status"] == nil && result["toolCall"] == nil && eventType == "status" { + return nil + } + return result +} + func normalizeExternalACPAssistantText(text string) string { normalized := strings.TrimSpace(text) if normalized == "" { diff --git a/internal/acp/http_handler.go b/internal/acp/http_handler.go index 0bc43f4..21f0d90 100644 --- a/internal/acp/http_handler.go +++ b/internal/acp/http_handler.go @@ -455,7 +455,7 @@ func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest, if params == nil { params = map[string]any{} } - if parseBool(params["multiAgent"]) { + if parseBool(params["multiAgent"]) || strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), "multi-agent") { return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"} } if provider := strings.TrimSpace(shared.StringArg(params, "provider", "")); provider != "" { @@ -475,6 +475,9 @@ func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest, if routing == nil { routing = map[string]any{} } + if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" { + return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"} + } if provider := strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", "")); provider != "" { return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitProviderId must not be set on /gateway/openclaw"} } diff --git a/internal/acp/jobs.go b/internal/acp/jobs.go new file mode 100644 index 0000000..26c0aba --- /dev/null +++ b/internal/acp/jobs.go @@ -0,0 +1,354 @@ +package acp + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync" + "time" + + "xworkmate-bridge/internal/shared" +) + +const ( + defaultJobTimeout = 10 * time.Minute + defaultWebhookMaxTries = 3 + defaultWebhookRetryWait = 2 * time.Second +) + +type jobManager struct { + server *Server + mu sync.Mutex + jobs map[string]*bridgeJob +} + +type bridgeJob struct { + JobID string + SessionID string + ThreadID string + ProviderID string + Prompt string + WorkingDir string + Status string + Result map[string]any + Error string + CreatedAt time.Time + StartedAt time.Time + CompletedAt time.Time + Timeout time.Duration + CallbackURL string + WebhookTries int + WebhookSent bool + Target string + Channel string + AccountID string + Params map[string]any +} + +func newJobManager(server *Server) *jobManager { + return &jobManager{server: server, jobs: make(map[string]*bridgeJob)} +} + +func (s *Server) handleJobMethod(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) { + if s.jobs == nil { + s.jobs = newJobManager(s) + } + switch method { + case "xworkmate.jobs.submit": + return s.jobs.submit(ctx, params, notify), nil + case "xworkmate.jobs.get": + return s.jobs.get(params), nil + case "xworkmate.jobs.list": + return s.jobs.list(), nil + case "xworkmate.jobs.stats": + return s.jobs.stats(), nil + default: + return nil, &shared.RPCError{Code: -32601, Message: "unknown jobs method: " + method} + } +} + +func (m *jobManager) submit(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any { + m.failStuck() + jobID := fmt.Sprintf("job-%d", time.Now().UnixNano()) + providerID := strings.TrimSpace(firstNonEmptyString(params, "providerId", "agent", "provider")) + routing := shared.AsMap(params["routing"]) + if providerID == "" { + providerID = strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", "")) + } + timeout := time.Duration(parsePositiveInt(params["timeoutMs"])) * time.Millisecond + if timeout <= 0 { + timeout = defaultJobTimeout + } + job := &bridgeJob{ + JobID: jobID, + SessionID: strings.TrimSpace(shared.StringArg(params, "sessionId", jobID)), + ThreadID: strings.TrimSpace(shared.StringArg(params, "threadId", jobID)), + ProviderID: providerID, + Prompt: strings.TrimSpace(shared.StringArg(params, "taskPrompt", shared.StringArg(params, "prompt", ""))), + WorkingDir: strings.TrimSpace(shared.StringArg(params, "workingDirectory", shared.StringArg(params, "cwd", ""))), + Status: "pending", + CreatedAt: time.Now(), + Timeout: timeout, + CallbackURL: strings.TrimSpace(shared.StringArg(params, "callbackUrl", shared.StringArg(params, "webhookUrl", ""))), + Target: strings.TrimSpace(shared.StringArg(params, "target", shared.StringArg(params, "discordTarget", ""))), + Channel: strings.TrimSpace(shared.StringArg(params, "channel", "discord")), + AccountID: strings.TrimSpace(shared.StringArg(params, "accountId", "")), + Params: cloneJobParams(params), + } + m.mu.Lock() + m.jobs[job.JobID] = job + m.mu.Unlock() + go m.run(ctx, job, notify) + return map[string]any{"jobId": job.JobID, "status": job.Status, "sessionId": job.SessionID, "providerId": job.ProviderID} +} + +func (m *jobManager) run(parent context.Context, job *bridgeJob, notify func(map[string]any)) { + m.setRunning(job) + ctx, cancel := context.WithTimeout(parent, job.Timeout) + defer cancel() + params := cloneJobParams(job.Params) + params["sessionId"] = job.SessionID + params["threadId"] = job.ThreadID + params["taskPrompt"] = job.Prompt + params["workingDirectory"] = job.WorkingDir + if job.ProviderID != "" && !isMultiAgentSessionRequest(params) { + params["routing"] = map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "singleAgent", + "explicitProviderId": job.ProviderID, + } + } + result, rpcErr := m.server.orchestrator.Process(ctx, "session.start", params, notify) + m.mu.Lock() + defer m.mu.Unlock() + job.CompletedAt = time.Now() + if rpcErr != nil { + job.Status = "failed" + job.Error = rpcErr.Message + } else { + job.Result = result + if parseBool(result["success"]) { + job.Status = "completed" + } else { + job.Status = "failed" + job.Error = firstNonEmptyString(result, "error", "message", "unavailableMessage") + } + } + go m.sendCallbacks(job) +} + +func (m *jobManager) setRunning(job *bridgeJob) { + m.mu.Lock() + defer m.mu.Unlock() + job.Status = "running" + job.StartedAt = time.Now() +} + +func (m *jobManager) get(params map[string]any) map[string]any { + m.failStuck() + jobID := strings.TrimSpace(shared.StringArg(params, "jobId", "")) + m.mu.Lock() + defer m.mu.Unlock() + if job := m.jobs[jobID]; job != nil { + return job.mapPayload() + } + return map[string]any{"status": "not_found", "jobId": jobID} +} + +func (m *jobManager) list() map[string]any { + m.failStuck() + m.mu.Lock() + defer m.mu.Unlock() + jobs := make([]map[string]any, 0, len(m.jobs)) + counts := map[string]int{"pending": 0, "running": 0, "completed": 0, "failed": 0} + for _, job := range m.jobs { + jobs = append(jobs, job.mapPayload()) + counts[job.Status]++ + } + summary := make(map[string]any, len(counts)) + for key, value := range counts { + summary[key] = value + } + return map[string]any{"jobs": jobs, "summary": summary} +} + +func (m *jobManager) stats() map[string]any { + summary := m.list()["summary"] + return map[string]any{"summary": summary} +} + +func (m *jobManager) failStuck() { + now := time.Now() + m.mu.Lock() + defer m.mu.Unlock() + for _, job := range m.jobs { + if job.Status != "running" && job.Status != "pending" { + continue + } + timeout := job.Timeout + if timeout <= 0 { + timeout = defaultJobTimeout + } + if now.Sub(job.CreatedAt) > timeout { + job.Status = "failed" + job.Error = "job exceeded timeout" + job.CompletedAt = now + go m.sendCallbacks(job) + } + } +} + +func (m *jobManager) sendCallbacks(job *bridgeJob) { + if job == nil { + return + } + if job.CallbackURL != "" { + m.sendWebhook(job) + } + if job.Target != "" { + _, _ = m.server.invokeOpenClawTool(context.Background(), map[string]any{ + "tool": "message", + "action": "send", + "channel": job.Channel, + "accountId": job.AccountID, + "args": map[string]any{ + "channel": job.Channel, + "target": job.Target, + "message": job.markdownCard(), + }, + }) + } +} + +func (m *jobManager) sendWebhook(job *bridgeJob) { + payload, _ := json.Marshal(job.mapPayload()) + for attempt := 1; attempt <= defaultWebhookMaxTries; attempt++ { + req, err := http.NewRequest(http.MethodPost, job.CallbackURL, bytes.NewReader(payload)) + if err != nil { + break + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + job.WebhookTries = attempt + if err == nil && resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 { + job.WebhookSent = true + if resp.Body != nil { + _ = resp.Body.Close() + } + return + } + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024)) + _ = resp.Body.Close() + } + time.Sleep(defaultWebhookRetryWait) + } +} + +func (j *bridgeJob) mapPayload() map[string]any { + payload := map[string]any{ + "jobId": j.JobID, + "status": j.Status, + "sessionId": j.SessionID, + "threadId": j.ThreadID, + "providerId": j.ProviderID, + "createdAt": j.CreatedAt.UTC().Format(time.RFC3339Nano), + "webhookSent": j.WebhookSent, + "webhookTries": j.WebhookTries, + } + if !j.StartedAt.IsZero() { + payload["startedAt"] = j.StartedAt.UTC().Format(time.RFC3339Nano) + payload["elapsedMs"] = time.Since(j.StartedAt).Milliseconds() + } + if !j.CompletedAt.IsZero() { + payload["completedAt"] = j.CompletedAt.UTC().Format(time.RFC3339Nano) + payload["durationMs"] = j.CompletedAt.Sub(j.CreatedAt).Milliseconds() + } + if j.Result != nil { + payload["result"] = j.Result + } + if j.Error != "" { + payload["error"] = j.Error + } + return payload +} + +func (j *bridgeJob) markdownCard() string { + title := fmt.Sprintf("### XWorkmate job %s: %s", j.JobID, j.Status) + if j.Result != nil { + if summary := firstNonEmptyString(j.Result, "summary", "output", "message"); summary != "" { + return title + "\n\n" + summary + } + } + if j.Error != "" { + return title + "\n\n" + j.Error + } + return title +} + +func (s *Server) invokeOpenClawTool(ctx context.Context, params map[string]any) (map[string]any, *shared.RPCError) { + payload := map[string]any{ + "tool": strings.TrimSpace(shared.StringArg(params, "tool", "")), + "action": strings.TrimSpace(shared.StringArg(params, "action", "")), + "args": shared.AsMap(params["args"]), + } + if payload["tool"] == "" { + return nil, &shared.RPCError{Code: -32602, Message: "TOOL_REQUIRED"} + } + toolURL := strings.TrimSpace(os.Getenv("OPENCLAW_TOOLS_INVOKE_URL")) + if toolURL != "" { + body, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, toolURL, bytes.NewReader(body)) + if err != nil { + return nil, &shared.RPCError{Code: -32002, Message: err.Error()} + } + req.Header.Set("Content-Type", "application/json") + if token := strings.TrimSpace(os.Getenv("OPENCLAW_TOOLS_TOKEN")); token != "" { + req.Header.Set("Authorization", bearerHeader(token)) + } + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, &shared.RPCError{Code: -32002, Message: err.Error()} + } + defer func() { _ = response.Body.Close() }() + var decoded map[string]any + if err := json.NewDecoder(response.Body).Decode(&decoded); err != nil { + return nil, &shared.RPCError{Code: -32002, Message: err.Error()} + } + if response.StatusCode < 200 || response.StatusCode >= 300 { + return decoded, &shared.RPCError{Code: -32002, Message: "tools invoke failed"} + } + return decoded, nil + } + if s.gateway == nil { + return nil, &shared.RPCError{Code: -32001, Message: "GATEWAY_NOT_INITIALIZED"} + } + if rpcErr := ensureProductionGatewayConnected(s, "openclaw", nil); rpcErr != nil { + return nil, rpcErr + } + result := s.gateway.RequestByMode("openclaw", "tools.invoke", payload, 30*time.Second, nil) + if !result.OK { + return nil, gatewayRPCError(result.Error, "tools invoke failed") + } + return shared.AsMap(result.Payload), nil +} + +func bearerHeader(token string) string { + if strings.HasPrefix(strings.ToLower(token), "bearer ") { + return token + } + return "Bearer " + token +} + +func cloneJobParams(params map[string]any) map[string]any { + next := make(map[string]any, len(params)) + for key, value := range params { + next[key] = value + } + return next +} diff --git a/internal/acp/multi_agent.go b/internal/acp/multi_agent.go new file mode 100644 index 0000000..301ce8a --- /dev/null +++ b/internal/acp/multi_agent.go @@ -0,0 +1,612 @@ +package acp + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "xworkmate-bridge/internal/shared" +) + +const ( + multiAgentTargetID = "multi-agent" + defaultMultiAgentTimeout = 5 * time.Minute +) + +type multiAgentPlan struct { + Mode string + Steps []multiAgentStep + MaxTurns int + StopConditions []string + SharedWorkspace string +} + +type multiAgentStep struct { + Index int + ProviderID string + Prompt string + OutputAs string + Timeout time.Duration +} + +type multiAgentStepResult struct { + Index int + ProviderID string + Status string + Output string + Error string + DurationMs int64 + Result map[string]any +} + +func isMultiAgentSessionRequest(params map[string]any) bool { + if parseBool(params["multiAgent"]) { + return true + } + if strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), multiAgentTargetID) { + return true + } + routing := shared.AsMap(params["routing"]) + if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" { + return true + } + return false +} + +func (o *SessionOrchestrator) ProcessMultiAgent(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) { + if method != "session.start" && method != "session.message" { + return nil, &shared.RPCError{Code: -32601, Message: "MULTI_AGENT_METHOD_NOT_ALLOWED: " + method} + } + plan, rpcErr := o.parseMultiAgentPlan(params) + if rpcErr != nil { + return nil, rpcErr + } + + sessionID := shared.StringArg(params, "sessionId", "") + threadID := shared.StringArg(params, "threadId", sessionID) + turnID := fmt.Sprintf("turn-%d", time.Now().UnixNano()) + sess := o.server.getOrCreateSession(sessionID, threadID) + sess.mu.Lock() + sess.target = multiAgentTargetID + sess.provider = "" + sess.mode = multiAgentTargetID + sess.control.ControlPlaneSessionID = sessionID + sess.control.ThreadID = threadID + sess.control.RequestedWorkingDir = plan.SharedWorkspace + sess.control.RemoteWorkingDirHint = strings.TrimSpace(shared.StringArg(params, "remoteWorkingDirectoryHint", "")) + sess.control.UpdatedAt = time.Now() + sess.task = QueuedTask{ + SessionID: sessionID, + ThreadID: threadID, + TurnID: turnID, + Target: multiAgentTargetID, + State: TaskStateRunning, + Kind: TaskKindMultiAgent, + UpdatedAt: time.Now(), + } + if prompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", "")); prompt != "" { + sess.history = append(sess.history, "USER: "+prompt) + } + sess.mu.Unlock() + + o.server.emitSessionUpdate(notify, turnID, map[string]any{ + "type": "status", + "event": "multi_agent_started", + "message": "multi-agent orchestration started", + "mode": plan.Mode, + "stepCount": len(plan.Steps), + "pending": true, + "error": false, + }) + + result := o.runMultiAgentPlan(ctx, method, params, plan, sessionID, threadID, turnID, notify) + routing := RoutingResult{TargetID: multiAgentTargetID} + normalized := o.normalizeResult(sess, result, routing, turnID, withMultiAgentWorkingDirectory(params, plan.SharedWorkspace)) + o.server.emitSessionUpdate(notify, turnID, map[string]any{ + "type": "status", + "event": "multi_agent_completed", + "message": strings.TrimSpace(shared.StringArg(normalized, "summary", "multi-agent orchestration completed")), + "pending": false, + "error": !parseBool(normalized["success"]), + "result": normalized, + }) + return normalized, nil +} + +func (o *SessionOrchestrator) parseMultiAgentPlan(params map[string]any) (multiAgentPlan, *shared.RPCError) { + routing := shared.AsMap(params["routing"]) + mode := strings.ToLower(strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", ""))) + if mode == "" { + mode = strings.ToLower(strings.TrimSpace(shared.StringArg(params, "orchestrationMode", ""))) + } + if mode == "" { + mode = "sequence" + } + switch mode { + case "sequence", "parallel", "race", "conversation": + default: + return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_INVALID_MODE: " + mode} + } + + sharedWorkspace := strings.TrimSpace(shared.StringArg(routing, "sharedWorkspace", "")) + if sharedWorkspace == "" { + sharedWorkspace = strings.TrimSpace(shared.StringArg(params, "workingDirectory", "")) + } + if sharedWorkspace == "" { + sharedWorkspace = filepath.Join(os.TempDir(), "xworkmate-bridge", "multi-agent", safeMultiAgentPathSegment(shared.StringArg(params, "sessionId", "session"))) + } + if err := os.MkdirAll(sharedWorkspace, 0o755); err != nil { + return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_WORKSPACE_UNAVAILABLE: " + err.Error()} + } + + steps := parseMultiAgentSteps(shared.ListArg(routing, "steps"), params, defaultMultiAgentTimeout) + if len(steps) == 0 { + steps = parseMultiAgentSteps(shared.ListArg(params, "steps"), params, defaultMultiAgentTimeout) + } + if mode == "conversation" && len(steps) == 0 { + steps = parseConversationParticipants(shared.ListArg(routing, "participants"), params, defaultMultiAgentTimeout) + } + if len(steps) == 0 { + return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_STEPS_REQUIRED"} + } + for index := range steps { + steps[index].Index = index + if steps[index].ProviderID == "" { + return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_PROVIDER_REQUIRED"} + } + if steps[index].Prompt == "" { + return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_PROMPT_REQUIRED"} + } + } + + maxTurns := parsePositiveInt(routing["maxTurns"]) + if maxTurns == 0 { + maxTurns = parsePositiveInt(params["maxTurns"]) + } + if maxTurns == 0 { + maxTurns = len(steps) + } + if mode == "conversation" && maxTurns < len(steps) { + maxTurns = len(steps) + } + + stopConditions := parseMultiAgentStringList(routing["stopConditions"]) + if len(stopConditions) == 0 { + stopConditions = []string{"STATUS: DONE", "STATUS: CONSENSUS"} + } + + return multiAgentPlan{ + Mode: mode, + Steps: steps, + MaxTurns: maxTurns, + StopConditions: stopConditions, + SharedWorkspace: sharedWorkspace, + }, nil +} + +func parseMultiAgentSteps(raw []any, params map[string]any, fallbackTimeout time.Duration) []multiAgentStep { + steps := make([]multiAgentStep, 0, len(raw)) + defaultPrompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", "")) + for _, item := range raw { + stepParams := shared.AsMap(item) + if len(stepParams) == 0 { + continue + } + timeout := time.Duration(parsePositiveInt(stepParams["timeoutMs"])) * time.Millisecond + if timeout <= 0 { + timeout = time.Duration(parsePositiveInt(stepParams["timeoutSeconds"])) * time.Second + } + if timeout <= 0 { + timeout = fallbackTimeout + } + prompt := strings.TrimSpace(firstNonEmptyString(stepParams, "prompt", "taskPrompt")) + if prompt == "" { + prompt = defaultPrompt + } + steps = append(steps, multiAgentStep{ + ProviderID: strings.TrimSpace(firstNonEmptyString(stepParams, "providerId", "provider", "agent")), + Prompt: prompt, + OutputAs: strings.TrimSpace(shared.StringArg(stepParams, "outputAs", "")), + Timeout: timeout, + }) + } + return steps +} + +func parseConversationParticipants(raw []any, params map[string]any, fallbackTimeout time.Duration) []multiAgentStep { + steps := make([]multiAgentStep, 0, len(raw)) + prompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", "")) + for _, item := range raw { + providerID := strings.TrimSpace(fmt.Sprint(item)) + if mapped := shared.AsMap(item); len(mapped) > 0 { + providerID = strings.TrimSpace(firstNonEmptyString(mapped, "providerId", "provider", "agent")) + } + if providerID == "" || providerID == "" { + continue + } + steps = append(steps, multiAgentStep{ + ProviderID: providerID, + Prompt: prompt, + Timeout: fallbackTimeout, + }) + } + return steps +} + +func (o *SessionOrchestrator) runMultiAgentPlan(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any { + switch plan.Mode { + case "parallel": + return o.runMultiAgentParallel(ctx, method, params, plan, sessionID, threadID, turnID, notify) + case "race": + return o.runMultiAgentRace(ctx, method, params, plan, sessionID, threadID, turnID, notify) + case "conversation": + return o.runMultiAgentConversation(ctx, method, params, plan, sessionID, threadID, turnID, notify) + default: + return o.runMultiAgentSequence(ctx, method, params, plan, sessionID, threadID, turnID, notify) + } +} + +func (o *SessionOrchestrator) runMultiAgentSequence(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any { + values := map[string]string{"input": strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))} + results := make([]map[string]any, 0, len(plan.Steps)) + var lastOutput string + for _, step := range plan.Steps { + step.Prompt = renderMultiAgentPrompt(step.Prompt, values) + stepResult := o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, false, notify) + results = append(results, stepResult.Map()) + if stepResult.Status != "completed" { + return multiAgentResult(plan, results, "", stepResult.Error, false) + } + lastOutput = stepResult.Output + values["previousOutput"] = stepResult.Output + if step.OutputAs != "" { + values[step.OutputAs] = stepResult.Output + } + } + return multiAgentResult(plan, results, lastOutput, "", true) +} + +func (o *SessionOrchestrator) runMultiAgentParallel(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any { + results := make([]multiAgentStepResult, len(plan.Steps)) + var wg sync.WaitGroup + for _, step := range plan.Steps { + wg.Add(1) + go func(step multiAgentStep) { + defer wg.Done() + results[step.Index] = o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, false, notify) + }(step) + } + wg.Wait() + return multiAgentAggregateResult(plan, results) +} + +func (o *SessionOrchestrator) runMultiAgentRace(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any { + raceCtx, cancel := context.WithCancel(ctx) + defer cancel() + resultsCh := make(chan multiAgentStepResult, len(plan.Steps)) + for _, step := range plan.Steps { + go func(step multiAgentStep) { + resultsCh <- o.runMultiAgentProviderStep(raceCtx, method, params, plan, step, sessionID, threadID, turnID, false, notify) + }(step) + } + results := make([]multiAgentStepResult, 0, len(plan.Steps)) + for range plan.Steps { + stepResult := <-resultsCh + results = append(results, stepResult) + if stepResult.Status == "completed" { + cancel() + return multiAgentResult(plan, multiAgentStepResultMaps(results), stepResult.Output, "", true) + } + } + return multiAgentResult(plan, multiAgentStepResultMaps(results), "", "all agents failed", false) +} + +func (o *SessionOrchestrator) runMultiAgentConversation(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any { + results := make([]map[string]any, 0, plan.MaxTurns) + started := make(map[string]bool) + lastAgent := "" + lastOutput := "" + topic := strings.TrimSpace(shared.StringArg(params, "taskPrompt", "")) + for turn := 0; turn < plan.MaxTurns; turn++ { + step := plan.Steps[turn%len(plan.Steps)] + if lastOutput == "" { + step.Prompt = fmt.Sprintf("Topic:\n%s\n\nShared workspace: %s\n\nRespond as %s.", topic, plan.SharedWorkspace, step.ProviderID) + } else { + step.Prompt = fmt.Sprintf("[%s]: %s\n\nContinue the discussion as %s. Shared workspace: %s", lastAgent, lastOutput, step.ProviderID, plan.SharedWorkspace) + } + useSend := started[step.ProviderID] + stepResult := o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, useSend, notify) + started[step.ProviderID] = true + entry := stepResult.Map() + entry["conversationTurn"] = turn + 1 + results = append(results, entry) + if stepResult.Status != "completed" { + return multiAgentResult(plan, results, "", stepResult.Error, false) + } + lastAgent = step.ProviderID + lastOutput = stepResult.Output + if multiAgentConversationShouldStop(lastOutput, plan.StopConditions) { + result := multiAgentResult(plan, results, lastOutput, "", true) + result["stopReason"] = "stop_condition" + return result + } + } + result := multiAgentResult(plan, results, lastOutput, "", true) + result["stopReason"] = "max_turns" + return result +} + +func (o *SessionOrchestrator) runMultiAgentProviderStep(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, step multiAgentStep, sessionID string, threadID string, turnID string, useSend bool, notify func(map[string]any)) multiAgentStepResult { + startedAt := time.Now() + o.server.emitSessionUpdate(notify, turnID, map[string]any{ + "type": "status", + "event": "multi_agent_step_started", + "providerId": step.ProviderID, + "stepIndex": step.Index, + "pending": true, + "error": false, + }) + compat, ok := o.server.providers[step.ProviderID] + if !ok { + return failedMultiAgentStep(step, startedAt, "provider unavailable") + } + + stepCtx := ctx + cancel := func() {} + if step.Timeout > 0 { + stepCtx, cancel = context.WithTimeout(ctx, step.Timeout) + } + defer cancel() + + stepSessionID := fmt.Sprintf("%s/%s/%d/%s", sessionID, turnID, step.Index, step.ProviderID) + stepParams := multiAgentStepParams(params, step, plan.SharedWorkspace) + sink := func(update map[string]any) { + o.server.emitSessionUpdate(notify, turnID, map[string]any{ + "type": "delta", + "event": "multi_agent_step_update", + "providerId": step.ProviderID, + "stepIndex": step.Index, + "providerUpdate": update, + "pending": true, + "error": false, + }) + } + + var result map[string]any + var err error + if useSend { + result, err = compat.SendMessage(stepCtx, stepSessionID, threadID, stepParams, sink) + if _, ok := asSessionContinuationUnavailableError(err); ok { + result, err = compat.StartSession(stepCtx, stepSessionID, threadID, stepParams, sink) + } + } else { + result, err = compat.StartSession(stepCtx, stepSessionID, threadID, stepParams, sink) + } + stepResult := multiAgentStepResult{ + Index: step.Index, + ProviderID: step.ProviderID, + Status: "completed", + Output: strings.TrimSpace(multiAgentOutputFromResult(result)), + DurationMs: time.Since(startedAt).Milliseconds(), + Result: result, + } + if err != nil { + stepResult.Status = "failed" + stepResult.Error = err.Error() + } else if result != nil { + if value, ok := result["success"]; ok && !parseBool(value) { + stepResult.Status = "failed" + stepResult.Error = firstNonEmptyString(result, "error", "message", "unavailableMessage") + } + } + if stepResult.Status == "completed" && stepResult.Output == "" { + stepResult.Status = "failed" + stepResult.Error = "provider returned no displayable output" + } + if stepResult.Error == "" && stepResult.Status == "failed" { + stepResult.Error = "provider execution failed" + } + o.server.emitSessionUpdate(notify, turnID, map[string]any{ + "type": "status", + "event": "multi_agent_step_completed", + "providerId": step.ProviderID, + "stepIndex": step.Index, + "status": stepResult.Status, + "message": stepResult.Output, + "pending": false, + "error": stepResult.Status != "completed", + }) + return stepResult +} + +func multiAgentStepParams(params map[string]any, step multiAgentStep, sharedWorkspace string) map[string]any { + next := make(map[string]any, len(params)+2) + for key, value := range params { + next[key] = value + } + delete(next, "multiAgent") + delete(next, "mode") + next["taskPrompt"] = step.Prompt + next["workingDirectory"] = sharedWorkspace + next["routing"] = map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "singleAgent", + "explicitProviderId": step.ProviderID, + } + return next +} + +func multiAgentOutputFromResult(result map[string]any) string { + if result == nil { + return "" + } + return firstNonEmptyString(result, "output", "summary", "message", "text") +} + +func failedMultiAgentStep(step multiAgentStep, startedAt time.Time, message string) multiAgentStepResult { + return multiAgentStepResult{ + Index: step.Index, + ProviderID: step.ProviderID, + Status: "failed", + Error: message, + DurationMs: time.Since(startedAt).Milliseconds(), + } +} + +func (r multiAgentStepResult) Map() map[string]any { + result := map[string]any{ + "index": r.Index, + "providerId": r.ProviderID, + "status": r.Status, + "durationMs": r.DurationMs, + } + if r.Output != "" { + result["output"] = r.Output + } + if r.Error != "" { + result["error"] = r.Error + } + if r.Result != nil { + result["result"] = r.Result + } + return result +} + +func multiAgentAggregateResult(plan multiAgentPlan, results []multiAgentStepResult) map[string]any { + mapped := multiAgentStepResultMaps(results) + outputs := make([]string, 0, len(results)) + var errorText string + success := true + for _, result := range results { + if result.Status != "completed" { + success = false + if errorText == "" { + errorText = result.ProviderID + ": " + result.Error + } + continue + } + if result.Output != "" { + outputs = append(outputs, result.ProviderID+": "+result.Output) + } + } + return multiAgentResult(plan, mapped, strings.Join(outputs, "\n"), errorText, success) +} + +func multiAgentStepResultMaps(results []multiAgentStepResult) []map[string]any { + mapped := make([]map[string]any, 0, len(results)) + for _, result := range results { + mapped = append(mapped, result.Map()) + } + return mapped +} + +func multiAgentResult(plan multiAgentPlan, steps []map[string]any, output string, errorText string, success bool) map[string]any { + status := "completed" + if !success { + status = "failed" + } + if output == "" && errorText != "" { + output = errorText + } + result := map[string]any{ + "success": success, + "status": status, + "mode": multiAgentTargetID, + "orchestrationMode": plan.Mode, + "resolvedExecutionTarget": multiAgentTargetID, + "resolvedProviderId": "", + "resolvedGatewayProviderId": "", + "steps": steps, + "sharedWorkspace": plan.SharedWorkspace, + } + if output != "" { + result["output"] = output + result["summary"] = output + result["message"] = output + } + if errorText != "" { + result["error"] = errorText + } + return result +} + +func renderMultiAgentPrompt(prompt string, values map[string]string) string { + rendered := prompt + for key, value := range values { + rendered = strings.ReplaceAll(rendered, "{{"+key+"}}", value) + } + return rendered +} + +func multiAgentConversationShouldStop(output string, stopConditions []string) bool { + normalizedOutput := strings.ToUpper(output) + for _, condition := range stopConditions { + condition = strings.TrimSpace(condition) + if condition == "" { + continue + } + if strings.Contains(normalizedOutput, strings.ToUpper(condition)) { + return true + } + } + return false +} + +func parseMultiAgentStringList(raw any) []string { + switch values := raw.(type) { + case []string: + return append([]string(nil), values...) + case []any: + result := make([]string, 0, len(values)) + for _, value := range values { + text := strings.TrimSpace(fmt.Sprint(value)) + if text != "" && text != "" { + result = append(result, text) + } + } + return result + default: + return nil + } +} + +func withMultiAgentWorkingDirectory(params map[string]any, workingDirectory string) map[string]any { + next := make(map[string]any, len(params)+1) + for key, value := range params { + next[key] = value + } + if workingDirectory != "" { + next["workingDirectory"] = workingDirectory + } + return next +} + +func safeMultiAgentPathSegment(value string) string { + value = strings.TrimSpace(value) + if value == "" { + return "default" + } + var builder strings.Builder + for _, r := range value { + switch { + case r >= 'a' && r <= 'z': + builder.WriteRune(r) + case r >= 'A' && r <= 'Z': + builder.WriteRune(r) + case r >= '0' && r <= '9': + builder.WriteRune(r) + case r == '.', r == '_', r == '-': + builder.WriteRune(r) + default: + builder.WriteByte('-') + } + } + if builder.Len() == 0 { + return "default" + } + return builder.String() +} diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index 2d2e6cf..ebdb975 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -32,6 +32,10 @@ func NewSessionOrchestrator(server *Server) *SessionOrchestrator { } func (o *SessionOrchestrator) Process(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) { + if isMultiAgentSessionRequest(params) { + return o.ProcessMultiAgent(ctx, method, params, notify) + } + res, err := o.server.routingEngine.Resolve(ctx, params) if err != nil { if err.Error() == "ROUTING_REQUIRED" { diff --git a/internal/acp/provider_compat.go b/internal/acp/provider_compat.go index 1970e7d..d19dae8 100644 --- a/internal/acp/provider_compat.go +++ b/internal/acp/provider_compat.go @@ -365,10 +365,18 @@ func (c *codexCompat) writeAndReadWSRPC(ctx context.Context, conn *websocket.Con methodName := strings.TrimSpace(shared.StringArg(decoded, "method", "")) if methodName != "" { + if isExternalPermissionRequest(methodName) { + _ = writeExternalPermissionApproval(conn, decoded) + continue + } collector.observe(decoded) if isExternalSessionUpdateMethod(methodName) && sink != nil { update := shared.AsMap(decoded["params"]) if len(update) > 0 { + if structured := structuredExternalACPEvent(decoded); len(structured) > 0 { + update["structuredEvent"] = structured + update["eventType"] = structured["type"] + } sink(update) } } @@ -626,10 +634,18 @@ func (c *externalACPCompat) callWSRPC(ctx context.Context, method string, params methodName := strings.TrimSpace(shared.StringArg(decoded, "method", "")) if methodName != "" { + if isExternalPermissionRequest(methodName) { + _ = writeExternalPermissionApproval(conn, decoded) + continue + } collector.observe(decoded) if isExternalSessionUpdateMethod(methodName) && sink != nil { update := shared.AsMap(decoded["params"]) if len(update) > 0 { + if structured := structuredExternalACPEvent(decoded); len(structured) > 0 { + update["structuredEvent"] = structured + update["eventType"] = structured["type"] + } sink(update) } } @@ -657,6 +673,26 @@ func isExternalSessionUpdateMethod(method string) bool { } } +func isExternalPermissionRequest(method string) bool { + normalized := strings.TrimSpace(method) + return normalized == "session/request_permission" || normalized == "session.request_permission" || normalized == "request_permission" +} + +func writeExternalPermissionApproval(conn *websocket.Conn, request map[string]any) error { + if conn == nil || request == nil || request["id"] == nil { + return nil + } + return conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": request["id"], + "result": map[string]any{ + "approved": true, + "decision": "approved", + "behavior": "allow", + }, + }) +} + func parseExternalRPCResult(decoded map[string]any) (map[string]any, error) { if decoded == nil { return map[string]any{}, nil diff --git a/internal/acp/providers_sync_test.go b/internal/acp/providers_sync_test.go index 7388c44..06dd706 100644 --- a/internal/acp/providers_sync_test.go +++ b/internal/acp/providers_sync_test.go @@ -14,7 +14,7 @@ func setTestBridgeProvider(server *Server, provider syncedProvider) { server.providers = make(map[string]ProviderCompat) } server.providers[provider.ProviderID] = newProviderCompat(provider) - + if server.catalog != nil { server.catalog.ProviderCatalog = append(server.catalog.ProviderCatalog, map[string]any{ "providerId": provider.ProviderID, @@ -37,6 +37,13 @@ func TestCapabilitiesExposeBuiltInProductionProviderCatalog(t *testing.T) { } capabilities := response + if capabilities["multiAgent"] != true { + t.Fatalf("expected multiAgent capability to be enabled, got %#v", capabilities["multiAgent"]) + } + nestedCapabilities := shared.AsMap(capabilities["capabilities"]) + if nestedCapabilities["multi_agent"] != true { + t.Fatalf("expected nested multi_agent capability to be enabled, got %#v", nestedCapabilities["multi_agent"]) + } targets, ok := capabilities["availableExecutionTargets"].([]any) if !ok { // Try fallback decoding if it was serialized @@ -45,7 +52,7 @@ func TestCapabilitiesExposeBuiltInProductionProviderCatalog(t *testing.T) { t.Fatalf("expected availableExecutionTargets array, got %T", capabilities["availableExecutionTargets"]) } } - + foundAgent := false for _, target := range targets { if target == "agent" { diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index 0fdf02c..96e3376 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -622,6 +622,360 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) { } } +func TestExecuteSessionTaskMultiAgentModes(t *testing.T) { + cases := []struct { + name string + mode string + maxTurns int + wantSteps int + }{ + {name: "sequence", mode: "sequence", wantSteps: 2}, + {name: "parallel", mode: "parallel", wantSteps: 2}, + {name: "race", mode: "race", wantSteps: 1}, + {name: "conversation", mode: "conversation", maxTurns: 2, wantSteps: 2}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + server := NewServer() + opencodeProvider := newExternalSingleAgentProvider(t, "opencode", "opencode-output") + defer opencodeProvider.Close() + geminiProvider := newExternalSingleAgentProvider(t, "gemini", "gemini-output") + defer geminiProvider.Close() + setTestBridgeProvider(server, syncedProvider{ + ProviderID: "opencode", + Label: "OpenCode", + Endpoint: opencodeProvider.URL, + Enabled: true, + }) + setTestBridgeProvider(server, syncedProvider{ + ProviderID: "gemini", + Label: "Gemini", + Endpoint: geminiProvider.URL, + Enabled: true, + }) + + routing := map[string]any{ + "orchestrationMode": tc.mode, + "steps": []any{ + map[string]any{"providerId": "opencode", "prompt": "first"}, + map[string]any{"providerId": "gemini", "prompt": "second sees {{previousOutput}}"}, + }, + } + if tc.mode == "conversation" { + routing["maxTurns"] = tc.maxTurns + routing["participants"] = []any{"opencode", "gemini"} + } + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-multi-" + tc.name, + "threadId": "thread-multi-" + tc.name, + "taskPrompt": "coordinate two agents", + "workingDirectory": t.TempDir(), + "multiAgent": true, + "routing": routing, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected multi-agent response, got rpc error: %#v", rpcErr) + } + if !parseBool(response["success"]) { + t.Fatalf("expected successful multi-agent response, got %#v", response) + } + if got := response["resolvedExecutionTarget"]; got != "multi-agent" { + t.Fatalf("expected multi-agent execution target, got %#v", response) + } + if got := response["orchestrationMode"]; got != tc.mode { + t.Fatalf("expected orchestration mode %q, got %#v", tc.mode, response) + } + steps := mustStepMaps(t, response["steps"]) + if len(steps) != tc.wantSteps { + t.Fatalf("expected %d step results, got %#v", tc.wantSteps, steps) + } + if output := strings.TrimSpace(shared.StringArg(response, "output", "")); output == "" { + t.Fatalf("expected displayable multi-agent output, got %#v", response) + } + }) + } +} + +func TestExecuteSessionTaskMultiAgentProviderUnavailableIsResultFailure(t *testing.T) { + server := NewServer() + providerServer := newExternalSingleAgentProvider(t, "opencode", "opencode-output") + defer providerServer.Close() + setTestBridgeProvider(server, syncedProvider{ + ProviderID: "opencode", + Label: "OpenCode", + Endpoint: providerServer.URL, + Enabled: true, + }) + + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-multi-missing", + "threadId": "thread-multi-missing", + "taskPrompt": "coordinate agents", + "workingDirectory": t.TempDir(), + "multiAgent": true, + "routing": map[string]any{ + "orchestrationMode": "parallel", + "steps": []any{ + map[string]any{"providerId": "opencode", "prompt": "first"}, + map[string]any{"providerId": "missing", "prompt": "second"}, + }, + }, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected normalized failure result, got rpc error: %#v", rpcErr) + } + if parseBool(response["success"]) || response["status"] != "failed" { + t.Fatalf("expected failed multi-agent result, got %#v", response) + } + if !strings.Contains(strings.TrimSpace(shared.StringArg(response, "error", "")), "missing: provider unavailable") { + t.Fatalf("expected provider unavailable error, got %#v", response) + } +} + +func TestInternalJobsSubmitCompletesAndReportsStats(t *testing.T) { + server := NewServer() + providerServer := newExternalSingleAgentProvider(t, "opencode", "job-output") + defer providerServer.Close() + setTestBridgeProvider(server, syncedProvider{ + ProviderID: "opencode", + Label: "OpenCode", + Endpoint: providerServer.URL, + Enabled: true, + }) + + submitted, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "xworkmate.jobs.submit", + Params: map[string]any{ + "providerId": "opencode", + "sessionId": "job-session", + "threadId": "job-thread", + "taskPrompt": "run async job", + "workingDirectory": t.TempDir(), + "timeoutMs": 30_000, + "orchestrationMode": "sequence", + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected job submission, got %#v", rpcErr) + } + jobID := strings.TrimSpace(shared.StringArg(submitted, "jobId", "")) + if jobID == "" { + t.Fatalf("expected job id, got %#v", submitted) + } + + var job map[string]any + waitForCondition(t, func() bool { + job, _ = server.handleJobMethod(context.Background(), "xworkmate.jobs.get", map[string]any{"jobId": jobID}, nil) + return shared.StringArg(job, "status", "") == "completed" + }) + result := shared.AsMap(job["result"]) + if got := strings.TrimSpace(shared.StringArg(result, "output", "")); got != "job-output" { + t.Fatalf("expected job output, got %#v", job) + } + stats, rpcErr := server.handleJobMethod(context.Background(), "xworkmate.jobs.stats", nil, nil) + if rpcErr != nil { + t.Fatalf("expected stats, got %#v", rpcErr) + } + summary := shared.AsMap(stats["summary"]) + if got := fmt.Sprint(summary["completed"]); got != "1" { + t.Fatalf("expected completed job stats, got %#v", stats) + } +} + +func TestInternalJobWebhookRetriesUntilSuccess(t *testing.T) { + server := NewServer() + providerServer := newExternalSingleAgentProvider(t, "opencode", "job-output") + defer providerServer.Close() + setTestBridgeProvider(server, syncedProvider{ + ProviderID: "opencode", + Label: "OpenCode", + Endpoint: providerServer.URL, + Enabled: true, + }) + var callbackCount atomic.Int32 + callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count := callbackCount.Add(1) + if count == 1 { + http.Error(w, "retry", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + })) + defer callbackServer.Close() + + submitted, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "xworkmate.jobs.submit", + Params: map[string]any{ + "providerId": "opencode", + "sessionId": "job-webhook-session", + "threadId": "job-webhook-thread", + "taskPrompt": "run async job", + "workingDirectory": t.TempDir(), + "callbackUrl": callbackServer.URL, + "timeoutMs": 30_000, + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected job submission, got %#v", rpcErr) + } + jobID := strings.TrimSpace(shared.StringArg(submitted, "jobId", "")) + waitForCondition(t, func() bool { + job, _ := server.handleJobMethod(context.Background(), "xworkmate.jobs.get", map[string]any{"jobId": jobID}, nil) + return parseBool(job["webhookSent"]) && callbackCount.Load() >= 2 + }) +} + +func TestInternalToolsInvokeUsesOpenClawHTTPProxy(t *testing.T) { + var received map[string]any + toolsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if got := r.Header.Get("Authorization"); got != "Bearer tool-token" { + t.Fatalf("expected bearer token, got %q", got) + } + if err := json.NewDecoder(r.Body).Decode(&received); err != nil { + t.Fatalf("decode tool request: %v", err) + } + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + })) + defer toolsServer.Close() + t.Setenv("OPENCLAW_TOOLS_INVOKE_URL", toolsServer.URL) + t.Setenv("OPENCLAW_TOOLS_TOKEN", "tool-token") + + server := NewServer() + response, rpcErr := server.handleRequest(shared.RPCRequest{ + Method: "xworkmate.tools.invoke", + Params: map[string]any{ + "tool": "message", + "action": "send", + "args": map[string]any{ + "target": "channel:1", + "message": "hello", + }, + }, + }, nil) + if rpcErr != nil { + t.Fatalf("expected tools response, got %#v", rpcErr) + } + if !parseBool(response["ok"]) { + t.Fatalf("expected ok tools response, got %#v", response) + } + if got := shared.StringArg(received, "tool", ""); got != "message" { + t.Fatalf("expected message tool payload, got %#v", received) + } +} + +func TestExternalACPWebSocketAutoApprovesPermissionRequests(t *testing.T) { + var sawApproval atomic.Bool + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + wsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade: %v", err) + } + defer func() { _ = conn.Close() }() + var request map[string]any + if err := conn.ReadJSON(&request); err != nil { + t.Fatalf("read request: %v", err) + } + requestID := request["id"] + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": "perm-1", + "method": "session/request_permission", + "params": map[string]any{ + "reason": "test permission", + }, + }); err != nil { + t.Fatalf("write permission request: %v", err) + } + var approval map[string]any + if err := conn.ReadJSON(&approval); err != nil { + t.Fatalf("read approval: %v", err) + } + if approval["id"] == "perm-1" && parseBool(shared.AsMap(approval["result"])["approved"]) { + sawApproval.Store(true) + } + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": requestID, + "result": map[string]any{ + "success": true, + "output": "approved output", + }, + }) + })) + defer wsServer.Close() + + compat := newProviderCompat(syncedProvider{ + ProviderID: "hermes", + Label: "Hermes", + Endpoint: "ws" + strings.TrimPrefix(wsServer.URL, "http") + "/acp", + Enabled: true, + }) + result, err := compat.StartSession(context.Background(), "permission-session", "permission-thread", map[string]any{ + "taskPrompt": "needs permission", + }, nil) + if err != nil { + t.Fatalf("expected permission auto approval, got %v", err) + } + if !sawApproval.Load() { + t.Fatalf("expected permission approval response") + } + if got := shared.StringArg(result, "output", ""); got != "approved output" { + t.Fatalf("expected approved output, got %#v", result) + } +} + +func TestStructuredExternalACPEventClassifiesNativeStreams(t *testing.T) { + cases := []struct { + name string + raw map[string]any + want string + }{ + { + name: "thinking", + raw: map[string]any{ + "method": "session.update", + "params": map[string]any{"update": map[string]any{"thinking": "planning"}}, + }, + want: "thinking", + }, + { + name: "tool_call", + raw: map[string]any{ + "method": "item/tool_call", + "params": map[string]any{"item": map[string]any{"toolCall": map[string]any{"name": "read"}}}, + }, + want: "tool_call", + }, + { + name: "text", + raw: map[string]any{ + "method": "session.update", + "params": map[string]any{"update": map[string]any{"text": "hello"}}, + }, + want: "text", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + event := structuredExternalACPEvent(tc.raw) + if got := shared.StringArg(event, "type", ""); got != tc.want { + t.Fatalf("expected event type %q, got %#v", tc.want, event) + } + }) + } +} + func TestExecuteSessionTaskGatewaySurfacesOpenClawChatSendError(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -2227,6 +2581,35 @@ func sameMethods(got []string, want []string) bool { return true } +func mustStepMaps(t *testing.T, value any) []map[string]any { + t.Helper() + switch typed := value.(type) { + case []map[string]any: + return typed + case []any: + steps := make([]map[string]any, 0, len(typed)) + for _, item := range typed { + steps = append(steps, shared.AsMap(item)) + } + return steps + default: + t.Fatalf("expected step map list, got %#v", value) + return nil + } +} + +func waitForCondition(t *testing.T, condition func() bool) { + t.Helper() + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + if condition() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("timed out waiting for condition") +} + func TestExecuteSessionTaskAutoRoutingUsesBridgeProductionProviderOrder(t *testing.T) { workspaceDir := filepath.Join(t.TempDir(), "workspace") if err := os.MkdirAll(workspaceDir, 0o755); err != nil { diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index 5b0a4f7..1562480 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -56,6 +56,12 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string // Gateway 语义由专门的 Gateway 组件通过 Adapter 处理 return s.handleGatewayMethod(ctx, method, request.Params, notify) + case "xworkmate.jobs.submit", "xworkmate.jobs.get", "xworkmate.jobs.list", "xworkmate.jobs.stats": + return s.handleJobMethod(ctx, method, request.Params, notify) + + case "xworkmate.tools.invoke": + return s.invokeOpenClawTool(ctx, request.Params) + default: return nil, &shared.RPCError{ Code: -32601, diff --git a/internal/acp/types.go b/internal/acp/types.go index d264ef2..e5b9e8f 100644 --- a/internal/acp/types.go +++ b/internal/acp/types.go @@ -85,6 +85,7 @@ type Server struct { providerOrder []string gateway *gatewayruntime.Manager openClawGate *openClawGatewayAdmissionGate + jobs *jobManager // Legacy / Common authService interface{} // Minimal auth dependency diff --git a/internal/acp/web_contract_test.go b/internal/acp/web_contract_test.go index d63d196..56e9206 100644 --- a/internal/acp/web_contract_test.go +++ b/internal/acp/web_contract_test.go @@ -600,9 +600,11 @@ func TestHTTPHandlerGatewayOpenClawRejectsConflictingRouting(t *testing.T) { for _, payload := range []string{ `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"multiAgent":true}}`, + `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"mode":"multi-agent"}}`, `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"provider":"codex"}}`, `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"executionTarget":"agent"}}`, `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"gatewayProviderId":"other"}}`, + `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"orchestrationMode":"sequence"}}}`, `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"explicitProviderId":"codex"}}}`, `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"explicitExecutionTarget":"agent"}}}`, `{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"preferredGatewayProviderId":"other"}}}`,