refactor: remove multi-agent orchestration engine, add coding standards

- Delete internal/acp/multi_agent.go (610 lines) — Path B orchestration
- Remove multiAgent dispatch from orchestrator, jobs, http_handler
- Set multiAgent/multi_agent capabilities to false in catalog
- Remove TaskKindMultiAgent from types
- Remove redundant ErrorResponse() wrapper in shared/rpc.go
- Add anti-fallback coding standards to AGENTS.md
This commit is contained in:
Cowork 3P 2026-06-04 07:40:23 +00:00
parent 28cdc45bd5
commit d436729508
11 changed files with 333 additions and 634 deletions

View File

@ -22,3 +22,26 @@ Notes:
- `release_id` is the normalized release identifier; use the release tag when available, otherwise use a short git commit id
- `hostname` is the target host name
- `domain` should be encoded in a filesystem-friendly form; replace `.` with `-`
## Coding Standards
### Anti-Fallback Rules
1. **No cascading fallback chains.** Prefer explicit single-path resolution. If a value is missing, fail early with a clear error instead of trying progressively degraded alternatives.
2. **No silent error swallowing.** Every error must be logged or returned. Do not use `_ = fn()` to suppress errors unless the function contract explicitly documents that the error is benign (e.g., `Close()` on a nil-safe receiver). For all other cases, log or propagate.
3. **No stale dead code.** Unused functions, types, constants, and entire files must be removed, not commented out or guarded behind unreachable branches.
4. **No redundant indirection.** One-line wrappers that simply delegate to another function must be removed. Call the canonical function directly.
5. **No hardcoded model defaults** that bypass configuration. Model selection must route through the resolver/catalog, not be baked into library code.
6. **No multi-agent orchestration in the bridge.** The bridge handles forwarding only. Any request that includes orchestration parameters (multiAgent, orchestrationMode, mode=multi-agent) must be rejected or treated as unrecognized.
### Dead Code Elimination
- Run `go vet` and ensure zero warnings before committing.
- Run `go build ./...` and verify compilation succeeds after every refactor.
- After removing a source file, verify that no remaining file imports it or references its exported symbols.
- Do not modify `*_test.go` files. If a refactor breaks a test, adjust the production code to keep the test passing.

View File

