fix(openclaw): recover final task snapshots

This commit is contained in:
Haitao Pan 2026-06-02 04:24:00 +08:00
parent d51a4b0678
commit 64862bdeb9
5 changed files with 314 additions and 26 deletions

View File

@ -74,17 +74,23 @@ Future<void> refreshAcpCapabilitiesRuntimeInternal(
.trim();
}
if (persistMountTargets && !controller.disposedInternal) {
final currentConfig = controller.settings.multiAgent;
final nextConfig = await controller.multiAgentMountManagerInternal
.reconcile(
config: currentConfig,
aiGatewayUrl: controller.aiGatewayUrl,
try {
final currentConfig = controller.settings.multiAgent;
final nextConfig = await controller.multiAgentMountManagerInternal
.reconcile(
config: currentConfig,
aiGatewayUrl: controller.aiGatewayUrl,
);
if (jsonEncode(nextConfig.toJson()) !=
jsonEncode(currentConfig.toJson())) {
await controller.settingsControllerInternal.saveSnapshot(
controller.settings.copyWith(multiAgent: nextConfig),
);
if (jsonEncode(nextConfig.toJson()) != jsonEncode(currentConfig.toJson())) {
await controller.settingsControllerInternal.saveSnapshot(
controller.settings.copyWith(multiAgent: nextConfig),
);
controller.multiAgentOrchestratorInternal.updateConfig(nextConfig);
controller.multiAgentOrchestratorInternal.updateConfig(nextConfig);
}
} catch (_) {
// Mount reconciliation is an optional bridge capability. A missing or
// older remote method must not block assistant startup or task execution.
}
}
if (!controller.disposedInternal) {

View File

@ -145,19 +145,6 @@ class ExternalCodeAgentAcpDesktopTransport
completedMessage: completedMessage,
);
} on GatewayAcpException catch (error) {
if (_isRecoverableTaskStreamClosure(error) &&
completedResultSnapshot != null) {
return goTaskServiceResultFromAcpResponse(
<String, dynamic>{
'jsonrpc': '2.0',
'id': 'recovered-from-completed-session-update',
'result': completedResultSnapshot,
},
route: request.route,
streamedText: streamedText,
completedMessage: completedMessage,
);
}
if (_isRecoverableTaskStreamClosure(error)) {
final recovered = await _recoverTaskResultAfterStreamClosure(
request,
@ -166,10 +153,23 @@ class ExternalCodeAgentAcpDesktopTransport
: _taskEndpointResolver.call(request),
streamedText: streamedText,
completedMessage: completedMessage,
fallbackAvailable: completedResultSnapshot != null,
);
if (recovered != null) {
return recovered;
}
if (completedResultSnapshot != null) {
return goTaskServiceResultFromAcpResponse(
<String, dynamic>{
'jsonrpc': '2.0',
'id': 'recovered-from-completed-session-update',
'result': completedResultSnapshot,
},
route: request.route,
streamedText: streamedText,
completedMessage: completedMessage,
);
}
}
rethrow;
} on SocketException catch (error) {
@ -201,6 +201,7 @@ class ExternalCodeAgentAcpDesktopTransport
required Uri? taskEndpoint,
required String streamedText,
required String? completedMessage,
bool fallbackAvailable = false,
}) async {
final endpoint = _sessionSnapshotEndpoint(taskEndpoint);
if (endpoint == null) {
@ -222,6 +223,9 @@ class ExternalCodeAgentAcpDesktopTransport
endpointOverride: endpoint,
);
} on GatewayAcpException {
if (fallbackAvailable) {
return null;
}
continue;
} on SocketException {
continue;

View File

@ -770,10 +770,10 @@ GoTaskServiceResult goTaskServiceResultFromAcpResponse(
}();
final responseText = _extractGoTaskDisplayText(result);
final primaryText =
(completedMessage?.trim().isNotEmpty == true
? completedMessage!.trim()
: responseText.isNotEmpty
(responseText.isNotEmpty
? responseText
: completedMessage?.trim().isNotEmpty == true
? completedMessage!.trim()
: fallbackFailureText.isNotEmpty
? fallbackFailureText
: streamedText.trim().isNotEmpty

View File

@ -0,0 +1,163 @@
import 'dart:convert';
import 'dart:io';
import 'package:flutter_test/flutter_test.dart';
import 'package:xworkmate/app/app_controller.dart';
import 'package:xworkmate/runtime/multi_agent_mount_resolver.dart';
import 'package:xworkmate/runtime/multi_agent_mounts.dart';
import 'package:xworkmate/runtime/runtime_models.dart';
import 'package:xworkmate/runtime/secure_config_store.dart';
void main() {
test(
'capability refresh does not fail when remote mount reconcile is absent',
() async {
final server = await _CapabilityServer.start();
addTearDown(server.close);
final storeRoot = await Directory.systemTemp.createTemp(
'xworkmate-mount-resilience-',
);
addTearDown(() async {
if (await storeRoot.exists()) {
await storeRoot.delete(recursive: true);
}
});
final store = SecureConfigStore(
secretRootPathResolver: () async => '${storeRoot.path}/secrets',
appDataRootPathResolver: () async => '${storeRoot.path}/app-data',
supportRootPathResolver: () async => '${storeRoot.path}/support',
);
await store.initialize();
final settings = SettingsSnapshot.defaults();
final bridgeConfig = settings.acpBridgeServerModeConfig;
final selfHosted = bridgeConfig.selfHosted.copyWith(
serverUrl: server.endpoint,
username: 'admin',
);
await store.saveSecretValueByRef(selfHosted.passwordRef, 'bridge-token');
await store.saveSettingsSnapshot(
settings.copyWith(
acpBridgeServerModeConfig: bridgeConfig.copyWith(
selfHosted: selfHosted,
effective: AcpBridgeServerEffectiveConfig(
endpoint: server.endpoint,
tokenRef: selfHosted.passwordRef,
source: 'bridge',
reason: 'test bridge',
),
),
),
);
final controller = AppController(
store: store,
environmentOverride: <String, String>{'HOME': storeRoot.path},
multiAgentMountManager: MultiAgentMountManager(
resolver: _MissingRemoteMountResolver(),
),
);
addTearDown(controller.dispose);
await _waitForInitialization(controller);
expect(controller.bootstrapError, isNull);
expect(controller.bridgeCapabilitiesRefreshErrorInternal, isEmpty);
expect(
controller.gatewayProviderCatalog.map((item) => item.providerId),
contains('openclaw'),
);
},
);
}
Future<void> _waitForInitialization(AppController controller) async {
final deadline = DateTime.now().add(const Duration(seconds: 10));
while (controller.initializing && DateTime.now().isBefore(deadline)) {
await Future<void>.delayed(const Duration(milliseconds: 100));
}
if (controller.initializing) {
fail('controller did not initialize');
}
}
class _MissingRemoteMountResolver implements MultiAgentMountResolver {
@override
Future<MultiAgentConfig?> reconcile({
required MultiAgentConfig config,
required String aiGatewayUrl,
required String codexHome,
required String opencodeHome,
required ArisMountProbe arisProbe,
}) {
throw StateError('unknown method: xworkmate.mounts.reconcile');
}
@override
Future<void> dispose() async {}
}
class _CapabilityServer {
_CapabilityServer(this._server);
final HttpServer _server;
String get endpoint => 'http://${_server.address.host}:${_server.port}';
static Future<_CapabilityServer> start() async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
final wrapper = _CapabilityServer(server);
server.listen(wrapper._handle);
return wrapper;
}
Future<void> close() => _server.close(force: true);
Future<void> _handle(HttpRequest request) async {
final raw = await utf8.decoder.bind(request).join();
final decoded = jsonDecode(raw) as Map<String, dynamic>;
final method = decoded['method']?.toString() ?? '';
request.response.headers.contentType = ContentType.json;
if (method != 'acp.capabilities') {
request.response.write(
jsonEncode(<String, Object?>{
'jsonrpc': '2.0',
'id': decoded['id'],
'error': <String, Object?>{
'code': -32601,
'message': 'unknown method: $method',
},
}),
);
await request.response.close();
return;
}
request.response.write(
jsonEncode(<String, Object?>{
'jsonrpc': '2.0',
'id': decoded['id'],
'result': <String, Object?>{
'singleAgent': true,
'multiAgent': true,
'availableExecutionTargets': <String>['agent', 'gateway'],
'providerCatalog': <Map<String, Object?>>[
<String, Object?>{
'providerId': 'codex',
'label': 'Codex',
'targets': <String>['agent'],
},
],
'gatewayProviders': <Map<String, Object?>>[
<String, Object?>{
'providerId': 'openclaw',
'label': 'OpenClaw',
'targets': <String>['gateway'],
},
],
},
}),
);
await request.response.close();
}
}

View File

@ -780,6 +780,121 @@ void main() {
},
);
test(
'prefers final OpenClaw snapshot over early completed SSE update after connection close',
() async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
addTearDown(() => server.close(force: true));
final requestMethods = <String>[];
server.listen((request) async {
final body = await utf8.decoder.bind(request).join();
final decoded = jsonDecode(body) as Map<String, dynamic>;
final method = decoded['method']?.toString() ?? '';
final id = decoded['id']?.toString() ?? 'request-id';
requestMethods.add(method);
if (method == 'session.start') {
final event = jsonEncode(<String, dynamic>{
'jsonrpc': '2.0',
'method': 'session.update',
'params': <String, dynamic>{
'sessionId': 'unit-fixture-task-final',
'threadId': 'unit-fixture-task-final',
'turnId': 'turn-final',
'type': 'status',
'event': 'completed',
'message': 'early completed output without artifacts',
'success': true,
},
});
final eventBytes = utf8.encode('data: $event\n\n');
request.response.headers.set(
HttpHeaders.contentTypeHeader,
'text/event-stream',
);
request.response.contentLength = eventBytes.length + 128;
final socket = await request.response.detachSocket();
socket.add(eventBytes);
await socket.flush();
socket.destroy();
return;
}
if (method == 'xworkmate.sessions.get') {
request.response.headers.contentType = ContentType.json;
request.response.write(
jsonEncode(<String, dynamic>{
'jsonrpc': '2.0',
'id': id,
'result': <String, dynamic>{
'status': 'completed',
'sessionId': 'unit-fixture-task-final',
'threadId': 'unit-fixture-task-final',
'task': <String, dynamic>{
'state': 'completed',
'turnId': 'turn-final',
},
'result': <String, dynamic>{
'success': true,
'output': 'final snapshot output with artifacts',
'turnId': 'turn-final',
},
'artifacts': <String, dynamic>{
'items': <Map<String, dynamic>>[
<String, dynamic>{
'relativePath': 'reports/final.md',
'downloadUrl':
'https://xworkmate-bridge.svc.plus/artifacts/openclaw/download'
'?sessionKey=unit-fixture-task-final&runId=turn-final&relativePath=reports%2Ffinal.md',
'contentType': 'text/markdown',
'sizeBytes': 64,
},
],
},
},
}),
);
await request.response.close();
return;
}
request.response.statusCode = HttpStatus.badRequest;
await request.response.close();
});
final endpoint = Uri.parse('http://127.0.0.1:${server.port}');
final transport = ExternalCodeAgentAcpDesktopTransport(
client: GatewayAcpClient(endpointResolver: () => endpoint),
endpointResolver: (_) => endpoint,
taskEndpointResolver: (_) => endpoint,
);
addTearDown(transport.dispose);
final result = await transport.executeTask(
const GoTaskServiceRequest(
sessionId: 'unit-fixture-task-final',
threadId: 'unit-fixture-task-final',
target: AssistantExecutionTarget.gateway,
provider: SingleAgentProvider.openclaw,
prompt: 'create files',
workingDirectory: '/tmp/workspace',
model: '',
thinking: 'off',
selectedSkills: <String>[],
inlineAttachments: <GatewayChatAttachmentPayload>[],
localAttachments: <CollaborationAttachment>[],
agentId: '',
metadata: <String, dynamic>{},
),
onUpdate: (_) {},
);
expect(result.success, isTrue);
expect(result.message, 'final snapshot output with artifacts');
expect(result.artifacts.single.relativePath, 'reports/final.md');
expect(requestMethods, <String>[
'session.start',
'xworkmate.sessions.get',
]);
},
);
test(
'recovers OpenClaw follow-up from bridge session snapshot after SSE ends without final envelope',
() async {