feat: isolate concurrent assistant task runs

This commit is contained in:
Haitao Pan 2026-05-08 15:04:42 +08:00
parent e156ecf808
commit 833cc48e33
8 changed files with 503 additions and 31 deletions

View File

@ -333,6 +333,17 @@ extension AppControllerDesktopThreadActions on AppController {
final resumeSession = hasCommittedUserTurnForGatewaySessionInternal(
sessionKey,
);
final lifecycleStatus = taskThreadForSessionInternal(
sessionKey,
)?.lifecycleState.status.trim().toLowerCase();
final lastResultCode = taskThreadForSessionInternal(
sessionKey,
)?.lifecycleState.lastResultCode?.trim().toLowerCase();
final runStatus = resumeSession && lifecycleStatus == 'interrupted'
? 'continuing'
: resumeSession && lastResultCode == 'error'
? 'retrying'
: 'running';
final userText = message.trim().isEmpty
? 'See attached.'
: message.trim();
@ -354,9 +365,9 @@ extension AppControllerDesktopThreadActions on AppController {
aiGatewayPendingSessionKeysInternal.add(sessionKey);
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'running',
lifecycleStatus: runStatus,
lastRunAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
lastResultCode: 'running',
lastResultCode: runStatus,
updatedAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
);
recomputeTasksInternal();

View File

@ -302,13 +302,12 @@ extension AppControllerDesktopThreadBinding on AppController {
),
lifecycleState:
(snapshot.record?.lifecycleState ??
const ThreadLifecycleState(
archived: false,
status: 'ready',
lastRunAtMs: null,
lastResultCode: null,
))
.copyWith(status: 'ready'),
const ThreadLifecycleState(
archived: false,
status: 'ready',
lastRunAtMs: null,
lastResultCode: null,
)),
updatedAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
);
}

View File

