fix: expose session result recovery snapshot

This commit is contained in:
Haitao Pan 2026-05-18 18:33:09 +08:00
parent 0552677080
commit c134cb9ab4
5 changed files with 157 additions and 11 deletions

View File

@ -573,6 +573,28 @@ func asMap(value any) map[string]any {
return nil
}
func cloneMap(source map[string]any) map[string]any {
if source == nil {
return nil
}
result := make(map[string]any, len(source))
for key, value := range source {
result[key] = value
}
return result
}
func cloneMapSlice(source []map[string]any) []map[string]any {
if source == nil {
return nil
}
result := make([]map[string]any, 0, len(source))
for _, item := range source {
result = append(result, cloneMap(item))
}
return result
}
func parseSkillsCandidates(raw []any) []skills.Candidate {
result := make([]skills.Candidate, 0, len(raw))
for _, item := range raw {

View File

@ -1123,6 +1123,10 @@ func (o *SessionOrchestrator) normalizeResult(sess *session, result map[string]a
})
}
sess.mu.Lock()
sess.lastResult = cloneMap(result)
sess.mu.Unlock()
return result
}

View File

@ -59,6 +59,9 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
case "xworkmate.jobs.submit", "xworkmate.jobs.get", "xworkmate.jobs.list", "xworkmate.jobs.stats":
return s.handleJobMethod(ctx, method, request.Params, notify)
case "xworkmate.sessions.get":
return s.handleSessionGet(request.Params), nil
case "xworkmate.tools.invoke":
return s.invokeOpenClawTool(ctx, request.Params)
@ -70,6 +73,65 @@ func (s *Server) handleRequest(request shared.RPCRequest, notify func(map[string
}
}
func (s *Server) handleSessionGet(params map[string]any) map[string]any {
sessionID := strings.TrimSpace(shared.StringArg(params, "sessionId", ""))
threadID := strings.TrimSpace(shared.StringArg(params, "threadId", ""))
if sessionID == "" && threadID == "" {
return map[string]any{"status": "not_found"}
}
s.mu.RLock()
sess := s.sessions[sessionID]
if sess == nil && threadID != "" {
for _, candidate := range s.sessions {
if candidate != nil && candidate.threadID == threadID {
sess = candidate
break
}
}
}
s.mu.RUnlock()
if sess == nil {
return map[string]any{
"status": "not_found",
"sessionId": sessionID,
"threadId": threadID,
}
}
sess.mu.Lock()
defer sess.mu.Unlock()
payload := map[string]any{
"status": string(sess.task.State),
"sessionId": sess.sessionID,
"threadId": sess.threadID,
"task": map[string]any{
"sessionId": sess.task.SessionID,
"threadId": sess.task.ThreadID,
"turnId": sess.task.TurnID,
"provider": sess.task.Provider,
"target": sess.task.Target,
"state": string(sess.task.State),
"kind": string(sess.task.Kind),
"updatedAt": sess.task.UpdatedAt.UTC().Format(time.RFC3339Nano),
},
}
if len(sess.lastResult) > 0 {
payload["result"] = cloneMap(sess.lastResult)
}
if len(sess.artifacts.Artifacts) > 0 ||
sess.artifacts.RemoteWorkingDirectory != "" ||
sess.artifacts.RemoteWorkspaceRefKind != "" ||
sess.artifacts.ResultSummary != "" {
payload["artifacts"] = map[string]any{
"items": cloneMapSlice(sess.artifacts.Artifacts),
"remoteWorkingDirectory": sess.artifacts.RemoteWorkingDirectory,
"remoteWorkspaceRefKind": sess.artifacts.RemoteWorkspaceRefKind,
"resultSummary": sess.artifacts.ResultSummary,
"updatedAt": sess.artifacts.UpdatedAt.UTC().Format(time.RFC3339Nano),
}
}
return payload
}
func (s *Server) cancelSession(ctx context.Context, sessionID string) {
s.mu.RLock()
sess, ok := s.sessions[sessionID]

View File

@ -57,17 +57,18 @@ type ArtifactRecord struct {
}
type session struct {
sessionID string
threadID string
mode string
provider string // The Provider ID
target string // The Execution Target ID
compat ProviderCompat
mu sync.Mutex
history []string
control ControlPlaneSession
task QueuedTask
artifacts ArtifactRecord
sessionID string
threadID string
mode string
provider string // The Provider ID
target string // The Execution Target ID
compat ProviderCompat
mu sync.Mutex
history []string
control ControlPlaneSession
task QueuedTask
artifacts ArtifactRecord
lastResult map[string]any
}
type Server struct {

View File

@ -707,6 +707,63 @@ func TestHTTPHandlerGatewayOpenClawForcesGatewayRouting(t *testing.T) {
}
}
func TestHTTPHandlerSessionGetReturnsCompletedOpenClawResult(t *testing.T) {
gateway := newAcpFakeOpenClawGateway(t)
defer gateway.Close()
t.Setenv("GATEWAY_RPC_URL", gateway.URL())
t.Setenv("BRIDGE_AUTH_TOKEN", "bridge-test-token")
t.Setenv("BRIDGE_CONFIG_PATH", filepath.Join(t.TempDir(), "missing-config.yaml"))
server := NewServer()
handler := server.Handler()
startRecorder := httptest.NewRecorder()
startRequest := httptest.NewRequest(
http.MethodPost,
"http://127.0.0.1/gateway/openclaw",
strings.NewReader(`{"jsonrpc":"2.0","id":"task-1","method":"session.start","params":{"sessionId":"s1","threadId":"t1","taskPrompt":"Reply pong","workingDirectory":"`+t.TempDir()+`"}}`),
)
startRequest.Header.Set("Content-Type", "application/json")
startRequest.Header.Set("Authorization", "Bearer bridge-test-token")
handler.ServeHTTP(startRecorder, startRequest)
if startRecorder.Code != http.StatusOK {
t.Fatalf("expected start 200, got %d: %s", startRecorder.Code, startRecorder.Body.String())
}
getRecorder := httptest.NewRecorder()
getRequest := httptest.NewRequest(
http.MethodPost,
"http://127.0.0.1/acp/rpc",
strings.NewReader(`{"jsonrpc":"2.0","id":"get-1","method":"xworkmate.sessions.get","params":{"sessionId":"s1","threadId":"t1"}}`),
)
getRequest.Header.Set("Content-Type", "application/json")
getRequest.Header.Set("Authorization", "Bearer bridge-test-token")
handler.ServeHTTP(getRecorder, getRequest)
if getRecorder.Code != http.StatusOK {
t.Fatalf("expected get 200, got %d: %s", getRecorder.Code, getRecorder.Body.String())
}
var decoded map[string]any
if err := json.Unmarshal(getRecorder.Body.Bytes(), &decoded); err != nil {
t.Fatalf("decode session get response: %v", err)
}
result := shared.AsMap(decoded["result"])
if got := result["status"]; got != "completed" {
t.Fatalf("expected completed status, got %#v from %#v", got, result)
}
task := shared.AsMap(result["task"])
if got := task["turnId"]; got == "" {
t.Fatalf("expected retained task turn id, got %#v", task)
}
snapshot := shared.AsMap(result["result"])
if got := snapshot["resolvedGatewayProviderId"]; got != "openclaw" {
t.Fatalf("expected OpenClaw snapshot, got %#v", snapshot)
}
if got := snapshot["output"]; got == "" {
t.Fatalf("expected output in session snapshot, got %#v", snapshot)
}
}
func TestSafeSSEStreamDropsLateNotificationsAfterClose(t *testing.T) {
writer := &panicSSEWriter{header: http.Header{}}
stream := newSafeSSEStream(context.Background(), writer, safeSSEStreamMeta{})