fix(app): bound OpenClaw artifact sync polling

This commit is contained in:
Haitao Pan 2026-06-12 14:08:23 +08:00
parent 03385e1891
commit 017216e812
3 changed files with 478 additions and 6 deletions

View File

@ -46,6 +46,21 @@ import 'app_controller_desktop_skill_permissions.dart';
import 'app_controller_desktop_runtime_coordination_impl.dart';
import 'app_controller_desktop_runtime_exceptions.dart';
const int kOpenClawArtifactSyncMaxAttempts = 45;
const Duration kOpenClawArtifactSyncMaxDuration = Duration(seconds: 90);
const String kOpenClawArtifactSyncTimeoutCode =
'OPENCLAW_ARTIFACT_SYNC_TIMEOUT';
bool openClawArtifactPathHasRequiredExtension(String path, String extension) {
final normalizedPath = path.trim().toLowerCase();
final normalizedExtension = extension.trim().toLowerCase().replaceFirst(
RegExp(r'^\.+'),
'',
);
return normalizedExtension.isNotEmpty &&
normalizedPath.endsWith('.$normalizedExtension');
}
// ignore_for_file: invalid_use_of_visible_for_testing_member, invalid_use_of_protected_member
extension AppControllerDesktopRuntimeHelpers on AppController {
Future<void> saveAppUiStateInternal(
@ -713,8 +728,10 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
Future<void> persistGoTaskArtifactsForSessionInternal(
String sessionKey,
GoTaskServiceResult result,
) async {
GoTaskServiceResult result, {
int artifactSyncAttempts = 0,
double? artifactSyncStartedAtMs,
}) async {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
@ -757,6 +774,22 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
association.artifactDirectory.trim().isNotEmpty) &&
result.success;
if (waitingForOpenClawArtifacts) {
final firstSyncAtMs =
artifactSyncStartedAtMs ?? existingThread.lastArtifactSyncAtMs;
if (openClawArtifactSyncLimitReachedInternal(
attemptCount: artifactSyncAttempts + 1,
firstSyncAtMs: firstSyncAtMs,
nowMs: syncedAtMs,
)) {
markOpenClawArtifactSyncTimeoutInternal(
sessionKey: normalizedSessionKey,
association: association,
missingRequiredExtensions: association.requiredArtifactExtensions,
remoteWorkingDirectory: result.remoteWorkingDirectory,
remoteWorkspaceRefKind: result.remoteWorkspaceRefKind,
);
return;
}
upsertTaskThreadInternal(
normalizedSessionKey,
lifecycleStatus: 'running',
@ -877,7 +910,7 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
final missingRequired = requiredExts
.where((ext) {
return !currentTaskArtifactPaths.any(
(p) => p.toLowerCase().endsWith(ext.toLowerCase()),
(p) => openClawArtifactPathHasRequiredExtension(p, ext),
);
})
.toList(growable: false);
@ -892,6 +925,24 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
(association.artifactScope.trim().isNotEmpty ||
association.artifactDirectory.trim().isNotEmpty);
if (shouldKeepPollingAfterDownloadFailure) {
final firstSyncAtMs =
artifactSyncStartedAtMs ?? existingThread.lastArtifactSyncAtMs;
if (openClawArtifactSyncLimitReachedInternal(
attemptCount: artifactSyncAttempts + 1,
firstSyncAtMs: firstSyncAtMs,
nowMs: syncedAtMs,
)) {
markOpenClawArtifactSyncTimeoutInternal(
sessionKey: normalizedSessionKey,
association: association,
missingRequiredExtensions: missingRequired.isEmpty
? association.requiredArtifactExtensions
: missingRequired,
remoteWorkingDirectory: result.remoteWorkingDirectory,
remoteWorkspaceRefKind: result.remoteWorkspaceRefKind,
);
return;
}
upsertTaskThreadInternal(
normalizedSessionKey,
lifecycleStatus: 'running',
@ -926,6 +977,87 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
);
}
bool openClawArtifactSyncLimitReachedInternal({
required int attemptCount,
required double? firstSyncAtMs,
double? nowMs,
}) {
if (attemptCount >= kOpenClawArtifactSyncMaxAttempts) {
return true;
}
final startedAtMs = firstSyncAtMs;
if (startedAtMs == null || startedAtMs <= 0) {
return false;
}
final currentMs = nowMs ?? DateTime.now().millisecondsSinceEpoch.toDouble();
return currentMs - startedAtMs >=
kOpenClawArtifactSyncMaxDuration.inMilliseconds;
}
void markOpenClawArtifactSyncTimeoutInternal({
required String sessionKey,
required OpenClawTaskAssociation association,
Iterable<String> missingRequiredExtensions = const <String>[],
String? remoteWorkingDirectory,
WorkspaceRefKind? remoteWorkspaceRefKind,
}) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final nowMs = DateTime.now().millisecondsSinceEpoch.toDouble();
final missing =
missingRequiredExtensions
.map((ext) => ext.trim())
.where((ext) => ext.isNotEmpty)
.toSet()
.toList(growable: false)
..sort();
final missingLabel = missing.isEmpty ? '' : missing.join(', ');
final messageText = missing.isEmpty
? appText(
'OpenClaw artifact 同步已超时,已按部分结果结束本轮任务。',
'OpenClaw artifact sync timed out; this task was finished with partial results.',
)
: appText(
'OpenClaw artifact 同步已超时,缺少必需文件类型:$missingLabel。已按部分结果结束本轮任务。',
'OpenClaw artifact sync timed out before required artifact types arrived: $missingLabel. This task was finished with partial results.',
);
aiGatewayPendingSessionKeysInternal.remove(normalizedSessionKey);
clearAiGatewayStreamingTextInternal(normalizedSessionKey);
upsertTaskThreadInternal(
normalizedSessionKey,
lifecycleStatus: 'ready',
lastResultCode: kOpenClawArtifactSyncTimeoutCode,
lastRemoteWorkingDirectory:
remoteWorkingDirectory?.trim().isNotEmpty == true
? remoteWorkingDirectory!.trim()
: null,
lastRemoteWorkspaceRefKind: remoteWorkspaceRefKind,
lastArtifactSyncAtMs: nowMs,
lastArtifactSyncStatus: 'partial',
openClawTaskAssociation: association.copyWith(status: 'completed'),
updatedAtMs: nowMs,
);
appendLocalSessionMessageInternal(
normalizedSessionKey,
GatewayChatMessage(
id: nextLocalMessageIdInternal(),
role: 'assistant',
text: messageText,
timestampMs: nowMs,
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
recomputeTasksInternal();
notifyIfActiveInternal();
unawaited(flushAssistantThreadPersistenceInternal());
}
Future<List<int>?> artifactBytesInternal(
GoTaskServiceArtifact artifact,
) async {

View File

@ -749,6 +749,12 @@ extension AppControllerDesktopThreadActions on AppController {
}) async {
var current = association;
var firstAttempt = true;
var artifactSyncAttempts = 0;
double? artifactSyncStartedAtMs;
final existingThread = taskThreadForSessionInternal(sessionKey);
if (association.status.trim().toLowerCase() == 'syncing-artifacts') {
artifactSyncStartedAtMs = existingThread?.lastArtifactSyncAtMs;
}
while (true) {
if (disposedInternal) {
return;
@ -796,8 +802,9 @@ extension AppControllerDesktopThreadActions on AppController {
(!hasRequiredExts ||
current.requiredArtifactExtensions.every((ext) {
return result.artifacts.any(
(a) => a.relativePath.toLowerCase().endsWith(
ext.toLowerCase(),
(a) => openClawArtifactPathHasRequiredExtension(
a.relativePath,
ext,
),
);
}));
@ -807,6 +814,34 @@ extension AppControllerDesktopThreadActions on AppController {
!hasEnoughArtifacts;
if (shouldKeepPollingForArtifacts) {
final nowMs = DateTime.now().millisecondsSinceEpoch.toDouble();
artifactSyncAttempts += 1;
artifactSyncStartedAtMs ??= nowMs;
final missingRequired = current.requiredArtifactExtensions
.where((ext) {
return !result.artifacts.any(
(a) => openClawArtifactPathHasRequiredExtension(
a.relativePath,
ext,
),
);
})
.toList(growable: false);
if (openClawArtifactSyncLimitReachedInternal(
attemptCount: artifactSyncAttempts,
firstSyncAtMs: artifactSyncStartedAtMs,
nowMs: nowMs,
)) {
markOpenClawArtifactSyncTimeoutInternal(
sessionKey: sessionKey,
association: current,
missingRequiredExtensions: missingRequired.isEmpty
? current.requiredArtifactExtensions
: missingRequired,
remoteWorkingDirectory: result.remoteWorkingDirectory,
remoteWorkspaceRefKind: result.remoteWorkspaceRefKind,
);
return;
}
current = current.copyWith(status: 'syncing-artifacts');
upsertTaskThreadInternal(
sessionKey,
@ -837,6 +872,8 @@ extension AppControllerDesktopThreadActions on AppController {
sessionKey: sessionKey,
target: target,
result: result,
artifactSyncAttempts: artifactSyncAttempts,
artifactSyncStartedAtMs: artifactSyncStartedAtMs,
);
}
aiGatewayPendingSessionKeysInternal.remove(sessionKey);
@ -876,6 +913,20 @@ extension AppControllerDesktopThreadActions on AppController {
if (association == null || association.isTerminal) {
continue;
}
if (association.status.trim().toLowerCase() == 'syncing-artifacts' &&
openClawArtifactSyncLimitReachedInternal(
attemptCount: 0,
firstSyncAtMs: record.lastArtifactSyncAtMs,
)) {
markOpenClawArtifactSyncTimeoutInternal(
sessionKey: record.threadId,
association: association,
missingRequiredExtensions: association.requiredArtifactExtensions,
remoteWorkingDirectory: record.lastRemoteWorkingDirectory,
remoteWorkspaceRefKind: record.lastRemoteWorkspaceRefKind,
);
continue;
}
aiGatewayPendingSessionKeysInternal.add(record.threadId);
unawaited(
pollOpenClawTaskAssociationInternal(
@ -1285,6 +1336,8 @@ extension AppControllerDesktopThreadActions on AppController {
required String sessionKey,
required AssistantExecutionTarget target,
required GoTaskServiceResult result,
int artifactSyncAttempts = 0,
double? artifactSyncStartedAtMs,
}) async {
final completedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
final assistantText = result.message.trim();
@ -1398,7 +1451,18 @@ extension AppControllerDesktopThreadActions on AppController {
}
recomputeTasksInternal();
notifyIfActiveInternal();
await persistGoTaskArtifactsForSessionInternal(sessionKey, result);
await persistGoTaskArtifactsForSessionInternal(
sessionKey,
result,
artifactSyncAttempts: artifactSyncAttempts,
artifactSyncStartedAtMs: artifactSyncStartedAtMs,
);
if (taskThreadForSessionInternal(
sessionKey,
)?.lifecycleState.lastResultCode ==
kOpenClawArtifactSyncTimeoutCode) {
return;
}
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'ready',

View File

@ -5,6 +5,7 @@ import 'package:crypto/crypto.dart' as crypto;
import 'package:flutter_test/flutter_test.dart';
import 'package:xworkmate/app/app_controller.dart';
import 'package:xworkmate/app/app_controller_desktop_runtime_coordination_impl.dart';
import 'package:xworkmate/app/app_controller_desktop_runtime_helpers.dart';
import 'package:xworkmate/app/app_controller_desktop_thread_binding.dart';
import 'package:xworkmate/runtime/assistant_artifacts.dart';
import 'package:xworkmate/runtime/go_task_service_client.dart';
@ -2182,6 +2183,245 @@ void main() {
'no-artifacts',
);
});
test(
'OpenClaw artifact polling times out when required artifacts stay missing',
() async {
var getTaskCount = 0;
final staleSyncAtMs = DateTime.now()
.subtract(
kOpenClawArtifactSyncMaxDuration + const Duration(seconds: 1),
)
.millisecondsSinceEpoch
.toDouble();
const sessionKey = 'draft-openclaw-required-timeout';
const runId = 'turn-required-timeout';
const openClawSessionKey = 'agent:main:draft:openclaw-required-timeout';
final association = OpenClawTaskAssociation(
sessionId: sessionKey,
threadId: sessionKey,
turnId: runId,
runId: runId,
artifactScope: 'tasks/$openClawSessionKey/$runId',
artifactDirectory:
'/home/ubuntu/.openclaw/workspace/tasks/$openClawSessionKey/$runId',
gatewayProviderId: 'openclaw',
startedAtMs: staleSyncAtMs,
status: 'syncing-artifacts',
appThreadKey: 'draft:openclaw-required-timeout',
openclawSessionKey: openClawSessionKey,
requiredArtifactExtensions: const <String>['pdf'],
requiresArtifactExport: true,
);
final goTaskClient = _PollingGoTaskServiceClient(
onGetTask: (observedAssociation) {
getTaskCount += 1;
expect(observedAssociation.runId, runId);
return GoTaskServiceResult(
success: true,
message: 'done but still missing pdf',
turnId: runId,
raw: <String, dynamic>{
'success': true,
'status': 'completed',
'sessionId': sessionKey,
'threadId': sessionKey,
'turnId': runId,
'runId': runId,
'artifactScope': association.artifactScope,
'artifactDirectory': association.artifactDirectory,
'gatewayProviderId': 'openclaw',
'appThreadKey': association.appThreadKey,
'openclawSessionKey': openClawSessionKey,
'requiredArtifactExtensions': <String>['pdf'],
'requiresArtifactExport': true,
'artifacts': <Map<String, dynamic>>[
<String, dynamic>{
'relativePath': 'exports/final.md',
'content': '# not the final pdf\n',
'contentType': 'text/markdown',
},
],
},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
);
},
);
final controller = AppController(
environmentOverride: const <String, String>{},
goTaskServiceClient: goTaskClient,
);
addTearDown(controller.dispose);
final taskWorkspace = await Directory.systemTemp.createTemp(
'xworkmate-required-timeout-',
);
addTearDown(() async {
if (await taskWorkspace.exists()) {
await taskWorkspace.delete(recursive: true);
}
});
controller.upsertTaskThreadInternal(
sessionKey,
workspaceBinding: WorkspaceBinding(
workspaceId: sessionKey,
workspaceKind: WorkspaceKind.localFs,
workspacePath: taskWorkspace.path,
displayPath: taskWorkspace.path,
writable: true,
),
lifecycleStatus: 'running',
lastResultCode: 'running',
lastArtifactSyncAtMs: staleSyncAtMs,
lastArtifactSyncStatus: 'syncing',
openClawTaskAssociation: association,
);
controller.aiGatewayPendingSessionKeysInternal.add(sessionKey);
await controller
.pollOpenClawTaskAssociationInternal(
sessionKey: sessionKey,
target: AssistantExecutionTarget.gateway,
association: association,
)
.timeout(const Duration(seconds: 1));
final thread = controller.requireTaskThreadForSessionInternal(sessionKey);
expect(getTaskCount, 1);
expect(thread.lifecycleState.status, 'ready');
expect(
thread.lifecycleState.lastResultCode,
kOpenClawArtifactSyncTimeoutCode,
);
expect(thread.lastArtifactSyncStatus, 'partial');
expect(thread.openClawTaskAssociation?.status, 'completed');
expect(controller.assistantSessionHasPendingRun(sessionKey), isFalse);
expect(
controller.localSessionMessagesInternal[sessionKey]?.last.text,
contains('pdf'),
);
},
);
test(
'OpenClaw artifact sync times out when artifact downloads keep failing',
() async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
addTearDown(() => server.close(force: true));
server.listen((request) async {
request.response
..statusCode = HttpStatus.ok
..headers.contentType = ContentType.binary
..add(<int>[1, 2, 3]);
await request.response.close();
});
final staleSyncAtMs = DateTime.now()
.subtract(
kOpenClawArtifactSyncMaxDuration + const Duration(seconds: 1),
)
.millisecondsSinceEpoch
.toDouble();
const sessionKey = 'draft-openclaw-download-timeout';
const runId = 'turn-download-timeout';
const openClawSessionKey = 'agent:main:draft:openclaw-download-timeout';
final association = OpenClawTaskAssociation(
sessionId: sessionKey,
threadId: sessionKey,
turnId: runId,
runId: runId,
artifactScope: 'tasks/$openClawSessionKey/$runId',
artifactDirectory:
'/home/ubuntu/.openclaw/workspace/tasks/$openClawSessionKey/$runId',
gatewayProviderId: 'openclaw',
startedAtMs: staleSyncAtMs,
status: 'syncing-artifacts',
appThreadKey: 'draft:openclaw-download-timeout',
openclawSessionKey: openClawSessionKey,
requiredArtifactExtensions: const <String>['pdf'],
requiresArtifactExport: true,
);
final controller = AppController(
environmentOverride: const <String, String>{
'BRIDGE_AUTH_TOKEN': 'bridge-token',
},
);
addTearDown(controller.dispose);
final taskWorkspace = await Directory.systemTemp.createTemp(
'xworkmate-download-timeout-',
);
addTearDown(() async {
if (await taskWorkspace.exists()) {
await taskWorkspace.delete(recursive: true);
}
});
controller.upsertTaskThreadInternal(
sessionKey,
workspaceBinding: WorkspaceBinding(
workspaceId: sessionKey,
workspaceKind: WorkspaceKind.localFs,
workspacePath: taskWorkspace.path,
displayPath: taskWorkspace.path,
writable: true,
),
lifecycleStatus: 'running',
lastResultCode: 'running',
lastArtifactSyncAtMs: staleSyncAtMs,
lastArtifactSyncStatus: 'syncing',
openClawTaskAssociation: association,
);
final result = GoTaskServiceResult(
success: true,
message: 'pdf exported',
turnId: runId,
raw: <String, dynamic>{
'success': true,
'status': 'completed',
'artifacts': <Map<String, dynamic>>[
<String, dynamic>{
'relativePath': 'exports/final.pdf',
'downloadUrl':
'http://xworkmate-bridge.svc.plus:${server.port}/final.pdf',
'contentType': 'application/pdf',
'sizeBytes': 99,
},
],
},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
);
final clientFactory = _proxiedClientFactory(server.port);
await HttpOverrides.runZoned(() async {
await controller.persistGoTaskArtifactsForSessionInternal(
sessionKey,
result,
artifactSyncStartedAtMs: staleSyncAtMs,
);
}, createHttpClient: clientFactory);
final thread = controller.requireTaskThreadForSessionInternal(sessionKey);
expect(
await File('${taskWorkspace.path}/exports/final.pdf').exists(),
isFalse,
);
expect(thread.lifecycleState.status, 'ready');
expect(
thread.lifecycleState.lastResultCode,
kOpenClawArtifactSyncTimeoutCode,
);
expect(thread.lastArtifactSyncStatus, 'partial');
expect(thread.openClawTaskAssociation?.status, 'completed');
expect(
controller.localSessionMessagesInternal[sessionKey]?.last.text,
contains('pdf'),
);
},
);
}
HttpClient Function(SecurityContext?) _proxiedClientFactory(int port) {
@ -2305,6 +2545,42 @@ class _ArtifactBackfillGoTaskServiceClient implements GoTaskServiceClient {
Future<void> dispose() async {}
}
class _PollingGoTaskServiceClient implements GoTaskServiceClient {
_PollingGoTaskServiceClient({required this.onGetTask});
final GoTaskServiceResult Function(OpenClawTaskAssociation association)
onGetTask;
@override
Future<GoTaskServiceResult> executeTask(
GoTaskServiceRequest request, {
required void Function(GoTaskServiceUpdate update) onUpdate,
}) async {
throw UnimplementedError('executeTask is not used by this test');
}
@override
Future<GoTaskServiceResult> getTask({
required AssistantExecutionTarget target,
required OpenClawTaskAssociation association,
required GoTaskServiceRoute route,
}) async {
return onGetTask(association);
}
@override
Future<void> cancelTask({
required GoTaskServiceRoute route,
required AssistantExecutionTarget target,
required String sessionId,
required String threadId,
OpenClawTaskAssociation? association,
}) async {}
@override
Future<void> dispose() async {}
}
Future<void> _waitForControllerInitialization(AppController controller) async {
final deadline = DateTime.now().add(const Duration(seconds: 5));
while (controller.initializing && DateTime.now().isBefore(deadline)) {