diff --git a/internal/acp/openclaw_async_tasks.go b/internal/acp/openclaw_async_tasks.go index 66dba3d..c9db911 100644 --- a/internal/acp/openclaw_async_tasks.go +++ b/internal/acp/openclaw_async_tasks.go @@ -348,7 +348,43 @@ func (o *SessionOrchestrator) probeOpenClawTask(ctx context.Context, sess *sessi sess.openClaw.FirstSilentFailureAt = time.Time{} } sess.mu.Unlock() - return o.completeOpenClawTask(sess, shared.AsMap(waitResult.Payload), collector, notify) + waitPayload := shared.AsMap(waitResult.Payload) + if !openClawWaitPayloadTerminal(waitPayload) && !collector.isTerminal() { + return openClawMarkProbeRunning(sess) + } + return o.completeOpenClawTask(sess, waitPayload, collector, notify) +} + +func openClawMarkProbeRunning(sess *session) map[string]any { + sess.mu.Lock() + if sess.openClaw != nil { + sess.openClaw.ProgressStage = "running" + sess.openClaw.ProgressMessage = "OpenClaw task is still running" + sess.openClaw.ProbeInFlight = false + } + sess.task.ProgressStage = "running" + sess.task.ProgressMessage = "OpenClaw task is still running" + sess.task.UpdatedAt = time.Now() + result := openClawRunningTaskResult(sess.openClaw) + sess.lastResult = cloneMap(result) + sess.mu.Unlock() + return result +} + +func openClawWaitPayloadTerminal(payload map[string]any) bool { + if payload == nil { + return false + } + if value, ok := payload["terminal"].(bool); ok { + return value + } + for _, key := range []string{"status", "state", "phase"} { + switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, key, ""))) { + case "complete", "completed", "done", "final", "success", "succeeded", "failed", "failure", "error", "timeout", "timed_out", "cancelled", "canceled": + return true + } + } + return false } func openClawSilentFailureExceeded(config *BridgeConfig, firstFailureAt time.Time, now time.Time) bool { diff --git a/internal/acp/orchestrator.go b/internal/acp/orchestrator.go index f0b6e2c..ff3d15a 100644 --- a/internal/acp/orchestrator.go +++ b/internal/acp/orchestrator.go @@ -1606,6 +1606,7 @@ func firstNonEmptyString(values map[string]any, keys ...string) string { type openClawChatCollector struct { parts []string final string + terminal bool artifactPayloads []map[string]any } @@ -1628,6 +1629,9 @@ func (c *openClawChatCollector) observe(notification map[string]any) { if strings.TrimSpace(shared.StringArg(event, "event", "")) != "chat.run" { return } + if isTerminalGatewayPayload(payload) { + c.terminal = true + } text := firstNonEmptyString(payload, "assistantText", "text", "message", "output", "summary") if text == "" { return @@ -1649,6 +1653,10 @@ func (c *openClawChatCollector) output() string { return strings.TrimSpace(strings.Join(c.parts, "")) } +func (c *openClawChatCollector) isTerminal() bool { + return c != nil && c.terminal +} + func (c *openClawChatCollector) artifactPayload() map[string]any { if c == nil || len(c.artifactPayloads) == 0 { return nil @@ -1714,7 +1722,7 @@ func isTerminalGatewayPayload(payload map[string]any) bool { return true } switch strings.TrimSpace(strings.ToLower(shared.StringArg(payload, "state", ""))) { - case "complete", "completed", "done", "ok", "success", "failed", "error", "timeout", "timed_out", "cancelled", "canceled": + case "complete", "completed", "done", "final", "ok", "success", "failed", "error", "timeout", "timed_out", "cancelled", "canceled": return true default: return false diff --git a/internal/acp/routing_test.go b/internal/acp/routing_test.go index e74e0fa..98c5972 100644 --- a/internal/acp/routing_test.go +++ b/internal/acp/routing_test.go @@ -733,7 +733,7 @@ func TestExecuteSessionTaskGatewayNoDisplayableOutputFails(t *testing.T) { Params: map[string]any{ "sessionId": "session-openclaw-no-output", "threadId": "thread-openclaw-no-output", - "taskPrompt": "silent-turn", + "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "routing": map[string]any{ "routingMode": "explicit", @@ -899,6 +899,55 @@ func TestExecuteSessionTaskGatewayFailsArtifactContractAfterWaitFailure(t *testi } } +func TestExecuteSessionTaskGatewayKeepsRunningOnNonTerminalWaitPayload(t *testing.T) { + gateway := newAcpFakeOpenClawGateway(t) + defer gateway.Close() + gateway.artifactWorkspaceRoot = t.TempDir() + + t.Setenv("GATEWAY_RPC_URL", gateway.URL()) + t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-token") + + server := NewServer() + response, rpcErr := server.executeSessionTask(task{ + req: shared.RPCRequest{ + Method: "session.start", + Params: map[string]any{ + "sessionId": "session-openclaw-running-wait", + "threadId": "thread-openclaw-running-wait", + "taskPrompt": "wait-running", + "workingDirectory": t.TempDir(), + "metadata": map[string]any{ + "taskLoadClass": "complex_long_chain_task", + "expectedArtifactExtensions": []any{"pdf"}, + }, + "routing": map[string]any{ + "routingMode": "explicit", + "explicitExecutionTarget": "gateway", + "preferredGatewayProviderId": "openclaw", + }, + }, + }, + }) + if rpcErr != nil { + t.Fatalf("expected running wait payload to keep task running, got rpc error: %#v", rpcErr) + } + if got := response["status"]; got != string(TaskStateRunning) { + t.Fatalf("expected running status from non-terminal wait payload, got %#v", response) + } + if got := gateway.ChatSendCount(); got != 1 { + t.Fatalf("expected no repair turn, got %d", got) + } + if got := gateway.AgentWaitCount(); got != 1 { + t.Fatalf("expected one status probe, got %d", got) + } + if got := gateway.ArtifactExportCount(); got != 0 { + t.Fatalf("expected no artifact export before terminal wait payload, got %d", got) + } + if _, ok := response["code"]; ok { + t.Fatalf("expected no terminal failure code, got %#v", response) + } +} + func TestExecuteSessionTaskGatewayArtifactContractNoFilesRequiresFinalArtifact(t *testing.T) { gateway := newAcpFakeOpenClawGateway(t) defer gateway.Close() @@ -913,7 +962,7 @@ func TestExecuteSessionTaskGatewayArtifactContractNoFilesRequiresFinalArtifact(t Params: map[string]any{ "sessionId": "session-openclaw-no-complex-output", "threadId": "thread-openclaw-no-complex-output", - "taskPrompt": "silent-turn", + "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "metadata": map[string]any{ "taskLoadClass": "complex_long_chain_task", @@ -964,7 +1013,7 @@ func TestExecuteSessionTaskGatewaySimpleArtifactContractNoFilesRequiresFinalArti Params: map[string]any{ "sessionId": "session-openclaw-simple-md", "threadId": "thread-openclaw-simple-md", - "taskPrompt": "silent-turn", + "taskPrompt": "completed-empty", "workingDirectory": t.TempDir(), "metadata": map[string]any{ "expectedArtifactExtensions": []any{"md"}, @@ -2943,6 +2992,29 @@ func newAcpFakeOpenClawGateway(t *testing.T) *acpFakeOpenClawGateway { }, }) continue + case "wait-running": + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": map[string]any{ + "runId": runID, + "status": "running", + "terminal": false, + }, + }) + continue + case "completed-empty": + _ = conn.WriteJSON(map[string]any{ + "type": "res", + "id": id, + "ok": true, + "payload": map[string]any{ + "runId": runID, + "status": "completed", + }, + }) + continue } message := "gateway pong" if strings.Contains(fake.runMessage(runID), "hallucinate-files") { diff --git a/internal/acp/rpc_handler.go b/internal/acp/rpc_handler.go index 388e2fd..154e217 100644 --- a/internal/acp/rpc_handler.go +++ b/internal/acp/rpc_handler.go @@ -20,6 +20,20 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string case "health": return map[string]any{"status": "ok", "version": "0.7.0", "role": "acp-control-plane"}, nil + case "system.logs": + gatewayStatus := "disconnected" + if s.gateway != nil { + if s.gateway.HasConnectedSession() { + gatewayStatus = "connected" + } + } + + return map[string]any{ + "bridgeStatus": "ok", + "gatewayStatus": gatewayStatus, + "bridgeLogs": shared.GlobalLogBuffer.GetLines(), + }, nil + case "acp.capabilities": return s.catalog.Get(), nil @@ -363,4 +377,3 @@ func (s *Server) handleDesktopMethod(ctx context.Context, method string, params return nil, &shared.RPCError{Code: -32601, Message: fmt.Sprintf("unknown desktop method: %s", method)} } } - diff --git a/internal/desktop/input.go b/internal/desktop/input.go index b65a832..078b067 100644 --- a/internal/desktop/input.go +++ b/internal/desktop/input.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "os" "os/exec" "strconv" "strings" @@ -62,7 +63,7 @@ func (xi *XdotoolInjector) Start() error { // 2. Launch persistent xdotool process cmd := exec.Command("xdotool", "-") - cmd.Env = append(cmd.Env, "DISPLAY="+xi.display) + cmd.Env = desktopCommandEnv(xi.display) stdin, err := cmd.StdinPipe() if err != nil { @@ -165,7 +166,7 @@ func (xi *XdotoolInjector) Close() error { func (xi *XdotoolInjector) queryDisplayGeometry() (int, int, error) { cmd := exec.Command("xdotool", "getdisplaygeometry") - cmd.Env = append(cmd.Env, "DISPLAY="+xi.display) + cmd.Env = desktopCommandEnv(xi.display) out, err := cmd.Output() if err != nil { return 0, 0, err @@ -185,6 +186,21 @@ func (xi *XdotoolInjector) queryDisplayGeometry() (int, int, error) { return w, h, nil } +func desktopCommandEnv(display string) []string { + env := os.Environ() + if strings.TrimSpace(display) == "" { + return env + } + filtered := make([]string, 0, len(env)+1) + for _, item := range env { + if strings.HasPrefix(item, "DISPLAY=") { + continue + } + filtered = append(filtered, item) + } + return append(filtered, "DISPLAY="+display) +} + func (xi *XdotoolInjector) mapButton(btn int) int { // Standard mapping: 1=left, 2=middle, 3=right if btn <= 0 || btn > 3 { diff --git a/internal/desktop/input_test.go b/internal/desktop/input_test.go new file mode 100644 index 0000000..683184e --- /dev/null +++ b/internal/desktop/input_test.go @@ -0,0 +1,46 @@ +package desktop + +import ( + "strings" + "testing" +) + +func TestDesktopCommandEnvPreservesProcessEnvironmentAndOverridesDisplay(t *testing.T) { + t.Setenv("PATH", "/usr/local/bin:/usr/bin") + t.Setenv("HOME", "/home/ubuntu") + t.Setenv("DISPLAY", ":old") + + env := desktopCommandEnv(":0.0") + + if !envContains(env, "PATH=/usr/local/bin:/usr/bin") { + t.Fatalf("expected PATH to be preserved, got %#v", env) + } + if !envContains(env, "HOME=/home/ubuntu") { + t.Fatalf("expected HOME to be preserved, got %#v", env) + } + if !envContains(env, "DISPLAY=:0.0") { + t.Fatalf("expected DISPLAY override, got %#v", env) + } + if countEnvPrefix(env, "DISPLAY=") != 1 { + t.Fatalf("expected exactly one DISPLAY entry, got %#v", env) + } +} + +func envContains(env []string, expected string) bool { + for _, item := range env { + if item == expected { + return true + } + } + return false +} + +func countEnvPrefix(env []string, prefix string) int { + count := 0 + for _, item := range env { + if strings.HasPrefix(item, prefix) { + count++ + } + } + return count +} diff --git a/internal/gatewayruntime/runtime.go b/internal/gatewayruntime/runtime.go index 0698c79..7543ab3 100644 --- a/internal/gatewayruntime/runtime.go +++ b/internal/gatewayruntime/runtime.go @@ -219,6 +219,23 @@ func (m *Manager) lookupConnectedByMode(mode string) *session { return nil } +func (m *Manager) HasConnectedSession() bool { + m.mu.Lock() + defer m.mu.Unlock() + for _, current := range m.sessions { + if current == nil { + continue + } + current.mu.Lock() + connected := current.snapshot.Status == "connected" + current.mu.Unlock() + if connected { + return true + } + } + return false +} + type session struct { manager *Manager runtimeID string diff --git a/internal/shared/logger.go b/internal/shared/logger.go new file mode 100644 index 0000000..e77eb10 --- /dev/null +++ b/internal/shared/logger.go @@ -0,0 +1,47 @@ +package shared + +import ( + "sync" +) + +// LogRingBuffer stores the last N log lines in memory. +type LogRingBuffer struct { + lines []string + size int + mu sync.Mutex +} + +// Global log buffer for the bridge +var GlobalLogBuffer = NewLogRingBuffer(200) + +// NewLogRingBuffer creates a new ring buffer with the given size. +func NewLogRingBuffer(size int) *LogRingBuffer { + return &LogRingBuffer{ + lines: make([]string, 0, size), + size: size, + } +} + +// Write implements io.Writer to intercept logs. +func (r *LogRingBuffer) Write(p []byte) (n int, err error) { + r.mu.Lock() + defer r.mu.Unlock() + + line := string(p) + if len(r.lines) >= r.size { + // pop front + r.lines = r.lines[1:] + } + r.lines = append(r.lines, line) + return len(p), nil +} + +// GetLines returns a copy of the current log lines. +func (r *LogRingBuffer) GetLines() []string { + r.mu.Lock() + defer r.mu.Unlock() + + res := make([]string, len(r.lines)) + copy(res, r.lines) + return res +} diff --git a/main.go b/main.go index 12a6876..52f4a52 100644 --- a/main.go +++ b/main.go @@ -3,12 +3,15 @@ package main import ( "encoding/json" "fmt" + "io" + "log" "os" "xworkmate-bridge/internal/acp" "xworkmate-bridge/internal/geminiadapter" "xworkmate-bridge/internal/hermesadapter" "xworkmate-bridge/internal/opencodeadapter" + "xworkmate-bridge/internal/shared" ) var ( @@ -18,6 +21,9 @@ var ( ) func main() { + // Intercept standard logs + log.SetOutput(io.MultiWriter(os.Stdout, shared.GlobalLogBuffer)) + acp.SetRuntimeVersionInfo(acp.RuntimeVersionInfo{ Commit: buildCommit, Version: buildVersion,