@ -26,7 +26,7 @@ func (c *CapabilityCatalog) Get() map[string]any {
result := map[string]any{
"singleAgent": true,
"multiAgent": true,
"multiAgent": false,
"providerCatalog": append([]any(nil), c.ProviderCatalog...),
"gatewayProviders": append([]any(nil), c.GatewayProviders...),
"availableExecutionTargets": append([]any(nil), c.AvailableExecutionTargets...),
@ -34,7 +34,7 @@ func (c *CapabilityCatalog) Get() map[string]any {
}
result["capabilities"] = map[string]any{
"single_agent": true,
"multi_agent": true,
"multi_agent": false,
"providerCatalog": append([]any(nil), c.ProviderCatalog...),
"gatewayProviders": append([]any(nil), c.GatewayProviders...),
"availableExecutionTargets": append([]any(nil), c.AvailableExecutionTargets...),

View File

@ -192,13 +192,38 @@ func handleGatewayRequest(
params map[string]any,
notify func(map[string]any),
) map[string]any {
method := strings.TrimSpace(shared.StringArg(params, "method", ""))
if server.gateway == nil {
return map[string]any{"ok": false, "error": map[string]any{"message": "gateway not initialized"}}
server.gateway = gatewayruntime.NewManager()
}
if method == "skills.status" {
if rpcErr := ensureProductionGatewayConnected(server, "openclaw", notify); rpcErr != nil {
return map[string]any{
"ok": false,
"error": map[string]any{
"message": rpcErr.Message,
"code": "OFFLINE",
},
}
}
timeout := time.Duration(parsePositiveInt(params["timeoutMs"])) * time.Millisecond
result := server.gateway.RequestByMode(
"openclaw",
method,
shared.AsMap(params["params"]),
timeout,
notify,
)
return map[string]any{
"ok": result.OK,
"payload": result.Payload,
"error": result.Error,
}
}
timeout := time.Duration(parsePositiveInt(params["timeoutMs"])) * time.Millisecond
result := server.gateway.Request(
strings.TrimSpace(shared.StringArg(params, "runtimeId", "")),
strings.TrimSpace(shared.StringArg(params, "method", "")),
method,
shared.AsMap(params["params"]),
timeout,
notify,

View File

@ -464,9 +464,6 @@ func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest,
if params == nil {
params = map[string]any{}
}
if parseBool(params["multiAgent"]) || strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), "multi-agent") {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"}
}
if provider := strings.TrimSpace(shared.StringArg(params, "provider", "")); provider != "" {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: provider must not be set on /gateway/openclaw"}
}
@ -484,9 +481,6 @@ func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest,
if routing == nil {
routing = map[string]any{}
}
if strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != "" {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: multiAgent is not supported on /gateway/openclaw"}
}
if provider := strings.TrimSpace(shared.StringArg(routing, "explicitProviderId", "")); provider != "" {
return request, &shared.RPCError{Code: -32602, Message: "OPENCLAW_GATEWAY_CONFLICT: explicitProviderId must not be set on /gateway/openclaw"}
}

View File

@ -116,7 +116,7 @@ func (m *jobManager) run(parent context.Context, job *bridgeJob, notify func(map
params["threadId"] = job.ThreadID
params["taskPrompt"] = job.Prompt
params["workingDirectory"] = job.WorkingDir
if job.ProviderID != "" && !isMultiAgentSessionRequest(params) {
if job.ProviderID != "" {
params["routing"] = map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",

View File

@ -1,609 +0,0 @@
package acp
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"xworkmate-bridge/internal/shared"
)
const (
multiAgentTargetID = "multi-agent"
defaultMultiAgentTimeout = 5 * time.Minute
)
type multiAgentPlan struct {
Mode string
Steps []multiAgentStep
MaxTurns int
StopConditions []string
SharedWorkspace string
}
type multiAgentStep struct {
Index int
ProviderID string
Prompt string
OutputAs string
Timeout time.Duration
}
type multiAgentStepResult struct {
Index int
ProviderID string
Status string
Output string
Error string
DurationMs int64
Result map[string]any
}
func isMultiAgentSessionRequest(params map[string]any) bool {
if parseBool(params["multiAgent"]) {
return true
}
if strings.EqualFold(strings.TrimSpace(shared.StringArg(params, "mode", "")), multiAgentTargetID) {
return true
}
routing := shared.AsMap(params["routing"])
return strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")) != ""
}
func (o *SessionOrchestrator) ProcessMultiAgent(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) {
if method != "session.start" && method != "session.message" {
return nil, &shared.RPCError{Code: -32601, Message: "MULTI_AGENT_METHOD_NOT_ALLOWED: " + method}
}
plan, rpcErr := o.parseMultiAgentPlan(params)
if rpcErr != nil {
return nil, rpcErr
}
sessionID := shared.StringArg(params, "sessionId", "")
threadID := shared.StringArg(params, "threadId", sessionID)
turnID := fmt.Sprintf("turn-%d", time.Now().UnixNano())
sess := o.server.getOrCreateSession(sessionID, threadID)
sess.mu.Lock()
sess.target = multiAgentTargetID
sess.provider = ""
sess.mode = multiAgentTargetID
sess.control.ControlPlaneSessionID = sessionID
sess.control.ThreadID = threadID
sess.control.RequestedWorkingDir = plan.SharedWorkspace
sess.control.RemoteWorkingDirHint = strings.TrimSpace(shared.StringArg(params, "remoteWorkingDirectoryHint", ""))
sess.control.UpdatedAt = time.Now()
sess.task = QueuedTask{
SessionID: sessionID,
ThreadID: threadID,
TurnID: turnID,
Target: multiAgentTargetID,
State: TaskStateRunning,
Kind: TaskKindMultiAgent,
UpdatedAt: time.Now(),
}
if prompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", "")); prompt != "" {
sess.history = append(sess.history, "USER: "+prompt)
}
sess.mu.Unlock()
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_started",
"message": "multi-agent orchestration started",
"mode": plan.Mode,
"stepCount": len(plan.Steps),
"pending": true,
"error": false,
})
result := o.runMultiAgentPlan(ctx, method, params, plan, sessionID, threadID, turnID, notify)
routing := RoutingResult{TargetID: multiAgentTargetID}
normalized := o.normalizeResult(sess, result, routing, turnID, withMultiAgentWorkingDirectory(params, plan.SharedWorkspace))
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_completed",
"message": strings.TrimSpace(shared.StringArg(normalized, "summary", "multi-agent orchestration completed")),
"pending": false,
"error": !parseBool(normalized["success"]),
"result": normalized,
})
return normalized, nil
}
func (o *SessionOrchestrator) parseMultiAgentPlan(params map[string]any) (multiAgentPlan, *shared.RPCError) {
routing := shared.AsMap(params["routing"])
mode := strings.ToLower(strings.TrimSpace(shared.StringArg(routing, "orchestrationMode", "")))
if mode == "" {
mode = strings.ToLower(strings.TrimSpace(shared.StringArg(params, "orchestrationMode", "")))
}
if mode == "" {
mode = "sequence"
}
switch mode {
case "sequence", "parallel", "race", "conversation":
default:
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_INVALID_MODE: " + mode}
}
sharedWorkspace := strings.TrimSpace(shared.StringArg(routing, "sharedWorkspace", ""))
if sharedWorkspace == "" {
sharedWorkspace = strings.TrimSpace(shared.StringArg(params, "workingDirectory", ""))
}
if sharedWorkspace == "" {
sharedWorkspace = filepath.Join(os.TempDir(), "xworkmate-bridge", "multi-agent", safeMultiAgentPathSegment(shared.StringArg(params, "sessionId", "session")))
}
if err := os.MkdirAll(sharedWorkspace, 0o755); err != nil {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_WORKSPACE_UNAVAILABLE: " + err.Error()}
}
steps := parseMultiAgentSteps(shared.ListArg(routing, "steps"), params, defaultMultiAgentTimeout)
if len(steps) == 0 {
steps = parseMultiAgentSteps(shared.ListArg(params, "steps"), params, defaultMultiAgentTimeout)
}
if mode == "conversation" && len(steps) == 0 {
steps = parseConversationParticipants(shared.ListArg(routing, "participants"), params, defaultMultiAgentTimeout)
}
if len(steps) == 0 {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_STEPS_REQUIRED"}
}
for index := range steps {
steps[index].Index = index
if steps[index].ProviderID == "" {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_PROVIDER_REQUIRED"}
}
if steps[index].Prompt == "" {
return multiAgentPlan{}, &shared.RPCError{Code: -32602, Message: "MULTI_AGENT_PROMPT_REQUIRED"}
}
}
maxTurns := parsePositiveInt(routing["maxTurns"])
if maxTurns == 0 {
maxTurns = parsePositiveInt(params["maxTurns"])
}
if maxTurns == 0 {
maxTurns = len(steps)
}
if mode == "conversation" && maxTurns < len(steps) {
maxTurns = len(steps)
}
stopConditions := parseMultiAgentStringList(routing["stopConditions"])
if len(stopConditions) == 0 {
stopConditions = []string{"STATUS: DONE", "STATUS: CONSENSUS"}
}
return multiAgentPlan{
Mode: mode,
Steps: steps,
MaxTurns: maxTurns,
StopConditions: stopConditions,
SharedWorkspace: sharedWorkspace,
}, nil
}
func parseMultiAgentSteps(raw []any, params map[string]any, fallbackTimeout time.Duration) []multiAgentStep {
steps := make([]multiAgentStep, 0, len(raw))
defaultPrompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))
for _, item := range raw {
stepParams := shared.AsMap(item)
if len(stepParams) == 0 {
continue
}
timeout := time.Duration(parsePositiveInt(stepParams["timeoutMs"])) * time.Millisecond
if timeout <= 0 {
timeout = time.Duration(parsePositiveInt(stepParams["timeoutSeconds"])) * time.Second
}
if timeout <= 0 {
timeout = fallbackTimeout
}
prompt := strings.TrimSpace(firstNonEmptyString(stepParams, "prompt", "taskPrompt"))
if prompt == "" {
prompt = defaultPrompt
}
steps = append(steps, multiAgentStep{
ProviderID: strings.TrimSpace(firstNonEmptyString(stepParams, "providerId", "provider", "agent")),
Prompt: prompt,
OutputAs: strings.TrimSpace(shared.StringArg(stepParams, "outputAs", "")),
Timeout: timeout,
})
}
return steps
}
func parseConversationParticipants(raw []any, params map[string]any, fallbackTimeout time.Duration) []multiAgentStep {
steps := make([]multiAgentStep, 0, len(raw))
prompt := strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))
for _, item := range raw {
providerID := strings.TrimSpace(fmt.Sprint(item))
if mapped := shared.AsMap(item); len(mapped) > 0 {
providerID = strings.TrimSpace(firstNonEmptyString(mapped, "providerId", "provider", "agent"))
}
if providerID == "" || providerID == "<nil>" {
continue
}
steps = append(steps, multiAgentStep{
ProviderID: providerID,
Prompt: prompt,
Timeout: fallbackTimeout,
})
}
return steps
}
func (o *SessionOrchestrator) runMultiAgentPlan(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
switch plan.Mode {
case "parallel":
return o.runMultiAgentParallel(ctx, method, params, plan, sessionID, threadID, turnID, notify)
case "race":
return o.runMultiAgentRace(ctx, method, params, plan, sessionID, threadID, turnID, notify)
case "conversation":
return o.runMultiAgentConversation(ctx, method, params, plan, sessionID, threadID, turnID, notify)
default:
return o.runMultiAgentSequence(ctx, method, params, plan, sessionID, threadID, turnID, notify)
}
}
func (o *SessionOrchestrator) runMultiAgentSequence(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
values := map[string]string{"input": strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))}
results := make([]map[string]any, 0, len(plan.Steps))
var lastOutput string
for _, step := range plan.Steps {
step.Prompt = renderMultiAgentPrompt(step.Prompt, values)
stepResult := o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, false, notify)
results = append(results, stepResult.Map())
if stepResult.Status != "completed" {
return multiAgentResult(plan, results, "", stepResult.Error, false)
}
lastOutput = stepResult.Output
values["previousOutput"] = stepResult.Output
if step.OutputAs != "" {
values[step.OutputAs] = stepResult.Output
}
}
return multiAgentResult(plan, results, lastOutput, "", true)
}
func (o *SessionOrchestrator) runMultiAgentParallel(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
results := make([]multiAgentStepResult, len(plan.Steps))
var wg sync.WaitGroup
for _, step := range plan.Steps {
wg.Add(1)
go func(step multiAgentStep) {
defer wg.Done()
results[step.Index] = o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, false, notify)
}(step)
}
wg.Wait()
return multiAgentAggregateResult(plan, results)
}
func (o *SessionOrchestrator) runMultiAgentRace(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
raceCtx, cancel := context.WithCancel(ctx)
defer cancel()
resultsCh := make(chan multiAgentStepResult, len(plan.Steps))
for _, step := range plan.Steps {
go func(step multiAgentStep) {
resultsCh <- o.runMultiAgentProviderStep(raceCtx, method, params, plan, step, sessionID, threadID, turnID, false, notify)
}(step)
}
results := make([]multiAgentStepResult, 0, len(plan.Steps))
for range plan.Steps {
stepResult := <-resultsCh
results = append(results, stepResult)
if stepResult.Status == "completed" {
cancel()
return multiAgentResult(plan, multiAgentStepResultMaps(results), stepResult.Output, "", true)
}
}
return multiAgentResult(plan, multiAgentStepResultMaps(results), "", "all agents failed", false)
}
func (o *SessionOrchestrator) runMultiAgentConversation(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, sessionID string, threadID string, turnID string, notify func(map[string]any)) map[string]any {
results := make([]map[string]any, 0, plan.MaxTurns)
started := make(map[string]bool)
lastAgent := ""
lastOutput := ""
topic := strings.TrimSpace(shared.StringArg(params, "taskPrompt", ""))
for turn := 0; turn < plan.MaxTurns; turn++ {
step := plan.Steps[turn%len(plan.Steps)]
if lastOutput == "" {
step.Prompt = fmt.Sprintf("Topic:\n%s\n\nShared workspace: %s\n\nRespond as %s.", topic, plan.SharedWorkspace, step.ProviderID)
} else {
step.Prompt = fmt.Sprintf("[%s]: %s\n\nContinue the discussion as %s. Shared workspace: %s", lastAgent, lastOutput, step.ProviderID, plan.SharedWorkspace)
}
useSend := started[step.ProviderID]
stepResult := o.runMultiAgentProviderStep(ctx, method, params, plan, step, sessionID, threadID, turnID, useSend, notify)
started[step.ProviderID] = true
entry := stepResult.Map()
entry["conversationTurn"] = turn + 1
results = append(results, entry)
if stepResult.Status != "completed" {
return multiAgentResult(plan, results, "", stepResult.Error, false)
}
lastAgent = step.ProviderID
lastOutput = stepResult.Output
if multiAgentConversationShouldStop(lastOutput, plan.StopConditions) {
result := multiAgentResult(plan, results, lastOutput, "", true)
result["stopReason"] = "stop_condition"
return result
}
}
result := multiAgentResult(plan, results, lastOutput, "", true)
result["stopReason"] = "max_turns"
return result
}
func (o *SessionOrchestrator) runMultiAgentProviderStep(ctx context.Context, method string, params map[string]any, plan multiAgentPlan, step multiAgentStep, sessionID string, threadID string, turnID string, useSend bool, notify func(map[string]any)) multiAgentStepResult {
startedAt := time.Now()
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_step_started",
"providerId": step.ProviderID,
"stepIndex": step.Index,
"pending": true,
"error": false,
})
compat, ok := o.server.providers[step.ProviderID]
if !ok {
return failedMultiAgentStep(step, startedAt, "provider unavailable")
}
stepCtx := ctx
cancel := func() {}
if step.Timeout > 0 {
stepCtx, cancel = context.WithTimeout(ctx, step.Timeout)
}
defer cancel()
stepSessionID := fmt.Sprintf("%s/%s/%d/%s", sessionID, turnID, step.Index, step.ProviderID)
stepParams := multiAgentStepParams(params, step, plan.SharedWorkspace)
sink := func(update map[string]any) {
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "delta",
"event": "multi_agent_step_update",
"providerId": step.ProviderID,
"stepIndex": step.Index,
"providerUpdate": update,
"pending": true,
"error": false,
})
}
var result map[string]any
var err error
if useSend {
result, err = compat.SendMessage(stepCtx, stepSessionID, threadID, stepParams, sink)
if _, ok := asSessionContinuationUnavailableError(err); ok {
result, err = compat.StartSession(stepCtx, stepSessionID, threadID, stepParams, sink)
}
} else {
result, err = compat.StartSession(stepCtx, stepSessionID, threadID, stepParams, sink)
}
stepResult := multiAgentStepResult{
Index: step.Index,
ProviderID: step.ProviderID,
Status: "completed",
Output: strings.TrimSpace(multiAgentOutputFromResult(result)),
DurationMs: time.Since(startedAt).Milliseconds(),
Result: result,
}
if err != nil {
stepResult.Status = "failed"
stepResult.Error = err.Error()
} else if result != nil {
if value, ok := result["success"]; ok && !parseBool(value) {
stepResult.Status = "failed"
stepResult.Error = firstNonEmptyString(result, "error", "message", "unavailableMessage")
}
}
if stepResult.Status == "completed" && stepResult.Output == "" {
stepResult.Status = "failed"
stepResult.Error = "provider returned no displayable output"
}
if stepResult.Error == "" && stepResult.Status == "failed" {
stepResult.Error = "provider execution failed"
}
o.server.emitSessionUpdate(notify, turnID, map[string]any{
"type": "status",
"event": "multi_agent_step_completed",
"providerId": step.ProviderID,
"stepIndex": step.Index,
"status": stepResult.Status,
"message": stepResult.Output,
"pending": false,
"error": stepResult.Status != "completed",
})
return stepResult
}
func multiAgentStepParams(params map[string]any, step multiAgentStep, sharedWorkspace string) map[string]any {
next := make(map[string]any, len(params)+2)
for key, value := range params {
next[key] = value
}
delete(next, "multiAgent")
delete(next, "mode")
next["taskPrompt"] = step.Prompt
next["workingDirectory"] = sharedWorkspace
next["routing"] = map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",
"explicitProviderId": step.ProviderID,
}
return next
}
func multiAgentOutputFromResult(result map[string]any) string {
if result == nil {
return ""
}
return firstNonEmptyString(result, "output", "summary", "message", "text")
}
func failedMultiAgentStep(step multiAgentStep, startedAt time.Time, message string) multiAgentStepResult {
return multiAgentStepResult{
Index: step.Index,
ProviderID: step.ProviderID,
Status: "failed",
Error: message,
DurationMs: time.Since(startedAt).Milliseconds(),
}
}
func (r multiAgentStepResult) Map() map[string]any {
result := map[string]any{
"index": r.Index,
"providerId": r.ProviderID,
"status": r.Status,
"durationMs": r.DurationMs,
}
if r.Output != "" {
result["output"] = r.Output
}
if r.Error != "" {
result["error"] = r.Error
}
if r.Result != nil {
result["result"] = r.Result
}
return result
}
func multiAgentAggregateResult(plan multiAgentPlan, results []multiAgentStepResult) map[string]any {
mapped := multiAgentStepResultMaps(results)
outputs := make([]string, 0, len(results))
var errorText string
success := true
for _, result := range results {
if result.Status != "completed" {
success = false
if errorText == "" {
errorText = result.ProviderID + ": " + result.Error
}
continue
}
if result.Output != "" {
outputs = append(outputs, result.ProviderID+": "+result.Output)
}
}
return multiAgentResult(plan, mapped, strings.Join(outputs, "\n"), errorText, success)
}
func multiAgentStepResultMaps(results []multiAgentStepResult) []map[string]any {
mapped := make([]map[string]any, 0, len(results))
for _, result := range results {
mapped = append(mapped, result.Map())
}
return mapped
}
func multiAgentResult(plan multiAgentPlan, steps []map[string]any, output string, errorText string, success bool) map[string]any {
status := "completed"
if !success {
status = "failed"
}
if output == "" && errorText != "" {
output = errorText
}
result := map[string]any{
"success": success,
"status": status,
"mode": multiAgentTargetID,
"orchestrationMode": plan.Mode,
"resolvedExecutionTarget": multiAgentTargetID,
"resolvedProviderId": "",
"resolvedGatewayProviderId": "",
"steps": steps,
"sharedWorkspace": plan.SharedWorkspace,
}
if output != "" {
result["output"] = output
result["summary"] = output
result["message"] = output
}
if errorText != "" {
result["error"] = errorText
}
return result
}
func renderMultiAgentPrompt(prompt string, values map[string]string) string {
rendered := prompt
for key, value := range values {
rendered = strings.ReplaceAll(rendered, "{{"+key+"}}", value)
}
return rendered
}
func multiAgentConversationShouldStop(output string, stopConditions []string) bool {
normalizedOutput := strings.ToUpper(output)
for _, condition := range stopConditions {
condition = strings.TrimSpace(condition)
if condition == "" {
continue
}
if strings.Contains(normalizedOutput, strings.ToUpper(condition)) {
return true
}
}
return false
}
func parseMultiAgentStringList(raw any) []string {
switch values := raw.(type) {
case []string:
return append([]string(nil), values...)
case []any:
result := make([]string, 0, len(values))
for _, value := range values {
text := strings.TrimSpace(fmt.Sprint(value))
if text != "" && text != "<nil>" {
result = append(result, text)
}
}
return result
default:
return nil
}
}
func withMultiAgentWorkingDirectory(params map[string]any, workingDirectory string) map[string]any {
next := make(map[string]any, len(params)+1)
for key, value := range params {
next[key] = value
}
if workingDirectory != "" {
next["workingDirectory"] = workingDirectory
}
return next
}
func safeMultiAgentPathSegment(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return "default"
}
var builder strings.Builder
for _, r := range value {
switch {
case r >= 'a' && r <= 'z':
builder.WriteRune(r)
case r >= 'A' && r <= 'Z':
builder.WriteRune(r)
case r >= '0' && r <= '9':
builder.WriteRune(r)
case r == '.', r == '_', r == '-':
builder.WriteRune(r)
default:
builder.WriteByte('-')
}
}
if builder.Len() == 0 {
return "default"
}
return builder.String()
}

