merge: fix openclaw terminal snapshot sync

This commit is contained in:
Haitao Pan 2026-06-05 17:13:08 +08:00
commit dd02ad4337
9 changed files with 304 additions and 272 deletions

View File

@ -16,6 +16,12 @@ Repo chain: openclaw-multi-session-plugins ↔ xworkmate-bridge ↔ xworkmate-ap
sync: save to ~/.xworkmate/threads/<session>/ (xworkmate-app)
```
App terminal rule:
- `completed`, `failed`, `cancelled`, and `canceled` snapshots end task
execution immediately.
- Artifact presence controls only `lastArtifactSyncStatus`; it is not a reason
to keep `lifecycleStatus=running`.
## State 1: Prepare
```
@ -47,7 +53,7 @@ Output:
artifactDirectory: "<workspace>/tasks/<safeSessionKey>/<safeRunId>/"
Fragile:
- workspace resolution chain has 5 fallback levels
- workspace resolution chain has 5 ordered sources
- session key format must match across bridge and plugin
- no cleanup of old scope directories
```
@ -212,6 +218,9 @@ Fragile:
- If export returns empty manifest, snapshot has no artifacts
- Artifact download URLs expire after 24h
- Snapshot stored only in memory (lost on bridge restart)
- App execution state must still transition to ready for any terminal
snapshot. Empty or incomplete artifact manifests update only artifact sync
status; they must not keep the task lifecycle running.
```
## State 6: Download (Bridge Proxy)

View File

@ -15,7 +15,7 @@ App starts
├─ thread has pendingTurnId?
│ ├─ Yes → pollBridgeTaskSnapshot(turnId)
│ │ └─ xworkmate.tasks.get({ sessionId, threadId, turnId })
│ │ ├─ Terminal snapshot found → apply result
│ │ ├─ Terminal snapshot found → apply result and mark ready
│ │ ├─ Session not found → mark failed
│ │ └─ No response → mark unrecovered
│ └─ No → mark ready (no pending turn)
@ -56,7 +56,7 @@ SSE stream interrupted (network flap)
└─ App: Transport detects stream close without terminal
└─ Enter polling mode
└─ Every N seconds: xworkmate.tasks.get({ sessionId, threadId, turnId })
├─ Terminal snapshot → apply result, stop polling
├─ Terminal snapshot → apply result, mark ready, stop polling
├─ Still running → continue polling
├─ Session not found → mark failed
└─ Max poll attempts reached → mark unrecovered
@ -128,7 +128,7 @@ stateDiagram-v2
Polling --> Session_Not_Found: bridge restarted
Polling --> Max_Retries: exceeded
Recovered --> Ready: result applied
Recovered --> Ready: result applied; artifact sync status records missing outputs
Session_Not_Found --> Failed: ACP_BRIDGE_RESTART
Max_Retries --> Failed: ACP_UNRECOVERABLE
@ -180,7 +180,7 @@ resolveGatewayThreadConnectionState(thread)
│ ├─ thread.lastTurnId exists?
│ │ ├─ Yes → transport.pollBridgeTaskSnapshot(turnId)
│ │ │ └─ xworkmate.tasks.get:
│ │ │ ├─ completed/failed → applyGatewayChatResult()
│ │ │ ├─ completed/failed → applyGatewayChatResult() and mark ready
│ │ │ ├─ running → leave as running, continue SSE
│ │ │ └─ not found / error:
│ │ │ ├─ isBridgeAvailable()

View File

