From 64862bdeb9938e90fe2aa067bb809ad3cd5e10f6 Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Tue, 2 Jun 2026 04:24:00 +0800 Subject: [PATCH] fix(openclaw): recover final task snapshots --- ...ler_desktop_runtime_coordination_impl.dart | 26 +-- ...rnal_code_agent_acp_desktop_transport.dart | 30 ++-- lib/runtime/go_task_service_client.dart | 6 +- ..._controller_acp_mount_resilience_test.dart | 163 ++++++++++++++++++ .../runtime/gateway_acp_client_auth_test.dart | 115 ++++++++++++ 5 files changed, 314 insertions(+), 26 deletions(-) create mode 100644 test/runtime/app_controller_acp_mount_resilience_test.dart diff --git a/lib/app/app_controller_desktop_runtime_coordination_impl.dart b/lib/app/app_controller_desktop_runtime_coordination_impl.dart index 3cf6efae..8b177b5d 100644 --- a/lib/app/app_controller_desktop_runtime_coordination_impl.dart +++ b/lib/app/app_controller_desktop_runtime_coordination_impl.dart @@ -74,17 +74,23 @@ Future refreshAcpCapabilitiesRuntimeInternal( .trim(); } if (persistMountTargets && !controller.disposedInternal) { - final currentConfig = controller.settings.multiAgent; - final nextConfig = await controller.multiAgentMountManagerInternal - .reconcile( - config: currentConfig, - aiGatewayUrl: controller.aiGatewayUrl, + try { + final currentConfig = controller.settings.multiAgent; + final nextConfig = await controller.multiAgentMountManagerInternal + .reconcile( + config: currentConfig, + aiGatewayUrl: controller.aiGatewayUrl, + ); + if (jsonEncode(nextConfig.toJson()) != + jsonEncode(currentConfig.toJson())) { + await controller.settingsControllerInternal.saveSnapshot( + controller.settings.copyWith(multiAgent: nextConfig), ); - if (jsonEncode(nextConfig.toJson()) != jsonEncode(currentConfig.toJson())) { - await controller.settingsControllerInternal.saveSnapshot( - controller.settings.copyWith(multiAgent: nextConfig), - ); - controller.multiAgentOrchestratorInternal.updateConfig(nextConfig); + controller.multiAgentOrchestratorInternal.updateConfig(nextConfig); + } + } catch (_) { + // Mount reconciliation is an optional bridge capability. A missing or + // older remote method must not block assistant startup or task execution. } } if (!controller.disposedInternal) { diff --git a/lib/runtime/external_code_agent_acp_desktop_transport.dart b/lib/runtime/external_code_agent_acp_desktop_transport.dart index 01af6485..c5881a9a 100644 --- a/lib/runtime/external_code_agent_acp_desktop_transport.dart +++ b/lib/runtime/external_code_agent_acp_desktop_transport.dart @@ -145,19 +145,6 @@ class ExternalCodeAgentAcpDesktopTransport completedMessage: completedMessage, ); } on GatewayAcpException catch (error) { - if (_isRecoverableTaskStreamClosure(error) && - completedResultSnapshot != null) { - return goTaskServiceResultFromAcpResponse( - { - 'jsonrpc': '2.0', - 'id': 'recovered-from-completed-session-update', - 'result': completedResultSnapshot, - }, - route: request.route, - streamedText: streamedText, - completedMessage: completedMessage, - ); - } if (_isRecoverableTaskStreamClosure(error)) { final recovered = await _recoverTaskResultAfterStreamClosure( request, @@ -166,10 +153,23 @@ class ExternalCodeAgentAcpDesktopTransport : _taskEndpointResolver.call(request), streamedText: streamedText, completedMessage: completedMessage, + fallbackAvailable: completedResultSnapshot != null, ); if (recovered != null) { return recovered; } + if (completedResultSnapshot != null) { + return goTaskServiceResultFromAcpResponse( + { + 'jsonrpc': '2.0', + 'id': 'recovered-from-completed-session-update', + 'result': completedResultSnapshot, + }, + route: request.route, + streamedText: streamedText, + completedMessage: completedMessage, + ); + } } rethrow; } on SocketException catch (error) { @@ -201,6 +201,7 @@ class ExternalCodeAgentAcpDesktopTransport required Uri? taskEndpoint, required String streamedText, required String? completedMessage, + bool fallbackAvailable = false, }) async { final endpoint = _sessionSnapshotEndpoint(taskEndpoint); if (endpoint == null) { @@ -222,6 +223,9 @@ class ExternalCodeAgentAcpDesktopTransport endpointOverride: endpoint, ); } on GatewayAcpException { + if (fallbackAvailable) { + return null; + } continue; } on SocketException { continue; diff --git a/lib/runtime/go_task_service_client.dart b/lib/runtime/go_task_service_client.dart index ab6791fc..d19cad45 100644 --- a/lib/runtime/go_task_service_client.dart +++ b/lib/runtime/go_task_service_client.dart @@ -770,10 +770,10 @@ GoTaskServiceResult goTaskServiceResultFromAcpResponse( }(); final responseText = _extractGoTaskDisplayText(result); final primaryText = - (completedMessage?.trim().isNotEmpty == true - ? completedMessage!.trim() - : responseText.isNotEmpty + (responseText.isNotEmpty ? responseText + : completedMessage?.trim().isNotEmpty == true + ? completedMessage!.trim() : fallbackFailureText.isNotEmpty ? fallbackFailureText : streamedText.trim().isNotEmpty diff --git a/test/runtime/app_controller_acp_mount_resilience_test.dart b/test/runtime/app_controller_acp_mount_resilience_test.dart new file mode 100644 index 00000000..207936fc --- /dev/null +++ b/test/runtime/app_controller_acp_mount_resilience_test.dart @@ -0,0 +1,163 @@ +import 'dart:convert'; +import 'dart:io'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:xworkmate/app/app_controller.dart'; +import 'package:xworkmate/runtime/multi_agent_mount_resolver.dart'; +import 'package:xworkmate/runtime/multi_agent_mounts.dart'; +import 'package:xworkmate/runtime/runtime_models.dart'; +import 'package:xworkmate/runtime/secure_config_store.dart'; + +void main() { + test( + 'capability refresh does not fail when remote mount reconcile is absent', + () async { + final server = await _CapabilityServer.start(); + addTearDown(server.close); + + final storeRoot = await Directory.systemTemp.createTemp( + 'xworkmate-mount-resilience-', + ); + addTearDown(() async { + if (await storeRoot.exists()) { + await storeRoot.delete(recursive: true); + } + }); + + final store = SecureConfigStore( + secretRootPathResolver: () async => '${storeRoot.path}/secrets', + appDataRootPathResolver: () async => '${storeRoot.path}/app-data', + supportRootPathResolver: () async => '${storeRoot.path}/support', + ); + await store.initialize(); + final settings = SettingsSnapshot.defaults(); + final bridgeConfig = settings.acpBridgeServerModeConfig; + final selfHosted = bridgeConfig.selfHosted.copyWith( + serverUrl: server.endpoint, + username: 'admin', + ); + await store.saveSecretValueByRef(selfHosted.passwordRef, 'bridge-token'); + await store.saveSettingsSnapshot( + settings.copyWith( + acpBridgeServerModeConfig: bridgeConfig.copyWith( + selfHosted: selfHosted, + effective: AcpBridgeServerEffectiveConfig( + endpoint: server.endpoint, + tokenRef: selfHosted.passwordRef, + source: 'bridge', + reason: 'test bridge', + ), + ), + ), + ); + + final controller = AppController( + store: store, + environmentOverride: {'HOME': storeRoot.path}, + multiAgentMountManager: MultiAgentMountManager( + resolver: _MissingRemoteMountResolver(), + ), + ); + addTearDown(controller.dispose); + + await _waitForInitialization(controller); + + expect(controller.bootstrapError, isNull); + expect(controller.bridgeCapabilitiesRefreshErrorInternal, isEmpty); + expect( + controller.gatewayProviderCatalog.map((item) => item.providerId), + contains('openclaw'), + ); + }, + ); +} + +Future _waitForInitialization(AppController controller) async { + final deadline = DateTime.now().add(const Duration(seconds: 10)); + while (controller.initializing && DateTime.now().isBefore(deadline)) { + await Future.delayed(const Duration(milliseconds: 100)); + } + if (controller.initializing) { + fail('controller did not initialize'); + } +} + +class _MissingRemoteMountResolver implements MultiAgentMountResolver { + @override + Future reconcile({ + required MultiAgentConfig config, + required String aiGatewayUrl, + required String codexHome, + required String opencodeHome, + required ArisMountProbe arisProbe, + }) { + throw StateError('unknown method: xworkmate.mounts.reconcile'); + } + + @override + Future dispose() async {} +} + +class _CapabilityServer { + _CapabilityServer(this._server); + + final HttpServer _server; + + String get endpoint => 'http://${_server.address.host}:${_server.port}'; + + static Future<_CapabilityServer> start() async { + final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); + final wrapper = _CapabilityServer(server); + server.listen(wrapper._handle); + return wrapper; + } + + Future close() => _server.close(force: true); + + Future _handle(HttpRequest request) async { + final raw = await utf8.decoder.bind(request).join(); + final decoded = jsonDecode(raw) as Map; + final method = decoded['method']?.toString() ?? ''; + request.response.headers.contentType = ContentType.json; + if (method != 'acp.capabilities') { + request.response.write( + jsonEncode({ + 'jsonrpc': '2.0', + 'id': decoded['id'], + 'error': { + 'code': -32601, + 'message': 'unknown method: $method', + }, + }), + ); + await request.response.close(); + return; + } + request.response.write( + jsonEncode({ + 'jsonrpc': '2.0', + 'id': decoded['id'], + 'result': { + 'singleAgent': true, + 'multiAgent': true, + 'availableExecutionTargets': ['agent', 'gateway'], + 'providerCatalog': >[ + { + 'providerId': 'codex', + 'label': 'Codex', + 'targets': ['agent'], + }, + ], + 'gatewayProviders': >[ + { + 'providerId': 'openclaw', + 'label': 'OpenClaw', + 'targets': ['gateway'], + }, + ], + }, + }), + ); + await request.response.close(); + } +} diff --git a/test/runtime/gateway_acp_client_auth_test.dart b/test/runtime/gateway_acp_client_auth_test.dart index f1b04615..9096063c 100644 --- a/test/runtime/gateway_acp_client_auth_test.dart +++ b/test/runtime/gateway_acp_client_auth_test.dart @@ -780,6 +780,121 @@ void main() { }, ); + test( + 'prefers final OpenClaw snapshot over early completed SSE update after connection close', + () async { + final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); + addTearDown(() => server.close(force: true)); + final requestMethods = []; + server.listen((request) async { + final body = await utf8.decoder.bind(request).join(); + final decoded = jsonDecode(body) as Map; + final method = decoded['method']?.toString() ?? ''; + final id = decoded['id']?.toString() ?? 'request-id'; + requestMethods.add(method); + if (method == 'session.start') { + final event = jsonEncode({ + 'jsonrpc': '2.0', + 'method': 'session.update', + 'params': { + 'sessionId': 'unit-fixture-task-final', + 'threadId': 'unit-fixture-task-final', + 'turnId': 'turn-final', + 'type': 'status', + 'event': 'completed', + 'message': 'early completed output without artifacts', + 'success': true, + }, + }); + final eventBytes = utf8.encode('data: $event\n\n'); + request.response.headers.set( + HttpHeaders.contentTypeHeader, + 'text/event-stream', + ); + request.response.contentLength = eventBytes.length + 128; + final socket = await request.response.detachSocket(); + socket.add(eventBytes); + await socket.flush(); + socket.destroy(); + return; + } + if (method == 'xworkmate.sessions.get') { + request.response.headers.contentType = ContentType.json; + request.response.write( + jsonEncode({ + 'jsonrpc': '2.0', + 'id': id, + 'result': { + 'status': 'completed', + 'sessionId': 'unit-fixture-task-final', + 'threadId': 'unit-fixture-task-final', + 'task': { + 'state': 'completed', + 'turnId': 'turn-final', + }, + 'result': { + 'success': true, + 'output': 'final snapshot output with artifacts', + 'turnId': 'turn-final', + }, + 'artifacts': { + 'items': >[ + { + 'relativePath': 'reports/final.md', + 'downloadUrl': + 'https://xworkmate-bridge.svc.plus/artifacts/openclaw/download' + '?sessionKey=unit-fixture-task-final&runId=turn-final&relativePath=reports%2Ffinal.md', + 'contentType': 'text/markdown', + 'sizeBytes': 64, + }, + ], + }, + }, + }), + ); + await request.response.close(); + return; + } + request.response.statusCode = HttpStatus.badRequest; + await request.response.close(); + }); + final endpoint = Uri.parse('http://127.0.0.1:${server.port}'); + final transport = ExternalCodeAgentAcpDesktopTransport( + client: GatewayAcpClient(endpointResolver: () => endpoint), + endpointResolver: (_) => endpoint, + taskEndpointResolver: (_) => endpoint, + ); + addTearDown(transport.dispose); + + final result = await transport.executeTask( + const GoTaskServiceRequest( + sessionId: 'unit-fixture-task-final', + threadId: 'unit-fixture-task-final', + target: AssistantExecutionTarget.gateway, + provider: SingleAgentProvider.openclaw, + prompt: 'create files', + workingDirectory: '/tmp/workspace', + model: '', + thinking: 'off', + selectedSkills: [], + inlineAttachments: [], + localAttachments: [], + agentId: '', + metadata: {}, + ), + onUpdate: (_) {}, + ); + + expect(result.success, isTrue); + expect(result.message, 'final snapshot output with artifacts'); + expect(result.artifacts.single.relativePath, 'reports/final.md'); + expect(requestMethods, [ + 'session.start', + 'xworkmate.sessions.get', + ]); + }, + ); + test( 'recovers OpenClaw follow-up from bridge session snapshot after SSE ends without final envelope', () async {