fix: synthesize external ACP streamed output

This commit is contained in:
Haitao Pan 2026-04-10 10:46:46 +08:00
parent 51d8ed6946
commit 50d59b0d00
2 changed files with 126 additions and 2 deletions

View File

@ -153,14 +153,29 @@ func (s *Server) runSingleAgentViaExternalProvider(
return nil, fmt.Errorf("external provider endpoint is missing")
}
forwardParams := sanitizeExternalACPParams(method, params)
return requestExternalACP(
collector := &externalACPNotificationCollector{}
combinedNotify := func(message map[string]any) {
collector.observe(message)
if notify != nil {
notify(message)
}
}
response, err := requestExternalACP(
ctx,
endpoint,
provider.AuthorizationHeader,
method,
forwardParams,
notify,
combinedNotify,
)
if err != nil {
return nil, err
}
result := asMap(response["result"])
if len(result) == 0 {
result = response
}
return collector.apply(result), nil
}
func resolveSingleAgentForwardEndpoint(provider syncedProvider) string {
@ -306,6 +321,80 @@ func normalizeAuthorizationHeader(raw string) string {
return "Bearer " + normalized
}
type externalACPNotificationCollector struct {
deltas strings.Builder
lastMessage string
turnID string
workingDirectory string
}
func (c *externalACPNotificationCollector) observe(notification map[string]any) {
method := strings.TrimSpace(shared.StringArg(notification, "method", ""))
if method != "session.update" && method != "acp.session.update" {
return
}
params := asMap(notification["params"])
if len(params) == 0 {
return
}
if turnID := strings.TrimSpace(shared.StringArg(params, "turnId", "")); turnID != "" {
c.turnID = turnID
}
for _, key := range []string{"resolvedWorkingDirectory", "effectiveWorkingDirectory", "workingDirectory"} {
if workingDirectory := strings.TrimSpace(shared.StringArg(params, key, "")); workingDirectory != "" {
c.workingDirectory = workingDirectory
break
}
}
if delta := strings.TrimSpace(shared.StringArg(params, "delta", "")); delta != "" {
if c.deltas.Len() > 0 {
c.deltas.WriteString("\n")
}
c.deltas.WriteString(delta)
}
message := strings.TrimSpace(shared.StringArg(params, "message", ""))
if message == "" {
message = strings.TrimSpace(shared.StringArg(asMap(params["message"]), "content", ""))
}
if message != "" && message != "session started" && message != "single-agent completed" {
c.lastMessage = message
}
}
func (c *externalACPNotificationCollector) apply(result map[string]any) map[string]any {
if result == nil {
result = map[string]any{}
}
text := strings.TrimSpace(shared.StringArg(result, "output", ""))
if text == "" {
text = strings.TrimSpace(shared.StringArg(result, "summary", ""))
}
if text == "" {
text = strings.TrimSpace(shared.StringArg(result, "message", ""))
}
if text == "" {
text = strings.TrimSpace(c.deltas.String())
}
if text == "" {
text = strings.TrimSpace(c.lastMessage)
}
if text != "" {
if _, exists := result["output"]; !exists {
result["output"] = text
}
if _, exists := result["summary"]; !exists {
result["summary"] = text
}
}
if _, exists := result["turnId"]; !exists && strings.TrimSpace(c.turnID) != "" {
result["turnId"] = strings.TrimSpace(c.turnID)
}
if _, exists := result["resolvedWorkingDirectory"]; !exists && strings.TrimSpace(c.workingDirectory) != "" {
result["resolvedWorkingDirectory"] = strings.TrimSpace(c.workingDirectory)
}
return result
}
func requestExternalACPWebSocket(
ctx context.Context,
endpoint *urlSpec,

View File

@ -296,3 +296,38 @@ func TestHandleRPCForwardsInboundBearerToExternalProvider(t *testing.T) {
t.Fatalf("expected forwarded provider response, got %q", recorder.Body.String())
}
}
func TestExternalACPNotificationCollectorSynthesizesOutputAndWorkspace(t *testing.T) {
collector := &externalACPNotificationCollector{}
collector.observe(map[string]any{
"jsonrpc": "2.0",
"method": "session.update",
"params": map[string]any{
"sessionId": "session-streamed",
"threadId": "thread-streamed",
"turnId": "turn-streamed",
"type": "delta",
"delta": "streamed external output",
"resolvedWorkingDirectory": "/tmp/thread-streamed",
"pending": false,
"error": false,
},
})
result := collector.apply(map[string]any{
"success": true,
})
if got := result["output"]; got != "streamed external output" {
t.Fatalf("expected synthesized output from notifications, got %#v", result)
}
if got := result["summary"]; got != "streamed external output" {
t.Fatalf("expected synthesized summary from notifications, got %#v", result)
}
if got := result["turnId"]; got != "turn-streamed" {
t.Fatalf("expected synthesized turnId, got %#v", result)
}
if got := result["resolvedWorkingDirectory"]; got != "/tmp/thread-streamed" {
t.Fatalf("expected synthesized working directory, got %#v", result)
}
}