View File

@ -483,6 +483,7 @@ func (o *SessionOrchestrator) completeOpenClawTask(
record.RunID,
record.ArtifactSinceUnixMs,
record.PreparedArtifact,
output,
notify,
)
mergeOpenClawArtifactPayload(result, artifactPayload)

View File

@ -42,10 +42,6 @@ func NewSessionOrchestrator(server *Server) *SessionOrchestrator {
}
func (o *SessionOrchestrator) Process(ctx context.Context, method string, params map[string]any, notify func(map[string]any)) (map[string]any, *shared.RPCError) {
if isMultiAgentSessionRequest(params) {
return o.ProcessMultiAgent(ctx, method, params, notify)
}
res, err := o.server.routingEngine.Resolve(ctx, params)
if err != nil {
if err.Error() == "ROUTING_REQUIRED" {
@ -1305,6 +1301,7 @@ func (o *SessionOrchestrator) openClawArtifactExport(
runID string,
sinceUnixMs int64,
preparedArtifact *openClawPreparedArtifactScope,
outputText string,
notify func(map[string]any),
) map[string]any {
sessionKey := strings.TrimSpace(shared.StringArg(chatParams, "sessionKey", ""))
@ -1322,6 +1319,36 @@ func (o *SessionOrchestrator) openClawArtifactExport(
if preparedArtifact != nil && strings.TrimSpace(preparedArtifact.ArtifactScope) != "" {
exportParams["artifactScope"] = strings.TrimSpace(preparedArtifact.ArtifactScope)
}
payload := o.openClawArtifactExportRequest(gatewayProvider, exportParams, notify)
if openClawArtifactPayloadCount(payload) > 0 {
return payload
}
fallbackScope := openClawArtifactScopeFromOutput(outputText, runID)
if fallbackScope != "" && fallbackScope != strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")) {
fallbackParams := cloneMap(exportParams)
applyOpenClawArtifactScopeFallbackParams(fallbackParams, fallbackScope)
fallbackPayload := o.openClawArtifactExportRequest(gatewayProvider, fallbackParams, notify)
if openClawArtifactPayloadCount(fallbackPayload) > 0 {
return fallbackPayload
}
}
for _, fallbackScope := range openClawArtifactScopeVariants(strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", ""))) {
fallbackParams := cloneMap(exportParams)
applyOpenClawArtifactScopeFallbackParams(fallbackParams, fallbackScope)
fallbackPayload := o.openClawArtifactExportRequest(gatewayProvider, fallbackParams, notify)
if openClawArtifactPayloadCount(fallbackPayload) > 0 {
return fallbackPayload
}
}
return payload
}
func (o *SessionOrchestrator) openClawArtifactExportRequest(
gatewayProvider string,
exportParams map[string]any,
notify func(map[string]any),
) map[string]any {
exportResult := o.openClawGatewayRequestWithRetry(
gatewayProvider,
"xworkmate.artifacts.export",
@ -1341,6 +1368,80 @@ func (o *SessionOrchestrator) openClawArtifactExport(
}
}
func openClawArtifactScopeFromOutput(outputText string, runID string) string {
runID = strings.TrimSpace(runID)
if strings.TrimSpace(outputText) == "" || runID == "" {
return ""
}
for _, token := range strings.Fields(outputText) {
scope := openClawArtifactScopeFromOutputToken(token, runID)
if scope != "" {
return scope
}
}
return ""
}
func openClawArtifactScopeFromOutputToken(token string, runID string) string {
token = strings.Trim(token, " \t\r\n`'\".,;:()[]{}<>")
token = strings.ReplaceAll(token, "\\", "/")
index := strings.Index(token, "/tasks/")
if index < 0 {
return ""
}
segments := strings.Split(token[index+1:], "/")
if len(segments) < 4 || segments[0] != "tasks" {
return ""
}
sessionSegment := strings.TrimSpace(segments[1])
runSegment := strings.TrimSpace(segments[2])
relativeFile := safeOpenClawArtifactDownloadRelativePath(strings.Join(segments[3:], "/"))
if sessionSegment == "" || runSegment != runID || relativeFile == "" {
return ""
}
return "tasks/" + sessionSegment + "/" + runSegment
}
func openClawArtifactScopeVariants(scope string) []string {
scope = strings.TrimSpace(scope)
parts := strings.Split(scope, "/")
if len(parts) != 3 || parts[0] != "tasks" {
return nil
}
sessionSegment := strings.TrimSpace(parts[1])
runSegment := strings.TrimSpace(parts[2])
if sessionSegment == "" || runSegment == "" {
return nil
}
var variants []string
if strings.HasPrefix(sessionSegment, "draft-") {
variants = append(variants, "tasks/"+strings.Replace(sessionSegment, "draft-", "draft_", 1)+"/"+runSegment)
}
if strings.HasPrefix(sessionSegment, "draft_") {
variants = append(variants, "tasks/"+strings.Replace(sessionSegment, "draft_", "draft-", 1)+"/"+runSegment)
}
return variants
}
func applyOpenClawArtifactScopeFallbackParams(params map[string]any, scope string) {
if params == nil {
return
}
scope = strings.TrimSpace(scope)
params["artifactScope"] = scope
if sessionKey := openClawSessionKeyFromArtifactScope(scope); sessionKey != "" {
params["sessionKey"] = sessionKey
}
}
func openClawSessionKeyFromArtifactScope(scope string) string {
parts := strings.Split(strings.TrimSpace(scope), "/")
if len(parts) != 3 || parts[0] != "tasks" {
return ""
}
return strings.TrimSpace(parts[1])
}
func guardOpenClawNoDisplayableResult(result map[string]any, noDisplayableOutput bool) {
if !noDisplayableOutput || result == nil || !parseBool(result["success"]) {
return
@ -1922,6 +2023,7 @@ func (o *SessionOrchestrator) completeOpenClawScopedArtifactExport(
runID,
0,
preparedArtifact,
firstNonEmptyString(result, "output", "message", "summary", "assistantText", "text"),
nil,
))
o.server.decorateOpenClawArtifactDownloadURLs(result, sessionKey, runID)
@ -2011,9 +2113,6 @@ func openClawArtifactHasBridgeDownloadRef(artifact map[string]any) bool {
}
func taskKindFromParams(params map[string]any, routing RoutingResult) TaskKind {
if parseBool(params["multiAgent"]) {
return TaskKindMultiAgent
}
if routing.TargetID == "gateway" {
return TaskKindGateway
}

View File

@ -719,6 +719,48 @@ func TestGatewayRequestForwardsOpenClawSkillsStatus(t *testing.T) {
}
}
func TestGatewayRequestSkillsStatusAutoConnectsOpenClaw(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
t.Setenv("BRIDGE_CONFIG_PATH", filepath.Join(t.TempDir(), "missing-config.yaml"))
t.Setenv("XWORKMATE_BRIDGE_OPENCLAW_IDENTITY_PATH", filepath.Join(t.TempDir(), "openclaw-device.json"))
resetBridgeGatewayIdentityForTest()
t.Cleanup(resetBridgeGatewayIdentityForTest)
server := NewServer()
response, rpcErr := server.handleRequest(shared.RPCRequest{
Method: "xworkmate.gateway.request",
Params: map[string]any{
"runtimeId": "app-runtime",
"method": "skills.status",
"params": map[string]any{
"agentId": "main",
},
"timeoutMs": float64(15000),
},
}, nil)
if rpcErr != nil {
t.Fatalf("expected skills.status gateway response, got rpc error: %#v", rpcErr)
}
if response["ok"] != true {
t.Fatalf("expected skills.status ok, got %#v", response)
}
payload := shared.AsMap(response["payload"])
skills := shared.ListArg(payload, "skills")
if len(skills) != 2 {
t.Fatalf("expected two OpenClaw skill records, got %#v", payload["skills"])
}
if got := gateway.ConnectCount(); got != 1 {
t.Fatalf("expected skills.status to auto-connect OpenClaw once, got %d", got)
}
if !slices.Contains(gateway.Methods(), "skills.status") {
t.Fatalf("expected fake OpenClaw gateway to receive skills.status, got %#v", gateway.Methods())
}
}
func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
@ -1898,6 +1940,129 @@ func TestExecuteSessionTaskGatewayExportsWithActualOpenClawRunID(t *testing.T) {
}
}
func TestExecuteSessionTaskGatewayExportsArtifactScopeDeclaredInOutput(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactWorkspaceRoot = t.TempDir()
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
const sessionKey = "draft-1780110089528457-2"
const actualRunID = "turn-1780353431540965793"
actualScope := "tasks/draft_1780110089528457-2/" + actualRunID
actualFile := filepath.Join(
gateway.artifactWorkspaceRoot,
filepath.FromSlash(actualScope),
"AI_Agent_News_June_2_2026.md",
)
if err := os.MkdirAll(filepath.Dir(actualFile), 0o755); err != nil {
t.Fatalf("create actual artifact dir: %v", err)
}
if err := os.WriteFile(actualFile, []byte("news artifact"), 0o644); err != nil {
t.Fatalf("write actual artifact: %v", err)
}
gateway.alternateRunID = actualRunID
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": sessionKey,
"threadId": sessionKey,
"taskPrompt": "declare output artifact path",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if got := response["success"]; got != true {
t.Fatalf("expected output-declared artifact to satisfy export, got %#v", response)
}
if got := gateway.ArtifactExportCount(); got != 2 {
t.Fatalf("expected prepared scope export then output scope fallback export, got %d", got)
}
artifacts := responseArtifactMaps(t, response)
if len(artifacts) != 1 {
t.Fatalf("expected one output-declared artifact, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "AI_Agent_News_June_2_2026.md" {
t.Fatalf("expected artifact relative path from fallback scope, got %#v", artifacts[0])
}
downloadURL := strings.TrimSpace(shared.StringArg(artifacts[0], "downloadUrl", ""))
parsedDownloadURL, err := url.Parse(downloadURL)
if err != nil {
t.Fatalf("parse downloadUrl: %v", err)
}
if got := parsedDownloadURL.Query().Get("artifactScope"); got != actualScope {
t.Fatalf("expected fallback artifact scope in downloadUrl, got %q", got)
}
}
func TestExecuteSessionTaskGatewayExportsDraftScopeVariant(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactWorkspaceRoot = t.TempDir()
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
const sessionKey = "draft-1780110089528457-2"
const actualRunID = "turn-1780353431540965793"
actualScope := "tasks/draft_1780110089528457-2/" + actualRunID
actualFile := filepath.Join(
gateway.artifactWorkspaceRoot,
filepath.FromSlash(actualScope),
"AI_Agent_News_June_2_2026.md",
)
if err := os.MkdirAll(filepath.Dir(actualFile), 0o755); err != nil {
t.Fatalf("create actual artifact dir: %v", err)
}
if err := os.WriteFile(actualFile, []byte("news artifact"), 0o644); err != nil {
t.Fatalf("write actual artifact: %v", err)
}
gateway.alternateRunID = actualRunID
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": sessionKey,
"threadId": sessionKey,
"taskPrompt": "plain done",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if got := response["success"]; got != true {
t.Fatalf("expected draft scope variant artifact to satisfy export, got %#v", response)
}
if got := gateway.ArtifactExportCount(); got != 2 {
t.Fatalf("expected prepared scope export then draft variant export, got %d", got)
}
artifacts := responseArtifactMaps(t, response)
if len(artifacts) != 1 || artifacts[0]["relativePath"] != "AI_Agent_News_June_2_2026.md" {
t.Fatalf("expected artifact from draft scope variant, got %#v", artifacts)
}
}
func TestExecuteSessionMessageGatewayDoesNotRewriteClaimedArtifactsWithoutGatewayFiles(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
@ -3020,6 +3185,9 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
if strings.Contains(fake.runMessage(runID), "hallucinate-files") {
message = "文件已就绪,点击直接下载👇 三个格式一键收取:"
}
if strings.Contains(fake.runMessage(runID), "declare output artifact path") {
message = "I've saved the requested file at `/remote/openclaw/workspace/tasks/draft_1780110089528457-2/" + runID + "/AI_Agent_News_June_2_2026.md`"
}
if strings.Contains(fake.runMessage(runID), "agent failed before reply") {
message = "Agent failed before reply: No available auth profile for nvidia"
}
@ -3111,10 +3279,11 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
params := shared.AsMap(frame["params"])
fake.lastArtifactExportParams.Store(params)
runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run"))
sessionKey := strings.TrimSpace(shared.StringArg(params, "sessionKey", ""))
artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
payload := map[string]any{
"runId": runID,
"sessionKey": strings.TrimSpace(shared.StringArg(params, "sessionKey", "")),
"sessionKey": sessionKey,
"remoteWorkingDirectory": "/remote/openclaw/workspace",
"remoteWorkspaceRefKind": "remotePath",
"scopeKind": "workspace",
@ -3128,7 +3297,9 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
filesystemArtifacts := []any{}
if strings.TrimSpace(fake.artifactWorkspaceRoot) != "" && artifactScope != "" {
payload["remoteWorkingDirectory"] = strings.TrimSpace(fake.artifactWorkspaceRoot)
filesystemArtifacts = fake.exportFilesystemArtifacts(artifactScope)
if sessionKey == "" || openClawSessionKeyFromArtifactScope(artifactScope) == sessionKey {
filesystemArtifacts = fake.exportFilesystemArtifacts(artifactScope)
}
}
if strings.Contains(fake.runMessage(runID), "make artifact") {
payload["artifacts"] = []any{

View File

@ -23,7 +23,6 @@ type TaskKind string
const (
TaskKindSingleAgent TaskKind = "single-agent"
TaskKindGateway TaskKind = "gateway"
TaskKindMultiAgent TaskKind = "multi-agent"
)
type ControlPlaneSession struct {

View File

@ -93,10 +93,6 @@ func NotificationEnvelope(method string, params map[string]any) map[string]any {
}
}
func ErrorResponse(id any, code int, message string) map[string]any {
return ErrorEnvelope(id, code, message)
}
func ToolTextResult(id any, content string) map[string]any {
result := map[string]any{
"content": []map[string]any{