Fix ACP SSE no-result recovery

This commit is contained in:
Haitao Pan 2026-05-25 12:52:31 +08:00
parent c49badb55e
commit 25e8b17dd8
2 changed files with 103 additions and 4 deletions

View File

@ -145,7 +145,7 @@ class ExternalCodeAgentAcpDesktopTransport
completedMessage: completedMessage,
);
} on GatewayAcpException catch (error) {
if (error.code == 'ACP_HTTP_CONNECTION_CLOSED' &&
if (_isRecoverableTaskStreamClosure(error) &&
completedResultSnapshot != null) {
return goTaskServiceResultFromAcpResponse(
<String, dynamic>{
@ -158,8 +158,8 @@ class ExternalCodeAgentAcpDesktopTransport
completedMessage: completedMessage,
);
}
if (error.code == 'ACP_HTTP_CONNECTION_CLOSED') {
final recovered = await _recoverTaskResultAfterConnectionClosed(
if (_isRecoverableTaskStreamClosure(error)) {
final recovered = await _recoverTaskResultAfterStreamClosure(
request,
taskEndpoint: _taskEndpointResolver == null
? _endpointResolver(request.target)
@ -191,7 +191,12 @@ class ExternalCodeAgentAcpDesktopTransport
}
}
Future<GoTaskServiceResult?> _recoverTaskResultAfterConnectionClosed(
bool _isRecoverableTaskStreamClosure(GatewayAcpException error) {
return error.code == 'ACP_HTTP_CONNECTION_CLOSED' ||
error.code == 'ACP_SSE_NO_RESULT';
}
Future<GoTaskServiceResult?> _recoverTaskResultAfterStreamClosure(
GoTaskServiceRequest request, {
required Uri? taskEndpoint,
required String streamedText,

View File

@ -784,6 +784,100 @@ void main() {
},
);
test(
'recovers OpenClaw follow-up from bridge session snapshot after SSE ends without final envelope',
() async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
addTearDown(() => server.close(force: true));
final requestPaths = <String>[];
server.listen((request) async {
final body = await utf8.decoder.bind(request).join();
requestPaths.add(request.uri.path);
final decoded = jsonDecode(body) as Map<String, dynamic>;
final method = decoded['method']?.toString() ?? '';
final id = decoded['id']?.toString() ?? 'request-id';
if (method == 'session.message') {
final event = jsonEncode(<String, dynamic>{
'jsonrpc': '2.0',
'method': 'xworkmate.bridge.accepted',
'params': <String, dynamic>{'sessionId': 'unit-fixture-task-sse'},
});
request.response.headers.set(
HttpHeaders.contentTypeHeader,
'text/event-stream',
);
request.response.write('data: $event\n\n');
await request.response.close();
return;
}
if (method == 'xworkmate.sessions.get') {
request.response.headers.contentType = ContentType.json;
request.response.write(
jsonEncode(<String, dynamic>{
'jsonrpc': '2.0',
'id': id,
'result': <String, dynamic>{
'status': 'completed',
'sessionId': 'unit-fixture-task-sse',
'threadId': 'unit-fixture-task-sse',
'task': <String, dynamic>{
'state': 'completed',
'turnId': 'turn-recovered-sse',
},
'result': <String, dynamic>{
'success': true,
'output': 'recovered after SSE no result',
'turnId': 'turn-recovered-sse',
},
},
}),
);
await request.response.close();
return;
}
request.response.statusCode = HttpStatus.badRequest;
await request.response.close();
});
final endpoint = Uri.parse('http://127.0.0.1:${server.port}');
final transport = ExternalCodeAgentAcpDesktopTransport(
client: GatewayAcpClient(endpointResolver: () => endpoint),
endpointResolver: (_) => endpoint,
taskEndpointResolver: (_) =>
endpoint.replace(path: '/gateway/openclaw'),
recoveryPollDelay: Duration.zero,
recoveryMaxAttempts: 1,
);
addTearDown(transport.dispose);
final result = await transport.executeTask(
const GoTaskServiceRequest(
sessionId: 'unit-fixture-task-sse',
threadId: 'unit-fixture-task-sse',
target: AssistantExecutionTarget.gateway,
provider: SingleAgentProvider.openclaw,
prompt: 'recover from graceful SSE close',
workingDirectory: '/tmp/workspace',
model: '',
thinking: 'off',
resumeSession: true,
selectedSkills: <String>[],
inlineAttachments: <GatewayChatAttachmentPayload>[],
localAttachments: <CollaborationAttachment>[],
agentId: '',
metadata: <String, dynamic>{},
),
onUpdate: (_) {},
);
expect(result.success, isTrue);
expect(result.message, 'recovered after SSE no result');
expect(
requestPaths,
containsAll(<String>['/gateway/openclaw', '/acp/rpc']),
);
},
);
test(
'keeps polling running OpenClaw snapshot after SSE connection close',
() async {