From 38a2d80e3c9a42efcd1208b094d564c299c5ddee Mon Sep 17 00:00:00 2001 From: Haitao Pan Date: Tue, 2 Jun 2026 07:36:05 +0800 Subject: [PATCH] fix(openclaw): keep long artifact recovery synced --- ...pp_controller_desktop_runtime_helpers.dart | 59 ++++++++++++ ...rnal_code_agent_acp_desktop_transport.dart | 21 ++++- ...troller_thread_workspace_binding_test.dart | 75 +++++++++++++++- .../runtime/gateway_acp_client_auth_test.dart | 90 +++++++++++++++++++ 4 files changed, 241 insertions(+), 4 deletions(-) diff --git a/lib/app/app_controller_desktop_runtime_helpers.dart b/lib/app/app_controller_desktop_runtime_helpers.dart index a6d5f844..7f583056 100644 --- a/lib/app/app_controller_desktop_runtime_helpers.dart +++ b/lib/app/app_controller_desktop_runtime_helpers.dart @@ -760,6 +760,24 @@ extension AppControllerDesktopRuntimeHelpers on AppController { } final artifacts = result.artifacts; if (artifacts.isEmpty) { + final root = Directory(existingThread.workspaceBinding.workspacePath); + final currentTaskArtifactRelativePaths = + isOpenClawNoExportedArtifactsGuardResultInternal(result) + ? const [] + : await _workspaceArtifactPathsModifiedSinceInternal( + root, + existingThread.lifecycleState.lastRunAtMs, + ); + if (currentTaskArtifactRelativePaths.isNotEmpty) { + upsertTaskThreadInternal( + normalizedSessionKey, + lastArtifactSyncAtMs: syncedAtMs, + lastArtifactSyncStatus: 'synced', + lastTaskArtifactRelativePaths: currentTaskArtifactRelativePaths, + updatedAtMs: syncedAtMs, + ); + return; + } upsertTaskThreadInternal( normalizedSessionKey, lastArtifactSyncAtMs: syncedAtMs, @@ -1250,6 +1268,47 @@ Future> _existingWorkspaceArtifactPathsInternal( return paths; } +Future> _workspaceArtifactPathsModifiedSinceInternal( + Directory root, + double? sinceMs, +) async { + final thresholdMs = sinceMs ?? 0; + if (thresholdMs <= 0 || !await root.exists()) { + return const []; + } + final files = await DesktopThreadArtifactService().collectFilesInternal(root); + final paths = []; + for (final file in files) { + try { + final stat = await file.stat(); + if (stat.modified.millisecondsSinceEpoch.toDouble() < thresholdMs) { + continue; + } + final resolvedRelativePath = + DesktopThreadArtifactService.relativePathInternal( + root.path, + file.path, + ); + if (resolvedRelativePath == null || resolvedRelativePath.isEmpty) { + continue; + } + if (_isWorkspaceArtifactNoisePathInternal(resolvedRelativePath)) { + continue; + } + paths.add(resolvedRelativePath); + } on FileSystemException { + continue; + } + } + paths.sort(); + return paths; +} + +bool _isWorkspaceArtifactNoisePathInternal(String relativePath) { + return DesktopThreadArtifactService.baseNameInternal(relativePath) == + '.DS_Store'; +} + String _normalizeAuthorizationHeaderInternal(String raw) { final trimmed = raw.trim(); if (trimmed.isEmpty) { diff --git a/lib/runtime/external_code_agent_acp_desktop_transport.dart b/lib/runtime/external_code_agent_acp_desktop_transport.dart index c5881a9a..e14afb6b 100644 --- a/lib/runtime/external_code_agent_acp_desktop_transport.dart +++ b/lib/runtime/external_code_agent_acp_desktop_transport.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:io'; +import 'dart:math' as math; import 'package:flutter/foundation.dart'; @@ -15,7 +16,7 @@ class ExternalCodeAgentAcpDesktopTransport required Uri? Function(AssistantExecutionTarget target) endpointResolver, Uri? Function(GoTaskServiceRequest request)? taskEndpointResolver, Duration recoveryPollDelay = const Duration(seconds: 2), - int recoveryMaxAttempts = 300, + int? recoveryMaxAttempts, }) : _client = client, _endpointResolver = endpointResolver, _taskEndpointResolver = taskEndpointResolver, @@ -26,7 +27,7 @@ class ExternalCodeAgentAcpDesktopTransport final Uri? Function(AssistantExecutionTarget target) _endpointResolver; final Uri? Function(GoTaskServiceRequest request)? _taskEndpointResolver; final Duration _recoveryPollDelay; - final int _recoveryMaxAttempts; + final int? _recoveryMaxAttempts; @visibleForTesting GatewayAcpClient get clientForTest => _client; @@ -207,7 +208,7 @@ class ExternalCodeAgentAcpDesktopTransport if (endpoint == null) { return null; } - final attempts = _recoveryMaxAttempts <= 0 ? 1 : _recoveryMaxAttempts; + final attempts = _recoveryAttemptsForRequest(request); for (var attempt = 0; attempt < attempts; attempt += 1) { if (attempt > 0) { await Future.delayed(_recoveryPollDelay); @@ -273,6 +274,20 @@ class ExternalCodeAgentAcpDesktopTransport return null; } + int _recoveryAttemptsForRequest(GoTaskServiceRequest request) { + final configured = _recoveryMaxAttempts; + if (configured != null) { + return configured <= 0 ? 1 : configured; + } + final pollMicros = math.max(1, _recoveryPollDelay.inMicroseconds); + final budgetMicros = Duration( + minutes: gatewayAcpTaskRuntimeBudgetMinutesForParams( + request.toExternalAcpParams(), + ), + ).inMicroseconds; + return math.max(1, (budgetMicros / pollMicros).ceil()); + } + Uri? _sessionSnapshotEndpoint(Uri? taskEndpoint) { final controlEndpoint = resolveAcpHttpRpcEndpoint( _endpointResolver(AssistantExecutionTarget.gateway), diff --git a/test/runtime/app_controller_thread_workspace_binding_test.dart b/test/runtime/app_controller_thread_workspace_binding_test.dart index 5c7f6b9f..0fa77d74 100644 --- a/test/runtime/app_controller_thread_workspace_binding_test.dart +++ b/test/runtime/app_controller_thread_workspace_binding_test.dart @@ -71,7 +71,10 @@ void main() { .map((thread) => thread.threadId) .toList(growable: false); expect(persistedThreadIds, [realSessionKey]); - expect((await store.loadAppUiState()).assistantLastSessionKey, realSessionKey); + expect( + (await store.loadAppUiState()).assistantLastSessionKey, + realSessionKey, + ); }, ); @@ -1407,6 +1410,76 @@ void main() { expect(snapshot.resultMessage, 'No task artifacts recorded for this run.'); }); + test( + 'records workspace files produced during an empty-artifact task run', + () async { + final controller = AppController( + environmentOverride: const {}, + ); + addTearDown(controller.dispose); + + final localWorkspace = await Directory.systemTemp.createTemp( + 'xworkmate-empty-artifact-produced-files-', + ); + addTearDown(() async { + if (await localWorkspace.exists()) { + await localWorkspace.delete(recursive: true); + } + }); + await File( + '${localWorkspace.path}/old-task-report.md', + ).writeAsString('stale task output'); + final startedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble(); + await Future.delayed(const Duration(milliseconds: 20)); + await Directory('${localWorkspace.path}/renders').create(); + await Directory('${localWorkspace.path}/prompts').create(); + await File( + '${localWorkspace.path}/renders/identity-security-evolution.mp4', + ).writeAsBytes([1, 2, 3]); + await File( + '${localWorkspace.path}/prompts/DELIVERY.md', + ).writeAsString('delivery notes'); + controller.upsertTaskThreadInternal( + 'unit-fixture-task-a', + workspaceBinding: WorkspaceBinding( + workspaceId: 'unit-fixture-task-a', + workspaceKind: WorkspaceKind.localFs, + workspacePath: localWorkspace.path, + displayPath: localWorkspace.path, + writable: true, + ), + lifecycleStatus: 'running', + lastRunAtMs: startedAtMs, + lastResultCode: 'running', + ); + + const result = GoTaskServiceResult( + success: true, + message: + 'OpenClaw final artifacts were written to the current task artifact scope: mp4, png.', + turnId: 'turn-1', + raw: {}, + errorMessage: '', + resolvedModel: '', + route: GoTaskServiceRoute.externalAcpSingle, + ); + + await controller.persistGoTaskArtifactsForSessionInternal( + 'unit-fixture-task-a', + result, + ); + + final thread = controller.requireTaskThreadForSessionInternal( + 'unit-fixture-task-a', + ); + expect(thread.lastArtifactSyncStatus, 'synced'); + expect(thread.lastTaskArtifactRelativePaths, [ + 'prompts/DELIVERY.md', + 'renders/identity-security-evolution.mp4', + ]); + }, + ); + test('skips download URL artifacts outside the bridge host', () async { final controller = AppController( environmentOverride: const { diff --git a/test/runtime/gateway_acp_client_auth_test.dart b/test/runtime/gateway_acp_client_auth_test.dart index 9096063c..0ae9b1ab 100644 --- a/test/runtime/gateway_acp_client_auth_test.dart +++ b/test/runtime/gateway_acp_client_auth_test.dart @@ -1080,6 +1080,96 @@ void main() { }, ); + test( + 'uses long task budget for default OpenClaw SSE recovery polling', + () async { + final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); + addTearDown(() => server.close(force: true)); + var snapshotPolls = 0; + server.listen((request) async { + final body = await utf8.decoder.bind(request).join(); + final decoded = jsonDecode(body) as Map; + final method = decoded['method']?.toString() ?? ''; + final id = decoded['id']?.toString() ?? 'request-id'; + if (method == 'session.start') { + final event = jsonEncode({ + 'jsonrpc': '2.0', + 'method': 'xworkmate.bridge.accepted', + 'params': {'sessionId': 'unit-fixture-task-d'}, + }); + request.response.headers.set( + HttpHeaders.contentTypeHeader, + 'text/event-stream', + ); + request.response.write('data: $event\n\n'); + await request.response.close(); + return; + } + if (method == 'xworkmate.sessions.get') { + snapshotPolls += 1; + final completed = snapshotPolls >= 301; + request.response.headers.contentType = ContentType.json; + request.response.write( + jsonEncode({ + 'jsonrpc': '2.0', + 'id': id, + 'result': { + 'status': completed ? 'completed' : 'running', + 'sessionId': 'unit-fixture-task-d', + 'threadId': 'unit-fixture-task-d', + 'task': { + 'state': completed ? 'completed' : 'running', + 'turnId': 'turn-recovered-long', + }, + if (completed) + 'result': { + 'success': true, + 'output': 'recovered after long polling window', + 'turnId': 'turn-recovered-long', + }, + }, + }), + ); + await request.response.close(); + return; + } + request.response.statusCode = HttpStatus.badRequest; + await request.response.close(); + }); + final endpoint = Uri.parse('http://127.0.0.1:${server.port}'); + final transport = ExternalCodeAgentAcpDesktopTransport( + client: GatewayAcpClient(endpointResolver: () => endpoint), + endpointResolver: (_) => endpoint, + taskEndpointResolver: (_) => endpoint, + recoveryPollDelay: const Duration(microseconds: 1), + ); + addTearDown(transport.dispose); + + final result = await transport.executeTask( + const GoTaskServiceRequest( + sessionId: 'unit-fixture-task-d', + threadId: 'unit-fixture-task-d', + target: AssistantExecutionTarget.gateway, + provider: SingleAgentProvider.openclaw, + prompt: '生成封面图 png 和短视频 mp4', + workingDirectory: '/tmp/workspace', + model: '', + thinking: 'off', + selectedSkills: [], + inlineAttachments: [], + localAttachments: [], + agentId: '', + metadata: {}, + ), + onUpdate: (_) {}, + ); + + expect(snapshotPolls, 301); + expect(result.success, isTrue); + expect(result.message, 'recovered after long polling window'); + }, + ); + test( 'recovers terminal failed OpenClaw snapshot without displayable result', () async {