fix: emit OpenClaw SSE accepted event

This commit is contained in:
Haitao Pan 2026-05-08 18:58:51 +08:00
parent c08d5d6472
commit ed04d91dad
3 changed files with 125 additions and 11 deletions

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
@ -196,7 +197,13 @@ func (s *Server) handleRPCWithTransform(
w.Header().Set("Connection", "keep-alive")
}
streamWriter := newSafeSSEStream(r.Context(), w)
streamWriter := newSafeSSEStream(r.Context(), w, safeSSEStreamMeta{
Path: r.URL.Path,
Method: request.Method,
SessionID: shared.StringArg(request.Params, "sessionId", ""),
ThreadID: shared.StringArg(request.Params, "threadId", ""),
RequestID: fmt.Sprint(request.ID),
})
stopKeepalive := func() {}
writeNotification := func(message map[string]any) {
if !stream {
@ -205,6 +212,19 @@ func (s *Server) handleRPCWithTransform(
streamWriter.write(message)
}
if stream {
if r.URL.Path == "/gateway/openclaw" {
streamWriter.write(map[string]any{
"jsonrpc": "2.0",
"method": "xworkmate.bridge.accepted",
"params": map[string]any{
"sessionId": shared.StringArg(request.Params, "sessionId", ""),
"threadId": shared.StringArg(request.Params, "threadId", ""),
"method": request.Method,
"path": r.URL.Path,
"acceptedAt": time.Now().UTC().Format(time.RFC3339Nano),
},
})
}
stopKeepalive = streamWriter.startKeepalive(httpSSEKeepaliveInterval)
}
defer stopKeepalive()
@ -246,23 +266,32 @@ type safeSSEStream struct {
ctx context.Context
w http.ResponseWriter
flusher http.Flusher
meta safeSSEStreamMeta
closed atomic.Bool
mu sync.Mutex
}
func newSafeSSEStream(ctx context.Context, w http.ResponseWriter) *safeSSEStream {
type safeSSEStreamMeta struct {
Path string
Method string
RequestID string
SessionID string
ThreadID string
}
func newSafeSSEStream(ctx context.Context, w http.ResponseWriter, meta safeSSEStreamMeta) *safeSSEStream {
flusher, _ := w.(http.Flusher)
return &safeSSEStream{ctx: ctx, w: w, flusher: flusher}
return &safeSSEStream{ctx: ctx, w: w, flusher: flusher, meta: meta}
}
func (s *safeSSEStream) write(payload map[string]any) bool {
return s.writeRaw(func() error {
return s.writeRaw(sseEventType(payload), func() error {
return shared.WriteSSE(s.w, payload)
})
}
func (s *safeSSEStream) done() bool {
return s.writeRaw(func() error {
return s.writeRaw("done", func() error {
_, err := s.w.Write([]byte("data: [DONE]\n\n"))
return err
})
@ -305,13 +334,14 @@ func (s *safeSSEStream) close() {
s.closed.Store(true)
}
func (s *safeSSEStream) writeRaw(write func() error) (ok bool) {
func (s *safeSSEStream) writeRaw(eventType string, write func() error) (ok bool) {
if s == nil || s.closed.Load() {
return false
}
select {
case <-s.ctx.Done():
s.closed.Store(true)
s.logWriteFailure(eventType, "context_done", s.ctx.Err())
return false
default:
}
@ -321,13 +351,15 @@ func (s *safeSSEStream) writeRaw(write func() error) (ok bool) {
return false
}
defer func() {
if recover() != nil {
if recovered := recover(); recovered != nil {
s.closed.Store(true)
s.logWriteFailure(eventType, "panic", fmt.Errorf("%v", recovered))
ok = false
}
}()
if err := write(); err != nil {
s.closed.Store(true)
s.logWriteFailure(eventType, "write_failed", err)
return false
}
if s.flusher != nil {
@ -336,6 +368,43 @@ func (s *safeSSEStream) writeRaw(write func() error) (ok bool) {
return true
}
func (s *safeSSEStream) logWriteFailure(eventType string, reason string, err error) {
if s == nil {
return
}
errText := ""
if err != nil {
errText = err.Error()
}
log.Printf(
"level=warn component=acp_sse event=stream_write path=%q rpcMethod=%q requestId=%q sessionId=%q threadId=%q sseEvent=%q reason=%q error=%q",
s.meta.Path,
s.meta.Method,
s.meta.RequestID,
s.meta.SessionID,
s.meta.ThreadID,
eventType,
reason,
errText,
)
}
func sseEventType(payload map[string]any) string {
if payload == nil {
return "unknown"
}
if method, _ := payload["method"].(string); strings.TrimSpace(method) != "" {
return strings.TrimSpace(method)
}
if payload["result"] != nil {
return "result"
}
if payload["error"] != nil {
return "error"
}
return "unknown"
}
func forceOpenClawGatewayRequest(request shared.RPCRequest) (shared.RPCRequest, *shared.RPCError) {
method := strings.TrimSpace(request.Method)
switch method {

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"log"
"net/url"
"os"
"path"
@ -208,6 +209,7 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
return nil, rpcErr
}
artifactSinceUnixMs := time.Now().Add(-1 * time.Second).UnixMilli()
sendStarted := time.Now()
sendResult := o.openClawGatewayRequestWithRetry(
gatewayProvider,
"chat.send",
@ -215,11 +217,20 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
2*time.Minute,
notifyWithCollection,
)
logOpenClawGatewayTiming(
gatewayProvider,
"chat.send",
sessionKey,
turnID,
time.Since(sendStarted),
sendResult.OK,
)
if !sendResult.OK {
return nil, gatewayRPCError(sendResult.Error, "openclaw chat.send failed")
}
sendPayload := shared.AsMap(sendResult.Payload)
runID := strings.TrimSpace(shared.StringArg(sendPayload, "runId", turnID))
waitStarted := time.Now()
waitResult := o.openClawGatewayRequestWithRetry(
gatewayProvider,
"agent.wait",
@ -230,6 +241,14 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
openClawAgentWaitTimeout,
notifyWithCollection,
)
logOpenClawGatewayTiming(
gatewayProvider,
"agent.wait",
sessionKey,
runID,
time.Since(waitStarted),
waitResult.OK,
)
if !waitResult.OK {
return nil, gatewayRPCError(waitResult.Error, "openclaw agent.wait failed")
}
@ -278,6 +297,25 @@ func (o *SessionOrchestrator) runOpenClawGatewayChat(
return result, nil
}
func logOpenClawGatewayTiming(
gatewayProvider string,
method string,
sessionKey string,
runID string,
duration time.Duration,
ok bool,
) {
log.Printf(
"level=info component=openclaw_gateway event=request_timing provider=%q method=%q sessionId=%q runId=%q durationMs=%d ok=%t",
gatewayProvider,
method,
sessionKey,
runID,
duration.Milliseconds(),
ok,
)
}
func (o *SessionOrchestrator) openClawArtifactExportForDelivery(
gatewayProvider string,
chatParams map[string]any,

View File

@ -234,12 +234,13 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes
}
events := strings.Split(strings.TrimSpace(string(body)), "\n\n")
if len(events) < 3 {
t.Fatalf("expected keepalive, final envelope, and done events, got %q", string(body))
if len(events) < 4 {
t.Fatalf("expected accepted, keepalive, final envelope, and done events, got %q", string(body))
}
if events[len(events)-1] != "data: [DONE]" {
t.Fatalf("expected done event, got %q", events[len(events)-1])
}
var sawAcceptedBeforeKeepalive bool
var sawKeepaliveBeforeFinal bool
var sawFinal bool
for _, event := range events[:len(events)-1] {
@ -250,6 +251,9 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes
if err := json.Unmarshal([]byte(strings.TrimPrefix(event, "data: ")), &envelope); err != nil {
t.Fatalf("decode event %q: %v", event, err)
}
if envelope["method"] == "xworkmate.bridge.accepted" && !sawKeepaliveBeforeFinal && !sawFinal {
sawAcceptedBeforeKeepalive = true
}
if envelope["method"] == "xworkmate.bridge.keepalive" && !sawFinal {
sawKeepaliveBeforeFinal = true
}
@ -260,6 +264,9 @@ func TestHTTPHandlerGatewayOpenClawSSEKeepaliveBeforeFinalEnvelopeAndDone(t *tes
}
}
}
if !sawAcceptedBeforeKeepalive {
t.Fatalf("expected accepted event before keepalive/final envelope, got %q", string(body))
}
if !sawKeepaliveBeforeFinal {
t.Fatalf("expected keepalive event before final envelope, got %q", string(body))
}
@ -409,7 +416,7 @@ func TestHTTPHandlerGatewayOpenClawForcesGatewayRouting(t *testing.T) {
func TestSafeSSEStreamDropsLateNotificationsAfterClose(t *testing.T) {
writer := &panicSSEWriter{header: http.Header{}}
stream := newSafeSSEStream(context.Background(), writer)
stream := newSafeSSEStream(context.Background(), writer, safeSSEStreamMeta{})
stream.close()
if stream.write(map[string]any{"method": "xworkmate.gateway.push"}) {
@ -419,7 +426,7 @@ func TestSafeSSEStreamDropsLateNotificationsAfterClose(t *testing.T) {
t.Fatalf("expected no write after close, got %d", writer.writes)
}
openStream := newSafeSSEStream(context.Background(), writer)
openStream := newSafeSSEStream(context.Background(), writer, safeSSEStreamMeta{})
writer.panicOnWrite = true
if openStream.write(map[string]any{"method": "xworkmate.gateway.push"}) {
t.Fatal("expected panic writer to be marked closed")