@ -62,7 +62,7 @@ xworkmate-bridge
├─ Routing engine: Resolve(params, prompt, memory)
│ ├─ Heuristic: looksLocal() / looksOnline()
│ ├─ Memory preferences
│ └─ LLM classifier fallback
│ └─ LLM classifier path
├─ openClawGatewayAdmissionGate.acquire()
│ ├─ maxActive: 5, maxQueued: 20
@ -169,6 +169,7 @@ now copied into tasks/<session>/<run>/artifacts/ before export.
ExternalCodeAgentAcpDesktopTransport
├─ Receive terminal snapshot via SSE or xworkmate.tasks.get
├─ applyGatewayChatResult()
│ ├─ Terminal snapshot → lifecycleStatus=ready
│ ├─ success=true && artifacts present → syncArtifactsFromBridge()
│ ├─ success=false → lastResultCode=failed
│ └─ no-exported-artifacts → lastArtifactSyncStatus=no-exported-artifacts
@ -176,6 +177,11 @@ now copied into tasks/<session>/<run>/artifacts/ before export.
└─ syncArtifactsFromBridge()
└─ Download each artifact via /artifacts/openclaw/download
→ Save to ~/.xworkmate/threads/<session>/
Rule: the app must not keep an OpenClaw task in `running` after a bridge
terminal snapshot. Missing or incomplete artifacts are represented only through
`lastArtifactSyncStatus` (`no-exported-artifacts`, `partial`, `download-failed`,
etc.), not by extending the task execution lifecycle.
```
## Key Files by Repo
@ -217,4 +223,4 @@ now copied into tasks/<session>/<run>/artifacts/ before export.
4. **F4: Admission gate rejection** — Queue full → OPENCLAW_GATEWAY_BUSY → app must handle
5. **F5: Bridge restart** — In-memory sessions lost → app must detect and recover
6. **F6: Artifact ref key rotation** — Secret change invalidates all signed refs
7. **F7: SSE stream interruption**Polling fallback must have correct timeout/retry
7. **F7: SSE stream interruption**Recovery polling must align with bridge task deadlines and must apply terminal snapshots immediately

View File

@ -70,13 +70,9 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
appUiState.assistantLastSessionKey == normalizedSessionKey) {
return;
}
try {
await saveAppUiStateInternal(
appUiState.copyWith(assistantLastSessionKey: normalizedSessionKey),
);
} catch (_) {
// Best effort only during teardown-sensitive transitions.
}
await saveAppUiStateInternal(
appUiState.copyWith(assistantLastSessionKey: normalizedSessionKey),
);
}
void setAiGatewayStreamingTextInternal(String sessionKey, String text) {
@ -381,25 +377,22 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
return const <String>[];
}
final root = Directory(thread.workspaceBinding.workspacePath);
try {
final policy = await _loadArtifactSyncPolicyInternal(
root,
thread.selectedSkillKeys,
);
return _workspaceArtifactPathsModifiedSinceInternal(
root,
thread.lifecycleState.lastRunAtMs,
policy,
);
} catch (_) {
return const <String>[];
}
final policy = await _loadArtifactSyncPolicyInternal(
root,
thread.selectedSkillKeys,
);
return _workspaceArtifactPathsModifiedSinceInternal(
root,
thread.lifecycleState.lastRunAtMs,
policy,
);
}
String jsonLikeTextForDiagnosticsInternal(Object? value) {
try {
return jsonEncode(value);
} catch (_) {
} catch (error) {
debugPrint('JSON diagnostic encoding failed: $error');
return value.toString();
}
}
@ -781,6 +774,9 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
);
final artifacts = result.artifacts;
if (artifacts.isEmpty) {
final requiredExts =
existingThread.openClawTaskAssociation?.requiredArtifactExtensions ??
const <String>[];
final currentTaskArtifactRelativePaths =
isOpenClawNoExportedArtifactsGuardResultInternal(result)
? const <String>[]
@ -803,7 +799,8 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
normalizedSessionKey,
lastArtifactSyncAtMs: syncedAtMs,
lastArtifactSyncStatus:
isOpenClawNoExportedArtifactsGuardResultInternal(result)
isOpenClawNoExportedArtifactsGuardResultInternal(result) ||
requiredExts.isNotEmpty
? 'no-exported-artifacts'
: 'no-artifacts',
updatedAtMs: syncedAtMs,
@ -874,18 +871,21 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
}
final thread = taskThreadForSessionInternal(normalizedSessionKey);
final requiredExts = thread?.openClawTaskAssociation
?.requiredArtifactExtensions ?? const <String>[];
final missingRequired = requiredExts.where((ext) {
return !currentTaskArtifactPaths.any(
(p) => p.toLowerCase().endsWith(ext.toLowerCase()),
);
}).toList(growable: false);
final requiredExts =
thread?.openClawTaskAssociation?.requiredArtifactExtensions ??
const <String>[];
final missingRequired = requiredExts
.where((ext) {
return !currentTaskArtifactPaths.any(
(p) => p.toLowerCase().endsWith(ext.toLowerCase()),
);
})
.toList(growable: false);
final syncStatus = wroteArtifact
? (failedArtifact || skippedArtifact || missingRequired.isNotEmpty
? 'partial'
: 'synced')
? 'partial'
: 'synced')
: failedArtifact
? 'download-failed'
: rejectedArtifact
@ -928,11 +928,12 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
final bridgeEndpoint = resolveBridgeAcpEndpointInternal();
final bridgeHost = bridgeEndpoint?.host.trim().toLowerCase() ?? '';
final downloadHost = uri.host.trim().toLowerCase();
final isLoopback = downloadHost == '127.0.0.1' ||
final isLoopback =
downloadHost == '127.0.0.1' ||
downloadHost == 'localhost' ||
downloadHost == '::1';
final sameBridgeHost = bridgeEndpoint != null &&
(downloadHost == bridgeHost || isLoopback);
final sameBridgeHost =
bridgeEndpoint != null && (downloadHost == bridgeHost || isLoopback);
if (!sameBridgeHost) {
return const _ArtifactBytesResult.skipped();
}
@ -1066,7 +1067,8 @@ extension AppControllerDesktopRuntimeHelpers on AppController {
}
await temp.rename(target.path);
return true;
} catch (_) {
} catch (error) {
debugPrint('Artifact write failed for ${target.path}: $error');
if (await temp.exists()) {
await temp.delete();
}

View File

@ -359,15 +359,10 @@ extension AppControllerDesktopThreadActions on AppController {
bridgeCapabilityRefreshNeededForAssistantTargetInternal(
currentTarget,
)) {
try {
await refreshAcpCapabilitiesInternal(forceRefresh: true);
connectionState = assistantConnectionStateForSession(
normalizedSessionKey,
);
} catch (error) {
debugPrint('Gateway capability refresh fallback: $error');
// Fallback to existing connection state if refresh fails.
}
await refreshAcpCapabilitiesInternal(forceRefresh: true);
connectionState = assistantConnectionStateForSession(
normalizedSessionKey,
);
}
if (!connectionState.connected) {
final error = StateError(connectionState.detailLabel);
@ -410,12 +405,7 @@ extension AppControllerDesktopThreadActions on AppController {
throw error;
}
if (providerCatalogForExecutionTarget(currentTarget).isEmpty) {
try {
await refreshSingleAgentCapabilitiesInternal(forceRefresh: true);
} catch (error) {
debugPrint('Gateway provider catalog refresh fallback: $error');
// Keep the local guard focused on the post-refresh catalog state.
}
await refreshSingleAgentCapabilitiesInternal(forceRefresh: true);
if (providerCatalogForExecutionTarget(currentTarget).isEmpty) {
upsertTaskThreadInternal(
normalizedSessionKey,
@ -784,16 +774,6 @@ extension AppControllerDesktopThreadActions on AppController {
continue;
}
if (aiGatewayPendingSessionKeysInternal.contains(sessionKey)) {
final hasRequiredExts = current.requiredArtifactExtensions.isNotEmpty;
final hasEnoughArtifacts = !hasRequiredExts ||
current.requiredArtifactExtensions.every((ext) {
return result.artifacts.any(
(a) => a.relativePath.toLowerCase().endsWith(ext.toLowerCase()),
);
});
if (!hasEnoughArtifacts && attempt < maxAttempts - 1) {
continue;
}
await applyGatewayChatResultInternal(
sessionKey: sessionKey,
target: target,
@ -1296,8 +1276,6 @@ extension AppControllerDesktopThreadActions on AppController {
notifyIfActiveInternal();
}
Future<void> applyGatewayChatResultInternal({
required String sessionKey,
required AssistantExecutionTarget target,
@ -1327,11 +1305,15 @@ extension AppControllerDesktopThreadActions on AppController {
lifecycleStatus: 'ready',
lastRunAtMs: completedAtMs,
lastResultCode: terminalResultCode,
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
if (isOpenClawNoExportedArtifactsGuardResultInternal(result)) {
await persistGoTaskArtifactsForSessionInternal(sessionKey, result);
upsertTaskThreadInternal(
sessionKey,
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
return;
}
if (!result.success) {
@ -1340,6 +1322,7 @@ extension AppControllerDesktopThreadActions on AppController {
lastArtifactSyncAtMs: completedAtMs,
lastArtifactSyncStatus: 'failed',
lastTaskArtifactRelativePaths: const <String>[],
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
appendLocalSessionMessageInternal(
@ -1365,6 +1348,7 @@ extension AppControllerDesktopThreadActions on AppController {
lastArtifactSyncAtMs: completedAtMs,
lastArtifactSyncStatus: 'failed',
lastTaskArtifactRelativePaths: const <String>[],
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
appendLocalSessionMessageInternal(
@ -1404,6 +1388,7 @@ extension AppControllerDesktopThreadActions on AppController {
lifecycleStatus: 'ready',
lastRunAtMs: completedAtMs,
lastResultCode: terminalResultCode,
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
}
@ -1546,23 +1531,28 @@ extension AppControllerDesktopThreadActions on AppController {
bool gatewayResultCodeRequiresNewSessionInternal(String code) {
final normalized = code.trim().toUpperCase();
if (normalized.isEmpty ||
normalized == 'ACP_HTTP_CONNECTION_CLOSED') {
if (normalized.isEmpty || normalized == 'ACP_HTTP_CONNECTION_CLOSED') {
return false;
}
if (normalized == 'RUNNING' ||
normalized == 'QUEUED' ||
normalized == 'ABORTED' ||
normalized == 'BRIDGE_NOT_CONNECTED' ||
normalized == 'ARTIFACT_MISSING') {
return true;
}
if (normalized.startsWith('OPENCLAW_') ||
normalized.startsWith('ACP_HTTP_') ||
normalized.startsWith('GATEWAY_')) {
return true; // Conservative fallback for unrecognized infrastructure/gateway errors
}
return false;
return const <String>{
'RUNNING',
'QUEUED',
'ABORTED',
'BRIDGE_NOT_CONNECTED',
'ARTIFACT_MISSING',
'OPENCLAW_ARTIFACT_MISSING',
'OPENCLAW_GATEWAY_QUEUE_FULL',
'OPENCLAW_GATEWAY_SOCKET_CLOSED',
'OPENCLAW_NO_DISPLAYABLE_OUTPUT',
'OPENCLAW_NO_EXPORTED_ARTIFACTS',
'OPENCLAW_WAIT_FAILED',
'ACP_HTTP_401',
'ACP_HTTP_502',
'ACP_HTTP_CONNECT_FAILED',
'ACP_HTTP_CONNECT_TIMEOUT',
'ACP_HTTP_HANDSHAKE_INTERRUPTED',
'GATEWAY_TASK_REJECTED',
}.contains(normalized);
}
Future<void> abortRun() async {
@ -1587,27 +1577,17 @@ extension AppControllerDesktopThreadActions on AppController {
final association = taskThreadForSessionInternal(
normalized,
)?.openClawTaskAssociation;
try {
await goTaskServiceClientInternal.cancelTask(
route: GoTaskServiceRoute.externalAcpSingle,
target: assistantExecutionTargetForSession(normalized),
sessionId: normalized,
threadId: normalized,
association: association,
);
} catch (error) {
debugPrint('OpenClaw cancellation fallback: $error');
// Best effort cancellation only. Local state must still leave pending.
}
await goTaskServiceClientInternal.cancelTask(
route: GoTaskServiceRoute.externalAcpSingle,
target: assistantExecutionTargetForSession(normalized),
sessionId: normalized,
threadId: normalized,
association: association,
);
}
Future<void> prepareForExit() async {
try {
await abortRun();
} catch (error) {
debugPrint('Prepare for exit abort fallback: $error');
// Best effort only. Native termination still proceeds.
}
await abortRun();
await flushAssistantThreadPersistenceInternal();
}

View File

@ -6,8 +6,7 @@ import 'gateway_acp_client.dart';
import 'go_task_service_client.dart';
import 'runtime_models.dart';
class ExternalCodeAgentAcpDesktopTransport
implements GoTaskServiceClient {
class ExternalCodeAgentAcpDesktopTransport implements GoTaskServiceClient {
ExternalCodeAgentAcpDesktopTransport({
required GatewayAcpClient client,
required Uri? Function(AssistantExecutionTarget target) endpointResolver,
@ -26,8 +25,6 @@ class ExternalCodeAgentAcpDesktopTransport
final Duration _recoveryPollDelay;
final int? _recoveryMaxAttempts;
@override
Future<GoTaskServiceResult> executeTask(
GoTaskServiceRequest request, {
@ -35,7 +32,6 @@ class ExternalCodeAgentAcpDesktopTransport
}) async {
var streamedText = '';
String? completedMessage;
Map<String, dynamic>? completedResultSnapshot;
Map<String, dynamic>? runningTaskSnapshot;
try {
final endpointOverride = _taskEndpointResolver == null
@ -65,9 +61,6 @@ class ExternalCodeAgentAcpDesktopTransport
}
if (update.isDone && update.message.trim().isNotEmpty) {
completedMessage = update.message.trim();
completedResultSnapshot = _completedResultSnapshotFromUpdate(
update,
);
}
if (update.payload['status']?.toString().trim().toLowerCase() ==
'running' &&
@ -92,24 +85,11 @@ class ExternalCodeAgentAcpDesktopTransport
: _taskEndpointResolver.call(request),
streamedText: streamedText,
completedMessage: completedMessage,
fallbackAvailable: completedResultSnapshot != null,
runningTaskSnapshot: runningTaskSnapshot,
);
if (recovered != null) {
return recovered;
}
if (completedResultSnapshot != null) {
return goTaskServiceResultFromAcpResponse(
<String, dynamic>{
'jsonrpc': '2.0',
'id': 'recovered-from-completed-session-update',
'result': completedResultSnapshot,
},
route: request.route,
streamedText: streamedText,
completedMessage: completedMessage,
);
}
}
rethrow;
} on SocketException catch (error) {
@ -141,7 +121,6 @@ class ExternalCodeAgentAcpDesktopTransport
required Uri? taskEndpoint,
required String streamedText,
required String? completedMessage,
bool fallbackAvailable = false,
Map<String, dynamic>? runningTaskSnapshot,
}) async {
final endpoint = _sessionSnapshotEndpoint(taskEndpoint);
@ -160,7 +139,8 @@ class ExternalCodeAgentAcpDesktopTransport
try {
response = await _client.request(
method: 'xworkmate.tasks.get',
params: association?.toTaskGetParams() ??
params:
association?.toTaskGetParams() ??
<String, dynamic>{
'sessionId': request.sessionId,
'threadId': request.threadId,
@ -168,9 +148,6 @@ class ExternalCodeAgentAcpDesktopTransport
endpointOverride: endpoint,
);
} on GatewayAcpException {
if (fallbackAvailable) {
return null;
}
continue;
} on SocketException {
continue;
@ -198,14 +175,6 @@ class ExternalCodeAgentAcpDesktopTransport
);
}
final result = _recoveredResultFromTaskSnapshot(snapshot);
final resultArtifacts = _castMap(result['artifacts']);
final artifactItems = resultArtifacts['items'] ?? resultArtifacts;
final hasArtifacts = result.isNotEmpty &&
(artifactItems is List && artifactItems.isNotEmpty ||
result['artifacts'] is List && (result['artifacts'] as List).isNotEmpty);
if (!hasArtifacts && status == 'completed' && attempt < attempts - 1) {
continue;
}
if (result.isNotEmpty) {
return goTaskServiceResultFromAcpResponse(
<String, dynamic>{
@ -287,43 +256,8 @@ class ExternalCodeAgentAcpDesktopTransport
);
}
@override
Future<void> dispose() => _client.dispose();
Map<String, dynamic>? _completedResultSnapshotFromUpdate(
GoTaskServiceUpdate update,
) {
if (!update.isDone) {
return null;
}
final payload = update.payload;
final embeddedResult = _castMap(payload['result']);
final snapshot = <String, dynamic>{...embeddedResult, ...payload};
snapshot.remove('sessionId');
snapshot.remove('threadId');
snapshot.remove('type');
snapshot.remove('event');
snapshot.remove('pending');
snapshot.remove('result');
snapshot['turnId'] = update.turnId;
snapshot['success'] = !update.error;
final text = _firstNonEmptyDisplayText(snapshot, const <String>[
'output',
'message',
'summary',
'text',
'delta',
]);
if (text.isNotEmpty) {
snapshot['output'] = text;
snapshot['message'] = text;
snapshot['summary'] = text;
}
return snapshot;
}
Map<String, dynamic> _recoveredResultFromTaskSnapshot(
Map<String, dynamic> snapshot,
) {

View File

@ -496,28 +496,22 @@ class GoTaskServiceResult {
? raw['resultSummary'].toString().trim()
: raw['summary']?.toString().trim() ?? '';
String get status => _firstNestedGoTaskString(
raw,
const <List<String>>[
<String>['status'],
<String>['error', 'status'],
<String>['details', 'status'],
<String>['payload', 'status'],
<String>['result', 'status'],
],
);
String get status => _firstNestedGoTaskString(raw, const <List<String>>[
<String>['status'],
<String>['error', 'status'],
<String>['details', 'status'],
<String>['payload', 'status'],
<String>['result', 'status'],
]);
String get code => _firstNestedGoTaskString(
raw,
const <List<String>>[
<String>['code'],
<String>['error', 'code'],
<String>['error', 'details', 'code'],
<String>['details', 'code'],
<String>['payload', 'code'],
<String>['result', 'code'],
],
);
String get code => _firstNestedGoTaskString(raw, const <List<String>>[
<String>['code'],
<String>['error', 'code'],
<String>['error', 'details', 'code'],
<String>['details', 'code'],
<String>['payload', 'code'],
<String>['result', 'code'],
]);
bool get isOpenClawRunningTaskHandle {
final normalizedStatus = status.trim().toLowerCase();
@ -755,9 +749,8 @@ GoTaskServiceResult goTaskServiceResultFromAcpResponse(
.map((item) => item['id']?.toString().trim() ?? '')
.where((item) => item.isNotEmpty)
.toList(growable: false);
final success =
_boolValue(result['success']) ?? _inferGoTaskSuccess(result);
final fallbackFailureText = () {
final success = _boolValue(result['success']) ?? _inferGoTaskSuccess(result);
final structuredFailureText = () {
if (success) {
return '';
}
@ -778,8 +771,8 @@ GoTaskServiceResult goTaskServiceResultFromAcpResponse(
? responseText
: completedMessage?.trim().isNotEmpty == true
? completedMessage!.trim()
: fallbackFailureText.isNotEmpty
? fallbackFailureText
: structuredFailureText.isNotEmpty
? structuredFailureText
: streamedText.trim().isNotEmpty
? streamedText.trim()
: '')
@ -787,8 +780,8 @@ GoTaskServiceResult goTaskServiceResultFromAcpResponse(
final directErrorMessage = _extractGoTaskDisplayText(result['error']);
final effectiveErrorMessage = success
? directErrorMessage
: fallbackFailureText.isNotEmpty
? fallbackFailureText
: structuredFailureText.isNotEmpty
? structuredFailureText
: primaryText.isNotEmpty
? primaryText
: directErrorMessage;
@ -810,15 +803,12 @@ bool _inferGoTaskSuccess(Map<String, dynamic> result) {
if (result.containsKey('error')) {
return false;
}
final status = _firstNestedGoTaskString(
result,
const <List<String>>[
<String>['status'],
<String>['details', 'status'],
<String>['payload', 'status'],
<String>['result', 'status'],
],
).toLowerCase();
final status = _firstNestedGoTaskString(result, const <List<String>>[
<String>['status'],
<String>['details', 'status'],
<String>['payload', 'status'],
<String>['result', 'status'],
]).toLowerCase();
if (status == 'failed' ||
status == 'error' ||
status == 'artifact_missing' ||
@ -826,15 +816,12 @@ bool _inferGoTaskSuccess(Map<String, dynamic> result) {
status == 'canceled') {
return false;
}
final code = _firstNestedGoTaskString(
result,
const <List<String>>[
<String>['code'],
<String>['details', 'code'],
<String>['payload', 'code'],
<String>['result', 'code'],
],
).toUpperCase();
final code = _firstNestedGoTaskString(result, const <List<String>>[
<String>['code'],
<String>['details', 'code'],
<String>['payload', 'code'],
<String>['result', 'code'],
]).toUpperCase();
if (code == 'OPENCLAW_ARTIFACT_MISSING' ||
code == 'OPENCLAW_NO_EXPORTED_ARTIFACTS' ||
code == 'ARTIFACT_MISSING') {

View File

@ -899,8 +899,7 @@ void main() {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here. The controller may still be
// releasing files when teardown starts.
// The controller may still be releasing files when teardown starts.
}
}
});
@ -980,7 +979,7 @@ void main() {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here.
// Ignore temp cleanup failure during teardown.
}
}
});
@ -1047,8 +1046,7 @@ void main() {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here. The controller may still be
// releasing files when teardown starts.
// The controller may still be releasing files when teardown starts.
}
}
});
@ -1962,8 +1960,6 @@ void main() {
},
);
test(
'sendChatMessage hides OpenClaw artifact guard text from failed results and streaming',
() async {
@ -3274,7 +3270,10 @@ void main() {
await fakeGoTaskService.waitForRequestCount(prompts.length);
expect(fakeGoTaskService.requests, hasLength(prompts.length));
expect(controller.openClawGatewayActiveTurnsInternal.length, prompts.length);
expect(
controller.openClawGatewayActiveTurnsInternal.length,
prompts.length,
);
expect(controller.openClawGatewayQueuedTurnsInternal, isEmpty);
for (var index = 0; index < prompts.length; index += 1) {
final sessionKey = 'openclaw-e2e-$index';
@ -4114,6 +4113,92 @@ void main() {
);
});
test(
'OpenClaw terminal snapshot without required artifacts does not stay running',
() async {
final fakeGoTaskService = _RecordingGoTaskServiceClient()
..outcomes.add(
const GoTaskServiceResult(
success: true,
message: '',
turnId: 'turn-openclaw-missing-screenshot',
raw: <String, dynamic>{
'success': true,
'status': 'running',
'sessionId': 'openclaw-missing-screenshot',
'threadId': 'openclaw-missing-screenshot',
'turnId': 'turn-openclaw-missing-screenshot',
'runId': 'run-openclaw-missing-screenshot',
'artifactScope':
'tasks/openclaw-missing-screenshot/run-openclaw-missing-screenshot',
'artifactDirectory':
'/tmp/tasks/openclaw-missing-screenshot/run-openclaw-missing-screenshot',
'gatewayProviderId': 'openclaw',
'runtimeBudgetMinutes': 1,
'requiredArtifactExtensions': <String>['.png'],
},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
)
..taskOutcomes.add(
const GoTaskServiceResult(
success: true,
message: 'gateway completed the screenshot task',
turnId: 'turn-openclaw-missing-screenshot',
raw: <String, dynamic>{
'success': true,
'status': 'completed',
'turnId': 'turn-openclaw-missing-screenshot',
'runId': 'run-openclaw-missing-screenshot',
'output': 'gateway completed the screenshot task',
},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
final controller = _connectedGatewayController(fakeGoTaskService);
addTearDown(controller.dispose);
await _selectGatewaySession(controller, 'openclaw-missing-screenshot');
await expectLater(
controller
.sendChatMessage('执行截图并导出 PNG')
.timeout(const Duration(milliseconds: 500)),
completes,
);
await _waitForThreadLifecycleStatusWithin(
controller,
'openclaw-missing-screenshot',
'ready',
const Duration(milliseconds: 500),
);
await _waitForThreadArtifactSyncStatusWithin(
controller,
'openclaw-missing-screenshot',
'no-exported-artifacts',
const Duration(milliseconds: 500),
);
final thread = controller.requireTaskThreadForSessionInternal(
'openclaw-missing-screenshot',
);
expect(thread.lifecycleState.status, 'ready');
expect(thread.lifecycleState.lastResultCode, 'success');
expect(thread.lastArtifactSyncStatus, 'no-exported-artifacts');
expect(thread.openClawTaskAssociation, isNull);
expect(
controller.assistantSessionHasPendingRun(
'openclaw-missing-screenshot',
),
isFalse,
);
},
);
test(
'sendChatMessage resumes existing interrupted and error states',
() async {
@ -4687,7 +4772,21 @@ Future<void> _waitForThreadLifecycleStatus(
String sessionKey,
String status,
) async {
final deadline = DateTime.now().add(const Duration(seconds: 15));
await _waitForThreadLifecycleStatusWithin(
controller,
sessionKey,
status,
const Duration(seconds: 15),
);
}
Future<void> _waitForThreadLifecycleStatusWithin(
AppController controller,
String sessionKey,
String status,
Duration timeout,
) async {
final deadline = DateTime.now().add(timeout);
while (DateTime.now().isBefore(deadline)) {
final currentStatus = controller
.taskThreadForSessionInternal(sessionKey)
@ -4707,6 +4806,30 @@ Future<void> _waitForThreadLifecycleStatus(
);
}
Future<void> _waitForThreadArtifactSyncStatusWithin(
AppController controller,
String sessionKey,
String status,
Duration timeout,
) async {
final deadline = DateTime.now().add(timeout);
while (DateTime.now().isBefore(deadline)) {
final currentStatus = controller
.taskThreadForSessionInternal(sessionKey)
?.lastArtifactSyncStatus;
if (currentStatus == status) {
return;
}
await Future<void>.delayed(const Duration(milliseconds: 10));
}
final currentStatus = controller
.taskThreadForSessionInternal(sessionKey)
?.lastArtifactSyncStatus;
throw StateError(
'Timed out waiting for $sessionKey artifact sync status $status. Current status: $currentStatus.',
);
}
Future<void> _waitForThreadLastResultCode(
AppController controller,
String sessionKey,
@ -4741,8 +4864,6 @@ class _RecordingGoTaskServiceClient implements GoTaskServiceClient {
final List<Object> taskOutcomes = <Object>[];
Future<void> Function(GoTaskServiceRequest request)? onExecuteTask;
@override
Future<GoTaskServiceResult> executeTask(
GoTaskServiceRequest request, {
@ -4814,8 +4935,6 @@ class _RecordingGoTaskServiceClient implements GoTaskServiceClient {
OpenClawTaskAssociation? association,
}) async {}
@override
Future<void> dispose() async {}
}
@ -4831,8 +4950,6 @@ class _BlockingGoTaskServiceClient implements GoTaskServiceClient {
final Map<String, void Function(GoTaskServiceUpdate)> _updates =
<String, void Function(GoTaskServiceUpdate)>{};
@override
Future<GoTaskServiceResult> executeTask(
GoTaskServiceRequest request, {
@ -4937,8 +5054,6 @@ class _BlockingGoTaskServiceClient implements GoTaskServiceClient {
cancelledSessionIds.add(sessionId);
}
@override
Future<void> dispose() async {}
}

View File

@ -67,7 +67,7 @@ void main() {
expect(result.message, 'content list response');
});
test('uses bridge failure text instead of empty output fallback', () {
test('uses bridge failure text when output is empty', () {
final result = goTaskServiceResultFromAcpResponse(<String, dynamic>{
'jsonrpc': '2.0',
'id': 'request-id',
@ -251,7 +251,6 @@ void main() {
'https://xworkmate-bridge.svc.plus/artifacts/summary.pdf',
);
});
});
group('GatewayAcpClient authorization', () {
@ -559,7 +558,7 @@ void main() {
);
test(
'recovers OpenClaw task result from completed session update when final SSE envelope is lost',
'does not synthesize OpenClaw result from completed session update when final SSE envelope is lost',
() async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
addTearDown(() => server.close(force: true));
@ -614,29 +613,33 @@ void main() {
);
addTearDown(transport.dispose);
final result = await transport.executeTask(
const GoTaskServiceRequest(
sessionId: 'unit-fixture-task-a',
threadId: 'unit-fixture-task-a',
target: AssistantExecutionTarget.gateway,
provider: SingleAgentProvider.openclaw,
prompt: 'create files',
workingDirectory: '/tmp/workspace',
model: '',
thinking: 'off',
selectedSkills: <String>[],
inlineAttachments: <GatewayChatAttachmentPayload>[],
localAttachments: <CollaborationAttachment>[],
agentId: '',
metadata: <String, dynamic>{},
await expectLater(
transport.executeTask(
const GoTaskServiceRequest(
sessionId: 'unit-fixture-task-a',
threadId: 'unit-fixture-task-a',
target: AssistantExecutionTarget.gateway,
provider: SingleAgentProvider.openclaw,
prompt: 'create files',
workingDirectory: '/tmp/workspace',
model: '',
thinking: 'off',
selectedSkills: <String>[],
inlineAttachments: <GatewayChatAttachmentPayload>[],
localAttachments: <CollaborationAttachment>[],
agentId: '',
metadata: <String, dynamic>{},
),
onUpdate: (_) {},
),
throwsA(
isA<GatewayAcpException>().having(
(error) => error.code,
'code',
'ACP_HTTP_CONNECTION_CLOSED',
),
),
onUpdate: (_) {},
);
expect(result.success, isTrue);
expect(result.message, 'stable completed output');
expect(result.artifacts, hasLength(1));
expect(result.artifacts.single.relativePath, 'exports/final.md');
},
);
@ -1010,7 +1013,8 @@ void main() {
'items': <Map<String, dynamic>>[
<String, dynamic>{
'relativePath': 'exports/snapshot.md',
'downloadUrl': 'https://xworkmate-bridge.svc.plus/artifacts/openclaw/download?sessionKey=unit-fixture-task-b&runId=turn-recovered-running&relativePath=exports%2Fsnapshot.md',
'downloadUrl':
'https://xworkmate-bridge.svc.plus/artifacts/openclaw/download?sessionKey=unit-fixture-task-b&runId=turn-recovered-running&relativePath=exports%2Fsnapshot.md',
'contentType': 'text/markdown',
'sizeBytes': 64,
},
@ -1084,8 +1088,7 @@ void main() {
'event': 'running',
'status': 'running',
'runId': 'run-running',
'artifactScope':
'tasks/unit-fixture-task-handle/run-running',
'artifactScope': 'tasks/unit-fixture-task-handle/run-running',
'artifactDirectory':
'/home/ubuntu/.openclaw/workspace/tasks/unit-fixture-task-handle/run-running',
'gatewayProviderId': 'openclaw',
@ -1461,8 +1464,7 @@ void main() {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here. The controller does not own
// the lifecycle of the OS temp directory.
// The controller does not own the OS temp directory lifecycle.
}
}
});
@ -1518,8 +1520,7 @@ void main() {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here. The client may still be
// releasing files when teardown starts.
// The client may still be releasing files when teardown starts.
}
}
});
@ -1561,8 +1562,7 @@ void main() {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here. The controller may still be
// releasing files when teardown starts.
// The controller may still be releasing files when teardown starts.
}
}
});
@ -1617,18 +1617,17 @@ void main() {
);
test(
'desktop bridge auth resolver does not fallback to the remote gateway token for bridge ACP',
'desktop bridge auth resolver rejects the remote gateway token for bridge ACP',
() async {
final storeRoot = await Directory.systemTemp.createTemp(
'xworkmate-acp-auth-bridge-fallback-',
'xworkmate-acp-auth-bridge-reject-',
);
addTearDown(() async {
if (await storeRoot.exists()) {
try {
await storeRoot.delete(recursive: true);
} on FileSystemException {
// Temp cleanup is best effort here. The controller may still be
// releasing files when teardown starts.
// The controller may still be releasing files when teardown starts.
}
}
});