xworkmate-app/lib/app/app_controller_desktop_thread_actions.dart
Haitao Pan 40c59269e9 fix(gateway): day-1 stability — stop infinite "running" and un-stoppable tasks
Symptom: a gateway turn shows "任务运行中..." forever and 停止 has no effect,
even though the OpenClaw gateway has already finished (ACP_HTTP_CONNECTION_CLOSED).

- T3: add a hard deadline to the running-handle poll branch so the client no
  longer polls forever when tasks.get keeps returning "running". Budget is
  derived from taskLoadClass (10/30/60min, aligned with the bridge) + grace;
  on timeout the turn lands in a recoverable `interrupted` state
  (OPENCLAW_RUN_POLL_TIMEOUT) prompting the user to resend.
- T4: make 停止 locally authoritative — capture the association, mark the turn
  aborted immediately (clears pending, exits the poll loop), then fire
  tasks.cancel best-effort so a hung/failed cancel RPC can't block termination.
- T6: applyGatewayChatFailureInternal now authoritatively clears the pending
  flag (both raw + normalized key). Previously runOpenClawGatewayQueuedTurnInternal's
  finally never cleared it, leaving "error shown but still running".

Full cross-repo analysis + remaining TODO in docs/cases/06.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 10:48:45 +08:00

1925 lines
68 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// ignore_for_file: unused_import, unnecessary_import
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:math' as math;
import 'package:flutter/material.dart';
import 'app_metadata.dart';
import 'app_capabilities.dart';
import 'app_store_policy.dart';
import 'ui_feature_manifest.dart';
import '../i18n/app_language.dart';
import '../models/app_models.dart';
import '../runtime/device_identity_store.dart';
import '../runtime/runtime_bootstrap.dart';
import '../runtime/desktop_platform_service.dart';
import '../runtime/gateway_runtime.dart';
import '../runtime/runtime_controllers.dart';
import '../runtime/runtime_models.dart';
import '../runtime/secure_config_store.dart';
import '../runtime/embedded_agent_launch_policy.dart';
import '../runtime/runtime_coordinator.dart';
import '../runtime/gateway_acp_client.dart';
import '../runtime/codex_runtime.dart';
import '../runtime/codex_config_bridge.dart';
import '../runtime/code_agent_node_orchestrator.dart';
import '../runtime/assistant_artifacts.dart';
import '../runtime/desktop_thread_artifact_service.dart';
import '../runtime/go_task_service_client.dart';
import '../runtime/mode_switcher.dart';
import '../runtime/agent_registry.dart';
import '../runtime/platform_environment.dart';
import 'app_controller_openclaw_task_queue.dart';
import 'app_controller_desktop_core.dart';
import 'app_controller_desktop_navigation.dart';
import 'app_controller_desktop_gateway.dart';
import 'app_controller_desktop_settings.dart';
import 'app_controller_desktop_external_acp_routing.dart';
import 'app_controller_desktop_thread_binding.dart';
import 'app_controller_desktop_thread_sessions.dart';
import 'app_controller_desktop_workspace_execution.dart';
import 'app_controller_desktop_settings_runtime.dart';
import 'app_controller_desktop_thread_storage.dart';
import 'app_controller_desktop_skill_permissions.dart';
import 'app_controller_desktop_runtime_helpers.dart';
// ignore_for_file: invalid_use_of_visible_for_testing_member, invalid_use_of_protected_member
extension AppControllerDesktopThreadActions on AppController {
GatewayChatMessage assistantErrorMessageInternal(String text) {
return GatewayChatMessage(
id: nextLocalMessageIdInternal(),
role: 'assistant',
text: text,
timestampMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: true,
);
}
bool assistantSessionHasPendingRun(String sessionKey) {
final normalized = normalizedAssistantSessionKeyInternal(sessionKey);
final association = taskThreadForSessionInternal(
normalized,
)?.openClawTaskAssociation;
return aiGatewayPendingSessionKeysInternal.contains(normalized) ||
(association != null && !association.isTerminal) ||
openClawGatewayQueuedTurnsBySessionInternal[normalized]?.any(
(turn) => !turn.cancelled,
) ==
true ||
false;
}
Future<void> connectSavedGateway() async {
final target = currentAssistantExecutionTarget;
await AppControllerDesktopGateway(this).connectProfileInternal(
gatewayProfileForAssistantExecutionTargetInternal(target),
profileIndex: gatewayProfileIndexForExecutionTargetInternal(target),
);
}
Future<void> clearStoredGatewayToken({int? profileIndex}) async {
await settingsControllerInternal.clearGatewaySecrets(
profileIndex: profileIndex,
token: true,
);
}
Future<void> refreshGatewayHealth() async {
if (!runtimeInternal.isConnected) {
return;
}
try {
await runtimeInternal.health();
} catch (error) {
debugPrint('Gateway health refresh failed: $error');
}
try {
await runtimeInternal.status();
} catch (error) {
debugPrint('Gateway status refresh failed: $error');
}
notifyListeners();
}
Future<void> refreshDevices({bool quiet = false}) async {
await devicesControllerInternal.refresh(quiet: quiet);
}
Future<void> approveDevicePairing(String requestId) async {
await devicesControllerInternal.approve(requestId);
await settingsControllerInternal.refreshDerivedState();
}
Future<void> rejectDevicePairing(String requestId) async {
await devicesControllerInternal.reject(requestId);
}
Future<void> removePairedDevice(String deviceId) async {
await devicesControllerInternal.remove(deviceId);
await settingsControllerInternal.refreshDerivedState();
}
Future<String?> rotateDeviceRoleToken({
required String deviceId,
required String role,
List<String> scopes = const <String>[],
}) async {
final token = await devicesControllerInternal.rotateToken(
deviceId: deviceId,
role: role,
scopes: scopes,
);
await settingsControllerInternal.refreshDerivedState();
return token;
}
Future<void> revokeDeviceRoleToken({
required String deviceId,
required String role,
}) async {
await devicesControllerInternal.revokeToken(deviceId: deviceId, role: role);
await settingsControllerInternal.refreshDerivedState();
}
Future<void> refreshAgents() async {
await agentsControllerInternal.refresh();
sessionsControllerInternal.configure(
selectedAgentId: agentsControllerInternal.selectedAgentId,
defaultAgentId: '',
);
recomputeTasksInternal();
}
Future<void> selectAgent(String? agentId) async {
agentsControllerInternal.selectAgent(agentId);
final target = currentAssistantExecutionTarget;
final nextProfile = gatewayProfileForAssistantExecutionTargetInternal(
target,
).copyWith(selectedAgentId: agentsControllerInternal.selectedAgentId);
await AppControllerDesktopSettings(this).saveSettings(
settings.copyWithGatewayProfileAt(
gatewayProfileIndexForExecutionTargetInternal(target),
nextProfile,
),
refreshAfterSave: false,
);
sessionsControllerInternal.configure(
selectedAgentId: agentsControllerInternal.selectedAgentId,
defaultAgentId: '',
);
final sessionKey = normalizedAssistantSessionKeyInternal(currentSessionKey);
if (isAppOwnedAssistantSessionKeyInternal(sessionKey)) {
await chatControllerInternal.loadSession(sessionKey);
}
await skillsControllerInternal.refresh(
agentId: agentsControllerInternal.selectedAgentId.isEmpty
? null
: agentsControllerInternal.selectedAgentId,
);
recomputeTasksInternal();
}
Future<void> refreshSessions({bool preserveCurrentSelection = false}) async {
final previousSessionKey = normalizedAssistantSessionKeyInternal(
sessionsControllerInternal.currentSessionKey,
);
sessionsControllerInternal.configure(
selectedAgentId: agentsControllerInternal.selectedAgentId,
defaultAgentId: '',
);
await sessionsControllerInternal.refresh();
if (preserveCurrentSelection &&
isAppOwnedAssistantSessionKeyInternal(previousSessionKey) &&
!isAssistantTaskArchived(previousSessionKey)) {
await setCurrentAssistantSessionKeyInternal(
previousSessionKey,
persistSelection: false,
);
recomputeTasksInternal();
return;
}
await ensureActiveAssistantThreadInternal();
final selectedSessionKey = normalizedAssistantSessionKeyInternal(
sessionsControllerInternal.currentSessionKey,
);
if (isAppOwnedAssistantSessionKeyInternal(selectedSessionKey)) {
await chatControllerInternal.loadSession(selectedSessionKey);
}
recomputeTasksInternal();
}
Future<void> switchSession(String sessionKey) async {
var nextSessionKey = normalizedAssistantSessionKeyInternal(sessionKey);
if (!isAppOwnedAssistantSessionKeyInternal(nextSessionKey)) {
nextSessionKey = createAssistantDraftSessionKeyInternal();
}
final nextTarget = assistantExecutionTargetForSession(nextSessionKey);
final nextViewMode = assistantMessageViewModeForSession(nextSessionKey);
await setCurrentAssistantSessionKeyInternal(nextSessionKey);
upsertTaskThreadInternal(
nextSessionKey,
executionTarget: nextTarget,
messageViewMode: nextViewMode,
);
await ensureDesktopTaskThreadBindingInternal(
nextSessionKey,
executionTarget: nextTarget,
);
await applyAssistantExecutionTargetInternal(
nextTarget,
sessionKey: nextSessionKey,
persistDefaultSelection: false,
preserveGatewayHistoryForSelectedThread: false,
);
if (runtimeInternal.isConnected) {
await chatControllerInternal.loadSession(nextSessionKey);
} else {
chatControllerInternal.resetSession(nextSessionKey);
}
recomputeTasksInternal();
}
Future<void> sendChatMessage(
String message, {
String? sessionKey,
String thinking = 'off',
List<GatewayChatAttachmentPayload> attachments =
const <GatewayChatAttachmentPayload>[],
List<CollaborationAttachment> localAttachments =
const <CollaborationAttachment>[],
List<String> selectedSkillLabels = const <String>[],
}) async {
var targetSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey ?? sessionsControllerInternal.currentSessionKey,
);
if (!isAppOwnedAssistantSessionKeyInternal(targetSessionKey)) {
if (sessionKey != null && sessionKey.trim().isNotEmpty) {
throw StateError(
appText(
'提交目标会话无效,请重新选择任务后提交。',
'The submit target session is invalid. Select the task again before submitting.',
),
);
}
await ensureActiveAssistantThreadInternal();
targetSessionKey = normalizedAssistantSessionKeyInternal(
sessionsControllerInternal.currentSessionKey,
);
}
final resumeSessionHint = shouldResumeGatewaySessionForNextSendInternal(
targetSessionKey,
);
await dispatchGatewayChatTurnInternal(
sessionKey: targetSessionKey,
message: message,
thinking: thinking,
attachments: attachments,
localAttachments: localAttachments,
selectedSkillLabels: selectedSkillLabels,
resumeSessionHint: resumeSessionHint,
);
}
Future<void> continueAssistantTaskInternal(String sessionKey) async {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey.trim().isEmpty
? sessionsControllerInternal.currentSessionKey
: sessionKey,
);
final thread = taskThreadForSessionInternal(normalizedSessionKey);
final lifecycleStatus = thread?.lifecycleState.status ?? '';
final lastResultCode = thread?.lifecycleState.lastResultCode ?? '';
final artifactSyncStatus = thread?.lastArtifactSyncStatus ?? '';
if (!isRecoverableAssistantTaskStateInternal(
lifecycleStatus: lifecycleStatus,
lastResultCode: lastResultCode,
artifactSyncStatus: artifactSyncStatus,
)) {
final error = StateError(
appText('当前任务状态不可继续执行。', 'The current task state cannot be continued.'),
);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
throw error;
}
final lastUserTurn = lastCommittedUserTurnForGatewaySessionInternal(
normalizedSessionKey,
);
final message = lastUserTurn?.text.trim() ?? '';
if (message.isEmpty) {
final error = StateError(
appText(
'当前任务没有可恢复的用户请求,请输入需求后重新提交。',
'This task has no recoverable user request. Enter a request and submit it again.',
),
);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
throw error;
}
await dispatchGatewayChatTurnInternal(
sessionKey: normalizedSessionKey,
message: message,
thinking: 'off',
attachments: const <GatewayChatAttachmentPayload>[],
localAttachments: const <CollaborationAttachment>[],
selectedSkillLabels: const <String>[],
resumeSessionHint:
lastResultCode.trim().toUpperCase() != 'ABORTED' &&
shouldResumeGatewaySessionForNextSendInternal(normalizedSessionKey),
appendUserTurn: false,
);
}
Future<void> dispatchGatewayChatTurnInternal({
required String sessionKey,
required String message,
required String thinking,
required List<GatewayChatAttachmentPayload> attachments,
required List<CollaborationAttachment> localAttachments,
required List<String> selectedSkillLabels,
required bool resumeSessionHint,
bool appendUserTurn = true,
}) async {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final currentTarget = assistantExecutionTargetForSession(
normalizedSessionKey,
);
var connectionState = assistantConnectionStateForSession(
normalizedSessionKey,
);
if (!connectionState.connected &&
isBridgeAcpRuntimeConfiguredInternal() &&
bridgeCapabilityRefreshNeededForAssistantTargetInternal(
currentTarget,
)) {
await refreshAcpCapabilitiesInternal(forceRefresh: true);
connectionState = assistantConnectionStateForSession(
normalizedSessionKey,
);
}
if (!connectionState.connected) {
final error = StateError(connectionState.detailLabel);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
throw error;
}
await ensureDesktopTaskThreadBindingInternal(
normalizedSessionKey,
executionTarget: currentTarget,
);
final workingDirectory =
assistantWorkingDirectoryForSessionInternal(
normalizedSessionKey,
)?.trim() ??
'';
final remoteWorkingDirectoryHint =
assistantRemoteWorkingDirectoryHintForSessionInternal(
normalizedSessionKey,
)?.trim() ??
'';
if (workingDirectory.isEmpty) {
final error = StateError(
appText(
'当前任务线程缺少可运行的 workingDirectory无法执行。',
'This task thread has no runnable workingDirectory yet.',
),
);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
throw error;
}
if (providerCatalogForExecutionTarget(currentTarget).isEmpty) {
await refreshSingleAgentCapabilitiesInternal(forceRefresh: true);
if (providerCatalogForExecutionTarget(currentTarget).isEmpty) {
upsertTaskThreadInternal(
normalizedSessionKey,
selectedProvider: SingleAgentProvider.unspecified,
selectedProviderSource: ThreadSelectionSource.inherited,
latestResolvedProviderId: '',
updatedAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
);
final error = StateError(
currentTarget.isGateway
? appText(
'Gateway ACP 未报告可用的 gateway provider当前无法发送。',
'Gateway ACP did not report a usable gateway provider, so this Gateway task cannot run yet.',
)
: appText(
'Gateway ACP 未报告可用的 agent provider当前无法发送。',
'Gateway ACP did not report a usable agent provider, so this Agent task cannot run yet.',
),
);
appendAssistantThreadMessageInternal(
normalizedSessionKey,
assistantErrorMessageInternal(error.message),
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
throw error;
}
}
final provider = assistantProviderForSession(normalizedSessionKey);
final model = currentTarget.isGateway
? ''
: assistantModelForSession(normalizedSessionKey);
final routing = buildExternalAcpRoutingForSessionInternal(
normalizedSessionKey,
);
final dispatch = await codeAgentNodeOrchestratorInternal
.buildGatewayDispatch(
buildCodeAgentNodeStateInternal(executionTarget: currentTarget),
);
final capturedSelectedSkillLabels = List<String>.unmodifiable(
selectedSkillLabels,
);
final inlineAttachmentsToUpload =
registerTaskInputAttachmentsForGatewayTurnInternal(
normalizedSessionKey,
attachments,
);
final capturedAttachments = List<GatewayChatAttachmentPayload>.unmodifiable(
inlineAttachmentsToUpload,
);
final capturedLocalAttachments = List<CollaborationAttachment>.unmodifiable(
localAttachments,
);
final executionWorkingDirectory = gatewayExecutionWorkingDirectoryInternal(
target: currentTarget,
workingDirectory: workingDirectory,
remoteWorkingDirectoryHint: remoteWorkingDirectoryHint,
);
final taskMetadata = Map<String, dynamic>.unmodifiable(
gatewayTaskMetadataWithArtifactContractInternal(
baseMetadata: dispatch.metadata,
sessionKey: normalizedSessionKey,
userPrompt: message,
localWorkingDirectory: workingDirectory,
executionWorkingDirectory: executionWorkingDirectory,
remoteWorkingDirectoryHint: remoteWorkingDirectoryHint,
),
);
if (usesOpenClawGatewayQueueInternal(currentTarget, provider)) {
await enqueueOpenClawGatewayTurnInternal(
OpenClawGatewayQueuedTurnInternal(
queueId:
'openclaw-${DateTime.now().microsecondsSinceEpoch}-$localMessageCounterInternal',
sessionKey: normalizedSessionKey,
target: currentTarget,
provider: provider,
message: message,
thinking: thinking,
selectedSkillLabels: capturedSelectedSkillLabels,
attachments: capturedAttachments,
localAttachments: capturedLocalAttachments,
workingDirectory: executionWorkingDirectory,
localWorkingDirectory: workingDirectory,
remoteWorkingDirectoryHint: remoteWorkingDirectoryHint,
model: model,
routing: routing,
agentId: dispatch.agentId ?? '',
metadata: taskMetadata,
resumeSessionHint: resumeSessionHint,
appendUserTurn: appendUserTurn,
),
);
return;
}
await enqueueThreadTurnInternal<void>(
normalizedSessionKey,
() => runGatewayChatTurnInternal(
sessionKey: normalizedSessionKey,
target: currentTarget,
provider: provider,
message: message,
thinking: thinking,
selectedSkillLabels: capturedSelectedSkillLabels,
attachments: capturedAttachments,
localAttachments: capturedLocalAttachments,
workingDirectory: workingDirectory,
localWorkingDirectory: workingDirectory,
executionWorkingDirectory: executionWorkingDirectory,
remoteWorkingDirectoryHint: remoteWorkingDirectoryHint,
model: model,
routing: routing,
agentId: dispatch.agentId ?? '',
metadata: taskMetadata,
resumeSessionHint: resumeSessionHint,
appendUserTurn: appendUserTurn,
),
);
recomputeTasksInternal();
}
List<GatewayChatAttachmentPayload>
registerTaskInputAttachmentsForGatewayTurnInternal(
String sessionKey,
List<GatewayChatAttachmentPayload> attachments,
) {
if (attachments.isEmpty) {
return const <GatewayChatAttachmentPayload>[];
}
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final existing = taskThreadForSessionInternal(normalizedSessionKey);
final existingByKey = <String, TaskInputAttachmentRecord>{
for (final item
in existing?.taskInputAttachments ??
const <TaskInputAttachmentRecord>[])
if (item.key.isNotEmpty) item.key: item,
};
final inlineAttachmentsToUpload = <GatewayChatAttachmentPayload>[];
final nextByKey = <String, TaskInputAttachmentRecord>{...existingByKey};
final uploadedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
for (final attachment in attachments) {
final key = gatewayAttachmentPayloadSha256Internal(attachment);
if (key.isEmpty) {
inlineAttachmentsToUpload.add(attachment);
continue;
}
if (!existingByKey.containsKey(key)) {
inlineAttachmentsToUpload.add(attachment);
}
nextByKey.putIfAbsent(
key,
() => TaskInputAttachmentRecord(
name: attachment.fileName.trim(),
mimeType: attachment.mimeType.trim(),
sha256: key,
type: attachment.type.trim(),
uploadedAtMs: uploadedAtMs,
sourcePath: attachment.sourcePath.trim(),
),
);
}
if (nextByKey.length != existingByKey.length) {
upsertTaskThreadInternal(
normalizedSessionKey,
taskInputAttachments: nextByKey.values.toList(growable: false),
updatedAtMs: uploadedAtMs,
);
}
return inlineAttachmentsToUpload;
}
String gatewayAttachmentPayloadSha256Internal(
GatewayChatAttachmentPayload attachment,
) => goTaskServiceAttachmentSha256(attachment);
Future<void> runGatewayChatTurnInternal({
required String sessionKey,
required AssistantExecutionTarget target,
required SingleAgentProvider provider,
required String message,
required String thinking,
required List<String> selectedSkillLabels,
required List<GatewayChatAttachmentPayload> attachments,
required List<CollaborationAttachment> localAttachments,
required String workingDirectory,
String? localWorkingDirectory,
String? executionWorkingDirectory,
required String remoteWorkingDirectoryHint,
required String model,
required ExternalCodeAgentAcpRoutingConfig routing,
required String agentId,
required Map<String, dynamic> metadata,
required bool resumeSessionHint,
bool appendUserTurn = true,
}) async {
final resumeSession =
resumeSessionHint ||
(appendUserTurn &&
shouldResumeGatewaySessionForNextSendInternal(sessionKey));
final messageWithSkills = messageWithSelectedSkillsContextInternal(
message: message,
selectedSkillLabels: selectedSkillLabels,
);
final taskPrompt = taskWorkspaceContextPromptInternal(
sessionKey: sessionKey,
userPrompt: messageWithSkills,
workingDirectory: localWorkingDirectory ?? workingDirectory,
executionWorkingDirectory: executionWorkingDirectory ?? workingDirectory,
remoteWorkingDirectoryHint: remoteWorkingDirectoryHint,
target: target,
taskInputAttachments:
taskThreadForSessionInternal(sessionKey)?.taskInputAttachments ??
const <TaskInputAttachmentRecord>[],
);
if (appendUserTurn) {
appendGatewayUserTurnInternal(sessionKey, message);
}
markGatewayChatRunInternal(sessionKey);
var handedOffToBridgeTask = false;
try {
final result = await goTaskServiceClientInternal.executeTask(
GoTaskServiceRequest(
sessionId: sessionKey,
threadId: sessionKey,
target: target,
provider: provider,
prompt: taskPrompt,
workingDirectory: executionWorkingDirectory ?? workingDirectory,
remoteWorkingDirectoryHint: remoteWorkingDirectoryHint,
model: model,
thinking: thinking,
selectedSkills: selectedSkillLabels,
inlineAttachments: attachments,
localAttachments: localAttachments,
agentId: agentId,
metadata: metadata,
routing: routing,
routingHint: 'gateway',
resumeSession: resumeSession,
),
onUpdate: (update) {
if (update.isDelta) {
appendAiGatewayStreamingTextInternal(sessionKey, update.text);
notifyIfActiveInternal();
}
},
);
if (!aiGatewayPendingSessionKeysInternal.contains(sessionKey) &&
taskThreadForSessionInternal(
sessionKey,
)?.lifecycleState.lastResultCode ==
'aborted') {
clearAiGatewayStreamingTextInternal(sessionKey);
return;
}
final association = result.openClawTaskAssociation;
if (association != null) {
handedOffToBridgeTask = true;
persistOpenClawTaskAssociationInternal(
sessionKey: sessionKey,
association: association,
);
unawaited(
pollOpenClawTaskAssociationInternal(
sessionKey: sessionKey,
target: target,
association: association,
),
);
return;
}
await applyGatewayChatResultInternal(
sessionKey: sessionKey,
target: target,
result: result,
);
} catch (error) {
if (!aiGatewayPendingSessionKeysInternal.contains(sessionKey) &&
taskThreadForSessionInternal(
sessionKey,
)?.lifecycleState.lastResultCode ==
'aborted') {
clearAiGatewayStreamingTextInternal(sessionKey);
return;
}
await applyGatewayChatFailureInternal(
sessionKey: sessionKey,
target: target,
error: error,
);
} finally {
if (!handedOffToBridgeTask) {
aiGatewayPendingSessionKeysInternal.remove(sessionKey);
clearAiGatewayStreamingTextInternal(sessionKey);
}
recomputeTasksInternal();
notifyIfActiveInternal();
}
}
void persistOpenClawTaskAssociationInternal({
required String sessionKey,
required OpenClawTaskAssociation association,
}) {
final nowMs = DateTime.now().millisecondsSinceEpoch.toDouble();
aiGatewayPendingSessionKeysInternal.add(sessionKey);
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'running',
lastResultCode: 'running',
lastRunAtMs: association.startedAtMs > 0
? association.startedAtMs
: nowMs,
lastArtifactSyncAtMs: nowMs,
lastArtifactSyncStatus: 'running',
openClawTaskAssociation: association,
updatedAtMs: nowMs,
);
recomputeTasksInternal();
notifyIfActiveInternal();
unawaited(flushAssistantThreadPersistenceInternal());
}
Future<void> pollOpenClawTaskAssociationInternal({
required String sessionKey,
required AssistantExecutionTarget target,
required OpenClawTaskAssociation association,
}) async {
var current = association;
var firstAttempt = true;
var artifactSyncAttempts = 0;
double? artifactSyncStartedAtMs;
// T3: running 轮询兜底截止的起算锚点(首次进入 running 轮询的时间)。
double? runningPollFirstAtMs;
final existingThread = taskThreadForSessionInternal(sessionKey);
if (association.status.trim().toLowerCase() == 'syncing-artifacts') {
artifactSyncStartedAtMs = existingThread?.lastArtifactSyncAtMs;
}
while (true) {
if (disposedInternal) {
return;
}
if (!aiGatewayPendingSessionKeysInternal.contains(sessionKey)) {
return;
}
if (!firstAttempt) {
await Future<void>.delayed(const Duration(seconds: 2));
}
firstAttempt = false;
try {
final result = await goTaskServiceClientInternal.getTask(
route: GoTaskServiceRoute.externalAcpSingle,
target: target,
association: current,
);
if (disposedInternal) {
return;
}
final nextAssociation =
result.openClawTaskAssociation ??
current.copyWith(
status: result.status.trim().isEmpty
? current.status
: result.status.trim(),
);
current = nextAssociation;
if (result.isOpenClawRunningTaskHandle) {
// T3: 给 running 轮询加兜底截止,避免 gateway 始终回 running 时无限轮询、永远卡「任务运行中...」。
final nowMs = DateTime.now().millisecondsSinceEpoch.toDouble();
runningPollFirstAtMs ??= nowMs;
if (openClawRunningPollDeadlineReachedInternal(
startedAtMs: current.startedAtMs,
taskLoadClass: current.taskLoadClass,
firstPollAtMs: runningPollFirstAtMs,
nowMs: nowMs,
)) {
markOpenClawRunningPollTimeoutInternal(
sessionKey: sessionKey,
association: current,
);
return;
}
persistOpenClawTaskAssociationInternal(
sessionKey: sessionKey,
association: nextAssociation,
);
continue;
}
if (aiGatewayPendingSessionKeysInternal.contains(sessionKey)) {
final hasTaskScopedOpenClawArtifacts =
current.artifactScope.trim().isNotEmpty ||
current.artifactDirectory.trim().isNotEmpty;
final hasRequiredExts = current.requiredArtifactExtensions.isNotEmpty;
final requiresArtifactExport =
current.requiresArtifactExport || hasRequiredExts;
final hasEnoughArtifacts =
result.artifacts.isNotEmpty &&
(!hasRequiredExts ||
current.requiredArtifactExtensions.every((ext) {
return result.artifacts.any(
(a) => openClawArtifactPathHasRequiredExtension(
a.relativePath,
ext,
),
);
}));
final shouldKeepPollingForArtifacts =
hasTaskScopedOpenClawArtifacts &&
requiresArtifactExport &&
!hasEnoughArtifacts;
if (shouldKeepPollingForArtifacts) {
final nowMs = DateTime.now().millisecondsSinceEpoch.toDouble();
artifactSyncAttempts += 1;
artifactSyncStartedAtMs ??= nowMs;
final missingRequired = current.requiredArtifactExtensions
.where((ext) {
return !result.artifacts.any(
(a) => openClawArtifactPathHasRequiredExtension(
a.relativePath,
ext,
),
);
})
.toList(growable: false);
if (openClawArtifactSyncLimitReachedInternal(
attemptCount: artifactSyncAttempts,
firstSyncAtMs: artifactSyncStartedAtMs,
nowMs: nowMs,
)) {
markOpenClawArtifactSyncTimeoutInternal(
sessionKey: sessionKey,
association: current,
missingRequiredExtensions: missingRequired.isEmpty
? current.requiredArtifactExtensions
: missingRequired,
remoteWorkingDirectory: result.remoteWorkingDirectory,
remoteWorkspaceRefKind: result.remoteWorkspaceRefKind,
);
return;
}
current = current.copyWith(status: 'syncing-artifacts');
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'running',
lastResultCode: 'running',
lastRemoteWorkingDirectory:
result.remoteWorkingDirectory.trim().isEmpty
? taskThreadForSessionInternal(
sessionKey,
)?.lastRemoteWorkingDirectory
: result.remoteWorkingDirectory.trim(),
lastRemoteWorkspaceRefKind:
result.remoteWorkspaceRefKind ??
taskThreadForSessionInternal(
sessionKey,
)?.lastRemoteWorkspaceRefKind,
lastArtifactSyncAtMs: nowMs,
lastArtifactSyncStatus: 'syncing',
openClawTaskAssociation: current,
updatedAtMs: nowMs,
);
recomputeTasksInternal();
notifyIfActiveInternal();
unawaited(flushAssistantThreadPersistenceInternal());
continue;
}
await applyGatewayChatResultInternal(
sessionKey: sessionKey,
target: target,
result: result,
artifactSyncAttempts: artifactSyncAttempts,
artifactSyncStartedAtMs: artifactSyncStartedAtMs,
);
}
aiGatewayPendingSessionKeysInternal.remove(sessionKey);
clearAiGatewayStreamingTextInternal(sessionKey);
recomputeTasksInternal();
notifyIfActiveInternal();
return;
} catch (error) {
if (disposedInternal) {
return;
}
if (aiGatewayPendingSessionKeysInternal.contains(sessionKey)) {
await applyGatewayChatFailureInternal(
sessionKey: sessionKey,
target: target,
error: error,
);
}
aiGatewayPendingSessionKeysInternal.remove(sessionKey);
clearAiGatewayStreamingTextInternal(sessionKey);
recomputeTasksInternal();
notifyIfActiveInternal();
return;
}
}
}
void resumeOpenClawTaskAssociationsInternal({String? onlySessionKey}) {
final normalizedOnly = onlySessionKey == null
? ''
: normalizedAssistantSessionKeyInternal(onlySessionKey);
for (final record in taskThreadRepositoryInternal.snapshot()) {
if (normalizedOnly.isNotEmpty && record.threadId != normalizedOnly) {
continue;
}
final association = record.openClawTaskAssociation;
if (association == null || association.isTerminal) {
continue;
}
if (association.status.trim().toLowerCase() == 'syncing-artifacts' &&
openClawArtifactSyncLimitReachedInternal(
attemptCount: 0,
firstSyncAtMs: record.lastArtifactSyncAtMs,
)) {
markOpenClawArtifactSyncTimeoutInternal(
sessionKey: record.threadId,
association: association,
missingRequiredExtensions: association.requiredArtifactExtensions,
remoteWorkingDirectory: record.lastRemoteWorkingDirectory,
remoteWorkspaceRefKind: record.lastRemoteWorkspaceRefKind,
);
continue;
}
aiGatewayPendingSessionKeysInternal.add(record.threadId);
unawaited(
pollOpenClawTaskAssociationInternal(
sessionKey: record.threadId,
target: assistantExecutionTargetFromExecutionMode(
record.executionBinding.executionMode,
),
association: association,
),
);
}
recomputeTasksInternal();
notifyIfActiveInternal();
}
String messageWithSelectedSkillsContextInternal({
required String message,
required List<String> selectedSkillLabels,
}) {
final labels = selectedSkillLabels
.map((item) => item.trim())
.where((item) => item.isNotEmpty)
.toList(growable: false);
if (labels.isEmpty || message.contains('Preferred skills:')) {
return message;
}
return 'Preferred skills:\n${labels.map((name) => '- $name').join('\n')}\n\n$message';
}
String taskWorkspaceContextPromptInternal({
required String sessionKey,
required String userPrompt,
required String workingDirectory,
String? executionWorkingDirectory,
required String remoteWorkingDirectoryHint,
required AssistantExecutionTarget target,
List<TaskInputAttachmentRecord> taskInputAttachments =
const <TaskInputAttachmentRecord>[],
}) {
final requestText = userPrompt.trim().isEmpty
? 'See attached.'
: userPrompt.trim();
final executionWorkspace =
executionWorkingDirectory?.trim().isNotEmpty == true
? executionWorkingDirectory!.trim()
: (remoteWorkingDirectoryHint.trim().isNotEmpty
? remoteWorkingDirectoryHint.trim()
: workingDirectory.trim());
final currentTaskWorkspace = executionWorkspace.isNotEmpty
? executionWorkspace
: (target.isGateway
? r'$XWORKMATE_ARTIFACT_DIRECTORY'
: workingDirectory.trim());
final buffer = StringBuffer()
..writeln('TaskThread workspace context:')
..writeln('- sessionKey: $sessionKey')
..writeln('- currentTaskWorkspace: $currentTaskWorkspace');
if (workingDirectory.trim().isNotEmpty) {
buffer.writeln('- localWorkspace: ${workingDirectory.trim()}');
}
if (remoteWorkingDirectoryHint.trim().isNotEmpty) {
buffer.writeln(
'- remoteWorkspaceHint: ${remoteWorkingDirectoryHint.trim()}',
);
}
final visibleTaskInputAttachments = taskInputAttachments
.where((item) => item.name.trim().isNotEmpty && item.key.isNotEmpty)
.toList(growable: false);
if (visibleTaskInputAttachments.isNotEmpty) {
buffer.writeln('- taskInputAttachments:');
for (final attachment in visibleTaskInputAttachments) {
final sourcePath = attachment.sourcePath.trim();
final pathSuffix = sourcePath.isEmpty ? '' : ', path: $sourcePath';
buffer.writeln(
' - ${attachment.name.trim()} (${attachment.mimeType.trim()}, sha256: ${attachment.key}$pathSuffix)',
);
}
}
buffer
..writeln('User request:')
..write(requestText);
return buffer.toString();
}
Map<String, dynamic> gatewayTaskMetadataWithArtifactContractInternal({
required Map<String, dynamic> baseMetadata,
required String sessionKey,
required String userPrompt,
required String localWorkingDirectory,
required String executionWorkingDirectory,
required String remoteWorkingDirectoryHint,
}) {
final localWorkspace = localWorkingDirectory.trim();
final executionWorkspace = executionWorkingDirectory.trim();
final remoteHint = remoteWorkingDirectoryHint.trim();
final caseHint = gatewayCaseHintForPromptInternal(userPrompt);
final metadata = <String, dynamic>{
...baseMetadata,
if (caseHint.caseId.isNotEmpty) 'xworkmateCaseId': caseHint.caseId,
if (caseHint.taskLoadClass.isNotEmpty)
'taskLoadClass': caseHint.taskLoadClass,
if (caseHint.requiredArtifactExtensions.isNotEmpty)
'requiredArtifactExtensions': caseHint.requiredArtifactExtensions,
if (caseHint.expectedArtifactExtensions.isNotEmpty)
'expectedArtifactExtensions': caseHint.expectedArtifactExtensions,
};
if (caseHint.expectedFileCountByExtension.isNotEmpty) {
metadata['xworkmateArtifactConstraints'] = <String, dynamic>{
'schemaVersion': 1,
'expectedFileCountByExtension': caseHint.expectedFileCountByExtension,
};
}
return <String, dynamic>{
...metadata,
'xworkmateTaskArtifactContract': <String, dynamic>{
'schemaVersion': 1,
'appThreadKey': sessionKey,
'scopeKind': 'task',
'finalDeliverableDetection': 'remote-runtime',
'requiresExportBeforeFinalResponse': true,
'rejectTextOnlyFileClaims': true,
'expectedArtifactDirs': caseHint.expectedArtifactDirs.isNotEmpty
? caseHint.expectedArtifactDirs
: const <String>[
'artifacts/',
'reports/',
'exports/',
'assets/',
'assets/images/',
'dist/',
],
if (caseHint.requiredArtifactExtensions.isNotEmpty)
'requiredArtifactExtensions': caseHint.requiredArtifactExtensions,
if (caseHint.expectedFileCountByExtension.isNotEmpty)
'expectedFileCountByExtension': caseHint.expectedFileCountByExtension,
'currentTaskWorkspace': executionWorkspace.isNotEmpty
? executionWorkspace
: (remoteHint.isNotEmpty ? remoteHint : localWorkspace),
if (localWorkspace.isNotEmpty) 'localWorkspace': localWorkspace,
if (remoteHint.isNotEmpty) 'remoteWorkspaceHint': remoteHint,
},
};
}
GatewayTaskCaseHintInternal gatewayCaseHintForPromptInternal(String prompt) {
final text = prompt.trim();
final lower = text.toLowerCase();
final hasSecurityEvolution =
text.contains('从单机权限') &&
text.contains('网络边界') &&
text.contains('Web安全') &&
text.contains('云身份') &&
text.contains('Zero Trust') &&
text.contains('AI Agent') &&
text.contains('AI模型');
final wantsAiNewsMd =
(text.contains('AI资讯') ||
text.contains('AI 资讯') ||
text.contains('AI新闻') ||
lower.contains('ai news')) &&
(text.contains('md文件') ||
text.contains('Markdown') ||
lower.contains('.md') ||
lower.contains('markdown'));
if (wantsAiNewsMd) {
return const GatewayTaskCaseHintInternal(
caseId: 'case1-ai-news-md',
taskLoadClass: 'long_task',
requiredArtifactExtensions: <String>['md'],
expectedArtifactExtensions: <String>['md'],
expectedArtifactDirs: <String>['reports/', 'artifacts/'],
);
}
final wantsVideo =
text.contains('制作视频') ||
text.contains('测试制作视频') ||
lower.contains('make video') ||
lower.contains('mp4');
final wantsImages =
text.contains('7张') ||
text.contains('七张') ||
text.contains('连续制作') ||
text.contains('系列图片') ||
text.contains('一些列图片') ||
text.contains('一系列图片');
final wantsPdf =
text.contains('输出 PDF') ||
text.contains('输出PDF') ||
text.contains('PDF文件') ||
lower.contains('final.pdf');
final wantsSocialCopy =
text.contains('微信公众号短图文') ||
text.contains('小红书风格') ||
text.contains('X文案串') ||
text.contains('头条号长文');
final wantsChapteredImages =
text.contains('拆章节') ||
text.contains('每章') ||
lower.contains('gpt images') ||
lower.contains('images2');
if (hasSecurityEvolution && wantsPdf && wantsChapteredImages) {
return const GatewayTaskCaseHintInternal(
caseId: 'case5-security-evolution-pdf',
taskLoadClass: 'complex_chain_task',
requiredArtifactExtensions: <String>['pdf', 'png', 'md'],
expectedArtifactExtensions: <String>['pdf', 'png', 'md'],
expectedArtifactDirs: <String>[
'exports/',
'assets/',
'assets/images/',
'prompts/',
'reports/',
],
expectedFileCountByExtension: <String, int>{'pdf': 1, 'png': 7},
);
}
if (hasSecurityEvolution && wantsVideo) {
return const GatewayTaskCaseHintInternal(
caseId: 'case2-security-evolution-video',
taskLoadClass: 'complex_chain_task',
requiredArtifactExtensions: <String>['mp4'],
expectedArtifactExtensions: <String>['mp4', 'png', 'md'],
expectedArtifactDirs: <String>[
'renders/',
'assets/',
'assets/images/',
'exports/',
],
);
}
if (hasSecurityEvolution && wantsImages) {
return const GatewayTaskCaseHintInternal(
caseId: 'case3-security-evolution-seven-images',
taskLoadClass: 'complex_chain_task',
requiredArtifactExtensions: <String>['png'],
expectedArtifactExtensions: <String>['png', 'md'],
expectedArtifactDirs: <String>['assets/', 'assets/images/', 'prompts/'],
expectedFileCountByExtension: <String, int>{'png': 7},
);
}
if (hasSecurityEvolution && wantsSocialCopy) {
return const GatewayTaskCaseHintInternal(
caseId: 'case4-security-evolution-social-copy',
taskLoadClass: 'long_task',
requiredArtifactExtensions: <String>['md'],
expectedArtifactExtensions: <String>['md'],
expectedArtifactDirs: <String>['reports/', 'artifacts/'],
expectedFileCountByExtension: <String, int>{'md': 5},
);
}
return const GatewayTaskCaseHintInternal();
}
bool usesOpenClawGatewayQueueInternal(
AssistantExecutionTarget target,
SingleAgentProvider provider,
) {
return target.isGateway &&
provider.providerId == kCanonicalGatewayProviderId;
}
String gatewayExecutionWorkingDirectoryInternal({
required AssistantExecutionTarget target,
required String workingDirectory,
required String remoteWorkingDirectoryHint,
}) {
final remoteHint = remoteWorkingDirectoryHint.trim();
if (target.isGateway && remoteHint.isNotEmpty) {
return remoteHint;
}
return workingDirectory.trim();
}
Future<void> enqueueOpenClawGatewayTurnInternal(
OpenClawGatewayQueuedTurnInternal turn,
) async {
if (turn.appendUserTurn) {
appendGatewayUserTurnInternal(turn.sessionKey, turn.message);
}
if (openClawGatewayActiveTurnsInternal.length >=
openClawGatewayMaxActiveTasksInternal &&
openClawGatewayQueuedTurnsInternal.length >=
openClawGatewayMaxQueuedTasksInternal) {
final error = StateError(
appText(
'OpenClaw 任务队列已满,请等待当前任务完成后重试。',
'OpenClaw task queue is full. Wait for the current tasks to finish and try again.',
),
);
await failOpenClawGatewayQueuedTurnInternal(turn.sessionKey, error);
throw error;
}
openClawGatewayQueuedTurnsInternal.add(turn);
openClawGatewayQueuedTurnsBySessionInternal
.putIfAbsent(
turn.sessionKey,
() => <OpenClawGatewayQueuedTurnInternal>[],
)
.add(turn);
markOpenClawGatewayQueuedTurnInternal(turn.sessionKey);
drainOpenClawGatewayQueueInternal();
}
void markOpenClawGatewayQueuedTurnInternal(String sessionKey) {
final queuedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'queued',
lastResultCode: 'queued',
lastArtifactSyncAtMs: queuedAtMs,
lastArtifactSyncStatus: 'queued',
lastTaskArtifactRelativePaths: const <String>[],
updatedAtMs: queuedAtMs,
);
recomputeTasksInternal();
notifyIfActiveInternal();
}
Future<void> failOpenClawGatewayQueuedTurnInternal(
String sessionKey,
StateError error,
) async {
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'ready',
lastRunAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
lastResultCode: 'OPENCLAW_GATEWAY_QUEUE_FULL',
lastRemoteWorkingDirectory: '',
lastArtifactSyncAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
lastArtifactSyncStatus: 'failed',
lastTaskArtifactRelativePaths: const <String>[],
updatedAtMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
);
appendLocalSessionMessageInternal(
sessionKey,
assistantErrorMessageInternal(error.message),
persistInThreadContext: true,
);
await flushAssistantThreadPersistenceInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
}
bool abortQueuedOpenClawGatewayTurnInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
if (!removeQueuedOpenClawGatewayTurnsForSessionInternal(
normalizedSessionKey,
)) {
return false;
}
markOpenClawGatewayTurnAbortedInternal(normalizedSessionKey);
drainOpenClawGatewayQueueInternal();
return true;
}
bool removeQueuedOpenClawGatewayTurnsForSessionInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final queuedForSession = openClawGatewayQueuedTurnsBySessionInternal.remove(
normalizedSessionKey,
);
if (queuedForSession == null || queuedForSession.isEmpty) {
return false;
}
for (final turn in queuedForSession) {
turn.cancelled = true;
openClawGatewayQueuedTurnsInternal.remove(turn);
}
return true;
}
bool removeActiveOpenClawGatewayTurnsForSessionInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
var removed = false;
openClawGatewayActiveTurnsInternal.removeWhere((_, turn) {
final matches =
normalizedAssistantSessionKeyInternal(turn.sessionKey) ==
normalizedSessionKey;
if (matches) {
turn.cancelled = true;
removed = true;
}
return matches;
});
return removed;
}
void markOpenClawGatewayTurnAbortedInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
clearAiGatewayStreamingTextInternal(normalizedSessionKey);
aiGatewayPendingSessionKeysInternal.remove(normalizedSessionKey);
final nowMs = DateTime.now().millisecondsSinceEpoch.toDouble();
upsertTaskThreadInternal(
normalizedSessionKey,
lifecycleStatus: 'ready',
lastRunAtMs: nowMs,
lastResultCode: 'aborted',
lastRemoteWorkingDirectory: '',
lastArtifactSyncAtMs: nowMs,
lastArtifactSyncStatus: 'failed',
lastTaskArtifactRelativePaths: const <String>[],
clearOpenClawTaskAssociation: true,
updatedAtMs: nowMs,
);
recomputeTasksInternal();
notifyIfActiveInternal();
}
void removeOpenClawGatewayQueuedTurnIndexInternal(
OpenClawGatewayQueuedTurnInternal turn,
) {
final queuedForSession =
openClawGatewayQueuedTurnsBySessionInternal[turn.sessionKey];
queuedForSession?.remove(turn);
if (queuedForSession != null && queuedForSession.isEmpty) {
openClawGatewayQueuedTurnsBySessionInternal.remove(turn.sessionKey);
}
}
void drainOpenClawGatewayQueueInternal() {
while (openClawGatewayActiveTurnsInternal.length <
openClawGatewayMaxActiveTasksInternal &&
openClawGatewayQueuedTurnsInternal.isNotEmpty) {
final turn = openClawGatewayQueuedTurnsInternal.removeAt(0);
removeOpenClawGatewayQueuedTurnIndexInternal(turn);
if (turn.cancelled) {
continue;
}
openClawGatewayActiveTurnsInternal[turn.queueId] = turn;
markOpenClawGatewayRunningTurnInternal(turn.sessionKey);
unawaited(runOpenClawGatewayQueuedTurnInternal(turn));
}
}
void markOpenClawGatewayRunningTurnInternal(String sessionKey) {
final startedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
aiGatewayPendingSessionKeysInternal.add(sessionKey);
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'running',
lastRunAtMs: startedAtMs,
lastResultCode: 'running',
lastArtifactSyncAtMs: startedAtMs,
lastArtifactSyncStatus: 'running',
lastTaskArtifactRelativePaths: const <String>[],
updatedAtMs: startedAtMs,
);
recomputeTasksInternal();
notifyIfActiveInternal();
}
Future<void> runOpenClawGatewayQueuedTurnInternal(
OpenClawGatewayQueuedTurnInternal turn,
) async {
try {
await enqueueThreadTurnInternal<void>(
turn.sessionKey,
() => runGatewayChatTurnInternal(
sessionKey: turn.sessionKey,
target: turn.target,
provider: turn.provider,
message: turn.message,
thinking: turn.thinking,
selectedSkillLabels: turn.selectedSkillLabels,
attachments: turn.attachments,
localAttachments: turn.localAttachments,
workingDirectory: turn.workingDirectory,
localWorkingDirectory: turn.localWorkingDirectory,
executionWorkingDirectory: turn.workingDirectory,
remoteWorkingDirectoryHint: turn.remoteWorkingDirectoryHint,
model: turn.model,
routing: turn.routing,
agentId: turn.agentId,
metadata: turn.metadata,
resumeSessionHint: turn.resumeSessionHint,
appendUserTurn: false,
),
);
} catch (error) {
if (!disposedInternal) {
await applyGatewayChatFailureInternal(
sessionKey: turn.sessionKey,
target: turn.target,
error: error,
);
}
} finally {
openClawGatewayActiveTurnsInternal.remove(turn.queueId);
if (!disposedInternal) {
drainOpenClawGatewayQueueInternal();
recomputeTasksInternal();
notifyIfActiveInternal();
}
}
}
void appendGatewayUserTurnInternal(String sessionKey, String message) {
final userText = message.trim().isEmpty ? 'See attached.' : message.trim();
appendLocalSessionMessageInternal(
sessionKey,
GatewayChatMessage(
id: nextLocalMessageIdInternal(),
role: 'user',
text: userText,
timestampMs: DateTime.now().millisecondsSinceEpoch.toDouble(),
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
}
void markGatewayChatRunInternal(String sessionKey) {
final startedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
aiGatewayPendingSessionKeysInternal.add(sessionKey);
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'running',
lastRunAtMs: startedAtMs,
lastResultCode: 'running',
lastArtifactSyncAtMs: startedAtMs,
lastArtifactSyncStatus: 'running',
lastTaskArtifactRelativePaths: const <String>[],
updatedAtMs: startedAtMs,
);
recomputeTasksInternal();
notifyIfActiveInternal();
}
Future<void> applyGatewayChatResultInternal({
required String sessionKey,
required AssistantExecutionTarget target,
required GoTaskServiceResult result,
int artifactSyncAttempts = 0,
double? artifactSyncStartedAtMs,
}) async {
final completedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
final assistantText = result.message.trim();
final hasCurrentRunArtifacts = result.artifacts.isNotEmpty;
final openClawAssociation = result.openClawTaskAssociation;
final waitingForOpenClawArtifacts =
openClawAssociation != null &&
!hasCurrentRunArtifacts &&
result.success &&
(openClawAssociation.requiresArtifactExport ||
openClawAssociation.requiredArtifactExtensions.isNotEmpty) &&
(openClawAssociation.artifactScope.trim().isNotEmpty ||
openClawAssociation.artifactDirectory.trim().isNotEmpty);
if (waitingForOpenClawArtifacts) {
persistOpenClawTaskAssociationInternal(
sessionKey: sessionKey,
association: openClawAssociation.copyWith(status: 'syncing-artifacts'),
);
return;
}
final noDisplayableOutput =
result.success && assistantText.isEmpty && !hasCurrentRunArtifacts;
final terminalResultCode = noDisplayableOutput
? 'OPENCLAW_NO_DISPLAYABLE_OUTPUT'
: gatewayTerminalResultCodeInternal(result);
final remoteWorkingDirectory = result.remoteWorkingDirectory.trim();
clearAiGatewayStreamingTextInternal(sessionKey);
clearPendingToolCallsForGatewaySessionInternal(
sessionKey,
hasError: !result.success,
);
upsertTaskThreadInternal(
sessionKey,
gatewayEntryState: goTaskServiceGatewayEntryState(
requestedTarget: target,
result: result,
),
latestResolvedRuntimeModel: result.resolvedModel.trim(),
lastRemoteWorkingDirectory: remoteWorkingDirectory.isNotEmpty
? remoteWorkingDirectory
: '',
lastRemoteWorkspaceRefKind: result.remoteWorkspaceRefKind,
lifecycleStatus: 'ready',
lastRunAtMs: completedAtMs,
lastResultCode: terminalResultCode,
updatedAtMs: completedAtMs,
);
if (!result.success) {
upsertTaskThreadInternal(
sessionKey,
lastArtifactSyncAtMs: completedAtMs,
lastArtifactSyncStatus: 'failed',
lastTaskArtifactRelativePaths: const <String>[],
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
appendLocalSessionMessageInternal(
sessionKey,
assistantErrorMessageInternal(
result.errorMessage.trim().isEmpty
? appText(
'GoTaskService 执行失败。',
'GoTaskService execution failed.',
)
: gatewayExecutionErrorLabelInternal(
result.errorMessage,
target: target,
),
),
persistInThreadContext: true,
);
return;
}
if (noDisplayableOutput) {
upsertTaskThreadInternal(
sessionKey,
lastArtifactSyncAtMs: completedAtMs,
lastArtifactSyncStatus: 'failed',
lastTaskArtifactRelativePaths: const <String>[],
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
appendLocalSessionMessageInternal(
sessionKey,
assistantErrorMessageInternal(
appText(
'GoTaskService 没有返回可显示的输出。',
'GoTaskService returned no displayable output.',
),
),
persistInThreadContext: true,
);
return;
}
if (assistantText.isNotEmpty) {
appendLocalSessionMessageInternal(
sessionKey,
GatewayChatMessage(
id: nextLocalMessageIdInternal(),
role: 'assistant',
text: assistantText,
timestampMs: completedAtMs,
toolCallId: null,
toolName: null,
stopReason: null,
pending: false,
error: false,
),
persistInThreadContext: true,
);
}
recomputeTasksInternal();
notifyIfActiveInternal();
await persistGoTaskArtifactsForSessionInternal(
sessionKey,
result,
artifactSyncAttempts: artifactSyncAttempts,
artifactSyncStartedAtMs: artifactSyncStartedAtMs,
);
if (taskThreadForSessionInternal(
sessionKey,
)?.lifecycleState.lastResultCode ==
kOpenClawArtifactSyncTimeoutCode) {
return;
}
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'ready',
lastRunAtMs: completedAtMs,
lastResultCode: terminalResultCode,
clearOpenClawTaskAssociation: !hasCurrentRunArtifacts,
updatedAtMs: completedAtMs,
);
}
Future<void> applyGatewayChatFailureInternal({
required String sessionKey,
required AssistantExecutionTarget target,
required Object error,
}) async {
// T6: 终态一致性。失败处理器是本轮的终态出口,必须自身权威地清 pending
// 不能依赖各调用方的 finallyrunOpenClawGatewayQueuedTurnInternal 的 finally 此前并不清 pending
// 导致「错误已渲染但进度条仍卡在任务运行中」)。对其它已显式 remove 的调用方是幂等的。
// pending 集合在不同路径下分别用原始 / 归一化 key两种都移除以确保彻底清空。
aiGatewayPendingSessionKeysInternal.remove(sessionKey);
aiGatewayPendingSessionKeysInternal.remove(
normalizedAssistantSessionKeyInternal(sessionKey),
);
clearAiGatewayStreamingTextInternal(sessionKey);
clearPendingToolCallsForGatewaySessionInternal(sessionKey, hasError: true);
final completedAtMs = DateTime.now().millisecondsSinceEpoch.toDouble();
final recoveredArtifactPaths =
await recoverGatewayFailureArtifactPathsInternal(sessionKey, error);
upsertTaskThreadInternal(
sessionKey,
lifecycleStatus: 'ready',
lastRunAtMs: completedAtMs,
lastResultCode: gatewayFailureResultCodeInternal(error),
lastRemoteWorkingDirectory: '',
lastArtifactSyncAtMs: completedAtMs,
lastArtifactSyncStatus: recoveredArtifactPaths.isEmpty
? 'failed'
: 'interrupted',
lastTaskArtifactRelativePaths: recoveredArtifactPaths,
clearOpenClawTaskAssociation: true,
updatedAtMs: completedAtMs,
);
appendLocalSessionMessageInternal(
sessionKey,
assistantErrorMessageInternal(
gatewayExecutionErrorLabelInternal(error, target: target),
),
persistInThreadContext: true,
);
}
bool hasCommittedUserTurnForGatewaySessionInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final messages = <GatewayChatMessage>[
...?assistantThreadRecordsInternal[normalizedSessionKey]?.messages,
...?assistantThreadMessagesInternal[normalizedSessionKey],
...?localSessionMessagesInternal[normalizedSessionKey],
];
return messages.any((message) {
final role = message.role.trim().toLowerCase();
return role == 'user' && !message.pending;
});
}
GatewayChatMessage? lastCommittedUserTurnForGatewaySessionInternal(
String sessionKey,
) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
final messages = <GatewayChatMessage>[
...?assistantThreadRecordsInternal[normalizedSessionKey]?.messages,
...?assistantThreadMessagesInternal[normalizedSessionKey],
...?localSessionMessagesInternal[normalizedSessionKey],
];
for (final message in messages.reversed) {
final role = message.role.trim().toLowerCase();
if (role == 'user' && !message.pending) {
return message;
}
}
return null;
}
bool isRecoverableAssistantTaskStateInternal({
required String lifecycleStatus,
required String lastResultCode,
required String artifactSyncStatus,
}) {
final status = lifecycleStatus.trim().toLowerCase();
final syncStatus = artifactSyncStatus.trim().toLowerCase();
final result = lastResultCode.trim().toUpperCase();
return status == 'interrupted' ||
syncStatus == 'interrupted' ||
result == 'ABORTED' ||
result == 'ERROR' ||
result == 'ACP_HTTP_CONNECTION_CLOSED' ||
result == 'SESSION_CONTINUATION_UNAVAILABLE';
}
bool shouldResumeGatewaySessionForNextSendInternal(String sessionKey) {
final normalizedSessionKey = normalizedAssistantSessionKeyInternal(
sessionKey,
);
if (!hasCommittedUserTurnForGatewaySessionInternal(normalizedSessionKey)) {
return false;
}
final lastResultCode = taskThreadForSessionInternal(
normalizedSessionKey,
)?.lifecycleState.lastResultCode?.trim().toUpperCase();
return !gatewayResultCodeRequiresNewSessionInternal(lastResultCode ?? '');
}
String gatewayTerminalResultCodeInternal(GoTaskServiceResult result) {
if (result.success) {
return 'success';
}
final status = result.status.trim();
final code = result.code.trim();
if (status.isNotEmpty && status.toLowerCase() != 'failed') {
return status;
}
if (code.isNotEmpty) {
return code;
}
if (status.isNotEmpty) {
return status;
}
return 'error';
}
String gatewayFailureResultCodeInternal(Object error) {
final unconfirmedConnectCode = unconfirmedAcpHttpConnectCodeInternal(error);
if (unconfirmedConnectCode != null) {
return unconfirmedConnectCode;
}
final interruptedTransportCode = interruptedAcpHttpTransportCodeInternal(
error,
);
if (interruptedTransportCode != null) {
return interruptedTransportCode;
}
final primaryCode = gatewayExecutionPrimaryCodeInternal(error);
if (primaryCode != null && primaryCode.isNotEmpty) {
return primaryCode;
}
final detailCode = gatewayExecutionDetailCodeInternal(error);
if (detailCode != null && detailCode.isNotEmpty) {
return detailCode;
}
return 'error';
}
bool gatewayResultCodeRequiresNewSessionInternal(String code) {
final normalized = code.trim().toUpperCase();
if (normalized.isEmpty || normalized == 'ACP_HTTP_CONNECTION_CLOSED') {
return false;
}
return const <String>{
'RUNNING',
'QUEUED',
'ABORTED',
'BRIDGE_NOT_CONNECTED',
'ARTIFACT_MISSING',
'OPENCLAW_ARTIFACT_MISSING',
'OPENCLAW_GATEWAY_QUEUE_FULL',
'OPENCLAW_GATEWAY_SOCKET_CLOSED',
'OPENCLAW_NO_DISPLAYABLE_OUTPUT',
'OPENCLAW_NO_EXPORTED_ARTIFACTS',
'OPENCLAW_WAIT_FAILED',
'ACP_HTTP_401',
'ACP_HTTP_502',
'ACP_HTTP_CONNECT_FAILED',
'ACP_HTTP_CONNECT_TIMEOUT',
'ACP_HTTP_HANDSHAKE_INTERRUPTED',
'GATEWAY_TASK_REJECTED',
}.contains(normalized);
}
Future<void> abortRun() async {
final sessionKey = normalizedAssistantSessionKeyInternal(
sessionsControllerInternal.currentSessionKey,
);
if (abortQueuedOpenClawGatewayTurnInternal(sessionKey)) {
return;
}
if (aiGatewayPendingSessionKeysInternal.contains(sessionKey)) {
// T4: 停止本地权威化。先捕获 association再立刻把本轮标记为 aborted清 pending / 置终态 /
// 让轮询 loop 在下一次迭代退出UI 立即停止——不依赖 gateway 往返。
// 随后 best-effort 下发 tasks.cancel其失败/挂起都不得阻塞或回滚本地终止。
final association = taskThreadForSessionInternal(
sessionKey,
)?.openClawTaskAssociation;
removeQueuedOpenClawGatewayTurnsForSessionInternal(sessionKey);
removeActiveOpenClawGatewayTurnsForSessionInternal(sessionKey);
markOpenClawGatewayTurnAbortedInternal(sessionKey);
drainOpenClawGatewayQueueInternal();
unawaited(
cancelAssistantTaskForSessionInternal(
sessionKey,
association: association,
),
);
return;
}
}
Future<void> cancelAssistantTaskForSessionInternal(
String sessionKey, {
OpenClawTaskAssociation? association,
}) async {
final normalized = normalizedAssistantSessionKeyInternal(sessionKey);
// association 可能已被本地终止清空,故允许调用方预先捕获后传入。
final effectiveAssociation =
association ??
taskThreadForSessionInternal(normalized)?.openClawTaskAssociation;
// best-effortgateway 不可达 / run 已不存在时不得抛出,避免阻塞停止流程。
try {
await goTaskServiceClientInternal.cancelTask(
route: GoTaskServiceRoute.externalAcpSingle,
target: assistantExecutionTargetForSession(normalized),
sessionId: normalized,
threadId: normalized,
association: effectiveAssociation,
);
} catch (error) {
debugPrint('cancelAssistantTaskForSession best-effort failed: $error');
}
}
Future<void> prepareForExit() async {
await abortRun();
await flushAssistantThreadPersistenceInternal();
}
Map<String, dynamic> desktopStatusSnapshot() {
final connectionState = currentAssistantConnectionState;
final pausedTasks = tasksControllerInternal.scheduled
.where((item) => item.status == 'Disabled')
.length;
final timedOutTasks = tasksControllerInternal.failed
.where(looksLikeTimedOutTaskInternal)
.length;
final failedTasks = tasksControllerInternal.failed.length;
final queuedTasks = tasksControllerInternal.queue.length;
final runningTasks = tasksControllerInternal.running.length;
final scheduledTasks = tasksControllerInternal.scheduled.length;
final badgeCount = runningTasks + pausedTasks + timedOutTasks;
return <String, dynamic>{
'connectionStatus': desktopConnectionStatusValueInternal(
connectionState.status,
),
'connectionLabel': connectionState.primaryLabel,
'runningTasks': runningTasks,
'pausedTasks': pausedTasks,
'timedOutTasks': timedOutTasks,
'queuedTasks': queuedTasks,
'scheduledTasks': scheduledTasks,
'failedTasks': failedTasks,
'totalTasks': tasksControllerInternal.totalCount,
'badgeCount': badgeCount > 0 ? badgeCount : runningTasks + queuedTasks,
};
}
bool looksLikeTimedOutTaskInternal(DerivedTaskItem item) {
final haystack = '${item.status} ${item.title} ${item.summary}'
.toLowerCase();
return haystack.contains('timed out') ||
haystack.contains('timeout') ||
haystack.contains('超时');
}
String desktopConnectionStatusValueInternal(RuntimeConnectionStatus status) {
switch (status) {
case RuntimeConnectionStatus.connected:
return 'connected';
case RuntimeConnectionStatus.connecting:
return 'connecting';
case RuntimeConnectionStatus.error:
return 'error';
case RuntimeConnectionStatus.offline:
return 'disconnected';
}
}
}
class GatewayTaskCaseHintInternal {
const GatewayTaskCaseHintInternal({
this.caseId = '',
this.taskLoadClass = '',
this.requiredArtifactExtensions = const <String>[],
this.expectedArtifactExtensions = const <String>[],
this.expectedArtifactDirs = const <String>[],
this.expectedFileCountByExtension = const <String, int>{},
});
final String caseId;
final String taskLoadClass;
final List<String> requiredArtifactExtensions;
final List<String> expectedArtifactExtensions;
final List<String> expectedArtifactDirs;
final Map<String, int> expectedFileCountByExtension;
}