xworkmate-bridge/internal/acp/routing_test.go

1977 lines
65 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package acp
import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/gorilla/websocket"
"xworkmate-bridge/internal/shared"
)
func handleRoutingResolve(params map[string]any) map[string]any {
server := NewServer()
res, _ := server.routingEngine.Resolve(context.Background(), params)
// Convert to map for tests
m := map[string]any{
"resolvedExecutionTarget": res.TargetID,
"resolvedProviderId": res.ProviderID,
"resolvedGatewayProviderId": res.GatewayProviderID,
"resolvedModel": res.Model,
"resolvedSkills": res.Skills,
"status": res.Status,
"unavailable": res.Status == "unavailable",
"unavailableCode": res.UnavailableCode,
"unavailableMessage": res.UnavailableMsg,
"skillResolutionSource": res.SkillResolutionSource,
"needsSkillInstall": res.NeedsSkillInstall,
"skillInstallRequestId": res.SkillInstallRequestID,
}
return m
}
type task struct {
req shared.RPCRequest
notify func(map[string]any)
}
func (s *Server) executeSessionTask(t task) (map[string]any, *shared.RPCError) {
if t.req.Method == "" {
t.req.Method = "session.start"
}
return s.handleRequest(t.req, t.notify)
}
func newExternalSingleAgentProvider(
t *testing.T,
providerID string,
output string,
) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/acp/rpc" {
http.NotFound(w, r)
return
}
defer func() {
_ = r.Body.Close()
}()
var request map[string]any
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
t.Fatalf("decode request: %v", err)
}
method := strings.TrimSpace(shared.StringArg(request, "method", ""))
result := map[string]any{
"success": true,
"output": output,
"turnId": "turn-" + providerID,
"provider": providerID,
"mode": "single-agent",
}
switch method {
case "thread/start", "thread/resume":
result = map[string]any{"id": "provider-thread-" + providerID}
case "turn/start":
result["summary"] = output
}
_ = json.NewEncoder(w).Encode(map[string]any{
"jsonrpc": "2.0",
"id": request["id"],
"result": result,
})
}))
}
func TestHandleRoutingResolveCoversNineScenarioBuckets(t *testing.T) {
localAvailableSkills := []map[string]any{
{"id": "pptx", "label": "PPTX", "description": "slides", "installed": true},
{"id": "docx", "label": "DOCX", "description": "docs", "installed": true},
{"id": "xlsx", "label": "XLSX", "description": "sheets", "installed": true},
{"id": "pdf", "label": "PDF", "description": "pdf", "installed": true},
{"id": "image-resizer", "label": "image-resizer", "description": "image resize", "installed": true},
{"id": "browser-automation", "label": "Browser Automation", "description": "browser", "installed": true},
}
cases := []struct {
name string
prompt string
expectedExecutionTarget string
expectedGatewayProviderID string
expectedSkillSource string
expectedResolvedSkill string
expectedNeedsSkillInstall bool
}{
{
name: "powerpoint-pptx",
prompt: "create a powerpoint deck for this launch",
expectedExecutionTarget: "single-agent",
expectedSkillSource: "local_match",
expectedResolvedSkill: "PPTX",
},
{
name: "word-docx",
prompt: "draft a word document memo",
expectedExecutionTarget: "single-agent",
expectedSkillSource: "local_match",
expectedResolvedSkill: "DOCX",
},
{
name: "excel-xlsx",
prompt: "build an excel workbook with formulas",
expectedExecutionTarget: "single-agent",
expectedSkillSource: "local_match",
expectedResolvedSkill: "XLSX",
},
{
name: "pdf",
prompt: "merge and fill this pdf form",
expectedExecutionTarget: "single-agent",
expectedSkillSource: "local_match",
expectedResolvedSkill: "PDF",
},
{
name: "image-resizer",
prompt: "batch resize image assets",
expectedExecutionTarget: "single-agent",
expectedSkillSource: "local_match",
expectedResolvedSkill: "image-resizer",
},
{
name: "image-cog",
prompt: "use image-cog to generate consistent characters",
expectedExecutionTarget: "gateway",
expectedGatewayProviderID: "openclaw",
expectedSkillSource: "find_skills",
expectedNeedsSkillInstall: true,
},
{
name: "image-video-generation-editting",
prompt: "wan 图生视频并做视频编辑",
expectedExecutionTarget: "gateway",
expectedGatewayProviderID: "openclaw",
expectedSkillSource: "find_skills",
expectedNeedsSkillInstall: true,
},
{
name: "video-translator",
prompt: "translate video subtitles and dub the clip",
expectedExecutionTarget: "gateway",
expectedGatewayProviderID: "openclaw",
expectedSkillSource: "find_skills",
expectedNeedsSkillInstall: true,
},
{
name: "browser-search-news",
prompt: "跨浏览器执行并搜索最新资讯采集结果",
expectedExecutionTarget: "gateway",
expectedGatewayProviderID: "openclaw",
expectedSkillSource: "local_match",
expectedResolvedSkill: "Browser Automation",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := handleRoutingResolve(map[string]any{
"taskPrompt": tc.prompt,
"workingDirectory": "/tmp/workspace",
"routing": map[string]any{
"routingMode": "auto",
"preferredGatewayProviderId": "openclaw",
"allowSkillInstall": false,
"availableSkills": func() []any {
values := make([]any, 0, len(localAvailableSkills))
for _, item := range localAvailableSkills {
values = append(values, item)
}
return values
}(),
},
})
if got := result["resolvedExecutionTarget"]; got != tc.expectedExecutionTarget {
t.Fatalf("expected execution target %q, got %#v", tc.expectedExecutionTarget, got)
}
if tc.expectedGatewayProviderID != "" {
if got := result["resolvedGatewayProviderId"]; got != tc.expectedGatewayProviderID {
t.Fatalf("expected gateway provider %q, got %#v", tc.expectedGatewayProviderID, got)
}
}
if _, exists := result["resolvedEndpointTarget"]; exists {
t.Fatalf("expected resolvedEndpointTarget compatibility field to be removed, got %#v", result)
}
if got := result["skillResolutionSource"]; got != tc.expectedSkillSource {
t.Fatalf("expected skill source %q, got %#v", tc.expectedSkillSource, got)
}
if tc.expectedResolvedSkill != "" {
resolvedSkills, _ := result["resolvedSkills"].([]string)
if len(resolvedSkills) == 0 || resolvedSkills[0] != tc.expectedResolvedSkill {
t.Fatalf("expected resolved skill %q, got %#v", tc.expectedResolvedSkill, result["resolvedSkills"])
}
}
if got := result["needsSkillInstall"]; got != tc.expectedNeedsSkillInstall {
t.Fatalf("expected needsSkillInstall=%v, got %#v", tc.expectedNeedsSkillInstall, got)
}
})
}
}
func TestHandleRoutingResolveAcceptsTopLevelGatewayContract(t *testing.T) {
server := NewServer()
server.mu.Lock()
server.providerOrder = []string{"codex"}
server.providers = map[string]ProviderCompat{
"codex": newProviderCompat(syncedProvider{
ProviderID: "codex",
Label: "Codex",
Endpoint: "ws://127.0.0.1:9001/acp",
Enabled: true,
}),
}
server.mu.Unlock()
res, err := server.routingEngine.Resolve(context.Background(), map[string]any{
"taskPrompt": "openclaw gateway task",
"executionTarget": "gateway",
"gatewayProviderId": "openclaw",
})
if err != nil {
t.Fatalf("resolve routing: %v", err)
}
if got := res.TargetID; got != "gateway" {
t.Fatalf("expected gateway execution target, got %#v", got)
}
if got := res.GatewayProviderID; got != "openclaw" {
t.Fatalf("expected openclaw gateway provider, got %#v", got)
}
if got := res.ProviderID; got != "" {
t.Fatalf("expected no single-agent provider for gateway, got %#v", got)
}
}
func TestExecuteSessionTaskAutoRoutingRecordsProjectMemory(t *testing.T) {
homeDir := t.TempDir()
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {
t.Fatalf("create workspace: %v", err)
}
t.Setenv("HOME", homeDir)
server := NewServer()
providerServer := newExternalSingleAgentProvider(t, "codex", "done")
defer providerServer.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "codex",
Label: "Codex",
Endpoint: providerServer.URL,
Enabled: true,
})
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Params: map[string]any{
"sessionId": "session-auto",
"threadId": "thread-auto",
"provider": "codex",
"taskPrompt": "create a powerpoint deck for launch",
"workingDirectory": workspaceDir,
"routing": map[string]any{
"routingMode": "auto",
"availableSkills": []any{
map[string]any{
"id": "pptx",
"label": "PPTX",
"description": "slides",
"installed": true,
},
},
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected success, got rpc error: %v", rpcErr)
}
if success, _ := response["success"].(bool); !success {
t.Fatalf("expected success response, got %#v", response)
}
projectLocalMemory := filepath.Join(workspaceDir, ".xworkmate", "memory.md")
content, err := os.ReadFile(projectLocalMemory)
if err != nil {
t.Fatalf("expected memory file %s: %v", projectLocalMemory, err)
}
text := string(content)
if !strings.Contains(text, "preferred-route: single-agent") {
t.Fatalf("expected preferred route in %s, got %q", projectLocalMemory, text)
}
if !strings.Contains(text, "preferred-skills: PPTX") {
t.Fatalf("expected preferred skills in %s, got %q", projectLocalMemory, text)
}
projectHomeMemory := filepath.Join(
homeDir,
"self-improving",
"projects",
filepath.Base(workspaceDir)+".md",
)
if _, err := os.Stat(projectHomeMemory); !os.IsNotExist(err) {
t.Fatalf("expected auto memory write to stay project-local only, got stat err=%v", err)
}
}
func TestExecuteSessionTaskExplicitRoutingDoesNotRecordProjectMemory(t *testing.T) {
homeDir := t.TempDir()
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {
t.Fatalf("create workspace: %v", err)
}
t.Setenv("HOME", homeDir)
server := NewServer()
providerServer := newExternalSingleAgentProvider(t, "codex", "done")
defer providerServer.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "codex",
Label: "Codex",
Endpoint: providerServer.URL,
Enabled: true,
})
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Params: map[string]any{
"sessionId": "session-explicit",
"threadId": "thread-explicit",
"provider": "codex",
"taskPrompt": "create a powerpoint deck for launch",
"workingDirectory": workspaceDir,
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",
"explicitProviderId": "codex",
"availableSkills": []any{
map[string]any{
"id": "pptx",
"label": "PPTX",
"description": "slides",
"installed": true,
},
},
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected success, got rpc error: %v", rpcErr)
}
if success, _ := response["success"].(bool); !success {
t.Fatalf("expected success response, got %#v", response)
}
projectHomeMemory := filepath.Join(
homeDir,
"self-improving",
"projects",
filepath.Base(workspaceDir)+".md",
)
projectLocalMemory := filepath.Join(workspaceDir, ".xworkmate", "memory.md")
for _, target := range []string{projectHomeMemory, projectLocalMemory} {
if _, err := os.Stat(target); !os.IsNotExist(err) {
t.Fatalf("expected no memory write for explicit routing at %s, err=%v", target, err)
}
}
}
func TestExecuteSessionTaskExplicitProviderRequiresAdvertisedBridgeProvider(t *testing.T) {
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-explicit-provider",
"threadId": "thread-explicit-provider",
"taskPrompt": "create a powerpoint deck for launch",
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",
"explicitProviderId": "claude",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected structured response, got rpc error: %v", rpcErr)
}
if success, _ := response["success"].(bool); success {
t.Fatalf("expected unavailable response, got %#v", response)
}
if got := response["unavailableCode"]; got != "PROVIDER_UNAVAILABLE" {
t.Fatalf("expected PROVIDER_UNAVAILABLE, got %#v", response)
}
if got := response["unavailableMessage"]; got != "explicit provider is unavailable" {
t.Fatalf("expected explicit provider unavailable message, got %#v", response)
}
}
func TestExecuteSessionTaskExplicitGatewayUsesResolvedGatewayProvider(t *testing.T) {
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-explicit-gateway",
"threadId": "thread-explicit-gateway",
"taskPrompt": "search latest news",
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"explicitProviderId": "claude",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr == nil {
t.Fatalf("expected gateway connectivity rpc error, got response: %v", response)
}
if rpcErr.Message == "GATEWAY_PROVIDER_REQUIRED" {
t.Fatalf("expected resolved gateway provider to be reused, got %q", rpcErr.Message)
}
}
func TestExecuteSessionTaskGatewayAutoConnectsLocalOpenClaw(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw",
"threadId": "thread-openclaw",
"taskPrompt": "say pong",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if got := response["output"]; got != "gateway pong" {
t.Fatalf("expected gateway pong output, got %#v", response)
}
if got := response["resolvedGatewayProviderId"]; got != "openclaw" {
t.Fatalf("expected openclaw gateway provider, got %#v", response)
}
if gateway.ConnectCount() != 1 {
t.Fatalf("expected one automatic gateway connect, got %d", gateway.ConnectCount())
}
if gateway.ChatSendCount() != 1 {
t.Fatalf("expected one OpenClaw chat.send request, got %d", gateway.ChatSendCount())
}
if gateway.AgentWaitCount() != 1 {
t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount())
}
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got)
}
client := gateway.LastConnectClient()
if got := client["id"]; got != "openclaw-macos" {
t.Fatalf("expected OpenClaw-compatible client id, got %#v", client)
}
if got := strings.TrimSpace(shared.StringArg(client, "modelIdentifier", "")); got == "" {
t.Fatalf("expected non-empty modelIdentifier, got %#v", client)
}
}
func TestExecuteSessionMessageGatewayUsesOpenClawChatSend(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.message",
Params: map[string]any{
"sessionId": "session-openclaw",
"threadId": "thread-openclaw",
"taskPrompt": "continue",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if got := response["output"]; got != "gateway pong" {
t.Fatalf("expected gateway pong output, got %#v", response)
}
if gateway.ChatSendCount() != 1 {
t.Fatalf("expected one OpenClaw chat.send request, got %d", gateway.ChatSendCount())
}
if gateway.AgentWaitCount() != 1 {
t.Fatalf("expected one OpenClaw agent.wait request, got %d", gateway.AgentWaitCount())
}
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, chat.send, agent.wait, then artifact export, got %#v", got)
}
}
func TestExecuteSessionTaskGatewaySurfacesOpenClawChatSendError(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-fail",
"threadId": "thread-openclaw-fail",
"taskPrompt": "fail",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr == nil {
t.Fatalf("expected OpenClaw chat.send error, got response: %#v", response)
}
if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "openclaw chat failed") {
t.Fatalf("expected surfaced chat.send failure, got %#v", rpcErr)
}
if got := gateway.Methods(); len(got) != 2 || got[0] != "connect" || got[1] != "chat.send" {
t.Fatalf("expected connect then chat.send, got %#v", got)
}
}
func TestExecuteSessionTaskGatewaySurfacesOpenClawAgentWaitError(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-wait-fail",
"threadId": "thread-openclaw-wait-fail",
"taskPrompt": "wait-error",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr == nil {
t.Fatalf("expected OpenClaw agent.wait error, got response: %#v", response)
}
if rpcErr.Code != -32002 || !strings.Contains(rpcErr.Message, "openclaw wait failed") {
t.Fatalf("expected surfaced agent.wait failure, got %#v", rpcErr)
}
if got := gateway.Methods(); len(got) != 3 || got[0] != "connect" || got[1] != "chat.send" || got[2] != "agent.wait" {
t.Fatalf("expected connect, chat.send, then agent.wait, got %#v", got)
}
}
func TestExecuteSessionTaskGatewayExportsOpenClawArtifacts(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-artifact",
"threadId": "thread-openclaw-artifact",
"taskPrompt": "make artifact",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/workspace" {
t.Fatalf("expected remote working directory from manifest, got %#v", response)
}
artifacts, ok := response["artifacts"].([]map[string]any)
if !ok {
raw, ok := response["artifacts"].([]any)
if !ok {
t.Fatalf("expected artifacts payload, got %#v", response["artifacts"])
}
artifacts = make([]map[string]any, 0, len(raw))
for _, item := range raw {
artifacts = append(artifacts, shared.AsMap(item))
}
}
if len(artifacts) != 1 {
t.Fatalf("expected one artifact from manifest, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "reports/final.md" {
t.Fatalf("expected manifest artifact relative path, got %#v", artifacts[0])
}
if got := artifacts[0]["encoding"]; got != "base64" {
t.Fatalf("expected inline base64 artifact, got %#v", artifacts[0])
}
downloadURL := strings.TrimSpace(shared.StringArg(artifacts[0], "downloadUrl", ""))
if downloadURL == "" {
t.Fatalf("expected bridge downloadUrl on artifact, got %#v", artifacts[0])
}
parsedDownloadURL, err := url.Parse(downloadURL)
if err != nil {
t.Fatalf("parse downloadUrl: %v", err)
}
if got := parsedDownloadURL.Path; got != openClawArtifactDownloadPath {
t.Fatalf("expected bridge artifact download path, got %q from %q", got, downloadURL)
}
if got := parsedDownloadURL.Query().Get("sessionKey"); got != "thread-openclaw-artifact" {
t.Fatalf("expected thread sessionKey in downloadUrl, got %q", got)
}
if got := parsedDownloadURL.Query().Get("relativePath"); got != "reports/final.md" {
t.Fatalf("expected artifact relativePath in downloadUrl, got %q", got)
}
artifactScope := parsedDownloadURL.Query().Get("artifactScope")
if !strings.HasPrefix(artifactScope, ".xworkmate/artifacts/tasks/") {
t.Fatalf("expected artifact scope in downloadUrl, got %q", artifactScope)
}
if parsedDownloadURL.Query().Get("sig") == "" {
t.Fatalf("expected signed downloadUrl, got %q", downloadURL)
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got)
}
}
func TestExecuteSessionTaskGatewayExportsLatestWorkspaceArtifactsWhenScopedDirectoryEmpty(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactMode = "workspace-latest"
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-latest-artifact",
"threadId": "thread-openclaw-latest-artifact",
"taskPrompt": "检查 workspace 已有真实制品,输出 artifacts files download。不要生成新文件只简短说明。",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected latest workspace artifact response, got rpc error: %#v", rpcErr)
}
if got := response["success"]; got != true {
t.Fatalf("expected successful artifact response, got %#v", response)
}
if got := response["status"]; got == "artifact_missing" {
t.Fatalf("expected latest workspace artifact fallback, got %#v", response)
}
artifacts, ok := response["artifacts"].([]map[string]any)
if !ok {
raw, ok := response["artifacts"].([]any)
if !ok {
t.Fatalf("expected artifacts payload, got %#v", response["artifacts"])
}
artifacts = make([]map[string]any, 0, len(raw))
for _, item := range raw {
artifacts = append(artifacts, shared.AsMap(item))
}
}
if len(artifacts) != 1 {
t.Fatalf("expected one latest workspace artifact, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "existing/report.pdf" {
t.Fatalf("expected latest workspace artifact relative path, got %#v", artifacts[0])
}
if got := artifacts[0]["scopeKind"]; got != "workspace-latest" {
t.Fatalf("expected workspace-latest artifact scope kind, got %#v", artifacts[0])
}
if got := strings.TrimSpace(shared.StringArg(artifacts[0], "downloadUrl", "")); got == "" {
t.Fatalf("expected bridge downloadUrl on latest workspace artifact, got %#v", artifacts[0])
}
exportParams := gateway.LastArtifactExportParams()
if got := strings.TrimSpace(shared.StringArg(exportParams, "artifactScope", "")); !strings.HasPrefix(got, ".xworkmate/artifacts/tasks/thread-openclaw-latest-artifact/") {
t.Fatalf("expected scoped artifact export params, got %#v", exportParams)
}
if got := shared.BoolArg(shared.StringArg(exportParams, "latestIfEmpty", ""), false); !got {
t.Fatalf("expected latestIfEmpty export param, got %#v", exportParams)
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.prepare", "chat.send", "agent.wait", "xworkmate.artifacts.export"}) {
t.Fatalf("expected connect, artifact prepare, chat.send, agent.wait, then artifact export, got %#v", got)
}
}
func TestHTTPHandlerOpenClawArtifactDownloadReadsViaGateway(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
downloadURL := server.openClawArtifactDownloadURL(
"thread-openclaw-artifact",
"run-1",
".xworkmate/artifacts/tasks/thread-openclaw-artifact/run-1",
"reports/final.md",
time.Now(),
)
if downloadURL == "" {
t.Fatal("expected signed download URL")
}
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, downloadURL, nil)
request.Header.Set("Authorization", "Bearer bridge-token")
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%q", recorder.Code, recorder.Body.String())
}
if got := recorder.Body.String(); got != "final report" {
t.Fatalf("expected artifact content from OpenClaw read, got %q", got)
}
if got := recorder.Header().Get("Content-Type"); got != "text/markdown" {
t.Fatalf("expected content type from artifact metadata, got %q", got)
}
if gateway.ArtifactReadCount() != 1 {
t.Fatalf("expected one OpenClaw artifact read request, got %d", gateway.ArtifactReadCount())
}
if got := gateway.Methods(); !sameMethods(got, []string{"connect", "xworkmate.artifacts.read"}) {
t.Fatalf("expected connect, then artifact read, got %#v", got)
}
}
func TestHTTPHandlerOpenClawArtifactDownloadReturnsArtifactMissing(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
downloadURL := server.openClawArtifactDownloadURL(
"thread-openclaw-artifact",
"run-1",
".xworkmate/artifacts/tasks/thread-openclaw-artifact/run-1",
"missing.txt",
time.Now(),
)
if downloadURL == "" {
t.Fatal("expected signed download URL")
}
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, downloadURL, nil)
request.Header.Set("Authorization", "Bearer bridge-token")
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d body=%q", recorder.Code, recorder.Body.String())
}
if !strings.Contains(recorder.Body.String(), "artifact_missing") {
t.Fatalf("expected artifact_missing response, got %q", recorder.Body.String())
}
if gateway.ArtifactReadCount() != 1 {
t.Fatalf("expected one OpenClaw artifact read request, got %d", gateway.ArtifactReadCount())
}
}
func TestHTTPHandlerOpenClawArtifactDownloadRequiresBearer(t *testing.T) {
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
downloadURL := server.openClawArtifactDownloadURL(
"thread-openclaw-artifact",
"run-1",
".xworkmate/artifacts/tasks/thread-openclaw-artifact/run-1",
"reports/final.md",
time.Now(),
)
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, downloadURL, nil)
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusUnauthorized {
t.Fatalf("expected 401, got %d body=%q", recorder.Code, recorder.Body.String())
}
}
func TestHTTPHandlerOpenClawArtifactDownloadRejectsInvalidSignature(t *testing.T) {
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
downloadURL := server.openClawArtifactDownloadURL(
"thread-openclaw-artifact",
"run-1",
".xworkmate/artifacts/tasks/thread-openclaw-artifact/run-1",
"reports/final.md",
time.Now(),
)
parsed, err := url.Parse(downloadURL)
if err != nil {
t.Fatalf("parse downloadUrl: %v", err)
}
query := parsed.Query()
query.Set("sig", "bad")
parsed.RawQuery = query.Encode()
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, parsed.String(), nil)
request.Header.Set("Authorization", "Bearer bridge-token")
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusForbidden {
t.Fatalf("expected 403, got %d body=%q", recorder.Code, recorder.Body.String())
}
}
func TestHTTPHandlerOpenClawArtifactDownloadRejectsExpiredSignature(t *testing.T) {
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
values := url.Values{}
expires := fmt.Sprintf("%d", time.Now().Add(-time.Minute).Unix())
values.Set("sessionKey", "thread-openclaw-artifact")
values.Set("runId", "run-1")
values.Set("artifactScope", ".xworkmate/artifacts/tasks/thread-openclaw-artifact/run-1")
values.Set("relativePath", "reports/final.md")
values.Set("expires", expires)
values.Set("sig", signOpenClawArtifactDownload(
"thread-openclaw-artifact",
"run-1",
".xworkmate/artifacts/tasks/thread-openclaw-artifact/run-1",
"reports/final.md",
expires,
))
server := NewServer()
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, openClawArtifactDownloadPath+"?"+values.Encode(), nil)
request.Header.Set("Authorization", "Bearer bridge-token")
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusGone {
t.Fatalf("expected 410, got %d body=%q", recorder.Code, recorder.Body.String())
}
}
func TestHTTPHandlerOpenClawArtifactDownloadRejectsTraversalPath(t *testing.T) {
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
values := url.Values{}
values.Set("sessionKey", "thread-openclaw-artifact")
values.Set("runId", "run-1")
values.Set("relativePath", "../secret.txt")
values.Set("expires", fmt.Sprintf("%d", time.Now().Add(time.Hour).Unix()))
values.Set("sig", "irrelevant")
server := NewServer()
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, openClawArtifactDownloadPath+"?"+values.Encode(), nil)
request.Header.Set("Authorization", "Bearer bridge-token")
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d body=%q", recorder.Code, recorder.Body.String())
}
}
func TestHTTPHandlerOpenClawArtifactDownloadRejectsInvalidArtifactScope(t *testing.T) {
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
values := url.Values{}
values.Set("sessionKey", "thread-openclaw-artifact")
values.Set("runId", "run-1")
values.Set("artifactScope", "../outside")
values.Set("relativePath", "reports/final.md")
values.Set("expires", fmt.Sprintf("%d", time.Now().Add(time.Hour).Unix()))
values.Set("sig", "irrelevant")
server := NewServer()
recorder := httptest.NewRecorder()
request := httptest.NewRequest(http.MethodGet, openClawArtifactDownloadPath+"?"+values.Encode(), nil)
request.Header.Set("Authorization", "Bearer bridge-token")
server.Handler().ServeHTTP(recorder, request)
if recorder.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d body=%q", recorder.Code, recorder.Body.String())
}
}
func TestOpenClawChatSendParamsAddsArtifactDeliveryInstructions(t *testing.T) {
for _, prompt := range []string{
"输出 PPT PDF docx 文件",
"生成一张图片并返回制品",
"render a video artifact for download",
"write a csv dataset file",
} {
t.Run(prompt, func(t *testing.T) {
chatParams, rpcErr := openClawChatSendParams(map[string]any{
"threadId": "thread-artifact-instructions",
"taskPrompt": prompt,
}, "turn-artifact-instructions", &openClawPreparedArtifactScope{
ArtifactScope: ".xworkmate/artifacts/tasks/thread-artifact-instructions/turn-artifact-instructions",
ArtifactDirectory: "/remote/openclaw/workspace/.xworkmate/artifacts/tasks/thread-artifact-instructions/turn-artifact-instructions",
ScopeKind: "task",
})
if rpcErr != nil {
t.Fatalf("expected chat params, got rpc error: %#v", rpcErr)
}
message := strings.TrimSpace(shared.StringArg(chatParams, "message", ""))
if !strings.Contains(message, prompt) {
t.Fatalf("expected original prompt to be preserved, got %q", message)
}
if !strings.Contains(message, "Create the requested files as real files") {
t.Fatalf("expected artifact delivery instructions, got %q", message)
}
if !strings.Contains(message, "/remote/openclaw/workspace/.xworkmate/artifacts/tasks/thread-artifact-instructions/turn-artifact-instructions") {
t.Fatalf("expected scoped artifact directory instruction, got %q", message)
}
if !strings.Contains(message, "Do not claim that files are ready") {
t.Fatalf("expected anti-hallucination download instruction, got %q", message)
}
})
}
}
func TestExecuteSessionTaskGatewayCollectsOpenClawEventArtifacts(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactMode = "unknown"
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-event-artifact",
"threadId": "thread-openclaw-event-artifact",
"taskPrompt": "event artifact",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway response, got rpc error: %#v", rpcErr)
}
artifacts, ok := response["artifacts"].([]map[string]any)
if !ok {
raw, ok := response["artifacts"].([]any)
if !ok {
t.Fatalf("expected artifacts payload, got %#v", response["artifacts"])
}
artifacts = make([]map[string]any, 0, len(raw))
for _, item := range raw {
artifacts = append(artifacts, shared.AsMap(item))
}
}
if len(artifacts) != 1 {
t.Fatalf("expected one event artifact, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "events/live.txt" {
t.Fatalf("expected event artifact relative path, got %#v", artifacts[0])
}
if got := response["remoteWorkingDirectory"]; got != "/remote/openclaw/events" {
t.Fatalf("expected remote working directory from event, got %#v", response)
}
}
func TestExecuteSessionTaskGatewayKeepsTextWhenArtifactExportUnavailable(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
gateway.artifactMode = "unknown"
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-artifact-missing",
"threadId": "thread-openclaw-artifact-missing",
"taskPrompt": "say pong",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected gateway text response despite artifact export failure, got rpc error: %#v", rpcErr)
}
if got := response["output"]; got != "gateway pong" {
t.Fatalf("expected gateway pong output, got %#v", response)
}
if gateway.ArtifactExportCount() != 1 {
t.Fatalf("expected one OpenClaw artifact export request, got %d", gateway.ArtifactExportCount())
}
warnings := response["artifactWarnings"].([]any)
if len(warnings) != 1 || !strings.Contains(fmt.Sprint(warnings[0]), "unknown method") {
t.Fatalf("expected artifact warning for unknown method, got %#v", response["artifactWarnings"])
}
}
func TestExecuteSessionTaskGatewayRejectsMissingOpenClawFilesForDeliveryRequest(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token")
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-openclaw-missing-files",
"threadId": "thread-openclaw-missing-files",
"taskPrompt": "输出 PPT PDF docx 文件 hallucinate-files",
"workingDirectory": t.TempDir(),
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
"preferredGatewayProviderId": "openclaw",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected bridge response, got rpc error: %#v", rpcErr)
}
if success, _ := response["success"].(bool); success {
t.Fatalf("expected missing artifact delivery to be marked unsuccessful, got %#v", response)
}
output := strings.TrimSpace(shared.StringArg(response, "output", ""))
if strings.Contains(output, "点击直接下载") || strings.Contains(output, "文件已就绪") {
t.Fatalf("expected hallucinated download text to be replaced, got %q", output)
}
if !strings.Contains(output, "未检测到 OpenClaw 本轮导出的实际文件") {
t.Fatalf("expected explicit missing artifact message, got %q", output)
}
if _, ok := response["artifacts"]; ok {
t.Fatalf("expected no artifacts when export returned none, got %#v", response["artifacts"])
}
warnings := response["artifactWarnings"].([]any)
if len(warnings) != 1 || !strings.Contains(fmt.Sprint(warnings[0]), "returned no files") {
t.Fatalf("expected missing artifact warning, got %#v", response["artifactWarnings"])
}
}
func TestExtractArtifactPayloadsDoesNotScanRemoteDirectoryFallback(t *testing.T) {
root := t.TempDir()
if err := os.WriteFile(filepath.Join(root, "stale.txt"), []byte("stale"), 0o644); err != nil {
t.Fatalf("write stale file: %v", err)
}
artifacts := extractArtifactPayloads(map[string]any{
"remoteWorkingDirectory": root,
"artifacts": []any{},
}, root)
if len(artifacts) != 0 {
t.Fatalf("expected no directory fallback artifacts, got %#v", artifacts)
}
}
func TestExecuteSessionTaskDefaultsExplicitGatewayToOpenClaw(t *testing.T) {
server := NewServer()
_, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-gateway-missing-provider",
"threadId": "thread-gateway-missing-provider",
"taskPrompt": "search latest news",
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "gateway",
},
},
},
})
if rpcErr == nil {
t.Fatal("expected gateway connectivity error")
}
if rpcErr.Message == "GATEWAY_PROVIDER_REQUIRED" {
t.Fatalf("expected openclaw default from routing result, got %#v", rpcErr)
}
}
func TestExtractArtifactPayloadsPreservesDownloadURLOnlyArtifacts(t *testing.T) {
artifacts := extractArtifactPayloads(map[string]any{
"artifacts": []any{
map[string]any{
"name": "reports/final.txt",
"downloadURL": "https://xworkmate-bridge.svc.plus/artifacts/final.txt",
},
map[string]any{
"download_url": "https://xworkmate-bridge.svc.plus/artifacts/from-url.md",
},
},
}, "")
if len(artifacts) != 2 {
t.Fatalf("expected two artifacts, got %#v", artifacts)
}
if got := artifacts[0]["relativePath"]; got != "reports/final.txt" {
t.Fatalf("expected name-derived path, got %#v", got)
}
if got := artifacts[0]["downloadUrl"]; got != "https://xworkmate-bridge.svc.plus/artifacts/final.txt" {
t.Fatalf("expected normalized downloadUrl, got %#v", got)
}
if _, ok := artifacts[0]["downloadURL"]; ok {
t.Fatalf("expected downloadURL alias to be removed: %#v", artifacts[0])
}
if got := artifacts[1]["relativePath"]; got != "from-url.md" {
t.Fatalf("expected URL basename path, got %#v", got)
}
if got := artifacts[1]["contentType"]; got != "text/plain" {
t.Fatalf("expected markdown content type, got %#v", got)
}
}
func TestExtractArtifactPayloadsRejectsUnsafeDownloadURLArtifactNames(t *testing.T) {
artifacts := extractArtifactPayloads(map[string]any{
"artifacts": []any{
map[string]any{
"name": "../secrets.txt",
"downloadUrl": "https://xworkmate-bridge.svc.plus/artifacts/secrets.txt",
},
},
}, "")
if len(artifacts) != 0 {
t.Fatalf("expected unsafe artifact to be dropped, got %#v", artifacts)
}
}
type acpFakeOpenClawGateway struct {
server *http.Server
listener net.Listener
connectCount atomic.Int32
chatSendCount atomic.Int32
agentWaitCount atomic.Int32
artifactCount atomic.Int32
artifactReadCount atomic.Int32
lastConnectClient atomic.Value
lastArtifactExportParams atomic.Value
mu sync.Mutex
methods []string
runMessages map[string]string
artifactMode string
}
func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway {
t.Helper()
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen fake openclaw gateway: %v", err)
}
fake := &acpFakeOpenClawGateway{listener: listener, runMessages: map[string]string{}}
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer func() {
_ = conn.Close()
}()
_ = conn.WriteJSON(map[string]any{
"type": "event",
"event": "connect.challenge",
"payload": map[string]any{
"nonce": "nonce-1",
},
})
for {
_, payload, err := conn.ReadMessage()
if err != nil {
return
}
var frame map[string]any
if err := json.Unmarshal(payload, &frame); err != nil {
continue
}
if strings.TrimSpace(shared.StringArg(frame, "type", "")) != "req" {
continue
}
id := frame["id"]
method := strings.TrimSpace(shared.StringArg(frame, "method", ""))
fake.recordMethod(method)
switch method {
case "connect":
fake.connectCount.Add(1)
params := shared.AsMap(frame["params"])
device := shared.AsMap(params["device"])
publicKey, err := base64.RawURLEncoding.DecodeString(
strings.TrimSpace(shared.StringArg(device, "publicKey", "")),
)
sum := sha256.Sum256(publicKey)
if err != nil || shared.StringArg(device, "id", "") != hex.EncodeToString(sum[:]) {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "INVALID_REQUEST",
"message": "device identity mismatch",
"details": map[string]any{
"code": "DEVICE_AUTH_DEVICE_ID_MISMATCH",
},
},
})
return
}
if got, want := shared.StringArg(shared.AsMap(params["auth"]), "token", ""), os.Getenv("BRIDGE_AUTH_TOKEN"); got != want {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "INVALID_REQUEST",
"message": "unauthorized: gateway token mismatch",
},
})
return
}
fake.lastConnectClient.Store(shared.AsMap(params["client"]))
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"server": map[string]any{"host": "127.0.0.1"},
"snapshot": map[string]any{
"sessionDefaults": map[string]any{"mainSessionKey": "main"},
},
"auth": map[string]any{
"role": "operator",
"scopes": []string{"operator.read", "operator.write"},
"deviceToken": "device-token-1",
},
},
})
case "chat.send":
fake.chatSendCount.Add(1)
params := shared.AsMap(frame["params"])
if strings.TrimSpace(shared.StringArg(params, "message", "")) == "fail" {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "OPENCLAW_CHAT_FAILED",
"message": "openclaw chat failed",
},
})
continue
}
runID := strings.TrimSpace(shared.StringArg(params, "idempotencyKey", "fake-run"))
fake.recordRunMessage(runID, strings.TrimSpace(shared.StringArg(params, "message", "")))
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"runId": runID,
"status": "started",
},
})
case "xworkmate.artifacts.prepare":
params := shared.AsMap(frame["params"])
runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run"))
sessionKey := strings.TrimSpace(shared.StringArg(params, "sessionKey", "main"))
artifactScope := ".xworkmate/artifacts/tasks/" + sessionKey + "/" + runID
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"runId": runID,
"sessionKey": sessionKey,
"remoteWorkingDirectory": "/remote/openclaw/workspace",
"remoteWorkspaceRefKind": "remotePath",
"artifactScope": artifactScope,
"scopeKind": "task",
"artifactDirectory": "/remote/openclaw/workspace/" + artifactScope,
"relativeArtifactDirectory": artifactScope,
"warnings": []any{},
},
})
case "agent.wait":
fake.agentWaitCount.Add(1)
params := shared.AsMap(frame["params"])
runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run"))
switch fake.runMessage(runID) {
case "wait-error":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "OPENCLAW_WAIT_FAILED",
"message": "openclaw wait failed",
},
})
continue
case "wait-timeout":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "TIMEOUT",
"message": "openclaw wait timeout",
},
})
continue
}
message := "gateway pong"
if strings.Contains(fake.runMessage(runID), "hallucinate-files") {
message = "文件已就绪,点击直接下载👇 三个格式一键收取:"
}
_ = conn.WriteJSON(map[string]any{
"type": "event",
"event": "chat",
"seq": 1,
"payload": map[string]any{
"runId": runID,
"state": "final",
"message": map[string]any{
"role": "assistant",
"content": message,
},
},
})
if strings.Contains(fake.runMessage(runID), "event artifact") {
_ = conn.WriteJSON(map[string]any{
"type": "event",
"event": "chat",
"seq": 2,
"payload": map[string]any{
"runId": runID,
"state": "final",
"remoteWorkingDirectory": "/remote/openclaw/events",
"remoteWorkspaceRefKind": "remotePath",
"artifacts": []any{
map[string]any{
"relativePath": "events/live.txt",
"contentType": "text/plain",
"encoding": "base64",
"content": "bGl2ZSBldmVudA==",
},
},
},
})
}
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"runId": runID,
"status": "ok",
},
})
case "xworkmate.artifacts.export":
fake.artifactCount.Add(1)
if fake.artifactMode == "unknown" {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "UNKNOWN_METHOD",
"message": "unknown method: xworkmate.artifacts.export",
},
})
continue
}
params := shared.AsMap(frame["params"])
fake.lastArtifactExportParams.Store(params)
runID := strings.TrimSpace(shared.StringArg(params, "runId", "fake-run"))
artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
payload := map[string]any{
"runId": runID,
"sessionKey": strings.TrimSpace(shared.StringArg(params, "sessionKey", "")),
"remoteWorkingDirectory": "/remote/openclaw/workspace",
"remoteWorkspaceRefKind": "remotePath",
"scopeKind": "workspace",
"artifacts": []any{},
"warnings": []any{},
}
if artifactScope != "" {
payload["artifactScope"] = artifactScope
payload["scopeKind"] = "task"
}
if strings.Contains(fake.runMessage(runID), "make artifact") {
payload["artifacts"] = []any{
map[string]any{
"relativePath": "reports/final.md",
"label": "final.md",
"contentType": "text/markdown",
"sizeBytes": 12,
"sha256": "fake-sha256",
"artifactScope": artifactScope,
"scopeKind": "task",
"encoding": "base64",
"content": "ZmluYWwgcmVwb3J0",
},
}
}
if fake.artifactMode == "workspace-latest" &&
shared.BoolArg(shared.StringArg(params, "latestIfEmpty", ""), false) &&
artifactScope != "" &&
len(payload["artifacts"].([]any)) == 0 {
payload["scopeKind"] = "workspace-latest"
payload["artifacts"] = []any{
map[string]any{
"relativePath": "existing/report.pdf",
"label": "report.pdf",
"contentType": "application/pdf",
"sizeBytes": 3,
"sha256": "latest-sha256",
"scopeKind": "workspace-latest",
"encoding": "base64",
"content": "cGRm",
},
}
}
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": payload,
})
case "xworkmate.artifacts.read":
fake.artifactReadCount.Add(1)
params := shared.AsMap(frame["params"])
relativePath := strings.TrimSpace(shared.StringArg(params, "relativePath", ""))
artifactScope := strings.TrimSpace(shared.StringArg(params, "artifactScope", ""))
if relativePath != "reports/final.md" {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "ARTIFACT_NOT_FOUND",
"message": "artifact not found",
},
})
continue
}
content := []byte("final report")
sum := sha256.Sum256(content)
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"runId": strings.TrimSpace(shared.StringArg(params, "runId", "")),
"sessionKey": strings.TrimSpace(shared.StringArg(params, "sessionKey", "")),
"remoteWorkingDirectory": "/remote/openclaw/workspace",
"remoteWorkspaceRefKind": "remotePath",
"artifactScope": artifactScope,
"scopeKind": "task",
"artifacts": []any{
map[string]any{
"relativePath": "reports/final.md",
"label": "final.md",
"contentType": "text/markdown",
"sizeBytes": len(content),
"sha256": hex.EncodeToString(sum[:]),
"artifactScope": artifactScope,
"scopeKind": "task",
"encoding": "base64",
"content": base64.StdEncoding.EncodeToString(content),
},
},
},
})
case "chat.run":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "UNKNOWN_METHOD",
"message": "unknown method: chat.run",
},
})
case "session.start":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": "UNKNOWN_METHOD",
"message": "unknown method: session.start",
},
})
default:
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{},
})
}
}
})
fake.server = &http.Server{Handler: mux, ReadHeaderTimeout: 2 * time.Second}
go func() {
_ = fake.server.Serve(listener)
}()
return fake
}
func (f *acpFakeOpenClawGateway) URL() string {
return "ws://" + f.listener.Addr().String() + "/"
}
func (f *acpFakeOpenClawGateway) recordMethod(method string) {
f.mu.Lock()
defer f.mu.Unlock()
f.methods = append(f.methods, method)
}
func (f *acpFakeOpenClawGateway) Methods() []string {
f.mu.Lock()
defer f.mu.Unlock()
return append([]string(nil), f.methods...)
}
func (f *acpFakeOpenClawGateway) recordRunMessage(runID, message string) {
f.mu.Lock()
defer f.mu.Unlock()
f.runMessages[runID] = message
}
func (f *acpFakeOpenClawGateway) runMessage(runID string) string {
f.mu.Lock()
defer f.mu.Unlock()
return f.runMessages[runID]
}
func (f *acpFakeOpenClawGateway) ConnectCount() int {
return int(f.connectCount.Load())
}
func (f *acpFakeOpenClawGateway) ChatSendCount() int {
return int(f.chatSendCount.Load())
}
func (f *acpFakeOpenClawGateway) AgentWaitCount() int {
return int(f.agentWaitCount.Load())
}
func (f *acpFakeOpenClawGateway) ArtifactExportCount() int {
return int(f.artifactCount.Load())
}
func (f *acpFakeOpenClawGateway) ArtifactReadCount() int {
return int(f.artifactReadCount.Load())
}
func (f *acpFakeOpenClawGateway) LastArtifactExportParams() map[string]any {
value := f.lastArtifactExportParams.Load()
if value == nil {
return nil
}
return shared.AsMap(value)
}
func (f *acpFakeOpenClawGateway) LastConnectClient() map[string]any {
value := f.lastConnectClient.Load()
if value == nil {
return nil
}
return value.(map[string]any)
}
func (f *acpFakeOpenClawGateway) Close() {
_ = f.server.Close()
}
func sameMethods(got []string, want []string) bool {
if len(got) != len(want) {
return false
}
for index := range got {
if got[index] != want[index] {
return false
}
}
return true
}
func TestExecuteSessionTaskAutoRoutingUsesBridgeProductionProviderOrder(t *testing.T) {
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {
t.Fatalf("create workspace: %v", err)
}
server := NewServer()
geminiProvider := newExternalSingleAgentProvider(t, "gemini", "gemini-output")
defer geminiProvider.Close()
codexProvider := newExternalSingleAgentProvider(t, "codex", "codex-output")
defer codexProvider.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "gemini",
Label: "Gemini",
Endpoint: geminiProvider.URL,
Enabled: true,
})
setTestBridgeProvider(server, syncedProvider{
ProviderID: "codex",
Label: "Codex",
Endpoint: codexProvider.URL,
Enabled: true,
})
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-auto-order",
"threadId": "thread-auto-order",
"taskPrompt": "create a powerpoint deck for launch",
"workingDirectory": workspaceDir,
"routing": map[string]any{
"routingMode": "auto",
"availableSkills": []any{
map[string]any{
"id": "pptx",
"label": "PPTX",
"description": "slides",
"installed": true,
},
},
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected success, got rpc error: %v", rpcErr)
}
if got := response["resolvedProviderId"]; got != "codex" {
t.Fatalf("expected resolved provider codex from built-in bridge order, got %#v", response)
}
}
func TestExecuteSessionTaskKeepsRemoteWorkspaceHintOutOfLocalCWD(t *testing.T) {
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {
t.Fatalf("create workspace: %v", err)
}
server := NewServer()
providerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/acp/rpc" {
http.NotFound(w, r)
return
}
defer func() { _ = r.Body.Close() }()
var request map[string]any
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
t.Fatalf("decode request: %v", err)
}
method := strings.TrimSpace(shared.StringArg(request, "method", ""))
result := map[string]any{
"success": true,
"output": "hello",
"summary": "hello",
"message": "hello",
"remoteWorkingDirectory": "/owners/local/user/demo/threads/main",
"remoteWorkspaceRefKind": "remotePath",
"artifacts": []map[string]any{
{
"relativePath": "notes/hello.txt",
"content": "hello artifact",
"contentType": "text/plain",
},
},
}
if method == "thread/start" {
result = map[string]any{"id": "codex-thread-1"}
}
_ = json.NewEncoder(w).Encode(map[string]any{
"jsonrpc": "2.0",
"id": request["id"],
"result": result,
})
}))
defer providerServer.Close()
setTestBridgeProvider(server, syncedProvider{
ProviderID: "codex",
Label: "Codex",
Endpoint: providerServer.URL,
Enabled: true,
})
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Method: "session.start",
Params: map[string]any{
"sessionId": "session-remote-hint",
"threadId": "thread-remote-hint",
"taskPrompt": "say hello",
"workingDirectory": workspaceDir,
"remoteWorkingDirectoryHint": "/owners/local/user/demo/threads/main",
"routing": map[string]any{
"routingMode": "explicit",
"explicitExecutionTarget": "singleAgent",
"explicitProviderId": "codex",
},
},
},
})
if rpcErr != nil {
t.Fatalf("expected success, got rpc error: %v", rpcErr)
}
if got := response["remoteWorkingDirectory"]; got != "/owners/local/user/demo/threads/main" {
t.Fatalf("expected remote working directory in response, got %#v", response)
}
if got := response["remoteWorkspaceRefKind"]; got != "remotePath" {
t.Fatalf("expected remote workspace kind in response, got %#v", response)
}
if _, ok := response["artifacts"].([]map[string]any); !ok {
if _, ok := response["artifacts"].([]any); !ok {
t.Fatalf("expected artifacts payload, got %#v", response["artifacts"])
}
}
sess := server.sessions["session-remote-hint"]
if sess == nil {
t.Fatal("expected session state to be retained")
}
if sess.control.RequestedWorkingDir != workspaceDir {
t.Fatalf("expected local requested cwd %q, got %q", workspaceDir, sess.control.RequestedWorkingDir)
}
if sess.control.RemoteWorkingDirHint != "/owners/local/user/demo/threads/main" {
t.Fatalf("expected remote hint retained, got %#v", sess.control)
}
if sess.task.Kind != TaskKindSingleAgent || sess.task.State != TaskStateCompleted {
t.Fatalf("expected completed single-agent task, got %#v", sess.task)
}
if sess.artifacts.RemoteWorkingDirectory != "/owners/local/user/demo/threads/main" {
t.Fatalf("expected artifact record to keep remote directory, got %#v", sess.artifacts)
}
}
func TestExecuteSessionTaskRequiresRouting(t *testing.T) {
server := NewServer()
_, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
ID: "request-1",
Method: "session.start",
Params: map[string]any{
"sessionId": "session-missing-routing",
"threadId": "thread-missing-routing",
"taskPrompt": "hello",
},
},
})
if rpcErr == nil {
t.Fatalf("expected routing-required error")
}
if rpcErr.Message != "ROUTING_REQUIRED" {
t.Fatalf("expected ROUTING_REQUIRED, got %#v", rpcErr)
}
}
func TestExecuteSessionTaskComplexRequestNoLongerPromotesToMultiAgent(t *testing.T) {
workspaceDir := filepath.Join(t.TempDir(), "workspace")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {
t.Fatalf("create workspace: %v", err)
}
server := NewServer()
response, rpcErr := server.executeSessionTask(task{
req: shared.RPCRequest{
Params: map[string]any{
"sessionId": "session-complex",
"threadId": "thread-complex",
"taskPrompt": "collect latest news and summarize it into a report for review",
"workingDirectory": workspaceDir,
"routing": map[string]any{
"routingMode": "auto",
},
},
},
})
if rpcErr == nil {
t.Fatalf("expected gateway-not-connected error, got response %#v", response)
}
if strings.Contains(rpcErr.Message, "multi-agent") {
t.Fatalf("expected no multi-agent path, got rpc error: %v", rpcErr)
}
}
func TestHandleRoutingResolveAllowsSkillInstallRetry(t *testing.T) {
tempDir := t.TempDir()
finder := filepath.Join(tempDir, "find-skills.sh")
installer := filepath.Join(tempDir, "install-skills.sh")
if err := os.WriteFile(
finder,
[]byte("#!/bin/sh\nprintf '%s' '{\"candidates\":[{\"id\":\"video-translator\",\"label\":\"video-translator\",\"description\":\"translate video\",\"installed\":false}]}'\n"),
0o755,
); err != nil {
t.Fatalf("write finder: %v", err)
}
if err := os.WriteFile(
installer,
[]byte("#!/bin/sh\nprintf '%s' '{\"candidates\":[{\"id\":\"video-translator\",\"label\":\"video-translator\",\"description\":\"translate video\",\"installed\":true}]}'\n"),
0o755,
); err != nil {
t.Fatalf("write installer: %v", err)
}
t.Setenv("ACP_FIND_SKILLS_BIN", finder)
t.Setenv("ACP_INSTALL_SKILL_BIN", installer)
result := handleRoutingResolve(map[string]any{
"taskPrompt": "translate and dub this video with subtitles",
"workingDirectory": "/tmp/workspace",
"routing": map[string]any{
"routingMode": "auto",
"allowSkillInstall": true,
"availableSkills": []any{
map[string]any{
"id": "docx",
"label": "docx",
"description": "docs",
"installed": true,
},
},
},
})
if got := result["skillResolutionSource"]; got != "find_skills" {
t.Fatalf("expected find_skills source, got %#v", got)
}
if got := result["needsSkillInstall"]; got != true {
t.Fatalf("expected first pass to request install approval, got %#v", got)
}
requestID, _ := result["skillInstallRequestId"].(string)
if strings.TrimSpace(requestID) == "" {
t.Fatalf("expected install request id, got %#v", result)
}
retried := handleRoutingResolve(map[string]any{
"taskPrompt": "translate and dub this video with subtitles",
"workingDirectory": "/tmp/workspace",
"routing": map[string]any{
"routingMode": "auto",
"allowSkillInstall": true,
"installApproval": map[string]any{
"requestId": requestID,
"approvedSkillKeys": []any{"video-translator"},
},
"availableSkills": []any{
map[string]any{
"id": "docx",
"label": "docx",
"description": "docs",
"installed": true,
},
},
},
})
if got := retried["needsSkillInstall"]; got != false {
t.Fatalf("expected install retry to clear needsSkillInstall, got %#v", got)
}
resolvedSkills, _ := retried["resolvedSkills"].([]string)
if len(resolvedSkills) != 1 || resolvedSkills[0] != "video-translator" {
t.Fatalf("expected installed skill to resolve, got %#v", retried["resolvedSkills"])
}
}