@ -85,9 +85,12 @@ class AssistantTaskRailStateInternal extends State<AssistantTaskRailInternal> {
AssistantExecutionTarget.values,
),
);
final runningCount = tasks
.where((task) => normalizedTaskStatusInternal(task.status) == 'running')
.length;
final activeCount = tasks.where((task) {
final status = normalizedTaskStatusInternal(task.status);
return status == 'running' ||
status == 'continuing' ||
status == 'retrying';
}).length;
final openCount = tasks
.where((task) => normalizedTaskStatusInternal(task.status) == 'open')
.length;
@ -162,7 +165,7 @@ class AssistantTaskRailStateInternal extends State<AssistantTaskRailInternal> {
runSpacing: 6,
children: [
MetaPillInternal(
label: '${appText('运行中', 'Running')} $runningCount',
label: '${appText('运行中', 'Running')} $activeCount',
icon: Icons.play_circle_outline_rounded,
),
MetaPillInternal(
@ -363,8 +366,11 @@ class AssistantTaskTileInternal extends StatelessWidget {
child: Icon(
entry.draft
? Icons.edit_note_rounded
: normalizedTaskStatusInternal(entry.status) == 'running'
: _taskIsActiveInternal(entry.status)
? Icons.play_arrow_rounded
: normalizedTaskStatusInternal(entry.status) ==
'interrupted'
? Icons.pause_circle_outline_rounded
: Icons.task_alt_rounded,
size: 15,
color: statusStyle.foregroundColor,
@ -385,11 +391,35 @@ class AssistantTaskTileInternal extends StatelessWidget {
),
),
const SizedBox(width: 8),
Text(
entry.updatedAtLabel,
style: theme.textTheme.bodySmall?.copyWith(
color: palette.textMuted,
),
Column(
mainAxisSize: MainAxisSize.min,
crossAxisAlignment: CrossAxisAlignment.end,
children: [
Text(
entry.updatedAtLabel,
style: theme.textTheme.bodySmall?.copyWith(
color: palette.textMuted,
),
),
if (_taskShowsProgressInternal(entry.status)) ...[
const SizedBox(height: 4),
SizedBox(
key: ValueKey<String>(
'assistant-task-progress-${entry.sessionKey}',
),
width: 56,
child: LinearProgressIndicator(
value: _taskProgressValueInternal(entry.status),
minHeight: 3,
color: statusStyle.foregroundColor,
backgroundColor: statusStyle.foregroundColor.withValues(
alpha: 0.16,
),
borderRadius: BorderRadius.circular(999),
),
),
],
],
),
const SizedBox(width: 2),
IconButton(
@ -414,6 +444,30 @@ class AssistantTaskTileInternal extends StatelessWidget {
}
}
bool _taskIsActiveInternal(String status) {
final normalized = normalizedTaskStatusInternal(status);
return normalized == 'running' ||
normalized == 'continuing' ||
normalized == 'retrying';
}
bool _taskShowsProgressInternal(String status) {
final normalized = normalizedTaskStatusInternal(status);
return normalized == 'running' ||
normalized == 'continuing' ||
normalized == 'retrying' ||
normalized == 'interrupted';
}
double? _taskProgressValueInternal(String status) {
return switch (normalizedTaskStatusInternal(status)) {
'continuing' => 0.62,
'retrying' => 0.38,
'interrupted' => 0.48,
_ => null,
};
}
class AssistantTaskGroupHeaderInternal extends StatelessWidget {
const AssistantTaskGroupHeaderInternal({
super.key,

View File

@ -86,6 +86,9 @@ extension AssistantPageStateActionsInternal on AssistantPageStateInternal {
if (rawPrompt.isEmpty) {
return;
}
if (controller.hasAssistantPendingRun) {
await createNewThreadInternal();
}
final autoAgent = pickAutoAgentInternal(controller, rawPrompt);
if (autoAgent != null) {
@ -267,15 +270,14 @@ extension AssistantPageStateActionsInternal on AssistantPageStateInternal {
List<String> selectedSkillKeysForInternal(AppController controller) {
final selected =
controller.taskThreadForSessionInternal(controller.currentSessionKey)
controller
.taskThreadForSessionInternal(controller.currentSessionKey)
?.selectedSkillKeys ??
const <String>[];
final availableKeys = availableSkillOptionsInternal(controller)
.map((option) => option.key)
.toSet();
return selected
.where(availableKeys.contains)
.toList(growable: false);
final availableKeys = availableSkillOptionsInternal(
controller,
).map((option) => option.key).toSet();
return selected.where(availableKeys.contains).toList(growable: false);
}
List<String> resolveSelectedSkillLabelsInternal(AppController controller) {
@ -541,6 +543,12 @@ extension AssistantPageStateActionsInternal on AssistantPageStateInternal {
status: sessionStatusInternal(
session,
sessionPending: controller.assistantSessionHasPendingRun(session.key),
lifecycleStatus:
controller
.taskThreadForSessionInternal(session.key)
?.lifecycleState
.status ??
'',
),
updatedAtMs:
session.updatedAtMs ??
@ -846,8 +854,21 @@ extension AssistantPageStateActionsInternal on AssistantPageStateInternal {
List<GatewayChatMessage> messages,
AppController controller,
) {
final thread = controller.taskThreadForSessionInternal(
controller.currentSessionKey,
);
final lifecycleStatus = normalizedTaskStatusInternal(
thread?.lifecycleState.status ?? '',
);
if (controller.hasAssistantPendingRun) {
return 'running';
return switch (lifecycleStatus) {
'continuing' => 'continuing',
'retrying' => 'retrying',
_ => 'running',
};
}
if (lifecycleStatus == 'interrupted') {
return 'interrupted';
}
if (messages.isEmpty) {
return null;

View File

@ -264,11 +264,19 @@ PillStyleInternal pillStyleForStatusInternal(
backgroundColor: context.palette.accentMuted,
foregroundColor: theme.colorScheme.primary,
),
'continuing' => PillStyleInternal(
backgroundColor: context.palette.accentMuted,
foregroundColor: theme.colorScheme.primary,
),
'retrying' => PillStyleInternal(
backgroundColor: context.palette.surfaceSecondary,
foregroundColor: theme.colorScheme.tertiary,
),
'queued' => PillStyleInternal(
backgroundColor: context.palette.surfaceSecondary,
foregroundColor: context.palette.textSecondary,
),
'failed' || 'error' => PillStyleInternal(
'interrupted' || 'failed' || 'error' => PillStyleInternal(
backgroundColor: context.palette.surfacePrimary,
foregroundColor: theme.colorScheme.error,
),
@ -283,6 +291,9 @@ String normalizedTaskStatusInternal(String status) {
final value = status.trim().toLowerCase();
return switch (value) {
'running' => 'running',
'continuing' => 'continuing',
'retrying' => 'retrying',
'interrupted' => 'interrupted',
'queued' => 'queued',
'failed' => 'failed',
'error' => 'error',
@ -294,6 +305,9 @@ String normalizedTaskStatusInternal(String status) {
String toolCallStatusLabelInternal(String status) =>
switch (normalizedTaskStatusInternal(status)) {
'running' => appText('运行中', 'Running'),
'continuing' => appText('继续中', 'Continuing'),
'retrying' => appText('重试中', 'Retrying'),
'interrupted' => appText('已中断', 'Interrupted'),
'failed' || 'error' => appText('错误', 'Error'),
_ => appText('已完成', 'Completed'),
};
@ -347,12 +361,21 @@ String? sessionPreviewInternal(GatewaySessionSummary session) {
String sessionStatusInternal(
GatewaySessionSummary session, {
required bool sessionPending,
String lifecycleStatus = '',
}) {
final normalizedLifecycle = normalizedTaskStatusInternal(lifecycleStatus);
if (session.abortedLastRun == true) {
return 'failed';
}
if (sessionPending) {
return 'running';
return switch (normalizedLifecycle) {
'continuing' => 'continuing',
'retrying' => 'retrying',
_ => 'running',
};
}
if (normalizedLifecycle == 'interrupted') {
return 'interrupted';
}
if ((session.lastMessagePreview ?? '').trim().isEmpty) {
return 'queued';

View File

@ -2,7 +2,14 @@ import 'package:flutter/material.dart';
import '../i18n/app_language.dart';
enum AssistantTaskProgressPhase { idle, running, syncingArtifacts, interrupted }
enum AssistantTaskProgressPhase {
idle,
running,
retrying,
continuing,
syncingArtifacts,
interrupted,
}
class AssistantTaskProgressState {
const AssistantTaskProgressState({
@ -37,6 +44,8 @@ class AssistantTaskProgressBar extends StatelessWidget {
final theme = Theme.of(context);
final color = state.interrupted
? theme.colorScheme.error
: state.phase == AssistantTaskProgressPhase.retrying
? theme.colorScheme.tertiary
: theme.colorScheme.primary;
return Container(
key: const Key('assistant-task-progress-bar'),
@ -87,6 +96,7 @@ AssistantTaskProgressState assistantTaskProgressState({
required String artifactSyncStatus,
}) {
final syncStatus = artifactSyncStatus.trim().toLowerCase();
final status = lifecycleStatus.trim().toLowerCase();
if (pending && syncStatus == 'syncing') {
return AssistantTaskProgressState(
phase: AssistantTaskProgressPhase.syncingArtifacts,
@ -94,13 +104,26 @@ AssistantTaskProgressState assistantTaskProgressState({
value: 0.82,
);
}
if (pending && status == 'continuing') {
return AssistantTaskProgressState(
phase: AssistantTaskProgressPhase.continuing,
label: appText('任务继续中...', 'Continuing task...'),
value: 0.62,
);
}
if (pending && status == 'retrying') {
return AssistantTaskProgressState(
phase: AssistantTaskProgressPhase.retrying,
label: appText('任务重试中...', 'Retrying task...'),
value: 0.38,
);
}
if (pending) {
return AssistantTaskProgressState(
phase: AssistantTaskProgressPhase.running,
label: appText('任务运行中...', 'Task running...'),
);
}
final status = lifecycleStatus.trim().toLowerCase();
final result = lastResultCode.trim().toUpperCase();
if (status == 'interrupted' ||
syncStatus == 'interrupted' ||

View File

@ -1,5 +1,8 @@
import 'package:flutter/material.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:xworkmate/features/assistant/assistant_page_components.dart';
import 'package:xworkmate/features/assistant/assistant_page_task_models.dart';
import 'package:xworkmate/runtime/runtime_models.dart';
import 'package:xworkmate/theme/app_theme.dart';
import 'package:xworkmate/widgets/assistant_task_progress_bar.dart';
@ -48,6 +51,48 @@ void main() {
expect(indicator.value, 0.82);
});
testWidgets('shows continuing progress when an interrupted task resumes', (
tester,
) async {
await tester.pumpWidget(
_buildTestApp(
assistantTaskProgressState(
pending: true,
lifecycleStatus: 'continuing',
lastResultCode: 'continuing',
artifactSyncStatus: '',
),
),
);
expect(find.text('任务继续中...'), findsOneWidget);
final indicator = tester.widget<LinearProgressIndicator>(
find.byKey(const Key('assistant-task-progress-indicator')),
);
expect(indicator.value, 0.62);
});
testWidgets('shows retry progress when a failed task is submitted again', (
tester,
) async {
await tester.pumpWidget(
_buildTestApp(
assistantTaskProgressState(
pending: true,
lifecycleStatus: 'retrying',
lastResultCode: 'retrying',
artifactSyncStatus: '',
),
),
);
expect(find.text('任务重试中...'), findsOneWidget);
final indicator = tester.widget<LinearProgressIndicator>(
find.byKey(const Key('assistant-task-progress-indicator')),
);
expect(indicator.value, 0.38);
});
testWidgets('shows interrupted state after ACP connection closes', (
tester,
) async {
@ -76,6 +121,50 @@ void main() {
expect(find.byKey(const Key('assistant-task-progress-bar')), findsNothing);
});
testWidgets('task rail item shows per-task progress for active status', (
tester,
) async {
await tester.pumpWidget(
MaterialApp(
theme: AppTheme.light(),
home: Material(
child: SizedBox(
width: 320,
child: AssistantTaskTileInternal(
entry: AssistantTaskEntryInternal(
sessionKey: 'task-a',
title: 'Task A',
preview: 'preview',
status: 'continuing',
updatedAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
owner: 'XWorkmate',
surface: 'Assistant',
executionTarget: AssistantExecutionTarget.gateway,
isCurrent: false,
),
archiveEnabled: true,
onTap: () {},
onRename: () {},
onArchive: () {},
),
),
),
),
);
final progress = find.byKey(
const ValueKey<String>('assistant-task-progress-task-a'),
);
expect(progress, findsOneWidget);
final indicator = tester.widget<LinearProgressIndicator>(
find.descendant(
of: progress,
matching: find.byType(LinearProgressIndicator),
),
);
expect(indicator.value, 0.62);
});
}
Widget _buildTestApp(AssistantTaskProgressState state) {

View File

@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@ -790,6 +791,189 @@ void main() {
);
},
);
test('sendChatMessage runs independent sessions concurrently', () async {
final fakeGoTaskService = _BlockingGoTaskServiceClient();
final controller = _connectedController(fakeGoTaskService);
addTearDown(controller.dispose);
await controller.sessionsController.switchSession('task-a');
final taskAFuture = controller.sendChatMessage('task A');
await fakeGoTaskService.waitForRequestCount(1);
expect(fakeGoTaskService.requests.single.sessionId, 'task-a');
expect(
controller
.taskThreadForSessionInternal('task-a')
?.lifecycleState
.status,
'running',
);
await controller.switchSession('task-b');
final taskBFuture = controller.sendChatMessage('task B');
await fakeGoTaskService.waitForRequestCount(2);
expect(
fakeGoTaskService.requests.map((request) => request.sessionId),
<String>['task-a', 'task-b'],
);
expect(controller.assistantSessionHasPendingRun('task-a'), isTrue);
expect(controller.assistantSessionHasPendingRun('task-b'), isTrue);
fakeGoTaskService.complete(
'task-b',
const GoTaskServiceResult(
success: true,
message: 'result B',
turnId: 'turn-b',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await taskBFuture;
expect(controller.assistantSessionHasPendingRun('task-a'), isTrue);
expect(controller.assistantSessionHasPendingRun('task-b'), isFalse);
expect(
controller.localSessionMessagesInternal['task-b']!.map(
(message) => message.text,
),
contains('result B'),
);
expect(
controller.localSessionMessagesInternal['task-a']!.map(
(message) => message.text,
),
isNot(contains('result B')),
);
fakeGoTaskService.complete(
'task-a',
const GoTaskServiceResult(
success: true,
message: 'result A',
turnId: 'turn-a',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await taskAFuture;
expect(controller.assistantSessionHasPendingRun('task-a'), isFalse);
expect(
controller.localSessionMessagesInternal['task-a']!.map(
(message) => message.text,
),
contains('result A'),
);
});
test(
'sendChatMessage exposes continuing and retrying lifecycle states',
() async {
final fakeGoTaskService = _BlockingGoTaskServiceClient();
final controller = _connectedController(fakeGoTaskService);
addTearDown(controller.dispose);
await controller.switchSession('interrupted-task');
controller.appendLocalSessionMessageInternal(
'interrupted-task',
GatewayChatMessage(
id: 'user-interrupted',
role: 'user',
text: 'previous turn',
timestampMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
controller.upsertTaskThreadInternal(
'interrupted-task',
lifecycleStatus: 'interrupted',
lastResultCode: 'ACP_HTTP_CONNECTION_CLOSED',
);
expect(
controller.hasCommittedUserTurnForGatewaySessionInternal(
'interrupted-task',
),
isTrue,
);
final continuingFuture = controller.sendChatMessage('continue');
await fakeGoTaskService.waitForRequestCount(1);
expect(
controller
.taskThreadForSessionInternal('interrupted-task')
?.lifecycleState
.status,
'continuing',
);
fakeGoTaskService.complete(
'interrupted-task',
const GoTaskServiceResult(
success: true,
message: 'continued',
turnId: 'turn-continued',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await continuingFuture;
await controller.switchSession('retry-task');
controller.appendLocalSessionMessageInternal(
'retry-task',
GatewayChatMessage(
id: 'user-retry',
role: 'user',
text: 'previous failed turn',
timestampMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
controller.upsertTaskThreadInternal(
'retry-task',
lifecycleStatus: 'ready',
lastResultCode: 'error',
);
final retryFuture = controller.sendChatMessage('retry');
await fakeGoTaskService.waitForRequestCount(2);
expect(
controller
.taskThreadForSessionInternal('retry-task')
?.lifecycleState
.status,
'retrying',
);
fakeGoTaskService.complete(
'retry-task',
const GoTaskServiceResult(
success: true,
message: 'retried',
turnId: 'turn-retried',
raw: <String, dynamic>{},
errorMessage: '',
resolvedModel: '',
route: GoTaskServiceRoute.externalAcpSingle,
),
);
await retryFuture;
},
);
});
}
@ -999,3 +1183,71 @@ class _RecordingGoTaskServiceClient implements GoTaskServiceClient {
@override
Future<void> dispose() async {}
}
class _BlockingGoTaskServiceClient implements GoTaskServiceClient {
final List<GoTaskServiceRequest> requests = <GoTaskServiceRequest>[];
final Map<String, Completer<GoTaskServiceResult>> _pending =
<String, Completer<GoTaskServiceResult>>{};
@override
Future<ExternalCodeAgentAcpCapabilities> loadExternalAcpCapabilities({
required AssistantExecutionTarget target,
bool forceRefresh = false,
}) async => const ExternalCodeAgentAcpCapabilities.empty();
@override
Future<ExternalCodeAgentAcpRoutingResolution> resolveExternalAcpRouting({
required String taskPrompt,
required String workingDirectory,
required ExternalCodeAgentAcpRoutingConfig routing,
}) async =>
const ExternalCodeAgentAcpRoutingResolution(raw: <String, dynamic>{});
@override
Future<GoTaskServiceResult> executeTask(
GoTaskServiceRequest request, {
required void Function(GoTaskServiceUpdate update) onUpdate,
}) {
requests.add(request);
final completer = Completer<GoTaskServiceResult>();
_pending[request.sessionId] = completer;
return completer.future;
}
Future<void> waitForRequestCount(int count) async {
final deadline = DateTime.now().add(const Duration(seconds: 2));
while (requests.length < count && DateTime.now().isBefore(deadline)) {
await Future<void>.delayed(const Duration(milliseconds: 10));
}
if (requests.length < count) {
throw StateError('Timed out waiting for $count requests.');
}
}
void complete(String sessionId, GoTaskServiceResult result) {
final completer = _pending.remove(sessionId);
if (completer == null) {
throw StateError('No pending task for $sessionId.');
}
completer.complete(result);
}
@override
Future<void> cancelTask({
required GoTaskServiceRoute route,
required AssistantExecutionTarget target,
required String sessionId,
required String threadId,
}) async {}
@override
Future<void> closeTask({
required GoTaskServiceRoute route,
required AssistantExecutionTarget target,
required String sessionId,
required String threadId,
}) async {}
@override
Future<void> dispose() async {}
}