cleanup: switch desktop ACP paths to direct Go stdio bridge
This commit is contained in:
parent
d9caf4a016
commit
6b15741b99
95
go/go_core/internal/acp/stdio.go
Normal file
95
go/go_core/internal/acp/stdio.go
Normal file
@ -0,0 +1,95 @@
|
||||
package acp
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"xworkmate/go_core/internal/shared"
|
||||
)
|
||||
|
||||
func RunStdio(input io.Reader, output io.Writer) {
|
||||
server := NewServer()
|
||||
reader := bufio.NewReader(input)
|
||||
var writeMu sync.Mutex
|
||||
|
||||
writeMessage := func(message map[string]any) {
|
||||
payload, _ := jsonMarshal(message)
|
||||
writeMu.Lock()
|
||||
defer writeMu.Unlock()
|
||||
_, _ = output.Write(append(payload, '\n'))
|
||||
}
|
||||
|
||||
for {
|
||||
payload, err := readStdioMessage(reader)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
return
|
||||
}
|
||||
writeMessage(shared.ErrorEnvelope(nil, -32700, err.Error()))
|
||||
continue
|
||||
}
|
||||
if len(strings.TrimSpace(string(payload))) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
request, err := shared.DecodeRPCRequest(payload)
|
||||
if err != nil {
|
||||
writeMessage(shared.ErrorEnvelope(nil, -32700, err.Error()))
|
||||
continue
|
||||
}
|
||||
response, rpcErr := server.handleRequest(request, writeMessage)
|
||||
if request.ID == nil {
|
||||
continue
|
||||
}
|
||||
if rpcErr != nil {
|
||||
writeMessage(
|
||||
shared.ErrorEnvelope(request.ID, rpcErr.Code, rpcErr.Message),
|
||||
)
|
||||
continue
|
||||
}
|
||||
writeMessage(shared.ResultEnvelope(request.ID, response))
|
||||
}
|
||||
}
|
||||
|
||||
func readStdioMessage(reader *bufio.Reader) ([]byte, error) {
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
return nil, nil
|
||||
}
|
||||
if strings.HasPrefix(strings.ToLower(line), "content-length:") {
|
||||
var contentLength int
|
||||
if _, err := fmt.Sscanf(line, "Content-Length: %d", &contentLength); err != nil {
|
||||
if _, err2 := fmt.Sscanf(line, "content-length: %d", &contentLength); err2 != nil {
|
||||
return nil, fmt.Errorf("invalid content-length header")
|
||||
}
|
||||
}
|
||||
for {
|
||||
headerLine, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(headerLine) == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
body := make([]byte, contentLength)
|
||||
if _, err := io.ReadFull(reader, body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
return []byte(line), nil
|
||||
}
|
||||
|
||||
func jsonMarshal(message map[string]any) ([]byte, error) {
|
||||
return json.Marshal(message)
|
||||
}
|
||||
@ -16,6 +16,10 @@ func main() {
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(os.Args) > 1 && os.Args[1] == "acp-stdio" {
|
||||
acp.RunStdio(os.Stdin, os.Stdout)
|
||||
return
|
||||
}
|
||||
|
||||
toolbridge.Run(os.Stdin, os.Stdout)
|
||||
}
|
||||
|
||||
@ -207,20 +207,13 @@ class AppController extends ChangeNotifier {
|
||||
arisBundleRepository ?? ArisBundleRepository();
|
||||
goCoreLocatorInternal = GoCoreLocator();
|
||||
runtimeCoordinatorInternal.attachDispatchResolver(
|
||||
GoRuntimeDispatchDesktopClient(
|
||||
acpClient: gatewayAcpClientInternal,
|
||||
goCoreLocator: goCoreLocatorInternal,
|
||||
),
|
||||
GoRuntimeDispatchDesktopClient(),
|
||||
);
|
||||
goTaskServiceClientInternal =
|
||||
goTaskServiceClient ??
|
||||
DesktopGoTaskService(
|
||||
gateway: runtimeCoordinatorInternal.gateway,
|
||||
acpTransport: ExternalCodeAgentAcpDesktopTransport(
|
||||
acpClient: gatewayAcpClientInternal,
|
||||
endpointResolver: resolveExternalAcpEndpointForTargetInternal,
|
||||
goCoreLocator: goCoreLocatorInternal,
|
||||
),
|
||||
acpTransport: ExternalCodeAgentAcpDesktopTransport(),
|
||||
);
|
||||
multiAgentOrchestratorInternal = MultiAgentOrchestrator(
|
||||
config: resolveMultiAgentConfigInternal(
|
||||
@ -234,10 +227,7 @@ class AppController extends ChangeNotifier {
|
||||
MultiAgentMountManager(
|
||||
arisBundleRepository: arisBundleRepositoryInternal,
|
||||
goCoreLocator: goCoreLocatorInternal,
|
||||
resolver: GoMultiAgentMountDesktopClient(
|
||||
acpClient: gatewayAcpClientInternal,
|
||||
goCoreLocator: goCoreLocatorInternal,
|
||||
),
|
||||
resolver: GoMultiAgentMountDesktopClient(),
|
||||
);
|
||||
|
||||
attachChildListenersInternal();
|
||||
|
||||
@ -271,7 +271,7 @@ extension AppControllerWebSessions on AppController {
|
||||
assistantExecutionTargetForSession(currentSessionKeyInternal) ==
|
||||
AssistantExecutionTarget.singleAgent &&
|
||||
!availableSingleAgentProviders.any(
|
||||
webAcpClientInternal.capabilities.providers.contains,
|
||||
acpCapabilitiesInternal.providers.contains,
|
||||
);
|
||||
|
||||
List<SecretReferenceEntry> get secretReferences {
|
||||
|
||||
@ -1,49 +1,16 @@
|
||||
import 'dart:async';
|
||||
import 'dart:io';
|
||||
|
||||
import 'embedded_agent_launch_policy.dart';
|
||||
import 'gateway_acp_client.dart';
|
||||
import 'go_core.dart';
|
||||
import 'go_acp_stdio_bridge.dart';
|
||||
import 'go_task_service_client.dart';
|
||||
import 'runtime_models.dart';
|
||||
|
||||
typedef ExternalCodeAgentAcpProcessStarter =
|
||||
Future<Process> Function(
|
||||
String executable,
|
||||
List<String> arguments, {
|
||||
Map<String, String>? environment,
|
||||
String? workingDirectory,
|
||||
});
|
||||
|
||||
class ExternalCodeAgentAcpDesktopTransport
|
||||
implements ExternalCodeAgentAcpTransport {
|
||||
ExternalCodeAgentAcpDesktopTransport({
|
||||
required GatewayAcpClient acpClient,
|
||||
required Uri? Function(AssistantExecutionTarget target) endpointResolver,
|
||||
GoCoreLocator? goCoreLocator,
|
||||
ExternalCodeAgentAcpProcessStarter? processStarter,
|
||||
}) : _acpClient = acpClient,
|
||||
_endpointResolver = endpointResolver,
|
||||
_goCoreLocator = goCoreLocator ?? GoCoreLocator(),
|
||||
_processStarter =
|
||||
processStarter ??
|
||||
((executable, arguments, {environment, workingDirectory}) {
|
||||
return Process.start(
|
||||
executable,
|
||||
arguments,
|
||||
environment: environment,
|
||||
workingDirectory: workingDirectory,
|
||||
);
|
||||
});
|
||||
ExternalCodeAgentAcpDesktopTransport({GoAcpStdioBridge? bridge})
|
||||
: _bridge = bridge ?? GoAcpStdioBridge();
|
||||
|
||||
final GatewayAcpClient _acpClient;
|
||||
final Uri? Function(AssistantExecutionTarget target) _endpointResolver;
|
||||
final GoCoreLocator _goCoreLocator;
|
||||
final ExternalCodeAgentAcpProcessStarter _processStarter;
|
||||
|
||||
Process? _localProcess;
|
||||
Uri? _localEndpoint;
|
||||
Future<Uri?>? _localEndpointFuture;
|
||||
final GoAcpStdioBridge _bridge;
|
||||
List<ExternalCodeAgentAcpSyncedProvider> _syncedProviders =
|
||||
const <ExternalCodeAgentAcpSyncedProvider>[];
|
||||
|
||||
@ -54,11 +21,7 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
_syncedProviders = List<ExternalCodeAgentAcpSyncedProvider>.unmodifiable(
|
||||
providers,
|
||||
);
|
||||
final endpoint = await _ensureLocalEndpoint();
|
||||
if (endpoint == null) {
|
||||
return;
|
||||
}
|
||||
await _syncProvidersToEndpoint(endpoint, _syncedProviders);
|
||||
await _syncProviders();
|
||||
}
|
||||
|
||||
@override
|
||||
@ -66,22 +29,39 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
required AssistantExecutionTarget target,
|
||||
bool forceRefresh = false,
|
||||
}) async {
|
||||
final endpoint = await _resolveEndpoint(target);
|
||||
if (endpoint == null) {
|
||||
return const ExternalCodeAgentAcpCapabilities.empty();
|
||||
}
|
||||
if (target == AssistantExecutionTarget.singleAgent) {
|
||||
await _syncProvidersToEndpoint(endpoint, _syncedProviders);
|
||||
}
|
||||
final capabilities = await _acpClient.loadCapabilities(
|
||||
forceRefresh: forceRefresh,
|
||||
endpointOverride: endpoint,
|
||||
await _syncProviders();
|
||||
final response = await _bridge.request(
|
||||
method: 'acp.capabilities',
|
||||
params: const <String, dynamic>{},
|
||||
);
|
||||
final result = _castMap(response['result']);
|
||||
final caps = _castMap(result['capabilities']);
|
||||
final providers = <SingleAgentProvider>{};
|
||||
for (final raw in <Object?>[
|
||||
..._asList(result['providers']),
|
||||
..._asList(caps['providers']),
|
||||
]) {
|
||||
if (raw == null) {
|
||||
continue;
|
||||
}
|
||||
final provider = SingleAgentProviderCopy.fromJsonValue(
|
||||
raw.toString().trim().toLowerCase(),
|
||||
);
|
||||
if (provider != SingleAgentProvider.auto) {
|
||||
providers.add(provider);
|
||||
}
|
||||
}
|
||||
return ExternalCodeAgentAcpCapabilities(
|
||||
singleAgent: capabilities.singleAgent,
|
||||
multiAgent: capabilities.multiAgent,
|
||||
providers: capabilities.providers,
|
||||
raw: capabilities.raw,
|
||||
singleAgent:
|
||||
_boolValue(result['singleAgent']) ??
|
||||
_boolValue(caps['single_agent']) ??
|
||||
providers.isNotEmpty,
|
||||
multiAgent:
|
||||
_boolValue(result['multiAgent']) ??
|
||||
_boolValue(caps['multi_agent']) ??
|
||||
true,
|
||||
providers: providers,
|
||||
raw: result,
|
||||
);
|
||||
}
|
||||
|
||||
@ -90,42 +70,46 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
GoTaskServiceRequest request, {
|
||||
required void Function(GoTaskServiceUpdate update) onUpdate,
|
||||
}) async {
|
||||
final endpoint = await _resolveEndpoint(request.target);
|
||||
if (endpoint == null) {
|
||||
throw const GatewayAcpException(
|
||||
'Missing external ACP endpoint',
|
||||
code: 'EXTERNAL_ACP_ENDPOINT_MISSING',
|
||||
);
|
||||
}
|
||||
if (request.target == AssistantExecutionTarget.singleAgent) {
|
||||
await _syncProvidersToEndpoint(endpoint, _syncedProviders);
|
||||
}
|
||||
await _syncProviders();
|
||||
late final StreamSubscription<Map<String, dynamic>> subscription;
|
||||
var streamedText = '';
|
||||
String? completedMessage;
|
||||
final response = await _acpClient.request(
|
||||
method: request.resumeSession ? 'session.message' : 'session.start',
|
||||
params: request.toExternalAcpParams(),
|
||||
endpointOverride: endpoint,
|
||||
onNotification: (notification) {
|
||||
final update = goTaskServiceUpdateFromAcpNotification(notification);
|
||||
if (update == null) {
|
||||
return;
|
||||
}
|
||||
if (update.isDelta) {
|
||||
streamedText += update.text;
|
||||
}
|
||||
if (update.isDone && update.message.trim().isNotEmpty) {
|
||||
completedMessage = update.message.trim();
|
||||
}
|
||||
onUpdate(update);
|
||||
},
|
||||
);
|
||||
return goTaskServiceResultFromAcpResponse(
|
||||
response,
|
||||
route: request.route,
|
||||
streamedText: streamedText,
|
||||
completedMessage: completedMessage,
|
||||
);
|
||||
subscription = _bridge.notifications.listen((notification) {
|
||||
final update = goTaskServiceUpdateFromAcpNotification(notification);
|
||||
if (update == null) {
|
||||
return;
|
||||
}
|
||||
if (update.sessionId != request.sessionId ||
|
||||
update.threadId != request.threadId) {
|
||||
return;
|
||||
}
|
||||
if (update.isDelta) {
|
||||
streamedText += update.text;
|
||||
}
|
||||
if (update.isDone && update.message.trim().isNotEmpty) {
|
||||
completedMessage = update.message.trim();
|
||||
}
|
||||
onUpdate(update);
|
||||
});
|
||||
try {
|
||||
final response = await _bridge.request(
|
||||
method: request.resumeSession ? 'session.message' : 'session.start',
|
||||
params: request.toExternalAcpParams(),
|
||||
);
|
||||
return goTaskServiceResultFromAcpResponse(
|
||||
response,
|
||||
route: request.route,
|
||||
streamedText: streamedText,
|
||||
completedMessage: completedMessage,
|
||||
);
|
||||
} catch (error) {
|
||||
throw GatewayAcpException(
|
||||
error.toString(),
|
||||
code: 'EXTERNAL_ACP_STDIO_ERROR',
|
||||
);
|
||||
} finally {
|
||||
await subscription.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
@ -134,14 +118,9 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
required String sessionId,
|
||||
required String threadId,
|
||||
}) async {
|
||||
final endpoint = await _resolveEndpoint(target);
|
||||
if (endpoint == null) {
|
||||
return;
|
||||
}
|
||||
await _acpClient.cancelSession(
|
||||
sessionId: sessionId,
|
||||
threadId: threadId,
|
||||
endpointOverride: endpoint,
|
||||
await _bridge.request(
|
||||
method: 'session.cancel',
|
||||
params: <String, dynamic>{'sessionId': sessionId, 'threadId': threadId},
|
||||
);
|
||||
}
|
||||
|
||||
@ -151,127 +130,71 @@ class ExternalCodeAgentAcpDesktopTransport
|
||||
required String sessionId,
|
||||
required String threadId,
|
||||
}) async {
|
||||
final endpoint = await _resolveEndpoint(target);
|
||||
if (endpoint == null) {
|
||||
return;
|
||||
}
|
||||
await _acpClient.closeSession(
|
||||
sessionId: sessionId,
|
||||
threadId: threadId,
|
||||
endpointOverride: endpoint,
|
||||
await _bridge.request(
|
||||
method: 'session.close',
|
||||
params: <String, dynamic>{'sessionId': sessionId, 'threadId': threadId},
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> dispose() async {
|
||||
final process = _localProcess;
|
||||
_localProcess = null;
|
||||
_localEndpoint = null;
|
||||
_localEndpointFuture = null;
|
||||
if (process != null) {
|
||||
try {
|
||||
process.kill();
|
||||
} catch (_) {
|
||||
// Best effort only.
|
||||
}
|
||||
}
|
||||
}
|
||||
Future<void> dispose() => _bridge.dispose();
|
||||
|
||||
Future<Uri?> _resolveEndpoint(AssistantExecutionTarget target) async {
|
||||
if (target == AssistantExecutionTarget.singleAgent) {
|
||||
return _ensureLocalEndpoint();
|
||||
}
|
||||
return _endpointResolver(target);
|
||||
}
|
||||
|
||||
Future<Uri?> _ensureLocalEndpoint() async {
|
||||
if (_localEndpoint != null) {
|
||||
return _localEndpoint;
|
||||
}
|
||||
final inFlight = _localEndpointFuture;
|
||||
if (inFlight != null) {
|
||||
return inFlight;
|
||||
}
|
||||
final next = _startLocalProcess();
|
||||
_localEndpointFuture = next;
|
||||
try {
|
||||
_localEndpoint = await next;
|
||||
return _localEndpoint;
|
||||
} finally {
|
||||
_localEndpointFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Uri?> _startLocalProcess() async {
|
||||
final launch = await _goCoreLocator.locate();
|
||||
if (launch == null) {
|
||||
return null;
|
||||
}
|
||||
if (shouldBlockGoCoreLaunch(
|
||||
launch,
|
||||
isAppleHost: Platform.isIOS || Platform.isMacOS,
|
||||
)) {
|
||||
return null;
|
||||
}
|
||||
final reservedSocket = await ServerSocket.bind(
|
||||
InternetAddress.loopbackIPv4,
|
||||
0,
|
||||
);
|
||||
final port = reservedSocket.port;
|
||||
await reservedSocket.close();
|
||||
final listenAddress = '127.0.0.1:$port';
|
||||
final process = await _processStarter(
|
||||
launch.executable,
|
||||
<String>[...launch.arguments, 'serve', '--listen', listenAddress],
|
||||
environment: Platform.environment,
|
||||
workingDirectory: launch.workingDirectory,
|
||||
);
|
||||
_localProcess = process;
|
||||
unawaited(process.stdout.drain<void>());
|
||||
unawaited(process.stderr.drain<void>());
|
||||
final endpoint = Uri(scheme: 'http', host: '127.0.0.1', port: port);
|
||||
final deadline = DateTime.now().add(const Duration(seconds: 8));
|
||||
while (DateTime.now().isBefore(deadline)) {
|
||||
if (_localProcess != process) {
|
||||
break;
|
||||
}
|
||||
final exitCode = await process.exitCode.timeout(
|
||||
const Duration(milliseconds: 20),
|
||||
onTimeout: () => -1,
|
||||
);
|
||||
if (exitCode != -1) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
await _acpClient.request(
|
||||
method: 'acp.capabilities',
|
||||
params: const <String, dynamic>{},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
return endpoint;
|
||||
} catch (_) {
|
||||
await Future<void>.delayed(const Duration(milliseconds: 120));
|
||||
}
|
||||
}
|
||||
await dispose();
|
||||
return null;
|
||||
}
|
||||
|
||||
Future<void> _syncProvidersToEndpoint(
|
||||
Uri endpoint,
|
||||
List<ExternalCodeAgentAcpSyncedProvider> providers,
|
||||
) async {
|
||||
if (providers.isEmpty) {
|
||||
return;
|
||||
}
|
||||
await _acpClient.request(
|
||||
Future<void> _syncProviders() async {
|
||||
await _bridge.request(
|
||||
method: 'xworkmate.providers.sync',
|
||||
params: <String, dynamic>{
|
||||
'providers': providers
|
||||
.map((item) => item.toJson())
|
||||
'providers': _syncedProviders
|
||||
.map(
|
||||
(item) => <String, dynamic>{
|
||||
'providerId': item.providerId,
|
||||
'endpoint': item.endpoint,
|
||||
'label': item.label,
|
||||
'authorizationHeader': item.authorizationHeader,
|
||||
'enabled': item.enabled,
|
||||
},
|
||||
)
|
||||
.toList(growable: false),
|
||||
},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
}
|
||||
|
||||
Map<String, dynamic> _castMap(Object? value) {
|
||||
if (value is Map<String, dynamic>) {
|
||||
return value;
|
||||
}
|
||||
if (value is Map) {
|
||||
return value.cast<String, dynamic>();
|
||||
}
|
||||
return const <String, dynamic>{};
|
||||
}
|
||||
|
||||
List<Object?> _asList(Object? raw) {
|
||||
if (raw is List<Object?>) {
|
||||
return raw;
|
||||
}
|
||||
if (raw is List) {
|
||||
return raw.cast<Object?>();
|
||||
}
|
||||
return const <Object?>[];
|
||||
}
|
||||
|
||||
bool? _boolValue(Object? raw) {
|
||||
if (raw is bool) {
|
||||
return raw;
|
||||
}
|
||||
if (raw is num) {
|
||||
return raw != 0;
|
||||
}
|
||||
final text = raw?.toString().trim().toLowerCase();
|
||||
if (text == null || text.isEmpty) {
|
||||
return null;
|
||||
}
|
||||
if (text == 'true' || text == '1' || text == 'yes') {
|
||||
return true;
|
||||
}
|
||||
if (text == 'false' || text == '0' || text == 'no') {
|
||||
return false;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
238
lib/runtime/go_acp_stdio_bridge.dart
Normal file
238
lib/runtime/go_acp_stdio_bridge.dart
Normal file
@ -0,0 +1,238 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'embedded_agent_launch_policy.dart';
|
||||
import 'go_core.dart';
|
||||
|
||||
typedef GoAcpStdioProcessStarter =
|
||||
Future<Process> Function(
|
||||
String executable,
|
||||
List<String> arguments, {
|
||||
Map<String, String>? environment,
|
||||
String? workingDirectory,
|
||||
});
|
||||
|
||||
class GoAcpStdioBridge {
|
||||
GoAcpStdioBridge({
|
||||
GoCoreLocator? goCoreLocator,
|
||||
GoAcpStdioProcessStarter? processStarter,
|
||||
}) : _goCoreLocator = goCoreLocator ?? GoCoreLocator(),
|
||||
_processStarter =
|
||||
processStarter ??
|
||||
((executable, arguments, {environment, workingDirectory}) {
|
||||
return Process.start(
|
||||
executable,
|
||||
arguments,
|
||||
environment: environment,
|
||||
workingDirectory: workingDirectory,
|
||||
);
|
||||
});
|
||||
|
||||
final GoCoreLocator _goCoreLocator;
|
||||
final GoAcpStdioProcessStarter _processStarter;
|
||||
|
||||
final StreamController<Map<String, dynamic>> _notificationsController =
|
||||
StreamController<Map<String, dynamic>>.broadcast();
|
||||
final Map<String, Completer<Map<String, dynamic>>> _pending =
|
||||
<String, Completer<Map<String, dynamic>>>{};
|
||||
|
||||
Process? _process;
|
||||
StreamSubscription<String>? _stdoutSubscription;
|
||||
StreamSubscription<String>? _stderrSubscription;
|
||||
Future<void>? _startupFuture;
|
||||
int _requestCounter = 0;
|
||||
|
||||
Stream<Map<String, dynamic>> get notifications =>
|
||||
_notificationsController.stream;
|
||||
|
||||
Future<Map<String, dynamic>> request({
|
||||
required String method,
|
||||
required Map<String, dynamic> params,
|
||||
Duration timeout = const Duration(seconds: 120),
|
||||
}) async {
|
||||
await _ensureStarted();
|
||||
final process = _process;
|
||||
if (process == null) {
|
||||
throw StateError('Missing Go ACP stdio process.');
|
||||
}
|
||||
final id =
|
||||
'${DateTime.now().microsecondsSinceEpoch}-$method-${_requestCounter++}';
|
||||
final completer = Completer<Map<String, dynamic>>();
|
||||
_pending[id] = completer;
|
||||
process.stdin.writeln(
|
||||
jsonEncode(<String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'id': id,
|
||||
'method': method,
|
||||
'params': params,
|
||||
}),
|
||||
);
|
||||
try {
|
||||
return await completer.future.timeout(
|
||||
timeout,
|
||||
onTimeout: () => throw TimeoutException(
|
||||
'Go ACP stdio request timed out: $method',
|
||||
timeout,
|
||||
),
|
||||
);
|
||||
} finally {
|
||||
_pending.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> dispose() async {
|
||||
final process = _process;
|
||||
_process = null;
|
||||
_startupFuture = null;
|
||||
for (final completer in _pending.values) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(
|
||||
StateError('Go ACP stdio bridge disposed before response.'),
|
||||
);
|
||||
}
|
||||
}
|
||||
_pending.clear();
|
||||
await _stdoutSubscription?.cancel();
|
||||
await _stderrSubscription?.cancel();
|
||||
_stdoutSubscription = null;
|
||||
_stderrSubscription = null;
|
||||
if (process != null) {
|
||||
try {
|
||||
await process.stdin.close();
|
||||
} catch (_) {
|
||||
// Ignore broken pipes during disposal.
|
||||
}
|
||||
try {
|
||||
process.kill();
|
||||
} catch (_) {
|
||||
// Best effort only.
|
||||
}
|
||||
}
|
||||
await _notificationsController.close();
|
||||
}
|
||||
|
||||
Future<void> _ensureStarted() async {
|
||||
if (_process != null) {
|
||||
return;
|
||||
}
|
||||
final inFlight = _startupFuture;
|
||||
if (inFlight != null) {
|
||||
return inFlight;
|
||||
}
|
||||
final next = _start();
|
||||
_startupFuture = next;
|
||||
try {
|
||||
await next;
|
||||
} finally {
|
||||
_startupFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _start() async {
|
||||
final launch = await _goCoreLocator.locate();
|
||||
if (launch == null) {
|
||||
throw StateError('Go core is unavailable.');
|
||||
}
|
||||
if (shouldBlockGoCoreLaunch(
|
||||
launch,
|
||||
isAppleHost: Platform.isIOS || Platform.isMacOS,
|
||||
)) {
|
||||
throw UnsupportedError(
|
||||
'App Store builds only allow the bundled Go core helper inside the app bundle.',
|
||||
);
|
||||
}
|
||||
final process = await _processStarter(
|
||||
launch.executable,
|
||||
<String>[...launch.arguments, 'acp-stdio'],
|
||||
environment: Platform.environment,
|
||||
workingDirectory: launch.workingDirectory,
|
||||
);
|
||||
_process = process;
|
||||
_stdoutSubscription = process.stdout
|
||||
.transform(utf8.decoder)
|
||||
.transform(const LineSplitter())
|
||||
.listen(_handleStdoutLine, onError: _handleProcessError);
|
||||
_stderrSubscription = process.stderr
|
||||
.transform(utf8.decoder)
|
||||
.transform(const LineSplitter())
|
||||
.listen((_) {}, onError: _handleProcessError);
|
||||
unawaited(
|
||||
process.exitCode.then((exitCode) {
|
||||
if (_process != process) {
|
||||
return;
|
||||
}
|
||||
_process = null;
|
||||
_failPending(
|
||||
StateError('Go ACP stdio process exited with code $exitCode'),
|
||||
);
|
||||
}),
|
||||
);
|
||||
await request(method: 'acp.capabilities', params: const <String, dynamic>{});
|
||||
}
|
||||
|
||||
void _handleStdoutLine(String line) {
|
||||
final trimmed = line.trim();
|
||||
if (trimmed.isEmpty || !trimmed.startsWith('{')) {
|
||||
return;
|
||||
}
|
||||
final json = _decodeMap(trimmed);
|
||||
final id = json['id']?.toString().trim();
|
||||
if (id != null && id.isNotEmpty) {
|
||||
final completer = _pending[id];
|
||||
if (completer == null || completer.isCompleted) {
|
||||
return;
|
||||
}
|
||||
final error = _castMap(json['error']);
|
||||
if (error.isNotEmpty) {
|
||||
completer.completeError(
|
||||
StateError(
|
||||
error['message']?.toString() ?? 'Go ACP stdio request failed',
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
completer.complete(json);
|
||||
return;
|
||||
}
|
||||
if ((json['method']?.toString().trim() ?? '').isNotEmpty &&
|
||||
!_notificationsController.isClosed) {
|
||||
_notificationsController.add(json);
|
||||
}
|
||||
}
|
||||
|
||||
void _handleProcessError(Object error) {
|
||||
_failPending(error);
|
||||
}
|
||||
|
||||
void _failPending(Object error) {
|
||||
final pending = Map<String, Completer<Map<String, dynamic>>>.from(_pending);
|
||||
_pending.clear();
|
||||
for (final completer in pending.values) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, dynamic> _decodeMap(String raw) {
|
||||
final decoded = jsonDecode(raw);
|
||||
if (decoded is Map<String, dynamic>) {
|
||||
return decoded;
|
||||
}
|
||||
if (decoded is Map) {
|
||||
return decoded.cast<String, dynamic>();
|
||||
}
|
||||
return const <String, dynamic>{};
|
||||
}
|
||||
|
||||
Map<String, dynamic> _castMap(Object? value) {
|
||||
if (value is Map<String, dynamic>) {
|
||||
return value;
|
||||
}
|
||||
if (value is Map) {
|
||||
return value.cast<String, dynamic>();
|
||||
}
|
||||
return const <String, dynamic>{};
|
||||
}
|
||||
}
|
||||
@ -1,52 +1,24 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'embedded_agent_launch_policy.dart';
|
||||
import 'gateway_runtime_errors.dart';
|
||||
import 'gateway_runtime_helpers.dart';
|
||||
import 'gateway_runtime_session_client.dart';
|
||||
import 'go_core.dart';
|
||||
|
||||
typedef GoGatewayRuntimeProcessStarter =
|
||||
Future<Process> Function(
|
||||
String executable,
|
||||
List<String> arguments, {
|
||||
Map<String, String>? environment,
|
||||
String? workingDirectory,
|
||||
});
|
||||
import 'go_acp_stdio_bridge.dart';
|
||||
|
||||
class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
GoGatewayRuntimeDesktopClient({
|
||||
GoCoreLocator? goCoreLocator,
|
||||
GoGatewayRuntimeProcessStarter? processStarter,
|
||||
}) : _goCoreLocator = goCoreLocator ?? GoCoreLocator(),
|
||||
_processStarter =
|
||||
processStarter ??
|
||||
((executable, arguments, {environment, workingDirectory}) {
|
||||
return Process.start(
|
||||
executable,
|
||||
arguments,
|
||||
environment: environment,
|
||||
workingDirectory: workingDirectory,
|
||||
);
|
||||
});
|
||||
|
||||
final GoCoreLocator _goCoreLocator;
|
||||
final GoGatewayRuntimeProcessStarter _processStarter;
|
||||
GoGatewayRuntimeDesktopClient({GoAcpStdioBridge? bridge})
|
||||
: _bridge = bridge ?? GoAcpStdioBridge() {
|
||||
_notificationsSubscription = _bridge.notifications.listen(
|
||||
_handleNotification,
|
||||
onError: (Object error, StackTrace stackTrace) {
|
||||
_updatesController.addError(error, stackTrace);
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
final GoAcpStdioBridge _bridge;
|
||||
late final StreamSubscription<Map<String, dynamic>> _notificationsSubscription;
|
||||
final StreamController<GatewayRuntimeSessionUpdate> _updatesController =
|
||||
StreamController<GatewayRuntimeSessionUpdate>.broadcast();
|
||||
final Map<String, Completer<Map<String, dynamic>>> _pending =
|
||||
<String, Completer<Map<String, dynamic>>>{};
|
||||
|
||||
Process? _localProcess;
|
||||
Uri? _localEndpoint;
|
||||
Future<Uri?>? _localEndpointFuture;
|
||||
WebSocket? _socket;
|
||||
StreamSubscription<dynamic>? _socketSubscription;
|
||||
Future<void>? _socketReadyFuture;
|
||||
int _requestCounter = 0;
|
||||
|
||||
@override
|
||||
Stream<GatewayRuntimeSessionUpdate> get updates => _updatesController.stream;
|
||||
@ -59,7 +31,7 @@ class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
method: 'xworkmate.gateway.connect',
|
||||
params: request.toJson(),
|
||||
);
|
||||
if (boolValue(result['ok']) != true) {
|
||||
if (_boolValue(result['ok']) != true) {
|
||||
throw _gatewayErrorFromResult(
|
||||
result,
|
||||
fallbackMessage: 'Gateway connect failed',
|
||||
@ -84,7 +56,7 @@ class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
'timeoutMs': timeout.inMilliseconds,
|
||||
},
|
||||
);
|
||||
if (boolValue(result['ok']) != true) {
|
||||
if (_boolValue(result['ok']) != true) {
|
||||
throw _gatewayErrorFromResult(
|
||||
result,
|
||||
fallbackMessage: '$method request failed',
|
||||
@ -103,37 +75,8 @@ class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
|
||||
@override
|
||||
Future<void> dispose() async {
|
||||
for (final completer in _pending.values) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(
|
||||
GatewayRuntimeException(
|
||||
'Go gateway runtime transport disposed',
|
||||
code: 'GO_GATEWAY_RUNTIME_TRANSPORT_DISPOSED',
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
_pending.clear();
|
||||
await _socketSubscription?.cancel();
|
||||
_socketSubscription = null;
|
||||
try {
|
||||
await _socket?.close();
|
||||
} catch (_) {
|
||||
// Best effort only.
|
||||
}
|
||||
_socket = null;
|
||||
_socketReadyFuture = null;
|
||||
final process = _localProcess;
|
||||
_localProcess = null;
|
||||
_localEndpoint = null;
|
||||
_localEndpointFuture = null;
|
||||
if (process != null) {
|
||||
try {
|
||||
process.kill();
|
||||
} catch (_) {
|
||||
// Best effort only.
|
||||
}
|
||||
}
|
||||
await _notificationsSubscription.cancel();
|
||||
await _bridge.dispose();
|
||||
await _updatesController.close();
|
||||
}
|
||||
|
||||
@ -141,208 +84,24 @@ class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
required String method,
|
||||
required Map<String, dynamic> params,
|
||||
}) async {
|
||||
await _ensureSocketReady();
|
||||
final socket = _socket;
|
||||
if (socket == null) {
|
||||
throw GatewayRuntimeException(
|
||||
'Missing Go gateway runtime transport',
|
||||
code: 'GO_GATEWAY_RUNTIME_TRANSPORT_UNAVAILABLE',
|
||||
);
|
||||
}
|
||||
final requestId =
|
||||
'${DateTime.now().microsecondsSinceEpoch}-$method-${_requestCounter++}';
|
||||
final completer = Completer<Map<String, dynamic>>();
|
||||
_pending[requestId] = completer;
|
||||
socket.add(
|
||||
jsonEncode(<String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'id': requestId,
|
||||
'method': method,
|
||||
'params': params,
|
||||
}),
|
||||
);
|
||||
try {
|
||||
return await completer.future.timeout(const Duration(seconds: 120));
|
||||
} finally {
|
||||
_pending.remove(requestId);
|
||||
}
|
||||
final response = await _bridge.request(method: method, params: params);
|
||||
return _castMap(response['result']);
|
||||
}
|
||||
|
||||
Future<void> _ensureSocketReady() async {
|
||||
final inFlight = _socketReadyFuture;
|
||||
if (inFlight != null) {
|
||||
return inFlight;
|
||||
}
|
||||
final next = _openSocket();
|
||||
_socketReadyFuture = next;
|
||||
try {
|
||||
await next;
|
||||
} finally {
|
||||
_socketReadyFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _openSocket() async {
|
||||
if (_socket != null) {
|
||||
return;
|
||||
}
|
||||
final endpoint = await _ensureLocalEndpoint();
|
||||
if (endpoint == null) {
|
||||
throw GatewayRuntimeException(
|
||||
'Missing Go gateway runtime endpoint',
|
||||
code: 'GO_GATEWAY_RUNTIME_ENDPOINT_MISSING',
|
||||
);
|
||||
}
|
||||
final wsEndpoint = endpoint.replace(
|
||||
scheme: endpoint.scheme == 'https' ? 'wss' : 'ws',
|
||||
path: '/acp',
|
||||
);
|
||||
final socket = await WebSocket.connect(wsEndpoint.toString()).timeout(
|
||||
const Duration(seconds: 6),
|
||||
onTimeout: () => throw GatewayRuntimeException(
|
||||
'Go gateway runtime websocket connect timeout',
|
||||
code: 'GO_GATEWAY_RUNTIME_WS_CONNECT_TIMEOUT',
|
||||
),
|
||||
);
|
||||
_socket = socket;
|
||||
_socketSubscription = socket.listen(
|
||||
_handleSocketMessage,
|
||||
onError: (Object error, StackTrace stackTrace) {
|
||||
_failPending(
|
||||
GatewayRuntimeException(
|
||||
error.toString(),
|
||||
code: 'GO_GATEWAY_RUNTIME_WS_ERROR',
|
||||
),
|
||||
);
|
||||
},
|
||||
onDone: () {
|
||||
_socket = null;
|
||||
_socketSubscription = null;
|
||||
_failPending(
|
||||
GatewayRuntimeException(
|
||||
'Go gateway runtime websocket closed',
|
||||
code: 'GO_GATEWAY_RUNTIME_WS_CLOSED',
|
||||
),
|
||||
);
|
||||
},
|
||||
cancelOnError: true,
|
||||
);
|
||||
}
|
||||
|
||||
void _handleSocketMessage(dynamic raw) {
|
||||
final json = _decodeMap(raw);
|
||||
final id = json['id']?.toString().trim();
|
||||
if (id != null && id.isNotEmpty) {
|
||||
final completer = _pending[id];
|
||||
if (completer != null && !completer.isCompleted) {
|
||||
final error = _castMap(json['error']);
|
||||
if (error.isNotEmpty) {
|
||||
completer.completeError(
|
||||
GatewayRuntimeException(
|
||||
error['message']?.toString() ??
|
||||
'Go gateway runtime request failed',
|
||||
code: error['code']?.toString(),
|
||||
),
|
||||
);
|
||||
} else {
|
||||
completer.complete(_castMap(json['result']));
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
final method = json['method']?.toString().trim() ?? '';
|
||||
void _handleNotification(Map<String, dynamic> notification) {
|
||||
final method = notification['method']?.toString().trim() ?? '';
|
||||
if (method.isEmpty) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
_updatesController.add(
|
||||
GatewayRuntimeSessionUpdate.fromNotification(json),
|
||||
GatewayRuntimeSessionUpdate.fromNotification(notification),
|
||||
);
|
||||
} catch (_) {
|
||||
// Ignore unrelated ACP notifications.
|
||||
}
|
||||
}
|
||||
|
||||
void _failPending(GatewayRuntimeException error) {
|
||||
for (final completer in _pending.values) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.completeError(error);
|
||||
}
|
||||
}
|
||||
_pending.clear();
|
||||
}
|
||||
|
||||
Future<Uri?> _ensureLocalEndpoint() async {
|
||||
if (_localEndpoint != null) {
|
||||
return _localEndpoint;
|
||||
}
|
||||
final inFlight = _localEndpointFuture;
|
||||
if (inFlight != null) {
|
||||
return inFlight;
|
||||
}
|
||||
final next = _startLocalProcess();
|
||||
_localEndpointFuture = next;
|
||||
try {
|
||||
_localEndpoint = await next;
|
||||
return _localEndpoint;
|
||||
} finally {
|
||||
_localEndpointFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Uri?> _startLocalProcess() async {
|
||||
final launch = await _goCoreLocator.locate();
|
||||
if (launch == null) {
|
||||
return null;
|
||||
}
|
||||
if (shouldBlockGoCoreLaunch(
|
||||
launch,
|
||||
isAppleHost: Platform.isIOS || Platform.isMacOS,
|
||||
)) {
|
||||
return null;
|
||||
}
|
||||
final reservedSocket = await ServerSocket.bind(
|
||||
InternetAddress.loopbackIPv4,
|
||||
0,
|
||||
);
|
||||
final port = reservedSocket.port;
|
||||
await reservedSocket.close();
|
||||
final listenAddress = '127.0.0.1:$port';
|
||||
final process = await _processStarter(
|
||||
launch.executable,
|
||||
<String>[...launch.arguments, 'serve', '--listen', listenAddress],
|
||||
environment: Platform.environment,
|
||||
workingDirectory: launch.workingDirectory,
|
||||
);
|
||||
_localProcess = process;
|
||||
unawaited(process.stdout.drain<void>());
|
||||
unawaited(process.stderr.drain<void>());
|
||||
final endpoint = Uri(scheme: 'http', host: '127.0.0.1', port: port);
|
||||
final deadline = DateTime.now().add(const Duration(seconds: 8));
|
||||
while (DateTime.now().isBefore(deadline)) {
|
||||
if (_localProcess != process) {
|
||||
break;
|
||||
}
|
||||
final exitCode = await process.exitCode.timeout(
|
||||
const Duration(milliseconds: 20),
|
||||
onTimeout: () => -1,
|
||||
);
|
||||
if (exitCode != -1) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
final probe = await WebSocket.connect(
|
||||
endpoint.replace(scheme: 'ws', path: '/acp').toString(),
|
||||
).timeout(const Duration(milliseconds: 300));
|
||||
await probe.close();
|
||||
return endpoint;
|
||||
} catch (_) {
|
||||
await Future<void>.delayed(const Duration(milliseconds: 120));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
GatewayRuntimeException _gatewayErrorFromResult(
|
||||
Map<String, dynamic> result, {
|
||||
required String fallbackMessage,
|
||||
@ -355,22 +114,6 @@ class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
);
|
||||
}
|
||||
|
||||
Map<String, dynamic> _decodeMap(dynamic raw) {
|
||||
if (raw is Map<String, dynamic>) {
|
||||
return raw;
|
||||
}
|
||||
if (raw is Map) {
|
||||
return raw.cast<String, dynamic>();
|
||||
}
|
||||
if (raw is String) {
|
||||
return _castMap(jsonDecode(raw));
|
||||
}
|
||||
if (raw is List<int>) {
|
||||
return _castMap(jsonDecode(utf8.decode(raw)));
|
||||
}
|
||||
return const <String, dynamic>{};
|
||||
}
|
||||
|
||||
Map<String, dynamic> _castMap(Object? value) {
|
||||
if (value is Map<String, dynamic>) {
|
||||
return value;
|
||||
@ -380,4 +123,24 @@ class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
|
||||
}
|
||||
return const <String, dynamic>{};
|
||||
}
|
||||
|
||||
bool? _boolValue(Object? raw) {
|
||||
if (raw is bool) {
|
||||
return raw;
|
||||
}
|
||||
if (raw is num) {
|
||||
return raw != 0;
|
||||
}
|
||||
final text = raw?.toString().trim().toLowerCase();
|
||||
if (text == null || text.isEmpty) {
|
||||
return null;
|
||||
}
|
||||
if (text == 'true' || text == '1' || text == 'yes') {
|
||||
return true;
|
||||
}
|
||||
if (text == 'false' || text == '0' || text == 'no') {
|
||||
return false;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,45 +1,12 @@
|
||||
import 'dart:async';
|
||||
import 'dart:io';
|
||||
|
||||
import 'embedded_agent_launch_policy.dart';
|
||||
import 'gateway_acp_client.dart';
|
||||
import 'go_core.dart';
|
||||
import 'go_acp_stdio_bridge.dart';
|
||||
import 'multi_agent_mount_resolver.dart';
|
||||
import 'runtime_models.dart';
|
||||
|
||||
typedef GoMultiAgentMountProcessStarter =
|
||||
Future<Process> Function(
|
||||
String executable,
|
||||
List<String> arguments, {
|
||||
Map<String, String>? environment,
|
||||
String? workingDirectory,
|
||||
});
|
||||
|
||||
class GoMultiAgentMountDesktopClient implements MultiAgentMountResolver {
|
||||
GoMultiAgentMountDesktopClient({
|
||||
GatewayAcpClient? acpClient,
|
||||
GoCoreLocator? goCoreLocator,
|
||||
GoMultiAgentMountProcessStarter? processStarter,
|
||||
}) : _acpClient = acpClient ?? GatewayAcpClient(endpointResolver: () => null),
|
||||
_goCoreLocator = goCoreLocator ?? GoCoreLocator(),
|
||||
_processStarter =
|
||||
processStarter ??
|
||||
((executable, arguments, {environment, workingDirectory}) {
|
||||
return Process.start(
|
||||
executable,
|
||||
arguments,
|
||||
environment: environment,
|
||||
workingDirectory: workingDirectory,
|
||||
);
|
||||
});
|
||||
GoMultiAgentMountDesktopClient({GoAcpStdioBridge? bridge})
|
||||
: _bridge = bridge ?? GoAcpStdioBridge();
|
||||
|
||||
final GatewayAcpClient _acpClient;
|
||||
final GoCoreLocator _goCoreLocator;
|
||||
final GoMultiAgentMountProcessStarter _processStarter;
|
||||
|
||||
Process? _localProcess;
|
||||
Uri? _localEndpoint;
|
||||
Future<Uri?>? _localEndpointFuture;
|
||||
final GoAcpStdioBridge _bridge;
|
||||
|
||||
@override
|
||||
Future<MultiAgentConfig?> reconcile({
|
||||
@ -50,11 +17,7 @@ class GoMultiAgentMountDesktopClient implements MultiAgentMountResolver {
|
||||
required String opencodeHome,
|
||||
required ArisMountProbe arisProbe,
|
||||
}) async {
|
||||
final endpoint = await _ensureLocalEndpoint();
|
||||
if (endpoint == null) {
|
||||
return null;
|
||||
}
|
||||
final response = await _acpClient.request(
|
||||
final response = await _bridge.request(
|
||||
method: 'xworkmate.mounts.reconcile',
|
||||
params: <String, dynamic>{
|
||||
'config': <String, dynamic>{
|
||||
@ -70,7 +33,6 @@ class GoMultiAgentMountDesktopClient implements MultiAgentMountResolver {
|
||||
'opencodeHome': opencodeHome.trim(),
|
||||
'aris': arisProbe.toJson(),
|
||||
},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
final result = _castMap(response['result']);
|
||||
final rawTargets = result['mountTargets'];
|
||||
@ -98,93 +60,7 @@ class GoMultiAgentMountDesktopClient implements MultiAgentMountResolver {
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> dispose() async {
|
||||
final process = _localProcess;
|
||||
_localProcess = null;
|
||||
_localEndpoint = null;
|
||||
_localEndpointFuture = null;
|
||||
if (process != null) {
|
||||
try {
|
||||
process.kill();
|
||||
} catch (_) {
|
||||
// Best effort only.
|
||||
}
|
||||
}
|
||||
await _acpClient.dispose();
|
||||
}
|
||||
|
||||
Future<Uri?> _ensureLocalEndpoint() async {
|
||||
if (_localEndpoint != null) {
|
||||
return _localEndpoint;
|
||||
}
|
||||
final inFlight = _localEndpointFuture;
|
||||
if (inFlight != null) {
|
||||
return inFlight;
|
||||
}
|
||||
final next = _startLocalProcess();
|
||||
_localEndpointFuture = next;
|
||||
try {
|
||||
_localEndpoint = await next;
|
||||
return _localEndpoint;
|
||||
} finally {
|
||||
_localEndpointFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Uri?> _startLocalProcess() async {
|
||||
final launch = await _goCoreLocator.locate();
|
||||
if (launch == null) {
|
||||
return null;
|
||||
}
|
||||
if (shouldBlockGoCoreLaunch(
|
||||
launch,
|
||||
isAppleHost: Platform.isIOS || Platform.isMacOS,
|
||||
)) {
|
||||
return null;
|
||||
}
|
||||
final reservedSocket = await ServerSocket.bind(
|
||||
InternetAddress.loopbackIPv4,
|
||||
0,
|
||||
);
|
||||
final port = reservedSocket.port;
|
||||
await reservedSocket.close();
|
||||
final listenAddress = '127.0.0.1:$port';
|
||||
final process = await _processStarter(
|
||||
launch.executable,
|
||||
<String>[...launch.arguments, 'serve', '--listen', listenAddress],
|
||||
environment: Platform.environment,
|
||||
workingDirectory: launch.workingDirectory,
|
||||
);
|
||||
_localProcess = process;
|
||||
unawaited(process.stdout.drain<void>());
|
||||
unawaited(process.stderr.drain<void>());
|
||||
final endpoint = Uri(scheme: 'http', host: '127.0.0.1', port: port);
|
||||
final deadline = DateTime.now().add(const Duration(seconds: 8));
|
||||
while (DateTime.now().isBefore(deadline)) {
|
||||
if (_localProcess != process) {
|
||||
break;
|
||||
}
|
||||
final exitCode = await process.exitCode.timeout(
|
||||
const Duration(milliseconds: 20),
|
||||
onTimeout: () => -1,
|
||||
);
|
||||
if (exitCode != -1) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
await _acpClient.request(
|
||||
method: 'acp.capabilities',
|
||||
params: const <String, dynamic>{},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
return endpoint;
|
||||
} catch (_) {
|
||||
await Future<void>.delayed(const Duration(milliseconds: 120));
|
||||
}
|
||||
}
|
||||
await dispose();
|
||||
return null;
|
||||
}
|
||||
Future<void> dispose() => _bridge.dispose();
|
||||
|
||||
Map<String, dynamic> _castMap(Object? value) {
|
||||
if (value is Map<String, dynamic>) {
|
||||
|
||||
@ -1,45 +1,12 @@
|
||||
import 'dart:async';
|
||||
import 'dart:io';
|
||||
|
||||
import 'embedded_agent_launch_policy.dart';
|
||||
import 'gateway_acp_client.dart';
|
||||
import 'go_core.dart';
|
||||
import 'go_acp_stdio_bridge.dart';
|
||||
import 'runtime_dispatch_resolver.dart';
|
||||
import 'runtime_external_code_agents.dart';
|
||||
|
||||
typedef GoRuntimeDispatchProcessStarter =
|
||||
Future<Process> Function(
|
||||
String executable,
|
||||
List<String> arguments, {
|
||||
Map<String, String>? environment,
|
||||
String? workingDirectory,
|
||||
});
|
||||
|
||||
class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
GoRuntimeDispatchDesktopClient({
|
||||
GatewayAcpClient? acpClient,
|
||||
GoCoreLocator? goCoreLocator,
|
||||
GoRuntimeDispatchProcessStarter? processStarter,
|
||||
}) : _acpClient = acpClient ?? GatewayAcpClient(endpointResolver: () => null),
|
||||
_goCoreLocator = goCoreLocator ?? GoCoreLocator(),
|
||||
_processStarter =
|
||||
processStarter ??
|
||||
((executable, arguments, {environment, workingDirectory}) {
|
||||
return Process.start(
|
||||
executable,
|
||||
arguments,
|
||||
environment: environment,
|
||||
workingDirectory: workingDirectory,
|
||||
);
|
||||
});
|
||||
GoRuntimeDispatchDesktopClient({GoAcpStdioBridge? bridge})
|
||||
: _bridge = bridge ?? GoAcpStdioBridge();
|
||||
|
||||
final GatewayAcpClient _acpClient;
|
||||
final GoCoreLocator _goCoreLocator;
|
||||
final GoRuntimeDispatchProcessStarter _processStarter;
|
||||
|
||||
Process? _localProcess;
|
||||
Uri? _localEndpoint;
|
||||
Future<Uri?>? _localEndpointFuture;
|
||||
final GoAcpStdioBridge _bridge;
|
||||
|
||||
@override
|
||||
Future<String?> selectProviderId({
|
||||
@ -47,11 +14,7 @@ class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
String preferredProviderId = '',
|
||||
Iterable<String> requiredCapabilities = const <String>[],
|
||||
}) async {
|
||||
final endpoint = await _ensureLocalEndpoint();
|
||||
if (endpoint == null) {
|
||||
return null;
|
||||
}
|
||||
final response = await _acpClient.request(
|
||||
final response = await _bridge.request(
|
||||
method: 'xworkmate.dispatch.resolve',
|
||||
params: <String, dynamic>{
|
||||
'preferredProviderId': preferredProviderId.trim(),
|
||||
@ -61,7 +24,6 @@ class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
.toList(growable: false),
|
||||
'providers': providers.map(_providerToJson).toList(growable: false),
|
||||
},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
final result = _castMap(response['result']);
|
||||
return result['providerId']?.toString().trim().isNotEmpty == true
|
||||
@ -77,11 +39,7 @@ class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
required Map<String, dynamic> nodeState,
|
||||
required Map<String, dynamic> nodeInfo,
|
||||
}) async {
|
||||
final endpoint = await _ensureLocalEndpoint();
|
||||
if (endpoint == null) {
|
||||
return const RuntimeDispatchResolution(metadata: <String, dynamic>{});
|
||||
}
|
||||
final response = await _acpClient.request(
|
||||
final response = await _bridge.request(
|
||||
method: 'xworkmate.dispatch.resolve',
|
||||
params: <String, dynamic>{
|
||||
'preferredProviderId': preferredProviderId.trim(),
|
||||
@ -93,7 +51,6 @@ class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
'nodeState': nodeState,
|
||||
'nodeInfo': nodeInfo,
|
||||
},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
final result = _castMap(response['result']);
|
||||
return RuntimeDispatchResolution(
|
||||
@ -109,20 +66,7 @@ class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> dispose() async {
|
||||
final process = _localProcess;
|
||||
_localProcess = null;
|
||||
_localEndpoint = null;
|
||||
_localEndpointFuture = null;
|
||||
if (process != null) {
|
||||
try {
|
||||
process.kill();
|
||||
} catch (_) {
|
||||
// Best effort only.
|
||||
}
|
||||
}
|
||||
await _acpClient.dispose();
|
||||
}
|
||||
Future<void> dispose() => _bridge.dispose();
|
||||
|
||||
Map<String, dynamic> _providerToJson(ExternalCodeAgentProvider provider) {
|
||||
return <String, dynamic>{
|
||||
@ -133,79 +77,6 @@ class GoRuntimeDispatchDesktopClient implements RuntimeDispatchResolver {
|
||||
};
|
||||
}
|
||||
|
||||
Future<Uri?> _ensureLocalEndpoint() async {
|
||||
if (_localEndpoint != null) {
|
||||
return _localEndpoint;
|
||||
}
|
||||
final inFlight = _localEndpointFuture;
|
||||
if (inFlight != null) {
|
||||
return inFlight;
|
||||
}
|
||||
final next = _startLocalProcess();
|
||||
_localEndpointFuture = next;
|
||||
try {
|
||||
_localEndpoint = await next;
|
||||
return _localEndpoint;
|
||||
} finally {
|
||||
_localEndpointFuture = null;
|
||||
}
|
||||
}
|
||||
|
||||
Future<Uri?> _startLocalProcess() async {
|
||||
final launch = await _goCoreLocator.locate();
|
||||
if (launch == null) {
|
||||
return null;
|
||||
}
|
||||
if (shouldBlockGoCoreLaunch(
|
||||
launch,
|
||||
isAppleHost: Platform.isIOS || Platform.isMacOS,
|
||||
)) {
|
||||
return null;
|
||||
}
|
||||
final reservedSocket = await ServerSocket.bind(
|
||||
InternetAddress.loopbackIPv4,
|
||||
0,
|
||||
);
|
||||
final port = reservedSocket.port;
|
||||
await reservedSocket.close();
|
||||
final listenAddress = '127.0.0.1:$port';
|
||||
final process = await _processStarter(
|
||||
launch.executable,
|
||||
<String>[...launch.arguments, 'serve', '--listen', listenAddress],
|
||||
environment: Platform.environment,
|
||||
workingDirectory: launch.workingDirectory,
|
||||
);
|
||||
_localProcess = process;
|
||||
unawaited(process.stdout.drain<void>());
|
||||
unawaited(process.stderr.drain<void>());
|
||||
final endpoint = Uri(scheme: 'http', host: '127.0.0.1', port: port);
|
||||
final deadline = DateTime.now().add(const Duration(seconds: 8));
|
||||
while (DateTime.now().isBefore(deadline)) {
|
||||
if (_localProcess != process) {
|
||||
break;
|
||||
}
|
||||
final exitCode = await process.exitCode.timeout(
|
||||
const Duration(milliseconds: 20),
|
||||
onTimeout: () => -1,
|
||||
);
|
||||
if (exitCode != -1) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
await _acpClient.request(
|
||||
method: 'acp.capabilities',
|
||||
params: const <String, dynamic>{},
|
||||
endpointOverride: endpoint,
|
||||
);
|
||||
return endpoint;
|
||||
} catch (_) {
|
||||
await Future<void>.delayed(const Duration(milliseconds: 120));
|
||||
}
|
||||
}
|
||||
await dispose();
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String, dynamic> _castMap(Object? value) {
|
||||
if (value is Map<String, dynamic>) {
|
||||
return value;
|
||||
|
||||
@ -579,16 +579,21 @@ GoTaskServiceResult goTaskServiceResultFromAcpResponse(
|
||||
String? completedMessage,
|
||||
}) {
|
||||
final result = _castMap(response['result']);
|
||||
final responseText =
|
||||
(result['output']?.toString().trim().isNotEmpty == true
|
||||
? result['output'].toString().trim()
|
||||
: result['summary']?.toString().trim().isNotEmpty == true
|
||||
? result['summary'].toString().trim()
|
||||
: result['message']?.toString().trim() ?? '')
|
||||
.trim();
|
||||
final primaryText =
|
||||
(completedMessage?.trim().isNotEmpty == true
|
||||
? completedMessage!.trim()
|
||||
: responseText.isNotEmpty
|
||||
? responseText
|
||||
: streamedText.trim().isNotEmpty
|
||||
? streamedText.trim()
|
||||
: (result['output']?.toString().trim().isNotEmpty == true
|
||||
? result['output'].toString().trim()
|
||||
: result['summary']?.toString().trim().isNotEmpty == true
|
||||
? result['summary'].toString().trim()
|
||||
: result['message']?.toString().trim() ?? ''))
|
||||
: '')
|
||||
.trim();
|
||||
return GoTaskServiceResult(
|
||||
success: _boolValue(result['success']) ?? true,
|
||||
|
||||
@ -2,29 +2,70 @@
|
||||
library;
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:flutter_test/flutter_test.dart';
|
||||
import 'package:xworkmate/runtime/external_code_agent_acp_desktop_transport.dart';
|
||||
import 'package:xworkmate/runtime/gateway_acp_client.dart';
|
||||
import 'package:xworkmate/runtime/go_acp_stdio_bridge.dart';
|
||||
import 'package:xworkmate/runtime/go_task_service_client.dart';
|
||||
import 'package:xworkmate/runtime/runtime_models.dart';
|
||||
|
||||
void main() {
|
||||
group('ExternalCodeAgentAcpDesktopTransport', () {
|
||||
test('uses resolved gateway endpoint for local gateway sessions', () async {
|
||||
final server = await _AcpFakeServer.start();
|
||||
addTearDown(server.close);
|
||||
|
||||
final transport = ExternalCodeAgentAcpDesktopTransport(
|
||||
acpClient: GatewayAcpClient(endpointResolver: () => null),
|
||||
endpointResolver: (target) => switch (target) {
|
||||
AssistantExecutionTarget.local => server.baseHttpUri,
|
||||
_ => null,
|
||||
test('uses direct Go ACP stdio bridge for desktop task execution', () async {
|
||||
late final _FakeGoAcpStdioBridge bridge;
|
||||
bridge = _FakeGoAcpStdioBridge(
|
||||
handler: (method, params) async {
|
||||
switch (method) {
|
||||
case 'acp.capabilities':
|
||||
return <String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'id': 'capabilities',
|
||||
'result': <String, dynamic>{
|
||||
'singleAgent': true,
|
||||
'multiAgent': true,
|
||||
'providers': <String>['codex'],
|
||||
'capabilities': <String, dynamic>{
|
||||
'single_agent': true,
|
||||
'multi_agent': true,
|
||||
'providers': <String>['codex'],
|
||||
},
|
||||
},
|
||||
};
|
||||
case 'xworkmate.providers.sync':
|
||||
return <String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'id': 'sync',
|
||||
'result': <String, dynamic>{'ok': true},
|
||||
};
|
||||
case 'session.start':
|
||||
bridge.emit(<String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'method': 'session.update',
|
||||
'params': <String, dynamic>{
|
||||
'sessionId': 'session-local',
|
||||
'threadId': 'thread-local',
|
||||
'turnId': 'turn-1',
|
||||
'type': 'delta',
|
||||
'delta': 'gateway-',
|
||||
},
|
||||
});
|
||||
return <String, dynamic>{
|
||||
'jsonrpc': '2.0',
|
||||
'id': 'start',
|
||||
'result': <String, dynamic>{
|
||||
'success': true,
|
||||
'message': 'gateway-ok',
|
||||
'summary': 'gateway-ok',
|
||||
'turnId': 'turn-1',
|
||||
},
|
||||
};
|
||||
}
|
||||
throw StateError('Unexpected method: $method');
|
||||
},
|
||||
);
|
||||
final transport = ExternalCodeAgentAcpDesktopTransport(bridge: bridge);
|
||||
|
||||
final updates = <GoTaskServiceUpdate>[];
|
||||
final result = await transport.executeTask(
|
||||
const GoTaskServiceRequest(
|
||||
sessionId: 'session-local',
|
||||
@ -42,112 +83,52 @@ void main() {
|
||||
agentId: '',
|
||||
metadata: <String, dynamic>{},
|
||||
),
|
||||
onUpdate: (_) {},
|
||||
onUpdate: updates.add,
|
||||
);
|
||||
|
||||
expect(result.success, isTrue);
|
||||
expect(result.message, 'gateway-ok');
|
||||
expect(server.lastHttpRequestPath, '/acp/rpc');
|
||||
expect(server.rpcMethods, contains('session.start'));
|
||||
expect(server.lastSessionMode, 'gateway-chat');
|
||||
});
|
||||
|
||||
test('reports missing endpoint when gateway target cannot resolve', () async {
|
||||
final transport = ExternalCodeAgentAcpDesktopTransport(
|
||||
acpClient: GatewayAcpClient(endpointResolver: () => null),
|
||||
endpointResolver: (_) => null,
|
||||
);
|
||||
|
||||
await expectLater(
|
||||
() => transport.executeTask(
|
||||
const GoTaskServiceRequest(
|
||||
sessionId: 'session-local',
|
||||
threadId: 'thread-local',
|
||||
target: AssistantExecutionTarget.local,
|
||||
prompt: 'ping local gateway',
|
||||
workingDirectory: '/tmp',
|
||||
model: '',
|
||||
thinking: '',
|
||||
selectedSkills: <String>[],
|
||||
inlineAttachments: <GatewayChatAttachmentPayload>[],
|
||||
localAttachments: <CollaborationAttachment>[],
|
||||
aiGatewayBaseUrl: '',
|
||||
aiGatewayApiKey: '',
|
||||
agentId: '',
|
||||
metadata: <String, dynamic>{},
|
||||
),
|
||||
onUpdate: (_) {},
|
||||
),
|
||||
throwsA(
|
||||
isA<GatewayAcpException>().having(
|
||||
(error) => error.code,
|
||||
'code',
|
||||
'EXTERNAL_ACP_ENDPOINT_MISSING',
|
||||
),
|
||||
),
|
||||
expect(
|
||||
bridge.recordedMethods,
|
||||
containsAll(<String>['xworkmate.providers.sync', 'session.start']),
|
||||
);
|
||||
expect(updates.single.text, 'gateway-');
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
class _AcpFakeServer {
|
||||
_AcpFakeServer._(this._server);
|
||||
class _FakeGoAcpStdioBridge extends GoAcpStdioBridge {
|
||||
_FakeGoAcpStdioBridge({required this.handler});
|
||||
|
||||
final HttpServer _server;
|
||||
final List<String> rpcMethods = <String>[];
|
||||
String? lastHttpRequestPath;
|
||||
String? lastSessionMode;
|
||||
final Future<Map<String, dynamic>> Function(
|
||||
String method,
|
||||
Map<String, dynamic> params,
|
||||
)
|
||||
handler;
|
||||
|
||||
Uri get baseHttpUri => Uri.parse('http://127.0.0.1:${_server.port}');
|
||||
final StreamController<Map<String, dynamic>> _notificationsController =
|
||||
StreamController<Map<String, dynamic>>.broadcast();
|
||||
final List<String> recordedMethods = <String>[];
|
||||
|
||||
static Future<_AcpFakeServer> start() async {
|
||||
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
|
||||
final fake = _AcpFakeServer._(server);
|
||||
unawaited(fake._listen());
|
||||
return fake;
|
||||
@override
|
||||
Stream<Map<String, dynamic>> get notifications => _notificationsController.stream;
|
||||
|
||||
void emit(Map<String, dynamic> notification) {
|
||||
_notificationsController.add(notification);
|
||||
}
|
||||
|
||||
Future<void> close() async {
|
||||
await _server.close(force: true);
|
||||
@override
|
||||
Future<Map<String, dynamic>> request({
|
||||
required String method,
|
||||
required Map<String, dynamic> params,
|
||||
Duration timeout = const Duration(seconds: 120),
|
||||
}) async {
|
||||
recordedMethods.add(method);
|
||||
return handler(method, params);
|
||||
}
|
||||
|
||||
Future<void> _listen() async {
|
||||
await for (final request in _server) {
|
||||
if (request.uri.path == '/acp/rpc' && request.method == 'POST') {
|
||||
lastHttpRequestPath = request.uri.path;
|
||||
await _handleHttpRpc(request);
|
||||
continue;
|
||||
}
|
||||
request.response.statusCode = HttpStatus.notFound;
|
||||
await request.response.close();
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _handleHttpRpc(HttpRequest request) async {
|
||||
final body = await utf8.decodeStream(request);
|
||||
final envelope = (jsonDecode(body) as Map).cast<String, dynamic>();
|
||||
final id = envelope['id'];
|
||||
final method = envelope['method']?.toString() ?? '';
|
||||
final params =
|
||||
(envelope['params'] as Map?)?.cast<String, dynamic>() ??
|
||||
const <String, dynamic>{};
|
||||
rpcMethods.add(method);
|
||||
|
||||
request.response.headers.set(
|
||||
HttpHeaders.contentTypeHeader,
|
||||
'text/event-stream; charset=utf-8',
|
||||
);
|
||||
if (method == 'session.start' || method == 'session.message') {
|
||||
lastSessionMode = params['mode']?.toString();
|
||||
request.response.write(
|
||||
'data: ${jsonEncode(<String, dynamic>{'jsonrpc': '2.0', 'id': id, 'result': <String, dynamic>{'success': true, 'message': 'gateway-ok', 'summary': 'gateway-ok', 'turnId': 'turn-1'}})}\n\n',
|
||||
);
|
||||
await request.response.close();
|
||||
return;
|
||||
}
|
||||
request.response.write(
|
||||
'data: ${jsonEncode(<String, dynamic>{'jsonrpc': '2.0', 'id': id, 'result': <String, dynamic>{'singleAgent': true, 'multiAgent': true, 'providers': <String>['codex'], 'capabilities': <String, dynamic>{'single_agent': true, 'multi_agent': true, 'providers': <String>['codex']}}})}\n\n',
|
||||
);
|
||||
await request.response.close();
|
||||
@override
|
||||
Future<void> dispose() async {
|
||||
await _notificationsController.close();
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user