Fix assistant continue task requeue

This commit is contained in:
Haitao Pan 2026-05-25 10:35:30 +08:00
parent ae3a8c02cc
commit 0f2b29e0ad
6 changed files with 432 additions and 20 deletions

View File

@ -249,11 +249,100 @@ extension AppControllerDesktopThreadActions on AppController {
sessionsControllerInternal.currentSessionKey,
);
}
final currentTarget = assistantExecutionTargetForSession(sessionKey);
final resumeSessionHint = shouldResumeGatewaySessionForNextSendInternal(
sessionKey,
);
var connectionState = assistantConnectionStateForSession(sessionKey);
await dispatchGatewayChatTurnInternal(
sessionKey: sessionKey,
message: message,
thinking: thinking,
attachments: attachments,
localAttachments: localAttachments,
selectedSkillLabels: selectedSkillLabels,
resumeSessionHint: resumeSessionHint,
);
}
Future<void> continueAssistantTaskInternal(String sessionKey) async {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey.trim().isEmpty
? sessionsControllerInternal.currentSessionKey
: sessionKey,
);
final thread = taskThreadForSessionInternal(normalizedSessionKey);
final lifecycleStatus = thread?.lifecycleState.status ?? '';
final lastResultCode = thread?.lifecycleState.lastResultCode ?? '';
final artifactSyncStatus = thread?.lastArtifactSyncStatus ?? '';
if (!isRecoverableAssistantTaskStateInternal(
lifecycleStatus: lifecycleStatus,
lastResultCode: lastResultCode,
artifactSyncStatus: artifactSyncStatus,
)) {
final error = StateError(
appText('当前任务状态不可继续执行。', 'The current task state cannot be continued.'),
);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
throw error;
}
final lastUserTurn = lastCommittedUserTurnForGatewaySessionInternal(
normalizedSessionKey,
);
final message = lastUserTurn?.text.trim() ?? '';
if (message.isEmpty) {
final error = StateError(
appText(
'当前任务没有可恢复的用户请求,请输入需求后重新提交。',
'This task has no recoverable user request. Enter a request and submit it again.',
),
);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
throw error;
}
await dispatchGatewayChatTurnInternal(
sessionKey: normalizedSessionKey,
message: message,
thinking: 'off',
attachments: const <GatewayChatAttachmentPayload>[],
localAttachments: const <CollaborationAttachment>[],
selectedSkillLabels: const <String>[],
resumeSessionHint:
lastResultCode.trim().toUpperCase() != 'ABORTED' &&
shouldResumeGatewaySessionForNextSendInternal(normalizedSessionKey),
appendUserTurn: false,
);
}
Future<void> dispatchGatewayChatTurnInternal({
required String sessionKey,
required String message,
required String thinking,
required List<GatewayChatAttachmentPayload> attachments,
required List<CollaborationAttachment> localAttachments,
required List<String> selectedSkillLabels,
required bool resumeSessionHint,
bool appendUserTurn = true,
}) async {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final currentTarget = assistantExecutionTargetForSession(
normalizedSessionKey,
);
var connectionState = assistantConnectionStateForSession(
normalizedSessionKey,
);
if (!connectionState.connected &&
isBridgeAcpRuntimeConfiguredInternal() &&
bridgeCapabilityRefreshNeededForAssistantTargetInternal(
@ -261,7 +350,9 @@ extension AppControllerDesktopThreadActions on AppController {
)) {
try {
await refreshAcpCapabilitiesInternal(forceRefresh: true);
connectionState = assistantConnectionStateForSession(sessionKey);
connectionState = assistantConnectionStateForSession(
normalizedSessionKey,
);
} catch (_) {
// Fallback to existing connection state if refresh fails.
}
@ -269,7 +360,7 @@ extension AppControllerDesktopThreadActions on AppController {
if (!connectionState.connected) {
final error = StateError(connectionState.detailLabel);
appendAssistantThreadMessageInternal(
sessionKey,
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
@ -278,14 +369,17 @@ extension AppControllerDesktopThreadActions on AppController {
throw error;
}
await ensureDesktopTaskThreadBindingInternal(
sessionKey,
normalizedSessionKey,
executionTarget: currentTarget,
);
final workingDirectory =
assistantWorkingDirectoryForSessionInternal(sessionKey)?.trim() ?? '';
assistantWorkingDirectoryForSessionInternal(
normalizedSessionKey,
)?.trim() ??
'';
final remoteWorkingDirectoryHint =
assistantRemoteWorkingDirectoryHintForSessionInternal(
sessionKey,
normalizedSessionKey,
)?.trim() ??
'';
if (workingDirectory.isEmpty) {
@ -296,7 +390,7 @@ extension AppControllerDesktopThreadActions on AppController {
),
);
appendAssistantThreadMessageInternal(
sessionKey,
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
@ -311,7 +405,7 @@ extension AppControllerDesktopThreadActions on AppController {
}
if (providerCatalogForExecutionTarget(currentTarget).isEmpty) {
upsertTaskThreadInternal(
sessionKey,
normalizedSessionKey,
selectedProvider: SingleAgentProvider.unspecified,
selectedProviderSource: ThreadSelectionSource.inherited,
latestResolvedProviderId: '',
@ -329,7 +423,7 @@ extension AppControllerDesktopThreadActions on AppController {
),
);
appendAssistantThreadMessageInternal(
sessionKey,
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
@ -338,11 +432,13 @@ extension AppControllerDesktopThreadActions on AppController {
throw error;
}
}
final provider = assistantProviderForSession(sessionKey);
final provider = assistantProviderForSession(normalizedSessionKey);
final model = currentTarget.isGateway
? ''
: assistantModelForSession(sessionKey);
final routing = buildExternalAcpRoutingForSessionInternal(sessionKey);
: assistantModelForSession(normalizedSessionKey);
final routing = buildExternalAcpRoutingForSessionInternal(
normalizedSessionKey,
);
final dispatch = await codeAgentNodeOrchestratorInternal
.buildGatewayDispatch(
buildCodeAgentNodeStateInternal(executionTarget: currentTarget),
@ -361,7 +457,7 @@ extension AppControllerDesktopThreadActions on AppController {
OpenClawGatewayQueuedTurnInternal(
queueId:
'openclaw-${DateTime.now().microsecondsSinceEpoch}-$localMessageCounterInternal',
sessionKey: sessionKey,
sessionKey: normalizedSessionKey,
target: currentTarget,
provider: provider,
message: message,
@ -376,14 +472,15 @@ extension AppControllerDesktopThreadActions on AppController {
agentId: dispatch.agentId ?? '',
metadata: Map<String, dynamic>.unmodifiable(dispatch.metadata),
resumeSessionHint: resumeSessionHint,
appendUserTurn: appendUserTurn,
),
);
return;
}
await enqueueThreadTurnInternal<void>(
sessionKey,
normalizedSessionKey,
() => runGatewayChatTurnInternal(
sessionKey: sessionKey,
sessionKey: normalizedSessionKey,
target: currentTarget,
provider: provider,
message: message,
@ -398,6 +495,7 @@ extension AppControllerDesktopThreadActions on AppController {
agentId: dispatch.agentId ?? '',
metadata: Map<String, dynamic>.unmodifiable(dispatch.metadata),
resumeSessionHint: resumeSessionHint,
appendUserTurn: appendUserTurn,
),
);
recomputeTasksInternal();
@ -570,7 +668,9 @@ extension AppControllerDesktopThreadActions on AppController {
Future<void> enqueueOpenClawGatewayTurnInternal(
OpenClawGatewayQueuedTurnInternal turn,
) async {
appendGatewayUserTurnInternal(turn.sessionKey, turn.message);
if (turn.appendUserTurn) {
appendGatewayUserTurnInternal(turn.sessionKey, turn.message);
}
if (openClawGatewayActiveTasksInternal >=
openClawGatewayMaxActiveTasksInternal &&
openClawGatewayQueuedTurnsInternal.length >=
@ -974,6 +1074,42 @@ extension AppControllerDesktopThreadActions on AppController {
});
}
GatewayChatMessage? lastCommittedUserTurnForGatewaySessionInternal(
String sessionKey,
) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final messages = <GatewayChatMessage>[
...?assistantThreadRecordsInternal[normalizedSessionKey]?.messages,
...?assistantThreadMessagesInternal[normalizedSessionKey],
...?localSessionMessagesInternal[normalizedSessionKey],
];
for (final message in messages.reversed) {
final role = message.role.trim().toLowerCase();
if (role == 'user' && !message.pending) {
return message;
}
}
return null;
}
bool isRecoverableAssistantTaskStateInternal({
required String lifecycleStatus,
required String lastResultCode,
required String artifactSyncStatus,
}) {
final status = lifecycleStatus.trim().toLowerCase();
final syncStatus = artifactSyncStatus.trim().toLowerCase();
final result = lastResultCode.trim().toUpperCase();
return status == 'interrupted' ||
syncStatus == 'interrupted' ||
result == 'ABORTED' ||
result == 'ERROR' ||
result == 'ACP_HTTP_CONNECTION_CLOSED' ||
result == 'SESSION_CONTINUATION_UNAVAILABLE';
}
bool shouldResumeGatewaySessionForNextSendInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,

View File

@ -22,6 +22,7 @@ class OpenClawGatewayQueuedTurnInternal {
required this.agentId,
required this.metadata,
required this.resumeSessionHint,
this.appendUserTurn = true,
});
final String queueId;
@ -40,6 +41,7 @@ class OpenClawGatewayQueuedTurnInternal {
final String agentId;
final Map<String, dynamic> metadata;
final bool resumeSessionHint;
final bool appendUserTurn;
bool cancelled = false;
}

View File

@ -355,6 +355,14 @@ extension AssistantPageStateActionsInternal on AssistantPageStateInternal {
widget.controller.openSettings(tab: SettingsTab.gateway);
}
Future<void> continueCurrentTaskInternal(String sessionKey) async {
try {
await widget.controller.continueAssistantTaskInternal(sessionKey);
} catch (_) {
focusComposerInternal();
}
}
void focusComposerInternal() {
if (!mounted) {
return;

View File

@ -178,9 +178,13 @@ extension AssistantPageStateClosureInternal on AssistantPageStateInternal {
}
: null,
onContinue: progressState.recoverable
? AssistantPageStateActionsInternal(
this,
).focusComposerInternal
? () {
unawaited(
AssistantPageStateActionsInternal(
this,
).continueCurrentTaskInternal(activeSessionKey),
);
}
: null,
),
ColoredBox(

View File

@ -129,6 +129,30 @@ void main() {
expect(indicator.value, 0.48);
});
testWidgets('invokes continue action for a recoverable interrupted task', (
tester,
) async {
var continued = false;
await tester.pumpWidget(
_buildTestApp(
assistantTaskProgressState(
pending: false,
lifecycleStatus: 'interrupted',
lastResultCode: 'ACP_HTTP_CONNECTION_CLOSED',
artifactSyncStatus: 'interrupted',
),
onContinue: () {
continued = true;
},
),
);
await tester.tap(
find.byKey(const Key('assistant-task-progress-continue-button')),
);
expect(continued, isTrue);
});
testWidgets('shows continue action for a stopped task', (tester) async {
var continued = false;
await tester.pumpWidget(

View File

@ -2790,6 +2790,142 @@ void main() {
},
);
test(
'continueAssistantTaskInternal requeues a stopped OpenClaw task without clearing queued work',
() async {
final fakeGoTaskService = _BlockingGoTaskServiceClient();
final controller = _connectedGatewayController(fakeGoTaskService);
addTearDown(() {
fakeGoTaskService.completeAll();
controller.dispose();
});
await _selectGatewaySession(controller, 'continue-active-openclaw');
await controller.sendChatMessage('active task');
await fakeGoTaskService.waitForRequestCount(1);
await _selectGatewaySession(controller, 'continue-queued-openclaw');
await controller.sendChatMessage('queued before continue');
await _waitForThreadLifecycleStatus(
controller,
'continue-queued-openclaw',
'queued',
);
await _selectGatewaySession(controller, 'continue-stopped-openclaw');
controller.appendLocalSessionMessageInternal(
'continue-stopped-openclaw',
GatewayChatMessage(
id: 'user-continue-stopped-openclaw',
role: 'user',
text: 'resume stopped openclaw task',
timestampMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
controller.upsertTaskThreadInternal(
'continue-stopped-openclaw',
executionTarget: AssistantExecutionTarget.gateway,
selectedProvider: SingleAgentProvider.openclaw,
selectedProviderSource: ThreadSelectionSource.explicit,
lifecycleStatus: 'ready',
lastResultCode: 'aborted',
);
await controller.continueAssistantTaskInternal(
'continue-stopped-openclaw',
);
await _waitForThreadLifecycleStatus(
controller,
'continue-stopped-openclaw',
'queued',
);
expect(fakeGoTaskService.requests, hasLength(1));
expect(
controller.assistantSessionHasPendingRun('continue-queued-openclaw'),
isTrue,
);
expect(
controller.assistantSessionHasPendingRun('continue-stopped-openclaw'),
isTrue,
);
expect(
controller.localSessionMessagesInternal['continue-stopped-openclaw']!
.where(
(message) =>
message.role == 'user' &&
message.text == 'resume stopped openclaw task',
)
.length,
1,
);
fakeGoTaskService.complete(
'continue-active-openclaw',
const GoTaskServiceResult(
success: true,
message: 'active done',
turnId: 'turn-active',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await fakeGoTaskService.waitForRequestCount(2);
expect(
fakeGoTaskService.requests.last.sessionId,
'continue-queued-openclaw',
);
fakeGoTaskService.complete(
'continue-queued-openclaw',
const GoTaskServiceResult(
success: true,
message: 'queued done',
turnId: 'turn-queued',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await fakeGoTaskService.waitForRequestCount(3);
final continuedRequest = fakeGoTaskService.requests.last;
expect(continuedRequest.sessionId, 'continue-stopped-openclaw');
expect(continuedRequest.resumeSession, isFalse);
expect(
continuedRequest.prompt,
contains('User request:\nresume stopped openclaw task'),
);
fakeGoTaskService.complete(
'continue-stopped-openclaw',
const GoTaskServiceResult(
success: true,
message: 'continued stopped',
turnId: 'turn-continued-stopped',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await _waitForThreadLifecycleStatus(
controller,
'continue-stopped-openclaw',
'ready',
);
},
);
test(
'stale queued lifecycle without a real queue entry is not pending',
() {
@ -3045,6 +3181,108 @@ void main() {
},
);
test(
'continueAssistantTaskInternal resumes interrupted task without duplicating user turn',
() async {
final fakeGoTaskService = _BlockingGoTaskServiceClient();
final controller = _connectedController(fakeGoTaskService);
addTearDown(() {
fakeGoTaskService.completeAll();
controller.dispose();
});
await controller.switchSession('continue-interrupted-task');
controller.appendLocalSessionMessageInternal(
'continue-interrupted-task',
GatewayChatMessage(
id: 'user-continue-interrupted',
role: 'user',
text: 'previous interrupted request',
timestampMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
controller.upsertTaskThreadInternal(
'continue-interrupted-task',
lifecycleStatus: 'interrupted',
lastResultCode: 'ACP_HTTP_CONNECTION_CLOSED',
lastArtifactSyncStatus: 'interrupted',
);
final continueFuture = controller.continueAssistantTaskInternal(
'continue-interrupted-task',
);
await fakeGoTaskService.waitForRequestCount(1);
final request = fakeGoTaskService.requests.single;
expect(request.sessionId, 'continue-interrupted-task');
expect(request.resumeSession, isTrue);
expect(
request.prompt,
contains('User request:\nprevious interrupted request'),
);
expect(
controller.localSessionMessagesInternal['continue-interrupted-task']!
.where(
(message) =>
message.role == 'user' &&
message.text == 'previous interrupted request',
)
.length,
1,
);
fakeGoTaskService.complete(
'continue-interrupted-task',
const GoTaskServiceResult(
success: true,
message: 'continued interrupted',
turnId: 'turn-continued-interrupted',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await continueFuture;
},
);
test(
'continueAssistantTaskInternal fails locally without a committed user turn',
() async {
final fakeGoTaskService = _RecordingGoTaskServiceClient();
final controller = _connectedController(fakeGoTaskService);
addTearDown(controller.dispose);
await controller.switchSession('continue-empty-task');
controller.upsertTaskThreadInternal(
'continue-empty-task',
lifecycleStatus: 'ready',
lastResultCode: 'aborted',
);
await expectLater(
controller.continueAssistantTaskInternal('continue-empty-task'),
throwsA(isA<StateError>()),
);
expect(fakeGoTaskService.requests, isEmpty);
expect(
controller
.assistantThreadMessagesInternal['continue-empty-task']!
.last
.text,
contains('没有可恢复的用户请求'),
);
},
);
test('sendChatMessage resumes after confirmed session activity', () async {
final fakeGoTaskService = _RecordingGoTaskServiceClient()
..outcomes.add(