fix(openclaw): keep long artifact recovery synced
This commit is contained in:
parent
464da385b6
commit
38a2d80e3c
@ -760,6 +760,24 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
|
||||
}
|
||||
final artifacts = result.artifacts;
|
||||
if (artifacts.isEmpty) {
|
||||
final root = Directory(existingThread.workspaceBinding.workspacePath);
|
||||
final currentTaskArtifactRelativePaths =
|
||||
isOpenClawNoExportedArtifactsGuardResultInternal(result)
|
||||
? const <String>[]
|
||||
: await _workspaceArtifactPathsModifiedSinceInternal(
|
||||
root,
|
||||
existingThread.lifecycleState.lastRunAtMs,
|
||||
);
|
||||
if (currentTaskArtifactRelativePaths.isNotEmpty) {
|
||||
upsertTaskThreadInternal(
|
||||
normalizedSessionKey,
|
||||
lastArtifactSyncAtMs: syncedAtMs,
|
||||
lastArtifactSyncStatus: 'synced',
|
||||
lastTaskArtifactRelativePaths: currentTaskArtifactRelativePaths,
|
||||
updatedAtMs: syncedAtMs,
|
||||
);
|
||||
return;
|
||||
}
|
||||
upsertTaskThreadInternal(
|
||||
normalizedSessionKey,
|
||||
lastArtifactSyncAtMs: syncedAtMs,
|
||||
@ -1250,6 +1268,47 @@ Future<List<String>> _existingWorkspaceArtifactPathsInternal(
|
||||
return paths;
|
||||
}
|
||||
|
||||
Future<List<String>> _workspaceArtifactPathsModifiedSinceInternal(
|
||||
Directory root,
|
||||
double? sinceMs,
|
||||
) async {
|
||||
final thresholdMs = sinceMs ?? 0;
|
||||
if (thresholdMs <= 0 || !await root.exists()) {
|
||||
return const <String>[];
|
||||
}
|
||||
final files = await DesktopThreadArtifactService().collectFilesInternal(root);
|
||||
final paths = <String>[];
|
||||
for (final file in files) {
|
||||
try {
|
||||
final stat = await file.stat();
|
||||
if (stat.modified.millisecondsSinceEpoch.toDouble() < thresholdMs) {
|
||||
continue;
|
||||
}
|
||||
final resolvedRelativePath =
|
||||
DesktopThreadArtifactService.relativePathInternal(
|
||||
root.path,
|
||||
file.path,
|
||||
);
|
||||
if (resolvedRelativePath == null || resolvedRelativePath.isEmpty) {
|
||||
continue;
|
||||
}
|
||||
if (_isWorkspaceArtifactNoisePathInternal(resolvedRelativePath)) {
|
||||
continue;
|
||||
}
|
||||
paths.add(resolvedRelativePath);
|
||||
} on FileSystemException {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
paths.sort();
|
||||
return paths;
|
||||
}
|
||||
|
||||
bool _isWorkspaceArtifactNoisePathInternal(String relativePath) {
|
||||
return DesktopThreadArtifactService.baseNameInternal(relativePath) ==
|
||||
'.DS_Store';
|
||||
}
|
||||
|
||||
String _normalizeAuthorizationHeaderInternal(String raw) {
|
||||
final trimmed = raw.trim();
|
||||
if (trimmed.isEmpty) {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import 'dart:async';
|
||||
import 'dart:io';
|
||||
import 'dart:math' as math;
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
|
||||
@ -15,7 +16,7 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
required Uri? Function(AssistantExecutionTarget target) endpointResolver,
|
||||
Uri? Function(GoTaskServiceRequest request)? taskEndpointResolver,
|
||||
Duration recoveryPollDelay = const Duration(seconds: 2),
|
||||
int recoveryMaxAttempts = 300,
|
||||
int? recoveryMaxAttempts,
|
||||
}) : _client = client,
|
||||
_endpointResolver = endpointResolver,
|
||||
_taskEndpointResolver = taskEndpointResolver,
|
||||
@ -26,7 +27,7 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
final Uri? Function(AssistantExecutionTarget target) _endpointResolver;
|
||||
final Uri? Function(GoTaskServiceRequest request)? _taskEndpointResolver;
|
||||
final Duration _recoveryPollDelay;
|
||||
final int _recoveryMaxAttempts;
|
||||
final int? _recoveryMaxAttempts;
|
||||
|
||||
@visibleForTesting
|
||||
GatewayAcpClient get clientForTest => _client;
|
||||
@ -207,7 +208,7 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
if (endpoint == null) {
|
||||
return null;
|
||||
}
|
||||
final attempts = _recoveryMaxAttempts <= 0 ? 1 : _recoveryMaxAttempts;
|
||||
final attempts = _recoveryAttemptsForRequest(request);
|
||||
for (var attempt = 0; attempt < attempts; attempt += 1) {
|
||||
if (attempt > 0) {
|
||||
await Future<void>.delayed(_recoveryPollDelay);
|
||||
@ -273,6 +274,20 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
return null;
|
||||
}
|
||||
|
||||
int _recoveryAttemptsForRequest(GoTaskServiceRequest request) {
|
||||
final configured = _recoveryMaxAttempts;
|
||||
if (configured != null) {
|
||||
return configured <= 0 ? 1 : configured;
|
||||
}
|
||||
final pollMicros = math.max(1, _recoveryPollDelay.inMicroseconds);
|
||||
final budgetMicros = Duration(
|
||||
minutes: gatewayAcpTaskRuntimeBudgetMinutesForParams(
|
||||
request.toExternalAcpParams(),
|
||||
),
|
||||
).inMicroseconds;
|
||||
return math.max(1, (budgetMicros / pollMicros).ceil());
|
||||
}
|
||||
|
||||
Uri? _sessionSnapshotEndpoint(Uri? taskEndpoint) {
|
||||
final controlEndpoint = resolveAcpHttpRpcEndpoint(
|
||||
_endpointResolver(AssistantExecutionTarget.gateway),
|
||||
|
||||
@ -71,7 +71,10 @@ void main() {
|
||||
.map((thread) => thread.threadId)
|
||||
.toList(growable: false);
|
||||
expect(persistedThreadIds, <String>[realSessionKey]);
|
||||
expect((await store.loadAppUiState()).assistantLastSessionKey, realSessionKey);
|
||||
expect(
|
||||
(await store.loadAppUiState()).assistantLastSessionKey,
|
||||
realSessionKey,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
@ -1407,6 +1410,76 @@ void main() {
|
||||
expect(snapshot.resultMessage, 'No task artifacts recorded for this run.');
|
||||
});
|
||||
|
||||
test(
|
||||
'records workspace files produced during an empty-artifact task run',
|
||||
() async {
|
||||
final controller = AppController(
|
||||
environmentOverride: const <String, String>{},
|
||||
);
|
||||
addTearDown(controller.dispose);
|
||||
|
||||
final localWorkspace = await Directory.systemTemp.createTemp(
|
||||
'xworkmate-empty-artifact-produced-files-',
|
||||
);
|
||||
addTearDown(() async {
|
||||
if (await localWorkspace.exists()) {
|
||||
await localWorkspace.delete(recursive: true);
|
||||
}
|
||||
});
|
||||
await File(
|
||||
'${localWorkspace.path}/old-task-report.md',
|
||||
).writeAsString('stale task output');
|
||||
final startedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
|
||||
await Future<void>.delayed(const Duration(milliseconds: 20));
|
||||
await Directory('${localWorkspace.path}/renders').create();
|
||||
await Directory('${localWorkspace.path}/prompts').create();
|
||||
await File(
|
||||
'${localWorkspace.path}/renders/identity-security-evolution.mp4',
|
||||
).writeAsBytes(<int>[1, 2, 3]);
|
||||
await File(
|
||||
'${localWorkspace.path}/prompts/DELIVERY.md',
|
||||
).writeAsString('delivery notes');
|
||||
controller.upsertTaskThreadInternal(
|
||||
'unit-fixture-task-a',
|
||||
workspaceBinding: WorkspaceBinding(
|
||||
workspaceId: 'unit-fixture-task-a',
|
||||
workspaceKind: WorkspaceKind.localFs,
|
||||
workspacePath: localWorkspace.path,
|
||||
displayPath: localWorkspace.path,
|
||||
writable: true,
|
||||
),
|
||||
lifecycleStatus: 'running',
|
||||
lastRunAtMs: startedAtMs,
|
||||
lastResultCode: 'running',
|
||||
);
|
||||
|
||||
const result = GoTaskServiceResult(
|
||||
success: true,
|
||||
message:
|
||||
'OpenClaw final artifacts were written to the current task artifact scope: mp4, png.',
|
||||
turnId: 'turn-1',
|
||||
raw: <String, dynamic>{},
|
||||
errorMessage: '',
|
||||
resolvedModel: '',
|
||||
route: GoTaskServiceRoute.externalAcpSingle,
|
||||
);
|
||||
|
||||
await controller.persistGoTaskArtifactsForSessionInternal(
|
||||
'unit-fixture-task-a',
|
||||
result,
|
||||
);
|
||||
|
||||
final thread = controller.requireTaskThreadForSessionInternal(
|
||||
'unit-fixture-task-a',
|
||||
);
|
||||
expect(thread.lastArtifactSyncStatus, 'synced');
|
||||
expect(thread.lastTaskArtifactRelativePaths, <String>[
|
||||
'prompts/DELIVERY.md',
|
||||
'renders/identity-security-evolution.mp4',
|
||||
]);
|
||||
},
|
||||
);
|
||||
|
||||
test('skips download URL artifacts outside the bridge host', () async {
|
||||
final controller = AppController(
|
||||
environmentOverride: const <String, String>{
|
||||
|
||||
@ -1080,6 +1080,96 @@ void main() {
|
||||
},
|
||||
);
|
||||
|
||||
test(
|
||||
'uses long task budget for default OpenClaw SSE recovery polling',
|
||||
() async {
|
||||
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
|
||||
addTearDown(() => server.close(force: true));
|
||||
var snapshotPolls = 0;
|
||||
server.listen((request) async {
|
||||
final body = await utf8.decoder.bind(request).join();
|
||||
final decoded = jsonDecode(body) as Map<String, dynamic>;
|
||||
final method = decoded['method']?.toString() ?? '';
|
||||
final id = decoded['id']?.toString() ?? 'request-id';
|
||||
if (method == 'session.start') {
|
||||
final event = jsonEncode(<String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'method': 'xworkmate.bridge.accepted',
|
||||
'params': <String, dynamic>{'sessionId': 'unit-fixture-task-d'},
|
||||
});
|
||||
request.response.headers.set(
|
||||
HttpHeaders.contentTypeHeader,
|
||||
'text/event-stream',
|
||||
);
|
||||
request.response.write('data: $event\n\n');
|
||||
await request.response.close();
|
||||
return;
|
||||
}
|
||||
if (method == 'xworkmate.sessions.get') {
|
||||
snapshotPolls += 1;
|
||||
final completed = snapshotPolls >= 301;
|
||||
request.response.headers.contentType = ContentType.json;
|
||||
request.response.write(
|
||||
jsonEncode(<String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'id': id,
|
||||
'result': <String, dynamic>{
|
||||
'status': completed ? 'completed' : 'running',
|
||||
'sessionId': 'unit-fixture-task-d',
|
||||
'threadId': 'unit-fixture-task-d',
|
||||
'task': <String, dynamic>{
|
||||
'state': completed ? 'completed' : 'running',
|
||||
'turnId': 'turn-recovered-long',
|
||||
},
|
||||
if (completed)
|
||||
'result': <String, dynamic>{
|
||||
'success': true,
|
||||
'output': 'recovered after long polling window',
|
||||
'turnId': 'turn-recovered-long',
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
await request.response.close();
|
||||
return;
|
||||
}
|
||||
request.response.statusCode = HttpStatus.badRequest;
|
||||
await request.response.close();
|
||||
});
|
||||
final endpoint = Uri.parse('http://127.0.0.1:${server.port}');
|
||||
final transport = ExternalCodeAgentAcpDesktopTransport(
|
||||
client: GatewayAcpClient(endpointResolver: () => endpoint),
|
||||
endpointResolver: (_) => endpoint,
|
||||
taskEndpointResolver: (_) => endpoint,
|
||||
recoveryPollDelay: const Duration(microseconds: 1),
|
||||
);
|
||||
addTearDown(transport.dispose);
|
||||
|
||||
final result = await transport.executeTask(
|
||||
const GoTaskServiceRequest(
|
||||
sessionId: 'unit-fixture-task-d',
|
||||
threadId: 'unit-fixture-task-d',
|
||||
target: AssistantExecutionTarget.gateway,
|
||||
provider: SingleAgentProvider.openclaw,
|
||||
prompt: '生成封面图 png 和短视频 mp4',
|
||||
workingDirectory: '/tmp/workspace',
|
||||
model: '',
|
||||
thinking: 'off',
|
||||
selectedSkills: <String>[],
|
||||
inlineAttachments: <GatewayChatAttachmentPayload>[],
|
||||
localAttachments: <CollaborationAttachment>[],
|
||||
agentId: '',
|
||||
metadata: <String, dynamic>{},
|
||||
),
|
||||
onUpdate: (_) {},
|
||||
);
|
||||
|
||||
expect(snapshotPolls, 301);
|
||||
expect(result.success, isTrue);
|
||||
expect(result.message, 'recovered after long polling window');
|
||||
},
|
||||
);
|
||||
|
||||
test(
|
||||
'recovers terminal failed OpenClaw snapshot without displayable result',
|
||||
() async {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user