feat: add internal multi-agent jobs and tools orchestration

This commit is contained in:
Haitao Pan 2026-05-18 16:55:40 +08:00
parent 087dd4f354
commit 760a8c0f3b
14 changed files with 1531 additions and 11 deletions

View File

@ -137,10 +137,15 @@ bridge 对 app 的稳定 method family 只有:
- `xworkmate.gateway.connect`
- `xworkmate.gateway.request`
- `xworkmate.gateway.disconnect`
- `xworkmate.jobs.submit`
- `xworkmate.jobs.get`
- `xworkmate.jobs.list`
- `xworkmate.jobs.stats`
- `xworkmate.tools.invoke`
路径约束:
- `/acp/rpc` 是 capabilities、routing、agent、multi-agent、cancel、close 的 canonical HTTP RPC 入口。
- `/acp/rpc` 是 capabilities、routing、agent、multi-agent、jobs、tools proxy、cancel、close 的 canonical HTTP RPC 入口。
- `/gateway/openclaw` 只允许 OpenClaw `session.start` 和 follow-up `session.message`
- `/gateway/openclaw` 拒绝 `acp.capabilities`、`xworkmate.routing.resolve`、`xworkmate.gateway.*`、`session.cancel` 和 `session.close`
@ -153,7 +158,7 @@ bridge 对 app 的稳定 method family 只有:
```json
{
"singleAgent": true,
"multiAgent": false,
"multiAgent": true,
"availableExecutionTargets": ["agent", "gateway"],
"providerCatalog": [
{ "providerId": "codex", "label": "Codex", "targets": ["agent"], "category": "native" },
@ -249,6 +254,15 @@ unavailable 示例:
- `workingDirectory`
- `routing`
multi-agent 输入仍使用同一个 `session.start` / `session.message` method不新增 HTTP path
- `multiAgent: true``mode: "multi-agent"`
- `routing.orchestrationMode`: `sequence`、`parallel`、`race`、`conversation`
- `routing.steps`: `{ "providerId": "codex", "prompt": "...", "outputAs": "...", "timeoutMs": 300000 }[]`
- `routing.participants`、`routing.maxTurns`、`routing.stopConditions` 用于 `conversation`
multi-agent 只允许通过 `/acp``/acp/rpc` 进入。`/gateway/openclaw` 仍是 OpenClaw task submit 专用入口,并继续拒绝 `multiAgent=true`
统一结果字段:
- `success`
@ -267,7 +281,9 @@ bridge 保证:
- provider-specific 差异被 compat layer 吸收
- bridge core 不暴露 stdio/runtime 细节
- 中间通知统一通过 `session.update`
- Native ACP structured event 会透传到 `structuredEvent`,类型包括 `thinking`、`tool_call`、`text`、`status`
- `session.message` 是续写合同provider compat 缺少原会话状态时返回结构化 JSON-RPC error不静默降级为新的 `session.start`
- provider 发起 `session/request_permission` 时 bridge 自动返回 allow/approved避免 ACP provider 卡住等待人工确认
续写失败错误:
@ -313,7 +329,54 @@ gateway method family 保留为 control-plane contract
- `openclaw` 是 bridge-owned gateway provider不是 app-facing direct route
- gateway control-plane method 仍走 `/acp``/acp/rpc`,不走 `/gateway/openclaw`
## 11. 非 Contract 内容
## 11. Internal Async Jobs
`xworkmate.jobs.*` 只作为 `/acp` / `/acp/rpc` 内部 JSON-RPC method 暴露,不新增 `/jobs` HTTP path。
- `xworkmate.jobs.submit`:提交后台任务,立即返回 `jobId`
- `xworkmate.jobs.get`:按 `jobId` 查询状态和结果
- `xworkmate.jobs.list`:返回 job 列表和 summary
- `xworkmate.jobs.stats`:返回状态统计
`submit` 输入重点:
- `providerId`
- `sessionId`
- `threadId`
- `taskPrompt`
- `workingDirectory`
- `timeoutMs`,默认 10 分钟
- `callbackUrl` / `webhookUrl`
- `target`、`channel`、`accountId`,用于通过 OpenClaw message tool 推送 Markdown card
语义:
- job 复用已有 provider compat/session 映射,不引入第二套 process pool
- 超过 `timeoutMs` 或默认 10 分钟仍未结束时标记 `failed`
- callback webhook 最多重试 3 次
- `target` 非空时内部调用 `xworkmate.tools.invoke`,等价于 acp-bridge 的 OpenClaw `/tools/invoke` message send 能力
## 12. Internal Tools Proxy
`xworkmate.tools.invoke``/tools/invoke` 的 JSON-RPC 内部等价物。
输入:
```json
{
"tool": "message",
"action": "send",
"args": {
"channel": "discord",
"target": "channel:123",
"message": "..."
}
}
```
优先使用 `OPENCLAW_TOOLS_INVOKE_URL` / `OPENCLAW_TOOLS_TOKEN` 直连 OpenClaw tools HTTP endpoint未配置时复用已连接的 OpenClaw gateway runtime 调用 `tools.invoke`
## 13. 非 Contract 内容
以下内容不是 app contract

View File

@ -26,6 +26,7 @@ func (s *Server) Bootstrap() {
s.routingEngine = &DefaultRoutingEngine{server: s}
s.orchestrator = NewSessionOrchestrator(s)
s.jobs = newJobManager(s)
s.providers = make(map[string]ProviderCompat)
s.sessions = make(map[string]*session)

View File

@ -7,10 +7,10 @@ import (
type CapabilityCatalog struct {
mu sync.RWMutex
ProviderCatalog []any `json:"providerCatalog"`
GatewayProviders []any `json:"gatewayProviders"`
ProviderCatalog []any `json:"providerCatalog"`
GatewayProviders []any `json:"gatewayProviders"`
AvailableExecutionTargets []any `json:"availableExecutionTargets"`
ProviderProbeSummary []any `json:"providerProbeSummary"`
ProviderProbeSummary []any `json:"providerProbeSummary"`
}
func (c *CapabilityCatalog) Update(providers []any, targets []any) {
@ -26,7 +26,7 @@ func (c *CapabilityCatalog) Get() map[string]any {
result := map[string]any{
"singleAgent": true,
"multiAgent": false,
"multiAgent": true,
"providerCatalog": append([]any(nil), c.ProviderCatalog...),
"gatewayProviders": append([]any(nil), c.GatewayProviders...),
"availableExecutionTargets": append([]any(nil), c.AvailableExecutionTargets...),
@ -34,7 +34,7 @@ func (c *CapabilityCatalog) Get() map[string]any {
}
result["capabilities"] = map[string]any{
"single_agent": true,
"multi_agent": false,
"multi_agent": true,
"providerCatalog": append([]any(nil), c.ProviderCatalog...),
"gatewayProviders": append([]any(nil), c.GatewayProviders...),
"availableExecutionTargets": append([]any(nil), c.AvailableExecutionTargets...),

View File

@ -321,6 +321,54 @@ func extractExternalACPAssistantTextValue(value any) string {
return normalizeExternalACPAssistantText(extractExternalACPTextValue(value))
}
func structuredExternalACPEvent(notification map[string]any) map[string]any {
if notification == nil {
return nil
}
method := strings.TrimSpace(stringValue(notification["method"]))
payload := asMap(notification["params"])
if len(payload) == 0 {
payload = notification
}
update := asMap(payload["update"])
if len(update) == 0 {
update = payload
}
item := asMap(payload["item"])
source := update
if len(item) > 0 {
source = item
}
eventType := "status"
if strings.Contains(method, "thinking") || strings.TrimSpace(stringValue(source["thinking"])) != "" {
eventType = "thinking"
} else if strings.Contains(method, "tool") || len(asMap(source["toolCall"])) > 0 || len(asMap(source["tool_call"])) > 0 {
eventType = "tool_call"
} else if text := extractExternalACPAssistantTextValue(source); text != "" {
eventType = "text"
_ = text
}
result := map[string]any{
"type": eventType,
"method": method,
}
if text := extractExternalACPAssistantTextValue(source); text != "" {
result["text"] = text
}
if status := strings.TrimSpace(firstNonEmptyString(source, "status", "sessionUpdate", "session_update")); status != "" {
result["status"] = status
}
if tool := asMap(source["toolCall"]); len(tool) > 0 {
result["toolCall"] = tool
} else if tool := asMap(source["tool_call"]); len(tool) > 0 {
result["toolCall"] = tool
}
if result["text"] == nil && result["status"] == nil && result["toolCall"] == nil && eventType == "status" {
return nil
}
return result
}
func normalizeExternalACPAssistantText(text string) string {
normalized := strings.TrimSpace(text)
if normalized == "" {

View File

@ -455,7 +455,7 @@ func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest,
if params == nil {
params = map[string]any{}
}
if parseBool(params["multiAgent"]) {
if parseBool(params["multiAgent"]) || strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), "multi-agent") {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"}
}
if provider := strings.TrimSpace(shared.StringArg(params, "provider", "")); provider != "" {
@ -475,6 +475,9 @@ func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest,
if routing == nil {
routing = map[string]any{}
}
if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"}
}
if provider := strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", "")); provider != "" {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitProviderId must not be set on /gateway/openclaw"}
}

354
internal/acp/jobs.go Normal file
View File

@ -0,0 +1,354 @@
package acp
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"xworkmate-bridge/internal/shared"
)
const (
defaultJobTimeout = 10 * time.Minute
defaultWebhookMaxTries = 3
defaultWebhookRetryWait = 2 * time.Second
)
type jobManager struct {
server *Server
mu sync.Mutex
jobs map[string]*bridgeJob
}
type bridgeJob struct {
JobID string
SessionID string
ThreadID string
ProviderID string
Prompt string
WorkingDir string
Status string
Result map[string]any
Error string
CreatedAt time.Time
StartedAt time.Time
CompletedAt time.Time
Timeout time.Duration
CallbackURL string
WebhookTries int
WebhookSent bool
Target string
Channel string
AccountID string
Params map[string]any
}
func newJobManager(server *Server) *jobManager {
return &jobManager{server: server, jobs: make(map[string]*bridgeJob)}
}
func (s *Server) handleJobMethod(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) {
if s.jobs == nil {
s.jobs = newJobManager(s)
}
switch method {
case "xworkmate.jobs.submit":
return s.jobs.submit(ctx, params, notify), nil
case "xworkmate.jobs.get":
return s.jobs.get(params), nil
case "xworkmate.jobs.list":
return s.jobs.list(), nil
case "xworkmate.jobs.stats":
return s.jobs.stats(), nil
default:
return nil, &shared.RPCError{Code: -32601, Message: "unknown jobs method: " + method}
}
}
func (m *jobManager) submit(ctx context.Context, params map[string]any, notify func(map[string]any)) map[string]any {
m.failStuck()
jobID := fmt.Sprintf("job-%d", time.Now().UnixNano())
providerID := strings.TrimSpace(firstNonEmptyString(params, "providerId", "agent", "provider"))
routing := shared.AsMap(params["routing"])
if providerID == "" {
providerID = strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", ""))
}
timeout := time.Duration(parsePositiveInt(params["timeoutMs"])) * time.Millisecond
if timeout <= 0 {
timeout = defaultJobTimeout
}
job := &bridgeJob{
JobID: jobID,
SessionID: strings.TrimSpace(shared.StringArg(params, "sessionId", jobID)),
ThreadID: strings.TrimSpace(shared.StringArg(params, "threadId", jobID)),
ProviderID: providerID,
Prompt: strings.TrimSpace(shared.StringArg(params, "taskPrompt", shared.StringArg(params, "prompt", ""))),
WorkingDir: strings.TrimSpace(shared.StringArg(params, "workingDirectory", shared.StringArg(params, "cwd", ""))),
Status: "pending",
CreatedAt: time.Now(),
Timeout: timeout,
CallbackURL: strings.TrimSpace(shared.StringArg(params, "callbackUrl", shared.StringArg(params, "webhookUrl", ""))),
Target: strings.TrimSpace(shared.StringArg(params, "target", shared.StringArg(params, "discordTarget", ""))),
Channel: strings.TrimSpace(shared.StringArg(params, "channel", "discord")),
AccountID: strings.TrimSpace(shared.StringArg(params, "accountId", "")),
Params: cloneJobParams(params),
}
m.mu.Lock()
m.jobs[job.JobID] = job
m.mu.Unlock()
go m.run(ctx, job, notify)
return map[string]any{"jobId": job.JobID, "status": job.Status, "sessionId": job.SessionID, "providerId": job.ProviderID}
}
func (m *jobManager) run(parent context.Context, job *bridgeJob, notify func(map[string]any)) {
m.setRunning(job)
ctx, cancel := context.WithTimeout(parent, job.Timeout)
defer cancel()
params := cloneJobParams(job.Params)
params["sessionId"] = job.SessionID
params["threadId"] = job.ThreadID
params["taskPrompt"] = job.Prompt
params["workingDirectory"] = job.WorkingDir
if job.ProviderID != "" && !isMultiAgentSessionRequest(params) {
params["routing"] = map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",
"explicitProviderId": job.ProviderID,
}
}
result, rpcErr := m.server.orchestrator.Process(ctx, "session.start", params, notify)
m.mu.Lock()
defer m.mu.Unlock()
job.CompletedAt = time.Now()
if rpcErr != nil {
job.Status = "failed"
job.Error = rpcErr.Message
} else {
job.Result = result
if parseBool(result["success"]) {
job.Status = "completed"
} else {
job.Status = "failed"
job.Error = firstNonEmptyString(result, "error", "message", "unavailableMessage")
}
}
go m.sendCallbacks(job)
}
func (m *jobManager) setRunning(job *bridgeJob) {
m.mu.Lock()
defer m.mu.Unlock()
job.Status = "running"
job.StartedAt = time.Now()
}
func (m *jobManager) get(params map[string]any) map[string]any {
m.failStuck()
jobID := strings.TrimSpace(shared.StringArg(params, "jobId", ""))
m.mu.Lock()
defer m.mu.Unlock()
if job := m.jobs[jobID]; job != nil {
return job.mapPayload()
}
return map[string]any{"status": "not_found", "jobId": jobID}
}
func (m *jobManager) list() map[string]any {
m.failStuck()
m.mu.Lock()
defer m.mu.Unlock()
jobs := make([]map[string]any, 0, len(m.jobs))
counts := map[string]int{"pending": 0, "running": 0, "completed": 0, "failed": 0}
for _, job := range m.jobs {
jobs = append(jobs, job.mapPayload())
counts[job.Status]++
}
summary := make(map[string]any, len(counts))
for key, value := range counts {
summary[key] = value
}
return map[string]any{"jobs": jobs, "summary": summary}
}
func (m *jobManager) stats() map[string]any {
summary := m.list()["summary"]
return map[string]any{"summary": summary}
}
func (m *jobManager) failStuck() {
now := time.Now()
m.mu.Lock()
defer m.mu.Unlock()
for _, job := range m.jobs {
if job.Status != "running" && job.Status != "pending" {
continue
}
timeout := job.Timeout
if timeout <= 0 {
timeout = defaultJobTimeout
}
if now.Sub(job.CreatedAt) > timeout {
job.Status = "failed"
job.Error = "job exceeded timeout"
job.CompletedAt = now
go m.sendCallbacks(job)
}
}
}
func (m *jobManager) sendCallbacks(job *bridgeJob) {
if job == nil {
return
}
if job.CallbackURL != "" {
m.sendWebhook(job)
}
if job.Target != "" {
_, _ = m.server.invokeOpenClawTool(context.Background(), map[string]any{
"tool": "message",
"action": "send",
"channel": job.Channel,
"accountId": job.AccountID,
"args": map[string]any{
"channel": job.Channel,
"target": job.Target,
"message": job.markdownCard(),
},
})
}
}
func (m *jobManager) sendWebhook(job *bridgeJob) {
payload, _ := json.Marshal(job.mapPayload())
for attempt := 1; attempt <= defaultWebhookMaxTries; attempt++ {
req, err := http.NewRequest(http.MethodPost, job.CallbackURL, bytes.NewReader(payload))
if err != nil {
break
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
job.WebhookTries = attempt
if err == nil && resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
job.WebhookSent = true
if resp.Body != nil {
_ = resp.Body.Close()
}
return
}
if resp != nil && resp.Body != nil {
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 1024))
_ = resp.Body.Close()
}
time.Sleep(defaultWebhookRetryWait)
}
}
func (j *bridgeJob) mapPayload() map[string]any {
payload := map[string]any{
"jobId": j.JobID,
"status": j.Status,
"sessionId": j.SessionID,
"threadId": j.ThreadID,
"providerId": j.ProviderID,
"createdAt": j.CreatedAt.UTC().Format(time.RFC3339Nano),
"webhookSent": j.WebhookSent,
"webhookTries": j.WebhookTries,
}
if !j.StartedAt.IsZero() {
payload["startedAt"] = j.StartedAt.UTC().Format(time.RFC3339Nano)
payload["elapsedMs"] = time.Since(j.StartedAt).Milliseconds()
}
if !j.CompletedAt.IsZero() {
payload["completedAt"] = j.CompletedAt.UTC().Format(time.RFC3339Nano)
payload["durationMs"] = j.CompletedAt.Sub(j.CreatedAt).Milliseconds()
}
if j.Result != nil {
payload["result"] = j.Result
}
if j.Error != "" {
payload["error"] = j.Error
}
return payload
}
func (j *bridgeJob) markdownCard() string {
title := fmt.Sprintf("### XWorkmate job %s: %s", j.JobID, j.Status)
if j.Result != nil {
if summary := firstNonEmptyString(j.Result, "summary", "output", "message"); summary != "" {
return title + "\n\n" + summary
}
}
if j.Error != "" {
return title + "\n\n" + j.Error
}
return title
}
func (s *Server) invokeOpenClawTool(ctx context.Context, params map[string]any) (map[string]any, *shared.RPCError) {
payload := map[string]any{
"tool": strings.TrimSpace(shared.StringArg(params, "tool", "")),
"action": strings.TrimSpace(shared.StringArg(params, "action", "")),
"args": shared.AsMap(params["args"]),
}
if payload["tool"] == "" {
return nil, &shared.RPCError{Code: -32602, Message: "TOOL_REQUIRED"}
}
toolURL := strings.TrimSpace(os.Getenv("OPENCLAW_TOOLS_INVOKE_URL"))
if toolURL != "" {
body, _ := json.Marshal(payload)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, toolURL, bytes.NewReader(body))
if err != nil {
return nil, &shared.RPCError{Code: -32002, Message: err.Error()}
}
req.Header.Set("Content-Type", "application/json")
if token := strings.TrimSpace(os.Getenv("OPENCLAW_TOOLS_TOKEN")); token != "" {
req.Header.Set("Authorization", bearerHeader(token))
}
response, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &shared.RPCError{Code: -32002, Message: err.Error()}
}
defer func() { _ = response.Body.Close() }()
var decoded map[string]any
if err := json.NewDecoder(response.Body).Decode(&decoded); err != nil {
return nil, &shared.RPCError{Code: -32002, Message: err.Error()}
}
if response.StatusCode < 200 || response.StatusCode >= 300 {
return decoded, &shared.RPCError{Code: -32002, Message: "tools invoke failed"}
}
return decoded, nil
}
if s.gateway == nil {
return nil, &shared.RPCError{Code: -32001, Message: "GATEWAY_NOT_INITIALIZED"}
}
if rpcErr := ensureProductionGatewayConnected(s, "openclaw", nil); rpcErr != nil {
return nil, rpcErr
}
result := s.gateway.RequestByMode("openclaw", "tools.invoke", payload, 30*time.Second, nil)
if !result.OK {
return nil, gatewayRPCError(result.Error, "tools invoke failed")
}
return shared.AsMap(result.Payload), nil
}
func bearerHeader(token string) string {
if strings.HasPrefix(strings.ToLower(token), "bearer ") {
return token
}
return "Bearer " + token
}
func cloneJobParams(params map[string]any) map[string]any {
next := make(map[string]any, len(params))
for key, value := range params {
next[key] = value
}
return next
}

612
internal/acp/multi_agent.go Normal file
View File

@ -0,0 +1,612 @@
package acp
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"xworkmate-bridge/internal/shared"
)
const (
multiAgentTargetID = "multi-agent"
defaultMultiAgentTimeout = 5 * time.Minute
)
type multiAgentPlan struct {
Mode string
Steps []multiAgentStep
MaxTurns int
StopConditions []string
SharedWorkspace string
}
type multiAgentStep struct {
Index int
ProviderID string
Prompt string
OutputAs string
Timeout time.Duration
}
type multiAgentStepResult struct {
Index int
ProviderID string
Status string
Output string
Error string
DurationMs int64
Result map[string]any
}
func isMultiAgentSessionRequest(params map[string]any) bool {
if parseBool(params["multiAgent"]) {
return true
}
if strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), multiAgentTargetID) {
return true
}
routing := shared.AsMap(params["routing"])
if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" {
return true
}
return false
}
func (o *SessionOrchestrator) ProcessMultiAgent(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) {
if method != "session.start" && method != "session.message" {
return nil, &shared.RPCError{Code: -32601, Message: "MULTI_AGENT_METHOD_NOT_ALLOWED: " + method}
}
plan, rpcErr := o.parseMultiAgentPlan(params)
if rpcErr != nil {
return nil, rpcErr
}
sessionID := shared.StringArg(params, "sessionId", "")
threadID := shared.StringArg(params, "threadId", sessionID)
turnID := fmt.Sprintf("turn-%d", time.Now().UnixNano())
sess := o.server.getOrCreateSession(sessionID, threadID)
sess.mu.Lock()
sess.target = multiAgentTargetID
sess.provider = ""
sess.mode = multiAgentTargetID
sess.control.ControlPlaneSessionID = sessionID
sess.control.ThreadID = threadID
sess.control.RequestedWorkingDir = plan.SharedWorkspace
sess.control.RemoteWorkingDirHint = strings.TrimSpace(shared.StringArg(params, "remoteWorkingDirectoryHint", ""))
sess.control.UpdatedAt = time.Now()
sess.task = QueuedTask{
SessionID: sessionID,
ThreadID: threadID,
TurnID: turnID,
Target: multiAgentTargetID,
State: TaskStateRunning,
Kind: TaskKindMultiAgent,
UpdatedAt: time.Now(),
}
if prompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", "")); prompt != "" {
sess.history = append(sess.history, "USER: "+prompt)
}
sess.mu.Unlock()
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_started",
"message": "multi-agent orchestration started",
"mode": plan.Mode,
"stepCount": len(plan.Steps),
"pending": true,
"error": false,
})
result := o.runMultiAgentPlan(ctx, method, params, plan, sessionID, threadID, turnID, notify)
routing := RoutingResult{TargetID: multiAgentTargetID}
normalized := o.normalizeResult(sess, result, routing, turnID, withMultiAgentWorkingDirectory(params, plan.SharedWorkspace))
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_completed",
"message": strings.TrimSpace(shared.StringArg(normalized, "summary", "multi-agent orchestration completed")),
"pending": false,
"error": !parseBool(normalized["success"]),
"result": normalized,
})
return normalized, nil
}
func (o *SessionOrchestrator) parseMultiAgentPlan(params map[string]any) (multiAgentPlan, *shared.RPCError) {
routing := shared.AsMap(params["routing"])
mode := strings.ToLower(strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")))
if mode == "" {
mode = strings.ToLower(strings.TrimSpace(shared.StringArg(params, "orchestrationMode", "")))
}
if mode == "" {
mode = "sequence"
}
switch mode {
case "sequence", "parallel", "race", "conversation":
default:
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_INVALID_MODE: " + mode}
}
sharedWorkspace := strings.TrimSpace(shared.StringArg(routing, "sharedWorkspace", ""))
if sharedWorkspace == "" {
sharedWorkspace = strings.TrimSpace(shared.StringArg(params, "workingDirectory", ""))
}
if sharedWorkspace == "" {
sharedWorkspace = filepath.Join(os.TempDir(), "xworkmate-bridge", "multi-agent", safeMultiAgentPathSegment(shared.StringArg(params, "sessionId", "session")))
}
if err := os.MkdirAll(sharedWorkspace, 0o755); err != nil {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_WORKSPACE_UNAVAILABLE: " + err.Error()}
}
steps := parseMultiAgentSteps(shared.ListArg(routing, "steps"), params, defaultMultiAgentTimeout)
if len(steps) == 0 {
steps = parseMultiAgentSteps(shared.ListArg(params, "steps"), params, defaultMultiAgentTimeout)
}
if mode == "conversation" && len(steps) == 0 {
steps = parseConversationParticipants(shared.ListArg(routing, "participants"), params, defaultMultiAgentTimeout)
}
if len(steps) == 0 {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_STEPS_REQUIRED"}
}
for index := range steps {
steps[index].Index = index
if steps[index].ProviderID == "" {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_PROVIDER_REQUIRED"}
}
if steps[index].Prompt == "" {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_PROMPT_REQUIRED"}
}
}
maxTurns := parsePositiveInt(routing["maxTurns"])
if maxTurns == 0 {
maxTurns = parsePositiveInt(params["maxTurns"])
}
if maxTurns == 0 {
maxTurns = len(steps)
}
if mode == "conversation" && maxTurns < len(steps) {
maxTurns = len(steps)
}
stopConditions := parseMultiAgentStringList(routing["stopConditions"])
if len(stopConditions) == 0 {
stopConditions = []string{"STATUS: DONE", "STATUS: CONSENSUS"}
}
return multiAgentPlan{
Mode: mode,
Steps: steps,
MaxTurns: maxTurns,
StopConditions: stopConditions,
SharedWorkspace: sharedWorkspace,
}, nil
}
func parseMultiAgentSteps(raw []any, params map[string]any, fallbackTimeout time.Duration) []multiAgentStep {
steps := make([]multiAgentStep, 0, len(raw))
defaultPrompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))
for _, item := range raw {
stepParams := shared.AsMap(item)
if len(stepParams) == 0 {
continue
}
timeout := time.Duration(parsePositiveInt(stepParams["timeoutMs"])) * time.Millisecond
if timeout <= 0 {
timeout = time.Duration(parsePositiveInt(stepParams["timeoutSeconds"])) * time.Second
}
if timeout <= 0 {
timeout = fallbackTimeout
}
prompt := strings.TrimSpace(firstNonEmptyString(stepParams, "prompt", "taskPrompt"))
if prompt == "" {
prompt = defaultPrompt
}
steps = append(steps, multiAgentStep{
ProviderID: strings.TrimSpace(firstNonEmptyString(stepParams, "providerId", "provider", "agent")),
Prompt: prompt,
OutputAs: strings.TrimSpace(shared.StringArg(stepParams, "outputAs", "")),
Timeout: timeout,
})
}
return steps
}
func parseConversationParticipants(raw []any, params map[string]any, fallbackTimeout time.Duration) []multiAgentStep {
steps := make([]multiAgentStep, 0, len(raw))
prompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))
for _, item := range raw {
providerID := strings.TrimSpace(fmt.Sprint(item))
if mapped := shared.AsMap(item); len(mapped) > 0 {
providerID = strings.TrimSpace(firstNonEmptyString(mapped, "providerId", "provider", "agent"))
}
if providerID == "" || providerID == "<nil>" {
continue
}
steps = append(steps, multiAgentStep{
ProviderID: providerID,
Prompt: prompt,
Timeout: fallbackTimeout,
})
}
return steps
}
func (o *SessionOrchestrator) runMultiAgentPlan(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
switch plan.Mode {
case "parallel":
return o.runMultiAgentParallel(ctx, method, params, plan, sessionID, threadID, turnID, notify)
case "race":
return o.runMultiAgentRace(ctx, method, params, plan, sessionID, threadID, turnID, notify)
case "conversation":
return o.runMultiAgentConversation(ctx, method, params, plan, sessionID, threadID, turnID, notify)
default:
return o.runMultiAgentSequence(ctx, method, params, plan, sessionID, threadID, turnID, notify)
}
}
func (o *SessionOrchestrator) runMultiAgentSequence(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
values := map[string]string{"input": strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))}
results := make([]map[string]any, 0, len(plan.Steps))
var lastOutput string
for _, step := range plan.Steps {
step.Prompt = renderMultiAgentPrompt(step.Prompt, values)
stepResult := o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, false, notify)
results = append(results, stepResult.Map())
if stepResult.Status != "completed" {
return multiAgentResult(plan, results, "", stepResult.Error, false)
}
lastOutput = stepResult.Output
values["previousOutput"] = stepResult.Output
if step.OutputAs != "" {
values[step.OutputAs] = stepResult.Output
}
}
return multiAgentResult(plan, results, lastOutput, "", true)
}
func (o *SessionOrchestrator) runMultiAgentParallel(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
results := make([]multiAgentStepResult, len(plan.Steps))
var wg sync.WaitGroup
for _, step := range plan.Steps {
wg.Add(1)
go func(step multiAgentStep) {
defer wg.Done()
results[step.Index] = o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, false, notify)
}(step)
}
wg.Wait()
return multiAgentAggregateResult(plan, results)
}
func (o *SessionOrchestrator) runMultiAgentRace(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
raceCtx, cancel := context.WithCancel(ctx)
defer cancel()
resultsCh := make(chan multiAgentStepResult, len(plan.Steps))
for _, step := range plan.Steps {
go func(step multiAgentStep) {
resultsCh <- o.runMultiAgentProviderStep(raceCtx, method, params, plan, step, sessionID, threadID, turnID, false, notify)
}(step)
}
results := make([]multiAgentStepResult, 0, len(plan.Steps))
for range plan.Steps {
stepResult := <-resultsCh
results = append(results, stepResult)
if stepResult.Status == "completed" {
cancel()
return multiAgentResult(plan, multiAgentStepResultMaps(results), stepResult.Output, "", true)
}
}
return multiAgentResult(plan, multiAgentStepResultMaps(results), "", "all agents failed", false)
}
func (o *SessionOrchestrator) runMultiAgentConversation(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
results := make([]map[string]any, 0, plan.MaxTurns)
started := make(map[string]bool)
lastAgent := ""
lastOutput := ""
topic := strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))
for turn := 0; turn < plan.MaxTurns; turn++ {
step := plan.Steps[turn%len(plan.Steps)]
if lastOutput == "" {
step.Prompt = fmt.Sprintf("Topic:\n%s\n\nShared workspace: %s\n\nRespond as %s.", topic, plan.SharedWorkspace, step.ProviderID)
} else {
step.Prompt = fmt.Sprintf("[%s]: %s\n\nContinue the discussion as %s. Shared workspace: %s", lastAgent, lastOutput, step.ProviderID, plan.SharedWorkspace)
}
useSend := started[step.ProviderID]
stepResult := o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, useSend, notify)
started[step.ProviderID] = true
entry := stepResult.Map()
entry["conversationTurn"] = turn + 1
results = append(results, entry)
if stepResult.Status != "completed" {
return multiAgentResult(plan, results, "", stepResult.Error, false)
}
lastAgent = step.ProviderID
lastOutput = stepResult.Output
if multiAgentConversationShouldStop(lastOutput, plan.StopConditions) {
result := multiAgentResult(plan, results, lastOutput, "", true)
result["stopReason"] = "stop_condition"
return result
}
}
result := multiAgentResult(plan, results, lastOutput, "", true)
result["stopReason"] = "max_turns"
return result
}
func (o *SessionOrchestrator) runMultiAgentProviderStep(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, step multiAgentStep, sessionID string, threadID string, turnID string, useSend bool, notify func(map[string]any)) multiAgentStepResult {
startedAt := time.Now()
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_step_started",
"providerId": step.ProviderID,
"stepIndex": step.Index,
"pending": true,
"error": false,
})
compat, ok := o.server.providers[step.ProviderID]
if !ok {
return failedMultiAgentStep(step, startedAt, "provider unavailable")
}
stepCtx := ctx
cancel := func() {}
if step.Timeout > 0 {
stepCtx, cancel = context.WithTimeout(ctx, step.Timeout)
}
defer cancel()
stepSessionID := fmt.Sprintf("%s/%s/%d/%s", sessionID, turnID, step.Index, step.ProviderID)
stepParams := multiAgentStepParams(params, step, plan.SharedWorkspace)
sink := func(update map[string]any) {
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "delta",
"event": "multi_agent_step_update",
"providerId": step.ProviderID,
"stepIndex": step.Index,
"providerUpdate": update,
"pending": true,
"error": false,
})
}
var result map[string]any
var err error
if useSend {
result, err = compat.SendMessage(stepCtx, stepSessionID, threadID, stepParams, sink)
if _, ok := asSessionContinuationUnavailableError(err); ok {
result, err = compat.StartSession(stepCtx, stepSessionID, threadID, stepParams, sink)
}
} else {
result, err = compat.StartSession(stepCtx, stepSessionID, threadID, stepParams, sink)
}
stepResult := multiAgentStepResult{
Index: step.Index,
ProviderID: step.ProviderID,
Status: "completed",
Output: strings.TrimSpace(multiAgentOutputFromResult(result)),
DurationMs: time.Since(startedAt).Milliseconds(),
Result: result,
}
if err != nil {
stepResult.Status = "failed"
stepResult.Error = err.Error()
} else if result != nil {
if value, ok := result["success"]; ok && !parseBool(value) {
stepResult.Status = "failed"
stepResult.Error = firstNonEmptyString(result, "error", "message", "unavailableMessage")
}
}
if stepResult.Status == "completed" && stepResult.Output == "" {
stepResult.Status = "failed"
stepResult.Error = "provider returned no displayable output"
}
if stepResult.Error == "" && stepResult.Status == "failed" {
stepResult.Error = "provider execution failed"
}
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_step_completed",
"providerId": step.ProviderID,
"stepIndex": step.Index,
"status": stepResult.Status,
"message": stepResult.Output,
"pending": false,
"error": stepResult.Status != "completed",
})
return stepResult
}
func multiAgentStepParams(params map[string]any, step multiAgentStep, sharedWorkspace string) map[string]any {
next := make(map[string]any, len(params)+2)
for key, value := range params {
next[key] = value
}
delete(next, "multiAgent")
delete(next, "mode")
next["taskPrompt"] = step.Prompt
next["workingDirectory"] = sharedWorkspace
next["routing"] = map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",
"explicitProviderId": step.ProviderID,
}
return next
}
func multiAgentOutputFromResult(result map[string]any) string {
if result == nil {
return ""
}
return firstNonEmptyString(result, "output", "summary", "message", "text")
}
func failedMultiAgentStep(step multiAgentStep, startedAt time.Time, message string) multiAgentStepResult {
return multiAgentStepResult{
Index: step.Index,
ProviderID: step.ProviderID,
Status: "failed",
Error: message,
DurationMs: time.Since(startedAt).Milliseconds(),
}
}
func (r multiAgentStepResult) Map() map[string]any {
result := map[string]any{
"index": r.Index,
"providerId": r.ProviderID,
"status": r.Status,
"durationMs": r.DurationMs,
}
if r.Output != "" {
result["output"] = r.Output
}
if r.Error != "" {
result["error"] = r.Error
}
if r.Result != nil {
result["result"] = r.Result
}
return result
}
func multiAgentAggregateResult(plan multiAgentPlan, results []multiAgentStepResult) map[string]any {
mapped := multiAgentStepResultMaps(results)
outputs := make([]string, 0, len(results))
var errorText string
success := true
for _, result := range results {
if result.Status != "completed" {
success = false
if errorText == "" {
errorText = result.ProviderID + ": " + result.Error
}
continue
}
if result.Output != "" {
outputs = append(outputs, result.ProviderID+": "+result.Output)
}
}
return multiAgentResult(plan, mapped, strings.Join(outputs, "\n"), errorText, success)
}
func multiAgentStepResultMaps(results []multiAgentStepResult) []map[string]any {
mapped := make([]map[string]any, 0, len(results))
for _, result := range results {
mapped = append(mapped, result.Map())
}
return mapped
}
func multiAgentResult(plan multiAgentPlan, steps []map[string]any, output string, errorText string, success bool) map[string]any {
status := "completed"
if !success {
status = "failed"
}
if output == "" && errorText != "" {
output = errorText
}
result := map[string]any{
"success": success,
"status": status,
"mode": multiAgentTargetID,
"orchestrationMode": plan.Mode,
"resolvedExecutionTarget": multiAgentTargetID,
"resolvedProviderId": "",
"resolvedGatewayProviderId": "",
"steps": steps,
"sharedWorkspace": plan.SharedWorkspace,
}
if output != "" {
result["output"] = output
result["summary"] = output
result["message"] = output
}
if errorText != "" {
result["error"] = errorText
}
return result
}
func renderMultiAgentPrompt(prompt string, values map[string]string) string {
rendered := prompt
for key, value := range values {
rendered = strings.ReplaceAll(rendered, "{{"+key+"}}", value)
}
return rendered
}
func multiAgentConversationShouldStop(output string, stopConditions []string) bool {
normalizedOutput := strings.ToUpper(output)
for _, condition := range stopConditions {
condition = strings.TrimSpace(condition)
if condition == "" {
continue
}
if strings.Contains(normalizedOutput, strings.ToUpper(condition)) {
return true
}
}
return false
}
func parseMultiAgentStringList(raw any) []string {
switch values := raw.(type) {
case []string:
return append([]string(nil), values...)
case []any:
result := make([]string, 0, len(values))
for _, value := range values {
text := strings.TrimSpace(fmt.Sprint(value))
if text != "" && text != "<nil>" {
result = append(result, text)
}
}
return result
default:
return nil
}
}
func withMultiAgentWorkingDirectory(params map[string]any, workingDirectory string) map[string]any {
next := make(map[string]any, len(params)+1)
for key, value := range params {
next[key] = value
}
if workingDirectory != "" {
next["workingDirectory"] = workingDirectory
}
return next
}
func safeMultiAgentPathSegment(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return "default"
}
var builder strings.Builder
for _, r := range value {
switch {
case r >= 'a' && r <= 'z':
builder.WriteRune(r)
case r >= 'A' && r <= 'Z':
builder.WriteRune(r)
case r >= '0' && r <= '9':
builder.WriteRune(r)
case r == '.', r == '_', r == '-':
builder.WriteRune(r)
default:
builder.WriteByte('-')
}
}
if builder.Len() == 0 {
return "default"
}
return builder.String()
}

View File

@ -32,6 +32,10 @@ func NewSessionOrchestrator(server *Server) *SessionOrchestrator {
}
func (o *SessionOrchestrator) Process(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) {
if isMultiAgentSessionRequest(params) {
return o.ProcessMultiAgent(ctx, method, params, notify)
}
res, err := o.server.routingEngine.Resolve(ctx, params)
if err != nil {
if err.Error() == "ROUTING_REQUIRED" {

View File

@ -365,10 +365,18 @@ func (c *codexCompat) writeAndReadWSRPC(ctx context.Context, conn *websocket.Con
methodName := strings.TrimSpace(shared.StringArg(decoded, "method", ""))
if methodName != "" {
if isExternalPermissionRequest(methodName) {
_ = writeExternalPermissionApproval(conn, decoded)
continue
}
collector.observe(decoded)
if isExternalSessionUpdateMethod(methodName) && sink != nil {
update := shared.AsMap(decoded["params"])
if len(update) > 0 {
if structured := structuredExternalACPEvent(decoded); len(structured) > 0 {
update["structuredEvent"] = structured
update["eventType"] = structured["type"]
}
sink(update)
}
}
@ -626,10 +634,18 @@ func (c *externalACPCompat) callWSRPC(ctx context.Context, method string, params
methodName := strings.TrimSpace(shared.StringArg(decoded, "method", ""))
if methodName != "" {
if isExternalPermissionRequest(methodName) {
_ = writeExternalPermissionApproval(conn, decoded)
continue
}
collector.observe(decoded)
if isExternalSessionUpdateMethod(methodName) && sink != nil {
update := shared.AsMap(decoded["params"])
if len(update) > 0 {
if structured := structuredExternalACPEvent(decoded); len(structured) > 0 {
update["structuredEvent"] = structured
update["eventType"] = structured["type"]
}
sink(update)
}
}
@ -657,6 +673,26 @@ func isExternalSessionUpdateMethod(method string) bool {
}
}
func isExternalPermissionRequest(method string) bool {
normalized := strings.TrimSpace(method)
return normalized == "session/request_permission" || normalized == "session.request_permission" || normalized == "request_permission"
}
func writeExternalPermissionApproval(conn *websocket.Conn, request map[string]any) error {
if conn == nil || request == nil || request["id"] == nil {
return nil
}
return conn.WriteJSON(map[string]any{
"jsonrpc": "2.0",
"id": request["id"],
"result": map[string]any{
"approved": true,
"decision": "approved",
"behavior": "allow",
},
})
}
func parseExternalRPCResult(decoded map[string]any) (map[string]any, error) {
if decoded == nil {
return map[string]any{}, nil

View File

@ -14,7 +14,7 @@ func setTestBridgeProvider(server *Server, provider syncedProvider) {
server.providers = make(map[string]ProviderCompat)
}
server.providers[provider.ProviderID] = newProviderCompat(provider)
if server.catalog != nil {
server.catalog.ProviderCatalog = append(server.catalog.ProviderCatalog, map[string]any{
"providerId": provider.ProviderID,
@ -37,6 +37,13 @@ func TestCapabilitiesExposeBuiltInProductionProviderCatalog(t *testing.T) {
}
capabilities := response
if capabilities["multiAgent"] != true {
t.Fatalf("expected multiAgent capability to be enabled, got %#v", capabilities["multiAgent"])
}
nestedCapabilities := shared.AsMap(capabilities["capabilities"])
if nestedCapabilities["multi_agent"] != true {
t.Fatalf("expected nested multi_agent capability to be enabled, got %#v", nestedCapabilities["multi_agent"])
}
targets, ok := capabilities["availableExecutionTargets"].([]any)
if !ok {
// Try fallback decoding if it was serialized
@ -45,7 +52,7 @@ func TestCapabilitiesExposeBuiltInProductionProviderCatalog(t *testing.T) {
t.Fatalf("expected availableExecutionTargets array, got %T", capabilities["availableExecutionTargets"])
}
}
foundAgent := false
for _, target := range targets {
if target == "agent" {

View File

@ -622,6 +622,360 @@ func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) {
}
}
func TestExecuteSessionTaskMultiAgentModes(t *testing.T) {
cases := []struct {
name string
mode string
maxTurns int
wantSteps int
}{
{name: "sequence", mode: "sequence", wantSteps: 2},
{name: "parallel", mode: "parallel", wantSteps: 2},
{name: "race", mode: "race", wantSteps: 1},
{name: "conversation", mode: "conversation", maxTurns: 2, wantSteps: 2},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
server := NewServer()
opencodeProvider := newExternalSingleAgentProvider(t, "opencode", "opencode-output")
defer opencodeProvider.Close()
geminiProvider := newExternalSingleAgentProvider(t, "gemini", "gemini-output")
defer geminiProvider.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "opencode",
Label: "OpenCode",
Endpoint: opencodeProvider.URL,
Enabled: true,
})
setTestBridgeProvider(server, syncedProvider{
ProviderID: "gemini",
Label: "Gemini",
Endpoint: geminiProvider.URL,
Enabled: true,
})
routing := map[string]any{
"orchestrationMode": tc.mode,
"steps": []any{
map[string]any{"providerId": "opencode", "prompt": "first"},
map[string]any{"providerId": "gemini", "prompt": "second sees {{previousOutput}}"},
},
}
if tc.mode == "conversation" {
routing["maxTurns"] = tc.maxTurns
routing["participants"] = []any{"opencode", "gemini"}
}
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-multi-" + tc.name,
"threadId": "thread-multi-" + tc.name,
"taskPrompt": "coordinate two agents",
"workingDirectory": t.TempDir(),
"multiAgent": true,
"routing": routing,
},
},
})
if rpcErr != nil {
t.Fatalf("expected multi-agent response, got rpc error: %#v", rpcErr)
}
if !parseBool(response["success"]) {
t.Fatalf("expected successful multi-agent response, got %#v", response)
}
if got := response["resolvedExecutionTarget"]; got != "multi-agent" {
t.Fatalf("expected multi-agent execution target, got %#v", response)
}
if got := response["orchestrationMode"]; got != tc.mode {
t.Fatalf("expected orchestration mode %q, got %#v", tc.mode, response)
}
steps := mustStepMaps(t, response["steps"])
if len(steps) != tc.wantSteps {
t.Fatalf("expected %d step results, got %#v", tc.wantSteps, steps)
}
if output := strings.TrimSpace(shared.StringArg(response, "output", "")); output == "" {
t.Fatalf("expected displayable multi-agent output, got %#v", response)
}
})
}
}
func TestExecuteSessionTaskMultiAgentProviderUnavailableIsResultFailure(t *testing.T) {
server := NewServer()
providerServer := newExternalSingleAgentProvider(t, "opencode", "opencode-output")
defer providerServer.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "opencode",
Label: "OpenCode",
Endpoint: providerServer.URL,
Enabled: true,
})
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-multi-missing",
"threadId": "thread-multi-missing",
"taskPrompt": "coordinate agents",
"workingDirectory": t.TempDir(),
"multiAgent": true,
"routing": map[string]any{
"orchestrationMode": "parallel",
"steps": []any{
map[string]any{"providerId": "opencode", "prompt": "first"},
map[string]any{"providerId": "missing", "prompt": "second"},
},
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected normalized failure result, got rpc error: %#v", rpcErr)
}
if parseBool(response["success"]) || response["status"] != "failed" {
t.Fatalf("expected failed multi-agent result, got %#v", response)
}
if !strings.Contains(strings.TrimSpace(shared.StringArg(response, "error", "")), "missing: provider unavailable") {
t.Fatalf("expected provider unavailable error, got %#v", response)
}
}
func TestInternalJobsSubmitCompletesAndReportsStats(t *testing.T) {
server := NewServer()
providerServer := newExternalSingleAgentProvider(t, "opencode", "job-output")
defer providerServer.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "opencode",
Label: "OpenCode",
Endpoint: providerServer.URL,
Enabled: true,
})
submitted, rpcErr := server.handleRequest(shared.RPCRequest{
Method: "xworkmate.jobs.submit",
Params: map[string]any{
"providerId": "opencode",
"sessionId": "job-session",
"threadId": "job-thread",
"taskPrompt": "run async job",
"workingDirectory": t.TempDir(),
"timeoutMs": 30_000,
"orchestrationMode": "sequence",
},
}, nil)
if rpcErr != nil {
t.Fatalf("expected job submission, got %#v", rpcErr)
}
jobID := strings.TrimSpace(shared.StringArg(submitted, "jobId", ""))
if jobID == "" {
t.Fatalf("expected job id, got %#v", submitted)
}
var job map[string]any
waitForCondition(t, func() bool {
job, _ = server.handleJobMethod(context.Background(), "xworkmate.jobs.get", map[string]any{"jobId": jobID}, nil)
return shared.StringArg(job, "status", "") == "completed"
})
result := shared.AsMap(job["result"])
if got := strings.TrimSpace(shared.StringArg(result, "output", "")); got != "job-output" {
t.Fatalf("expected job output, got %#v", job)
}
stats, rpcErr := server.handleJobMethod(context.Background(), "xworkmate.jobs.stats", nil, nil)
if rpcErr != nil {
t.Fatalf("expected stats, got %#v", rpcErr)
}
summary := shared.AsMap(stats["summary"])
if got := fmt.Sprint(summary["completed"]); got != "1" {
t.Fatalf("expected completed job stats, got %#v", stats)
}
}
func TestInternalJobWebhookRetriesUntilSuccess(t *testing.T) {
server := NewServer()
providerServer := newExternalSingleAgentProvider(t, "opencode", "job-output")
defer providerServer.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "opencode",
Label: "OpenCode",
Endpoint: providerServer.URL,
Enabled: true,
})
var callbackCount atomic.Int32
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count := callbackCount.Add(1)
if count == 1 {
http.Error(w, "retry", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}))
defer callbackServer.Close()
submitted, rpcErr := server.handleRequest(shared.RPCRequest{
Method: "xworkmate.jobs.submit",
Params: map[string]any{
"providerId": "opencode",
"sessionId": "job-webhook-session",
"threadId": "job-webhook-thread",
"taskPrompt": "run async job",
"workingDirectory": t.TempDir(),
"callbackUrl": callbackServer.URL,
"timeoutMs": 30_000,
},
}, nil)
if rpcErr != nil {
t.Fatalf("expected job submission, got %#v", rpcErr)
}
jobID := strings.TrimSpace(shared.StringArg(submitted, "jobId", ""))
waitForCondition(t, func() bool {
job, _ := server.handleJobMethod(context.Background(), "xworkmate.jobs.get", map[string]any{"jobId": jobID}, nil)
return parseBool(job["webhookSent"]) && callbackCount.Load() >= 2
})
}
func TestInternalToolsInvokeUsesOpenClawHTTPProxy(t *testing.T) {
var received map[string]any
toolsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got := r.Header.Get("Authorization"); got != "Bearer tool-token" {
t.Fatalf("expected bearer token, got %q", got)
}
if err := json.NewDecoder(r.Body).Decode(&received); err != nil {
t.Fatalf("decode tool request: %v", err)
}
_ = json.NewEncoder(w).Encode(map[string]any{"ok": true})
}))
defer toolsServer.Close()
t.Setenv("OPENCLAW_TOOLS_INVOKE_URL", toolsServer.URL)
t.Setenv("OPENCLAW_TOOLS_TOKEN", "tool-token")
server := NewServer()
response, rpcErr := server.handleRequest(shared.RPCRequest{
Method: "xworkmate.tools.invoke",
Params: map[string]any{
"tool": "message",
"action": "send",
"args": map[string]any{
"target": "channel:1",
"message": "hello",
},
},
}, nil)
if rpcErr != nil {
t.Fatalf("expected tools response, got %#v", rpcErr)
}
if !parseBool(response["ok"]) {
t.Fatalf("expected ok tools response, got %#v", response)
}
if got := shared.StringArg(received, "tool", ""); got != "message" {
t.Fatalf("expected message tool payload, got %#v", received)
}
}
func TestExternalACPWebSocketAutoApprovesPermissionRequests(t *testing.T) {
var sawApproval atomic.Bool
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
wsServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatalf("upgrade: %v", err)
}
defer func() { _ = conn.Close() }()
var request map[string]any
if err := conn.ReadJSON(&request); err != nil {
t.Fatalf("read request: %v", err)
}
requestID := request["id"]
if err := conn.WriteJSON(map[string]any{
"jsonrpc": "2.0",
"id": "perm-1",
"method": "session/request_permission",
"params": map[string]any{
"reason": "test permission",
},
}); err != nil {
t.Fatalf("write permission request: %v", err)
}
var approval map[string]any
if err := conn.ReadJSON(&approval); err != nil {
t.Fatalf("read approval: %v", err)
}
if approval["id"] == "perm-1" && parseBool(shared.AsMap(approval["result"])["approved"]) {
sawApproval.Store(true)
}
_ = conn.WriteJSON(map[string]any{
"jsonrpc": "2.0",
"id": requestID,
"result": map[string]any{
"success": true,
"output": "approved output",
},
})
}))
defer wsServer.Close()
compat := newProviderCompat(syncedProvider{
ProviderID: "hermes",
Label: "Hermes",
Endpoint: "ws" + strings.TrimPrefix(wsServer.URL, "http") + "/acp",
Enabled: true,
})
result, err := compat.StartSession(context.Background(), "permission-session", "permission-thread", map[string]any{
"taskPrompt": "needs permission",
}, nil)
if err != nil {
t.Fatalf("expected permission auto approval, got %v", err)
}
if !sawApproval.Load() {
t.Fatalf("expected permission approval response")
}
if got := shared.StringArg(result, "output", ""); got != "approved output" {
t.Fatalf("expected approved output, got %#v", result)
}
}
func TestStructuredExternalACPEventClassifiesNativeStreams(t *testing.T) {
cases := []struct {
name string
raw map[string]any
want string
}{
{
name: "thinking",
raw: map[string]any{
"method": "session.update",
"params": map[string]any{"update": map[string]any{"thinking": "planning"}},
},
want: "thinking",
},
{
name: "tool_call",
raw: map[string]any{
"method": "item/tool_call",
"params": map[string]any{"item": map[string]any{"toolCall": map[string]any{"name": "read"}}},
},
want: "tool_call",
},
{
name: "text",
raw: map[string]any{
"method": "session.update",
"params": map[string]any{"update": map[string]any{"text": "hello"}},
},
want: "text",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
event := structuredExternalACPEvent(tc.raw)
if got := shared.StringArg(event, "type", ""); got != tc.want {
t.Fatalf("expected event type %q, got %#v", tc.want, event)
}
})
}
}
func TestExecuteSessionTaskGatewaySurfacesOpenClawChatSendError(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
@ -2227,6 +2581,35 @@ func sameMethods(got []string, want []string) bool {
return true
}
func mustStepMaps(t *testing.T, value any) []map[string]any {
t.Helper()
switch typed := value.(type) {
case []map[string]any:
return typed
case []any:
steps := make([]map[string]any, 0, len(typed))
for _, item := range typed {
steps = append(steps, shared.AsMap(item))
}
return steps
default:
t.Fatalf("expected step map list, got %#v", value)
return nil
}
}
func waitForCondition(t *testing.T, condition func() bool) {
t.Helper()
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
if condition() {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timed out waiting for condition")
}
func TestExecuteSessionTaskAutoRoutingUsesBridgeProductionProviderOrder(t *testing.T) {
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {

View File

@ -56,6 +56,12 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
// Gateway 语义由专门的 Gateway 组件通过 Adapter 处理
return s.handleGatewayMethod(ctx, method, request.Params, notify)
case "xworkmate.jobs.submit", "xworkmate.jobs.get", "xworkmate.jobs.list", "xworkmate.jobs.stats":
return s.handleJobMethod(ctx, method, request.Params, notify)
case "xworkmate.tools.invoke":
return s.invokeOpenClawTool(ctx, request.Params)
default:
return nil, &shared.RPCError{
Code: -32601,

View File

@ -85,6 +85,7 @@ type Server struct {
providerOrder []string
gateway *gatewayruntime.Manager
openClawGate *openClawGatewayAdmissionGate
jobs *jobManager
// Legacy / Common
authService interface{} // Minimal auth dependency

View File

@ -600,9 +600,11 @@ func TestHTTPHandlerGatewayOpenClawRejectsConflictingRouting(t *testing.T) {
for _, payload := range []string{
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"multiAgent":true}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"mode":"multi-agent"}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"provider":"codex"}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"executionTarget":"agent"}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"gatewayProviderId":"other"}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"orchestrationMode":"sequence"}}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"explicitProviderId":"codex"}}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"explicitExecutionTarget":"agent"}}}`,
`{"jsonrpc":"2.0","id":1,"method":"session.start","params":{"routing":{"preferredGatewayProviderId":"other"}}}`,