Batch 4: sink gateway runtime into go core

This commit is contained in:
Haitao Pan 2026-03-29 16:29:00 +08:00
parent b10c42d11e
commit 9542dc3768
12 changed files with 3290 additions and 479 deletions

View File

@ -0,0 +1,159 @@
package acp
import (
"strings"
"time"
"xworkmate/go_core/internal/gatewayruntime"
"xworkmate/go_core/internal/shared"
)
func handleGatewayConnect(
server *Server,
params map[string]any,
notify func(map[string]any),
) map[string]any {
request := gatewayruntime.ConnectRequest{
RuntimeID: strings.TrimSpace(shared.StringArg(params, "runtimeId", "")),
Mode: strings.TrimSpace(shared.StringArg(params, "mode", "unconfigured")),
ClientID: strings.TrimSpace(shared.StringArg(params, "clientId", "")),
Locale: strings.TrimSpace(shared.StringArg(params, "locale", "")),
UserAgent: strings.TrimSpace(shared.StringArg(params, "userAgent", "")),
ConnectAuthMode: strings.TrimSpace(shared.StringArg(params, "connectAuthMode", "")),
ConnectAuthFields: parseGatewayRuntimeStringSlice(params["connectAuthFields"]),
ConnectAuthSources: parseGatewayRuntimeStringSlice(params["connectAuthSources"]),
HasSharedAuth: parseBool(params["hasSharedAuth"]),
HasDeviceToken: parseBool(params["hasDeviceToken"]),
Endpoint: gatewayruntime.Endpoint{
Host: strings.TrimSpace(shared.StringArg(asMap(params["endpoint"]), "host", "")),
Port: parsePositiveInt(asMap(params["endpoint"])["port"]),
TLS: parseBool(asMap(params["endpoint"])["tls"]),
},
PackageInfo: gatewayruntime.PackageInfo{
AppName: strings.TrimSpace(shared.StringArg(asMap(params["packageInfo"]), "appName", "")),
PackageName: strings.TrimSpace(shared.StringArg(asMap(params["packageInfo"]), "packageName", "")),
Version: strings.TrimSpace(shared.StringArg(asMap(params["packageInfo"]), "version", "")),
BuildNumber: strings.TrimSpace(shared.StringArg(asMap(params["packageInfo"]), "buildNumber", "")),
},
DeviceInfo: gatewayruntime.DeviceInfo{
Platform: strings.TrimSpace(shared.StringArg(asMap(params["deviceInfo"]), "platform", "")),
PlatformVersion: strings.TrimSpace(shared.StringArg(asMap(params["deviceInfo"]), "platformVersion", "")),
DeviceFamily: strings.TrimSpace(shared.StringArg(asMap(params["deviceInfo"]), "deviceFamily", "")),
ModelIdentifier: strings.TrimSpace(shared.StringArg(asMap(params["deviceInfo"]), "modelIdentifier", "")),
},
Identity: gatewayruntime.DeviceIdentity{
DeviceID: strings.TrimSpace(shared.StringArg(asMap(params["identity"]), "deviceId", "")),
PublicKeyBase64URL: strings.TrimSpace(shared.StringArg(asMap(params["identity"]), "publicKeyBase64Url", "")),
PrivateKeyBase64URL: strings.TrimSpace(shared.StringArg(asMap(params["identity"]), "privateKeyBase64Url", "")),
},
Auth: gatewayruntime.AuthConfig{
Token: strings.TrimSpace(shared.StringArg(asMap(params["auth"]), "token", "")),
DeviceToken: strings.TrimSpace(shared.StringArg(asMap(params["auth"]), "deviceToken", "")),
Password: strings.TrimSpace(shared.StringArg(asMap(params["auth"]), "password", "")),
},
}
result := server.gateway.Connect(request, notify)
return map[string]any{
"ok": result.OK,
"snapshot": result.Snapshot,
"auth": result.Auth,
"returnedDeviceToken": result.ReturnedDeviceToken,
"error": result.Error,
}
}
func handleGatewayRequest(
server *Server,
params map[string]any,
notify func(map[string]any),
) map[string]any {
timeout := time.Duration(parsePositiveInt(params["timeoutMs"])) * time.Millisecond
result := server.gateway.Request(
strings.TrimSpace(shared.StringArg(params, "runtimeId", "")),
strings.TrimSpace(shared.StringArg(params, "method", "")),
asMap(params["params"]),
timeout,
notify,
)
return map[string]any{
"ok": result.OK,
"payload": result.Payload,
"error": result.Error,
}
}
func handleGatewayDisconnect(
server *Server,
params map[string]any,
notify func(map[string]any),
) map[string]any {
server.gateway.Disconnect(
strings.TrimSpace(shared.StringArg(params, "runtimeId", "")),
notify,
)
return map[string]any{"accepted": true}
}
func asMap(value any) map[string]any {
if typed, ok := value.(map[string]any); ok {
return typed
}
if typed, ok := value.(map[string]interface{}); ok {
return typed
}
return map[string]any{}
}
func parseGatewayRuntimeStringSlice(value any) []string {
list, ok := value.([]any)
if !ok {
if typed, ok := value.([]string); ok {
return append([]string(nil), typed...)
}
return nil
}
result := make([]string, 0, len(list))
for _, item := range list {
text := strings.TrimSpace(shared.StringArg(map[string]any{"value": item}, "value", ""))
if text == "" {
continue
}
result = append(result, text)
}
return result
}
func parseBool(value any) bool {
switch typed := value.(type) {
case bool:
return typed
case string:
return shared.BoolArg(typed, false)
case float64:
return typed != 0
case int:
return typed != 0
default:
return false
}
}
func parsePositiveInt(value any) int {
switch typed := value.(type) {
case int:
if typed > 0 {
return typed
}
case int64:
if typed > 0 {
return int(typed)
}
case float64:
if typed > 0 {
return int(typed)
}
case string:
return shared.IntArg(typed, 0)
}
return 0
}

View File

@ -15,6 +15,7 @@ import (
"github.com/gorilla/websocket"
"xworkmate/go_core/internal/dispatch"
"xworkmate/go_core/internal/gatewayruntime"
"xworkmate/go_core/internal/mounts"
"xworkmate/go_core/internal/shared"
)
@ -45,6 +46,7 @@ type Server struct {
mu sync.Mutex
sessions map[string]*session
queues map[string]chan task
gateway *gatewayruntime.Manager
}
var wsUpgrader = websocket.Upgrader{
@ -88,6 +90,7 @@ func NewServer() *Server {
return &Server{
sessions: make(map[string]*session),
queues: make(map[string]chan task),
gateway: gatewayruntime.NewManager(),
}
}
@ -272,6 +275,12 @@ func (s *Server) handleRequest(
return handleDispatchResolve(request.Params), nil
case "xworkmate.mounts.reconcile":
return handleMountReconcile(request.Params), nil
case "xworkmate.gateway.connect":
return handleGatewayConnect(s, request.Params, notify), nil
case "xworkmate.gateway.request":
return handleGatewayRequest(s, request.Params, notify), nil
case "xworkmate.gateway.disconnect":
return handleGatewayDisconnect(s, request.Params, notify), nil
default:
return nil, &shared.RPCError{
Code: -32601,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,272 @@
package gatewayruntime
import (
"encoding/json"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/gorilla/websocket"
)
func TestManagerConnectAndRequest(t *testing.T) {
server := newFakeGatewayServer(t)
defer server.Close()
manager := NewManager()
manager.ReconnectDelay = 20 * time.Millisecond
notifications := make([]map[string]any, 0, 8)
var mu sync.Mutex
notify := func(message map[string]any) {
mu.Lock()
defer mu.Unlock()
notifications = append(notifications, message)
}
result := manager.Connect(buildTestConnectRequest(server.Port()), notify)
if !result.OK {
t.Fatalf("expected connect success, got %#v", result.Error)
}
if result.ReturnedDeviceToken != "device-token-1" {
t.Fatalf("expected returned device token, got %#v", result.ReturnedDeviceToken)
}
requestResult := manager.Request(
"runtime-1",
"health",
map[string]any{},
2*time.Second,
notify,
)
if !requestResult.OK {
t.Fatalf("expected health success, got %#v", requestResult.Error)
}
payload, ok := requestResult.Payload.(map[string]any)
if !ok || payload["status"] != "ok" {
t.Fatalf("unexpected health payload %#v", requestResult.Payload)
}
mu.Lock()
defer mu.Unlock()
if len(notifications) == 0 {
t.Fatalf("expected notifications during connect")
}
}
func TestManagerReconnectsAfterSocketClose(t *testing.T) {
server := newFakeGatewayServer(t)
server.closeAfterConnect.Store(true)
defer server.Close()
manager := NewManager()
manager.ReconnectDelay = 25 * time.Millisecond
reconnected := make(chan struct{}, 1)
notify := func(message map[string]any) {
params := asMap(message["params"])
if strings.TrimSpace(stringValue(message["method"])) != "xworkmate.gateway.snapshot" {
return
}
snapshot := asMap(params["snapshot"])
if snapshot["status"] == "connected" && server.ConnectCount() >= 2 {
select {
case reconnected <- struct{}{}:
default:
}
}
}
result := manager.Connect(buildTestConnectRequest(server.Port()), notify)
if !result.OK {
t.Fatalf("expected connect success, got %#v", result.Error)
}
select {
case <-reconnected:
case <-time.After(3 * time.Second):
t.Fatalf("expected reconnect to complete; connect count=%d", server.ConnectCount())
}
}
func TestManagerSuppressesReconnectForPairingRequired(t *testing.T) {
server := newFakeGatewayServer(t)
server.connectErrorCode = "NOT_PAIRED"
server.connectErrorDetailCode = "PAIRING_REQUIRED"
defer server.Close()
manager := NewManager()
manager.ReconnectDelay = 20 * time.Millisecond
result := manager.Connect(buildTestConnectRequest(server.Port()), func(map[string]any) {})
if result.OK {
t.Fatalf("expected connect failure")
}
time.Sleep(120 * time.Millisecond)
if server.ConnectCount() != 1 {
t.Fatalf("expected reconnect suppression, got %d connect attempts", server.ConnectCount())
}
}
type fakeGatewayServer struct {
server *http.Server
listener net.Listener
connectCount atomic.Int32
closeAfterConnect atomic.Bool
connectErrorCode string
connectErrorDetailCode string
}
func newFakeGatewayServer(t *testing.T) *fakeGatewayServer {
t.Helper()
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
fake := &fakeGatewayServer{listener: listener}
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
_ = conn.WriteJSON(map[string]any{
"type": "event",
"event": "connect.challenge",
"payload": map[string]any{
"nonce": "nonce-1",
},
})
for {
_, payload, err := conn.ReadMessage()
if err != nil {
return
}
var frame map[string]any
if err := json.Unmarshal(payload, &frame); err != nil {
continue
}
if frame["type"] != "req" {
continue
}
id := frame["id"]
method := stringValue(frame["method"])
switch method {
case "connect":
fake.connectCount.Add(1)
if fake.connectErrorCode != "" {
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": false,
"error": map[string]any{
"code": fake.connectErrorCode,
"message": "connect failed",
"details": map[string]any{
"code": fake.connectErrorDetailCode,
},
},
})
continue
}
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"server": map[string]any{"host": "127.0.0.1"},
"snapshot": map[string]any{
"sessionDefaults": map[string]any{"mainSessionKey": "main"},
},
"auth": map[string]any{
"role": "operator",
"scopes": defaultOperatorScopes,
"deviceToken": "device-token-1",
},
},
})
if fake.closeAfterConnect.Load() && fake.connectCount.Load() == 1 {
go func() {
time.Sleep(20 * time.Millisecond)
_ = conn.Close()
}()
}
case "health":
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{
"status": "ok",
},
})
default:
_ = conn.WriteJSON(map[string]any{
"type": "res",
"id": id,
"ok": true,
"payload": map[string]any{},
})
}
}
})
fake.server = &http.Server{Handler: mux}
go func() {
_ = fake.server.Serve(listener)
}()
return fake
}
func (f *fakeGatewayServer) Port() int {
return f.listener.Addr().(*net.TCPAddr).Port
}
func (f *fakeGatewayServer) ConnectCount() int {
return int(f.connectCount.Load())
}
func (f *fakeGatewayServer) Close() {
_ = f.server.Close()
}
func buildTestConnectRequest(port int) ConnectRequest {
return ConnectRequest{
RuntimeID: "runtime-1",
Mode: "remote",
ClientID: "openclaw-macos",
Locale: "en_US",
UserAgent: "XWorkmate/1.0.0",
Endpoint: Endpoint{
Host: "127.0.0.1",
Port: port,
TLS: false,
},
ConnectAuthMode: "shared-token",
ConnectAuthFields: []string{"token"},
ConnectAuthSources: []string{"shared:form"},
HasSharedAuth: true,
HasDeviceToken: false,
PackageInfo: PackageInfo{
AppName: "XWorkmate",
Version: "1.0.0",
},
DeviceInfo: DeviceInfo{
Platform: "macos",
PlatformVersion: "14.0",
DeviceFamily: "Mac",
ModelIdentifier: "Mac14,5",
},
Identity: DeviceIdentity{
DeviceID: "device-1",
PublicKeyBase64URL: "test-public-key-value",
PrivateKeyBase64URL: "test-private-key-value",
},
Auth: AuthConfig{
Token: "shared-token",
},
}
}

View File

@ -0,0 +1,129 @@
package gatewayruntime
import "time"
const (
defaultProtocolVersion = 3
defaultReconnectDelay = 2 * time.Second
defaultConnectTimeout = 10 * time.Second
defaultChallengeWait = 2 * time.Second
defaultRequestTimeout = 15 * time.Second
)
var defaultOperatorScopes = []string{
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
}
type Endpoint struct {
Host string
Port int
TLS bool
}
type PackageInfo struct {
AppName string
PackageName string
Version string
BuildNumber string
}
type DeviceInfo struct {
Platform string
PlatformVersion string
DeviceFamily string
ModelIdentifier string
}
func (d DeviceInfo) PlatformLabel() string {
if d.PlatformVersion == "" {
return d.Platform
}
return d.Platform + " " + d.PlatformVersion
}
type DeviceIdentity struct {
DeviceID string
PublicKeyBase64URL string
PrivateKeyBase64URL string
}
type AuthConfig struct {
Token string
DeviceToken string
Password string
}
type ConnectRequest struct {
RuntimeID string
Mode string
ClientID string
Locale string
UserAgent string
Endpoint Endpoint
ConnectAuthMode string
ConnectAuthFields []string
ConnectAuthSources []string
HasSharedAuth bool
HasDeviceToken bool
PackageInfo PackageInfo
DeviceInfo DeviceInfo
Identity DeviceIdentity
Auth AuthConfig
}
type ConnectResult struct {
OK bool
Snapshot map[string]any
Auth map[string]any
ReturnedDeviceToken string
Error map[string]any
}
type RequestResult struct {
OK bool
Payload any
Error map[string]any
}
type GatewayError struct {
Message string
Code string
Details map[string]any
}
func (e *GatewayError) Error() string {
if e == nil {
return ""
}
return e.Message
}
func (e *GatewayError) DetailCode() string {
if e == nil || e.Details == nil {
return ""
}
if value, ok := e.Details["code"].(string); ok {
return value
}
return ""
}
func (e *GatewayError) Map() map[string]any {
if e == nil {
return map[string]any{}
}
payload := map[string]any{
"message": e.Message,
}
if e.Code != "" {
payload["code"] = e.Code
}
if len(e.Details) > 0 {
payload["details"] = e.Details
}
return payload
}

View File

@ -30,6 +30,7 @@ import '../runtime/assistant_artifacts.dart';
import '../runtime/desktop_thread_artifact_service.dart';
import '../runtime/go_agent_core_client.dart';
import '../runtime/go_agent_core_desktop_transport.dart';
import '../runtime/go_gateway_runtime_desktop_client.dart';
import '../runtime/go_multi_agent_mount_desktop_client.dart';
import '../runtime/go_runtime_dispatch_desktop_client.dart';
import '../runtime/mode_switcher.dart';
@ -139,6 +140,7 @@ class AppController extends ChangeNotifier {
gateway: GatewayRuntime(
store: storeInternal,
identityStore: DeviceIdentityStore(storeInternal),
sessionClient: GoGatewayRuntimeDesktopClient(),
),
codex: CodexRuntime(),
configBridge: CodexConfigBridge(),

View File

@ -28,6 +28,7 @@ import '../runtime/codex_config_bridge.dart';
import '../runtime/code_agent_node_orchestrator.dart';
import '../runtime/assistant_artifacts.dart';
import '../runtime/desktop_thread_artifact_service.dart';
import '../runtime/go_gateway_runtime_desktop_client.dart';
import '../runtime/mode_switcher.dart';
import '../runtime/agent_registry.dart';
import '../runtime/multi_agent_orchestrator.dart';
@ -300,6 +301,7 @@ extension AppControllerDesktopSettingsRuntime on AppController {
final runtime = GatewayRuntime(
store: temporaryStore,
identityStore: DeviceIdentityStore(temporaryStore),
sessionClient: GoGatewayRuntimeDesktopClient(),
);
await runtime.initialize();
try {
@ -438,8 +440,7 @@ extension AppControllerDesktopSettingsRuntime on AppController {
resolvedUserHomeDirectoryInternal =
await skillDirectoryAccessServiceInternal.resolveUserHomeDirectory();
await settingsControllerInternal.initialize();
final storedAssistantThreads = await storeInternal
.loadTaskThreads();
final storedAssistantThreads = await storeInternal.loadTaskThreads();
if (disposedInternal) {
return;
}

View File

@ -0,0 +1,546 @@
part of 'gateway_runtime_core.dart';
extension GatewayRuntimeApiInternal on GatewayRuntime {
Future<Map<String, dynamic>> _healthInternal() async {
final payload = asMap(await request('health'));
snapshotInternal = snapshotInternal.copyWith(healthPayload: payload);
appendLogInternal(this, 'debug', 'health', 'health snapshot refreshed');
_notifyRuntimeChangedInternal();
return payload;
}
Future<Map<String, dynamic>> _statusInternal() async {
final payload = asMap(await request('status'));
snapshotInternal = snapshotInternal.copyWith(statusPayload: payload);
appendLogInternal(this, 'debug', 'health', 'status snapshot refreshed');
_notifyRuntimeChangedInternal();
return payload;
}
Future<List<GatewayAgentSummary>> _listAgentsInternal() async {
final payload = asMap(
await request('agents.list', params: const <String, dynamic>{}),
);
final agents = asList(payload['agents'])
.map((item) {
final map = asMap(item);
final identity = asMap(map['identity']);
return GatewayAgentSummary(
id: stringValue(map['id']) ?? 'unknown',
name:
stringValue(map['name']) ??
stringValue(identity['name']) ??
'Agent',
emoji: stringValue(identity['emoji']) ?? '·',
theme: stringValue(identity['theme']) ?? 'default',
);
})
.toList(growable: false);
if (snapshotInternal.mainSessionKey == null ||
snapshotInternal.mainSessionKey!.trim().isEmpty) {
snapshotInternal = snapshotInternal.copyWith(
mainSessionKey: stringValue(payload['mainKey']) ?? 'main',
);
_notifyRuntimeChangedInternal();
}
return agents;
}
Future<List<GatewaySessionSummary>> _listSessionsInternal({
String? agentId,
int limit = 24,
}) async {
final payload = asMap(
await request(
'sessions.list',
params: <String, dynamic>{
'includeGlobal': true,
'includeUnknown': false,
'includeDerivedTitles': true,
'includeLastMessage': true,
'limit': limit,
if (agentId != null && agentId.trim().isNotEmpty)
'agentId': agentId.trim(),
},
),
);
return asList(payload['sessions'])
.map((item) {
final map = asMap(item);
return GatewaySessionSummary(
key: stringValue(map['key']) ?? 'main',
kind: stringValue(map['kind']),
displayName:
stringValue(map['displayName']) ?? stringValue(map['label']),
surface: stringValue(map['surface']),
subject: stringValue(map['subject']),
room: stringValue(map['room']),
space: stringValue(map['space']),
updatedAtMs: doubleValue(map['updatedAt']),
sessionId: stringValue(map['sessionId']),
systemSent: boolValue(map['systemSent']),
abortedLastRun: boolValue(map['abortedLastRun']),
thinkingLevel: stringValue(map['thinkingLevel']),
verboseLevel: stringValue(map['verboseLevel']),
inputTokens: intValue(map['inputTokens']),
outputTokens: intValue(map['outputTokens']),
totalTokens: intValue(map['totalTokens']),
model: stringValue(map['model']),
contextTokens: intValue(map['contextTokens']),
derivedTitle: stringValue(map['derivedTitle']),
lastMessagePreview: stringValue(map['lastMessagePreview']),
);
})
.toList(growable: false);
}
Future<List<GatewayChatMessage>> _loadHistoryInternal(
String sessionKey, {
int limit = 120,
}) async {
final payload = asMap(
await request(
'chat.history',
params: <String, dynamic>{'sessionKey': sessionKey, 'limit': limit},
),
);
return asList(payload['messages'])
.map((item) {
final map = asMap(item);
return GatewayChatMessage(
id: randomIdInternal(),
role: stringValue(map['role']) ?? 'assistant',
text: extractMessageText(map),
timestampMs: doubleValue(map['timestamp']),
toolCallId:
stringValue(map['toolCallId']) ??
stringValue(map['tool_call_id']),
toolName:
stringValue(map['toolName']) ?? stringValue(map['tool_name']),
stopReason: stringValue(map['stopReason']),
pending: false,
error: false,
);
})
.toList(growable: false);
}
Future<String> _sendChatInternal({
required String sessionKey,
required String message,
required String thinking,
List<GatewayChatAttachmentPayload> attachments =
const <GatewayChatAttachmentPayload>[],
String? agentId,
Map<String, dynamic>? metadata,
}) async {
final runId = randomIdInternal();
final payload = asMap(
await request(
'chat.send',
params: <String, dynamic>{
'sessionKey': sessionKey,
'message': message,
'thinking': thinking,
'timeoutMs': 30000,
'idempotencyKey': runId,
if (agentId != null && agentId.trim().isNotEmpty)
'agentId': agentId.trim(),
if (metadata != null && metadata.isNotEmpty) 'metadata': metadata,
if (attachments.isNotEmpty)
'attachments': attachments
.map((attachment) => attachment.toJson())
.toList(growable: false),
},
timeout: const Duration(seconds: 35),
),
);
return stringValue(payload['runId']) ?? runId;
}
Future<void> _abortChatInternal({
required String sessionKey,
required String runId,
}) async {
await request(
'chat.abort',
params: <String, dynamic>{'sessionKey': sessionKey, 'runId': runId},
timeout: const Duration(seconds: 10),
);
}
Future<List<GatewayInstanceSummary>> _listInstancesInternal() async {
final payload = await request(
'system-presence',
params: const <String, dynamic>{},
);
return asList(payload)
.map((item) {
final map = asMap(item);
return GatewayInstanceSummary(
id: stringValue(map['id']) ?? randomIdInternal(),
host: stringValue(map['host']),
ip: stringValue(map['ip']),
version: stringValue(map['version']),
platform: stringValue(map['platform']),
deviceFamily: stringValue(map['deviceFamily']),
modelIdentifier: stringValue(map['modelIdentifier']),
lastInputSeconds: intValue(map['lastInputSeconds']),
mode: stringValue(map['mode']),
reason: stringValue(map['reason']),
text: stringValue(map['text']) ?? '',
timestampMs:
doubleValue(map['ts']) ??
DateTime.now().millisecondsSinceEpoch.toDouble(),
);
})
.toList(growable: false);
}
Future<List<GatewaySkillSummary>> _listSkillsInternal({
String? agentId,
}) async {
final payload = asMap(
await request(
'skills.status',
params: <String, dynamic>{
if (agentId != null && agentId.trim().isNotEmpty)
'agentId': agentId.trim(),
},
),
);
return asList(payload['skills'])
.map((item) {
final map = asMap(item);
return GatewaySkillSummary(
name: stringValue(map['name']) ?? 'Skill',
description: stringValue(map['description']) ?? '',
source: stringValue(map['source']) ?? 'workspace',
skillKey:
stringValue(map['skillKey']) ??
stringValue(map['name']) ??
'skill',
primaryEnv: stringValue(map['primaryEnv']),
eligible: boolValue(map['eligible']) ?? false,
disabled: boolValue(map['disabled']) ?? false,
missingBins: stringList(asMap(map['missing'])['bins']),
missingEnv: stringList(asMap(map['missing'])['env']),
missingConfig: stringList(asMap(map['missing'])['config']),
);
})
.toList(growable: false);
}
Future<List<GatewayConnectorSummary>> _listConnectorsInternal() async {
final payload = asMap(
await request(
'channels.status',
params: const <String, dynamic>{'probe': true, 'timeoutMs': 8000},
timeout: const Duration(seconds: 16),
),
);
final channelMeta = <String, Map<String, dynamic>>{
for (final entry in asList(payload['channelMeta']))
if (stringValue(asMap(entry)['id']) != null)
stringValue(asMap(entry)['id'])!: asMap(entry),
};
final labels = asMap(payload['channelLabels']);
final detailLabels = asMap(payload['channelDetailLabels']);
final accounts = asMap(payload['channelAccounts']);
final order = stringList(payload['channelOrder']);
final summaries = <GatewayConnectorSummary>[];
for (final channelId in order) {
final channelAccounts = asList(accounts[channelId]);
if (channelAccounts.isEmpty) {
final meta = channelMeta[channelId] ?? const <String, dynamic>{};
summaries.add(
GatewayConnectorSummary(
id: channelId,
label:
stringValue(meta['label']) ??
stringValue(labels[channelId]) ??
channelId,
detailLabel:
stringValue(meta['detailLabel']) ??
stringValue(detailLabels[channelId]) ??
channelId,
accountName: null,
configured: false,
enabled: false,
running: false,
connected: false,
status: 'idle',
lastError: null,
meta: const <String>[],
),
);
continue;
}
for (final account in channelAccounts) {
final map = asMap(account);
final configured = boolValue(map['configured']) ?? false;
final enabled = boolValue(map['enabled']) ?? configured;
final running = boolValue(map['running']) ?? false;
final connected =
boolValue(map['connected']) ?? boolValue(map['linked']) ?? false;
final lastError = stringValue(map['lastError']);
final status = lastError != null && lastError.trim().isNotEmpty
? 'error'
: connected
? 'connected'
: running
? 'running'
: configured
? 'configured'
: 'idle';
final mode = stringValue(map['mode']);
final tokenSource = stringValue(map['tokenSource']);
final baseUrl = stringValue(map['baseUrl']);
summaries.add(
GatewayConnectorSummary(
id: channelId,
label:
stringValue(channelMeta[channelId]?['label']) ??
stringValue(labels[channelId]) ??
channelId,
detailLabel:
stringValue(channelMeta[channelId]?['detailLabel']) ??
stringValue(detailLabels[channelId]) ??
channelId,
accountName:
stringValue(map['name']) ?? stringValue(map['accountId']),
configured: configured,
enabled: enabled,
running: running,
connected: connected,
status: status,
lastError: lastError,
meta: [
...?(mode == null ? null : <String>[mode]),
...?(tokenSource == null ? null : <String>[tokenSource]),
...?(baseUrl == null ? null : <String>[baseUrl]),
],
),
);
}
}
return summaries;
}
Future<List<GatewayModelSummary>> _listModelsInternal() async {
final payload = asMap(
await request(
'models.list',
params: const <String, dynamic>{},
timeout: const Duration(seconds: 16),
),
);
return asList(payload['models'])
.map((item) {
final map = asMap(item);
return GatewayModelSummary(
id: stringValue(map['id']) ?? 'unknown',
name:
stringValue(map['name']) ?? stringValue(map['id']) ?? 'unknown',
provider: stringValue(map['provider']) ?? 'unknown',
contextWindow: intValue(map['contextWindow']),
maxOutputTokens: intValue(map['maxOutputTokens']),
);
})
.toList(growable: false);
}
Future<List<GatewayCronJobSummary>> _listCronJobsInternal() async {
final payload = asMap(
await request(
'cron.list',
params: const <String, dynamic>{'includeDisabled': true},
timeout: const Duration(seconds: 16),
),
);
return asList(payload['jobs'])
.map((item) {
final map = asMap(item);
final state = asMap(map['state']);
return GatewayCronJobSummary(
id: stringValue(map['id']) ?? randomIdInternal(),
name: stringValue(map['name']) ?? 'Untitled job',
description: stringValue(map['description']),
enabled: boolValue(map['enabled']) ?? true,
agentId: stringValue(map['agentId']),
scheduleLabel: cronScheduleLabelInternal(asMap(map['schedule'])),
nextRunAtMs: intValue(state['nextRunAtMs']),
lastRunAtMs: intValue(state['lastRunAtMs']),
lastStatus: stringValue(state['lastStatus']),
lastError: stringValue(state['lastError']),
);
})
.toList(growable: false);
}
Future<GatewayDevicePairingList> _listDevicePairingInternal() async {
final payload = asMap(
await request(
'device.pair.list',
params: const <String, dynamic>{},
timeout: const Duration(seconds: 12),
),
);
final identity = await storeInternal.loadDeviceIdentity();
return GatewayDevicePairingList(
pending: asList(payload['pending'])
.map((item) => parsePendingDeviceInternal(asMap(item)))
.toList(growable: false),
paired: asList(payload['paired'])
.map(
(item) => parsePairedDeviceInternal(
asMap(item),
currentDeviceId: identity?.deviceId,
),
)
.toList(growable: false),
);
}
Future<GatewayPairedDevice?> _approveDevicePairingInternal(
String requestId,
) async {
appendLogInternal(this, 'info', 'pairing', 'approve request $requestId');
final payload = asMap(
await request(
'device.pair.approve',
params: <String, dynamic>{'requestId': requestId},
timeout: const Duration(seconds: 12),
),
);
final identity = await storeInternal.loadDeviceIdentity();
final device = asMap(payload['device']);
if (device.isEmpty) {
return null;
}
return parsePairedDeviceInternal(
device,
currentDeviceId: identity?.deviceId,
);
}
Future<void> _rejectDevicePairingInternal(String requestId) async {
appendLogInternal(this, 'info', 'pairing', 'reject request $requestId');
await request(
'device.pair.reject',
params: <String, dynamic>{'requestId': requestId},
timeout: const Duration(seconds: 12),
);
}
Future<void> _removePairedDeviceInternal(String deviceId) async {
appendLogInternal(this, 'info', 'pairing', 'remove device $deviceId');
await request(
'device.pair.remove',
params: <String, dynamic>{'deviceId': deviceId},
timeout: const Duration(seconds: 12),
);
}
Future<String> _rotateDeviceTokenInternal({
required String deviceId,
required String role,
List<String> scopes = const <String>[],
}) async {
appendLogInternal(
this,
'info',
'token',
'rotate role token | device: $deviceId | role: $role',
);
final payload = asMap(
await request(
'device.token.rotate',
params: <String, dynamic>{
'deviceId': deviceId,
'role': role,
if (scopes.isNotEmpty) 'scopes': scopes,
},
timeout: const Duration(seconds: 12),
),
);
final token = stringValue(payload['token']) ?? '';
final identity = await storeInternal.loadDeviceIdentity();
final resolvedRole = stringValue(payload['role']) ?? role;
if (token.isNotEmpty &&
identity != null &&
(stringValue(payload['deviceId']) ?? deviceId) == identity.deviceId) {
await storeInternal.saveDeviceToken(
deviceId: identity.deviceId,
role: resolvedRole,
token: token,
);
}
return token;
}
Future<void> _revokeDeviceTokenInternal({
required String deviceId,
required String role,
}) async {
appendLogInternal(
this,
'info',
'token',
'revoke role token | device: $deviceId | role: $role',
);
await request(
'device.token.revoke',
params: <String, dynamic>{'deviceId': deviceId, 'role': role},
timeout: const Duration(seconds: 12),
);
final identity = await storeInternal.loadDeviceIdentity();
if (identity != null && deviceId == identity.deviceId) {
await storeInternal.clearDeviceToken(
deviceId: identity.deviceId,
role: role,
);
}
}
Future<dynamic> _requestInternal(
String method, {
Map<String, dynamic>? params,
Duration timeout = const Duration(seconds: 15),
}) async {
if (sessionClientInternal != null) {
if (!isConnected) {
appendLogInternal(
this,
'warn',
'rpc',
'blocked request $method | offline',
);
throw GatewayRuntimeException('gateway not connected', code: 'OFFLINE');
}
return sessionClientInternal!.request(
runtimeId: runtimeIdInternal,
method: method,
params: params,
timeout: timeout,
);
}
if (channelInternal == null || !isConnected) {
appendLogInternal(
this,
'warn',
'rpc',
'blocked request $method | offline',
);
throw GatewayRuntimeException('gateway not connected', code: 'OFFLINE');
}
final result = await requestRawInternal(
this,
method,
params: params,
timeout: timeout,
);
return result.payload;
}
}

View File

@ -10,6 +10,7 @@ import 'package:package_info_plus/package_info_plus.dart';
import 'package:web_socket_channel/io.dart';
import '../app/app_metadata.dart';
import 'device_identity_store.dart';
import 'gateway_runtime_session_client.dart';
import 'platform_environment.dart';
import 'runtime_models.dart';
import 'secure_config_store.dart';
@ -18,15 +19,25 @@ import 'gateway_runtime_events.dart';
import 'gateway_runtime_errors.dart';
import 'gateway_runtime_helpers.dart';
part 'gateway_runtime_api.dart';
class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
GatewayRuntime({
required SecureConfigStore store,
required DeviceIdentityStore identityStore,
GatewayRuntimeSessionClient? sessionClient,
String runtimeId = '',
}) : storeInternal = store,
identityStoreInternal = identityStore;
identityStoreInternal = identityStore,
sessionClientInternal = sessionClient,
runtimeIdInternal = runtimeId.trim().isNotEmpty
? runtimeId.trim()
: randomIdInternal();
final SecureConfigStore storeInternal;
final DeviceIdentityStore identityStoreInternal;
final GatewayRuntimeSessionClient? sessionClientInternal;
final String runtimeIdInternal;
final StreamController<GatewayPushEvent> eventsInternal =
StreamController<GatewayPushEvent>.broadcast();
final Map<String, Completer<RpcResponseInternal>> pendingInternal =
@ -35,6 +46,7 @@ class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
IOWebSocketChannel? channelInternal;
StreamSubscription<dynamic>? socketSubscriptionInternal;
StreamSubscription<GatewayRuntimeSessionUpdate>? sessionUpdatesInternal;
Timer? reconnectTimerInternal;
GatewayConnectionProfile? desiredProfileInternal;
bool manualDisconnectInternal = false;
@ -85,6 +97,9 @@ class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
}
Future<void> initialize() async {
sessionUpdatesInternal ??= sessionClientInternal?.updates.listen(
_handleSessionUpdateInternal,
);
await storeInternal.initialize();
packageInfoInternal = await loadPackageInfoInternal();
deviceInfoInternal = await loadDeviceInfoInternal();
@ -231,6 +246,98 @@ class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
);
notifyListeners();
final sessionClient = sessionClientInternal;
if (sessionClient != null) {
try {
final connectResult = await sessionClient.connect(
GatewayRuntimeSessionConnectRequest(
runtimeId: runtimeIdInternal,
mode: profile.mode,
clientId: resolveClientIdInternal(),
locale: Platform.localeName,
userAgent: '$kSystemAppName/${packageInfoInternal.version}',
host: endpoint.$1,
port: endpoint.$2,
tls: endpoint.$3,
connectAuthMode: connectAuthMode,
connectAuthFields: connectAuthFields,
connectAuthSources: connectAuthSources,
hasSharedAuth: sharedToken.isNotEmpty || password.isNotEmpty,
hasDeviceToken: deviceToken.isNotEmpty,
packageInfo: packageInfoInternal,
deviceInfo: deviceInfoInternal,
identity: identity,
authToken: sharedToken,
authDeviceToken: deviceToken,
authPassword: password,
),
);
if (connectResult.returnedDeviceToken.trim().isNotEmpty) {
await storeInternal.saveDeviceToken(
deviceId: identity.deviceId,
role:
connectResult.auth['role']?.toString().trim().isNotEmpty == true
? connectResult.auth['role'].toString().trim()
: 'operator',
token: connectResult.returnedDeviceToken.trim(),
);
appendLogInternal(
this,
'info',
'auth',
'stored device token for role ${connectResult.auth['role']?.toString().trim().isNotEmpty == true ? connectResult.auth['role'].toString().trim() : 'operator'}',
);
}
snapshotInternal = connectResult.snapshot;
notifyListeners();
return;
} on GatewayRuntimeException catch (error) {
if (_shouldFallbackToDirectRuntimeInternal(error)) {
appendLogInternal(
this,
'warn',
'connect',
'go-core runtime unavailable, falling back to direct websocket | code: ${error.code ?? 'unknown'}',
);
} else {
if (error.detailCode == 'AUTH_DEVICE_TOKEN_MISMATCH' &&
deviceToken.isNotEmpty &&
sharedToken.isEmpty) {
await storeInternal.clearDeviceToken(
deviceId: identity.deviceId,
role: 'operator',
);
} else if (usedStoredDeviceTokenOnly &&
isPairingRequiredErrorInternal(error.code, error.detailCode)) {
await storeInternal.clearDeviceToken(
deviceId: identity.deviceId,
role: 'operator',
);
appendLogInternal(
this,
'warn',
'auth',
'cleared stale device token after pairing-required response',
);
}
snapshotInternal = snapshotInternal.copyWith(
status: RuntimeConnectionStatus.error,
statusText: 'Connection failed',
lastError: error.toString(),
lastErrorCode: error.code,
lastErrorDetailCode: error.detailCode,
connectAuthMode: connectAuthMode,
connectAuthFields: connectAuthFields,
connectAuthSources: connectAuthSources,
hasSharedAuth: sharedToken.isNotEmpty || password.isNotEmpty,
hasDeviceToken: deviceToken.isNotEmpty,
);
notifyListeners();
rethrow;
}
}
}
try {
final scheme = endpoint.$3 ? 'wss' : 'ws';
channelInternal = IOWebSocketChannel.connect(
@ -395,6 +502,22 @@ class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
desiredProfileInternal = null;
}
reconnectTimerInternal?.cancel();
if (sessionClientInternal != null) {
await sessionClientInternal!.disconnect(runtimeId: runtimeIdInternal);
snapshotInternal =
GatewayConnectionSnapshot.initial(
mode: snapshotInternal.mode,
).copyWith(
statusText: 'Offline',
deviceId: snapshotInternal.deviceId,
authRole: snapshotInternal.authRole,
authScopes: snapshotInternal.authScopes,
hasSharedAuth: snapshotInternal.hasSharedAuth,
hasDeviceToken: snapshotInternal.hasDeviceToken,
);
notifyListeners();
return;
}
await closeSocketInternal(this);
snapshotInternal =
GatewayConnectionSnapshot.initial(mode: snapshotInternal.mode).copyWith(
@ -408,129 +531,34 @@ class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
notifyListeners();
}
Future<Map<String, dynamic>> health() async {
final payload = asMap(await request('health'));
snapshotInternal = snapshotInternal.copyWith(healthPayload: payload);
appendLogInternal(this, 'debug', 'health', 'health snapshot refreshed');
notifyListeners();
return payload;
}
Future<Map<String, dynamic>> status() async {
final payload = asMap(await request('status'));
snapshotInternal = snapshotInternal.copyWith(statusPayload: payload);
appendLogInternal(this, 'debug', 'health', 'status snapshot refreshed');
notifyListeners();
return payload;
}
Future<List<GatewayAgentSummary>> listAgents() async {
final payload = asMap(
await request('agents.list', params: const <String, dynamic>{}),
);
final agents = asList(payload['agents'])
.map((item) {
final map = asMap(item);
final identity = asMap(map['identity']);
return GatewayAgentSummary(
id: stringValue(map['id']) ?? 'unknown',
name:
stringValue(map['name']) ??
stringValue(identity['name']) ??
'Agent',
emoji: stringValue(identity['emoji']) ?? '·',
theme: stringValue(identity['theme']) ?? 'default',
);
})
.toList(growable: false);
if (snapshotInternal.mainSessionKey == null ||
snapshotInternal.mainSessionKey!.trim().isEmpty) {
snapshotInternal = snapshotInternal.copyWith(
mainSessionKey: stringValue(payload['mainKey']) ?? 'main',
);
notifyListeners();
bool _shouldFallbackToDirectRuntimeInternal(GatewayRuntimeException error) {
switch (error.code) {
case 'GO_GATEWAY_RUNTIME_ENDPOINT_MISSING':
case 'GO_GATEWAY_RUNTIME_TRANSPORT_UNAVAILABLE':
case 'GO_GATEWAY_RUNTIME_WS_CONNECT_TIMEOUT':
case 'GO_GATEWAY_RUNTIME_WS_CLOSED':
case 'GO_GATEWAY_RUNTIME_WS_ERROR':
return true;
default:
return false;
}
return agents;
}
Future<Map<String, dynamic>> health() => _healthInternal();
Future<Map<String, dynamic>> status() => _statusInternal();
Future<List<GatewayAgentSummary>> listAgents() => _listAgentsInternal();
Future<List<GatewaySessionSummary>> listSessions({
String? agentId,
int limit = 24,
}) async {
final payload = asMap(
await request(
'sessions.list',
params: <String, dynamic>{
'includeGlobal': true,
'includeUnknown': false,
'includeDerivedTitles': true,
'includeLastMessage': true,
'limit': limit,
if (agentId != null && agentId.trim().isNotEmpty)
'agentId': agentId.trim(),
},
),
);
return asList(payload['sessions'])
.map((item) {
final map = asMap(item);
return GatewaySessionSummary(
key: stringValue(map['key']) ?? 'main',
kind: stringValue(map['kind']),
displayName:
stringValue(map['displayName']) ?? stringValue(map['label']),
surface: stringValue(map['surface']),
subject: stringValue(map['subject']),
room: stringValue(map['room']),
space: stringValue(map['space']),
updatedAtMs: doubleValue(map['updatedAt']),
sessionId: stringValue(map['sessionId']),
systemSent: boolValue(map['systemSent']),
abortedLastRun: boolValue(map['abortedLastRun']),
thinkingLevel: stringValue(map['thinkingLevel']),
verboseLevel: stringValue(map['verboseLevel']),
inputTokens: intValue(map['inputTokens']),
outputTokens: intValue(map['outputTokens']),
totalTokens: intValue(map['totalTokens']),
model: stringValue(map['model']),
contextTokens: intValue(map['contextTokens']),
derivedTitle: stringValue(map['derivedTitle']),
lastMessagePreview: stringValue(map['lastMessagePreview']),
);
})
.toList(growable: false);
}
}) => _listSessionsInternal(agentId: agentId, limit: limit);
Future<List<GatewayChatMessage>> loadHistory(
String sessionKey, {
int limit = 120,
}) async {
final payload = asMap(
await request(
'chat.history',
params: <String, dynamic>{'sessionKey': sessionKey, 'limit': limit},
),
);
return asList(payload['messages'])
.map((item) {
final map = asMap(item);
return GatewayChatMessage(
id: randomIdInternal(),
role: stringValue(map['role']) ?? 'assistant',
text: extractMessageText(map),
timestampMs: doubleValue(map['timestamp']),
toolCallId:
stringValue(map['toolCallId']) ??
stringValue(map['tool_call_id']),
toolName:
stringValue(map['toolName']) ?? stringValue(map['tool_name']),
stopReason: stringValue(map['stopReason']),
pending: false,
error: false,
);
})
.toList(growable: false);
}
}) => _loadHistoryInternal(sessionKey, limit: limit);
Future<String> sendChat({
required String sessionKey,
@ -540,398 +568,104 @@ class GatewayRuntime extends ChangeNotifier with GatewayRuntimeHelpersInternal {
const <GatewayChatAttachmentPayload>[],
String? agentId,
Map<String, dynamic>? metadata,
}) async {
final runId = randomIdInternal();
final payload = asMap(
await request(
'chat.send',
params: <String, dynamic>{
'sessionKey': sessionKey,
'message': message,
'thinking': thinking,
'timeoutMs': 30000,
'idempotencyKey': runId,
if (agentId != null && agentId.trim().isNotEmpty)
'agentId': agentId.trim(),
if (metadata != null && metadata.isNotEmpty) 'metadata': metadata,
if (attachments.isNotEmpty)
'attachments': attachments
.map((attachment) => attachment.toJson())
.toList(growable: false),
},
timeout: const Duration(seconds: 35),
),
);
return stringValue(payload['runId']) ?? runId;
}
}) => _sendChatInternal(
sessionKey: sessionKey,
message: message,
thinking: thinking,
attachments: attachments,
agentId: agentId,
metadata: metadata,
);
Future<void> abortChat({
required String sessionKey,
required String runId,
}) async {
await request(
'chat.abort',
params: <String, dynamic>{'sessionKey': sessionKey, 'runId': runId},
timeout: const Duration(seconds: 10),
);
}
Future<void> abortChat({required String sessionKey, required String runId}) =>
_abortChatInternal(sessionKey: sessionKey, runId: runId);
Future<List<GatewayInstanceSummary>> listInstances() async {
final payload = await request(
'system-presence',
params: const <String, dynamic>{},
);
return asList(payload)
.map((item) {
final map = asMap(item);
return GatewayInstanceSummary(
id: stringValue(map['id']) ?? randomIdInternal(),
host: stringValue(map['host']),
ip: stringValue(map['ip']),
version: stringValue(map['version']),
platform: stringValue(map['platform']),
deviceFamily: stringValue(map['deviceFamily']),
modelIdentifier: stringValue(map['modelIdentifier']),
lastInputSeconds: intValue(map['lastInputSeconds']),
mode: stringValue(map['mode']),
reason: stringValue(map['reason']),
text: stringValue(map['text']) ?? '',
timestampMs:
doubleValue(map['ts']) ??
DateTime.now().millisecondsSinceEpoch.toDouble(),
);
})
.toList(growable: false);
}
Future<List<GatewayInstanceSummary>> listInstances() =>
_listInstancesInternal();
Future<List<GatewaySkillSummary>> listSkills({String? agentId}) async {
final payload = asMap(
await request(
'skills.status',
params: <String, dynamic>{
if (agentId != null && agentId.trim().isNotEmpty)
'agentId': agentId.trim(),
},
),
);
return asList(payload['skills'])
.map((item) {
final map = asMap(item);
return GatewaySkillSummary(
name: stringValue(map['name']) ?? 'Skill',
description: stringValue(map['description']) ?? '',
source: stringValue(map['source']) ?? 'workspace',
skillKey:
stringValue(map['skillKey']) ??
stringValue(map['name']) ??
'skill',
primaryEnv: stringValue(map['primaryEnv']),
eligible: boolValue(map['eligible']) ?? false,
disabled: boolValue(map['disabled']) ?? false,
missingBins: stringList(asMap(map['missing'])['bins']),
missingEnv: stringList(asMap(map['missing'])['env']),
missingConfig: stringList(asMap(map['missing'])['config']),
);
})
.toList(growable: false);
}
Future<List<GatewaySkillSummary>> listSkills({String? agentId}) =>
_listSkillsInternal(agentId: agentId);
Future<List<GatewayConnectorSummary>> listConnectors() async {
final payload = asMap(
await request(
'channels.status',
params: const <String, dynamic>{'probe': true, 'timeoutMs': 8000},
timeout: const Duration(seconds: 16),
),
);
final channelMeta = <String, Map<String, dynamic>>{
for (final entry in asList(payload['channelMeta']))
if (stringValue(asMap(entry)['id']) != null)
stringValue(asMap(entry)['id'])!: asMap(entry),
};
final labels = asMap(payload['channelLabels']);
final detailLabels = asMap(payload['channelDetailLabels']);
final accounts = asMap(payload['channelAccounts']);
final order = stringList(payload['channelOrder']);
Future<List<GatewayConnectorSummary>> listConnectors() =>
_listConnectorsInternal();
final summaries = <GatewayConnectorSummary>[];
for (final channelId in order) {
final channelAccounts = asList(accounts[channelId]);
if (channelAccounts.isEmpty) {
final meta = channelMeta[channelId] ?? const <String, dynamic>{};
summaries.add(
GatewayConnectorSummary(
id: channelId,
label:
stringValue(meta['label']) ??
stringValue(labels[channelId]) ??
channelId,
detailLabel:
stringValue(meta['detailLabel']) ??
stringValue(detailLabels[channelId]) ??
channelId,
accountName: null,
configured: false,
enabled: false,
running: false,
connected: false,
status: 'idle',
lastError: null,
meta: const <String>[],
),
);
continue;
}
for (final account in channelAccounts) {
final map = asMap(account);
final configured = boolValue(map['configured']) ?? false;
final enabled = boolValue(map['enabled']) ?? configured;
final running = boolValue(map['running']) ?? false;
final connected =
boolValue(map['connected']) ?? boolValue(map['linked']) ?? false;
final lastError = stringValue(map['lastError']);
final status = lastError != null && lastError.trim().isNotEmpty
? 'error'
: connected
? 'connected'
: running
? 'running'
: configured
? 'configured'
: 'idle';
final mode = stringValue(map['mode']);
final tokenSource = stringValue(map['tokenSource']);
final baseUrl = stringValue(map['baseUrl']);
summaries.add(
GatewayConnectorSummary(
id: channelId,
label:
stringValue(channelMeta[channelId]?['label']) ??
stringValue(labels[channelId]) ??
channelId,
detailLabel:
stringValue(channelMeta[channelId]?['detailLabel']) ??
stringValue(detailLabels[channelId]) ??
channelId,
accountName:
stringValue(map['name']) ?? stringValue(map['accountId']),
configured: configured,
enabled: enabled,
running: running,
connected: connected,
status: status,
lastError: lastError,
meta: [
...?(mode == null ? null : <String>[mode]),
...?(tokenSource == null ? null : <String>[tokenSource]),
...?(baseUrl == null ? null : <String>[baseUrl]),
],
),
);
}
}
return summaries;
}
Future<List<GatewayModelSummary>> listModels() => _listModelsInternal();
Future<List<GatewayModelSummary>> listModels() async {
final payload = asMap(
await request(
'models.list',
params: const <String, dynamic>{},
timeout: const Duration(seconds: 16),
),
);
return asList(payload['models'])
.map((item) {
final map = asMap(item);
return GatewayModelSummary(
id: stringValue(map['id']) ?? 'unknown',
name:
stringValue(map['name']) ?? stringValue(map['id']) ?? 'unknown',
provider: stringValue(map['provider']) ?? 'unknown',
contextWindow: intValue(map['contextWindow']),
maxOutputTokens: intValue(map['maxOutputTokens']),
);
})
.toList(growable: false);
}
Future<List<GatewayCronJobSummary>> listCronJobs() => _listCronJobsInternal();
Future<List<GatewayCronJobSummary>> listCronJobs() async {
final payload = asMap(
await request(
'cron.list',
params: const <String, dynamic>{'includeDisabled': true},
timeout: const Duration(seconds: 16),
),
);
return asList(payload['jobs'])
.map((item) {
final map = asMap(item);
final state = asMap(map['state']);
return GatewayCronJobSummary(
id: stringValue(map['id']) ?? randomIdInternal(),
name: stringValue(map['name']) ?? 'Untitled job',
description: stringValue(map['description']),
enabled: boolValue(map['enabled']) ?? true,
agentId: stringValue(map['agentId']),
scheduleLabel: cronScheduleLabelInternal(asMap(map['schedule'])),
nextRunAtMs: intValue(state['nextRunAtMs']),
lastRunAtMs: intValue(state['lastRunAtMs']),
lastStatus: stringValue(state['lastStatus']),
lastError: stringValue(state['lastError']),
);
})
.toList(growable: false);
}
Future<GatewayDevicePairingList> listDevicePairing() =>
_listDevicePairingInternal();
Future<GatewayDevicePairingList> listDevicePairing() async {
final payload = asMap(
await request(
'device.pair.list',
params: const <String, dynamic>{},
timeout: const Duration(seconds: 12),
),
);
final identity = await storeInternal.loadDeviceIdentity();
return GatewayDevicePairingList(
pending: asList(payload['pending'])
.map((item) => parsePendingDeviceInternal(asMap(item)))
.toList(growable: false),
paired: asList(payload['paired'])
.map(
(item) => parsePairedDeviceInternal(
asMap(item),
currentDeviceId: identity?.deviceId,
),
)
.toList(growable: false),
);
}
Future<GatewayPairedDevice?> approveDevicePairing(String requestId) =>
_approveDevicePairingInternal(requestId);
Future<GatewayPairedDevice?> approveDevicePairing(String requestId) async {
appendLogInternal(this, 'info', 'pairing', 'approve request $requestId');
final payload = asMap(
await request(
'device.pair.approve',
params: <String, dynamic>{'requestId': requestId},
timeout: const Duration(seconds: 12),
),
);
final identity = await storeInternal.loadDeviceIdentity();
final device = asMap(payload['device']);
if (device.isEmpty) {
return null;
}
return parsePairedDeviceInternal(
device,
currentDeviceId: identity?.deviceId,
);
}
Future<void> rejectDevicePairing(String requestId) =>
_rejectDevicePairingInternal(requestId);
Future<void> rejectDevicePairing(String requestId) async {
appendLogInternal(this, 'info', 'pairing', 'reject request $requestId');
await request(
'device.pair.reject',
params: <String, dynamic>{'requestId': requestId},
timeout: const Duration(seconds: 12),
);
}
Future<void> removePairedDevice(String deviceId) async {
appendLogInternal(this, 'info', 'pairing', 'remove device $deviceId');
await request(
'device.pair.remove',
params: <String, dynamic>{'deviceId': deviceId},
timeout: const Duration(seconds: 12),
);
}
Future<void> removePairedDevice(String deviceId) =>
_removePairedDeviceInternal(deviceId);
Future<String> rotateDeviceToken({
required String deviceId,
required String role,
List<String> scopes = const <String>[],
}) async {
appendLogInternal(
this,
'info',
'token',
'rotate role token | device: $deviceId | role: $role',
);
final payload = asMap(
await request(
'device.token.rotate',
params: <String, dynamic>{
'deviceId': deviceId,
'role': role,
if (scopes.isNotEmpty) 'scopes': scopes,
},
timeout: const Duration(seconds: 12),
),
);
final token = stringValue(payload['token']) ?? '';
final identity = await storeInternal.loadDeviceIdentity();
final resolvedRole = stringValue(payload['role']) ?? role;
if (token.isNotEmpty &&
identity != null &&
(stringValue(payload['deviceId']) ?? deviceId) == identity.deviceId) {
await storeInternal.saveDeviceToken(
deviceId: identity.deviceId,
role: resolvedRole,
token: token,
);
}
return token;
}
}) => _rotateDeviceTokenInternal(
deviceId: deviceId,
role: role,
scopes: scopes,
);
Future<void> revokeDeviceToken({
required String deviceId,
required String role,
}) async {
appendLogInternal(
this,
'info',
'token',
'revoke role token | device: $deviceId | role: $role',
);
await request(
'device.token.revoke',
params: <String, dynamic>{'deviceId': deviceId, 'role': role},
timeout: const Duration(seconds: 12),
);
final identity = await storeInternal.loadDeviceIdentity();
if (identity != null && deviceId == identity.deviceId) {
await storeInternal.clearDeviceToken(
deviceId: identity.deviceId,
role: role,
);
}
}
}) => _revokeDeviceTokenInternal(deviceId: deviceId, role: role);
Future<dynamic> request(
String method, {
Map<String, dynamic>? params,
Duration timeout = const Duration(seconds: 15),
}) async {
if (channelInternal == null || !isConnected) {
appendLogInternal(
this,
'warn',
'rpc',
'blocked request $method | offline',
);
throw GatewayRuntimeException('gateway not connected', code: 'OFFLINE');
}) => _requestInternal(method, params: params, timeout: timeout);
void _notifyRuntimeChangedInternal() {
notifyListeners();
}
void _handleSessionUpdateInternal(GatewayRuntimeSessionUpdate update) {
if (update.runtimeId != runtimeIdInternal) {
return;
}
switch (update.type) {
case GatewayRuntimeSessionUpdateType.snapshot:
if (update.snapshot != null) {
snapshotInternal = update.snapshot!;
notifyListeners();
}
return;
case GatewayRuntimeSessionUpdateType.log:
final entry = update.log;
if (entry == null) {
return;
}
logsInternal.add(entry);
const maxLogEntries = 250;
if (logsInternal.length > maxLogEntries) {
logsInternal.removeRange(0, logsInternal.length - maxLogEntries);
}
notifyListeners();
return;
case GatewayRuntimeSessionUpdateType.push:
final push = update.push;
if (push != null) {
eventsInternal.add(push);
}
return;
}
final result = await requestRawInternal(
this,
method,
params: params,
timeout: timeout,
);
return result.payload;
}
@override
void dispose() {
sessionUpdatesInternal?.cancel();
unawaited(sessionClientInternal?.dispose() ?? Future<void>.value());
eventsInternal.close();
reconnectTimerInternal?.cancel();
unawaited(closeSocketInternal(this));

View File

@ -0,0 +1,259 @@
import 'gateway_runtime_errors.dart';
import 'gateway_runtime_events.dart';
import 'gateway_runtime_helpers.dart';
import 'runtime_models.dart';
class GatewayRuntimeSessionConnectRequest {
const GatewayRuntimeSessionConnectRequest({
required this.runtimeId,
required this.mode,
required this.clientId,
required this.locale,
required this.userAgent,
required this.host,
required this.port,
required this.tls,
required this.connectAuthMode,
required this.connectAuthFields,
required this.connectAuthSources,
required this.hasSharedAuth,
required this.hasDeviceToken,
required this.packageInfo,
required this.deviceInfo,
required this.identity,
required this.authToken,
required this.authDeviceToken,
required this.authPassword,
});
final String runtimeId;
final RuntimeConnectionMode mode;
final String clientId;
final String locale;
final String userAgent;
final String host;
final int port;
final bool tls;
final String connectAuthMode;
final List<String> connectAuthFields;
final List<String> connectAuthSources;
final bool hasSharedAuth;
final bool hasDeviceToken;
final RuntimePackageInfo packageInfo;
final RuntimeDeviceInfo deviceInfo;
final LocalDeviceIdentity identity;
final String authToken;
final String authDeviceToken;
final String authPassword;
Map<String, dynamic> toJson() {
return <String, dynamic>{
'runtimeId': runtimeId,
'mode': mode.name,
'clientId': clientId,
'locale': locale,
'userAgent': userAgent,
'endpoint': <String, dynamic>{'host': host, 'port': port, 'tls': tls},
'connectAuthMode': connectAuthMode,
'connectAuthFields': connectAuthFields,
'connectAuthSources': connectAuthSources,
'hasSharedAuth': hasSharedAuth,
'hasDeviceToken': hasDeviceToken,
'packageInfo': <String, dynamic>{
'appName': packageInfo.appName,
'packageName': packageInfo.packageName,
'version': packageInfo.version,
'buildNumber': packageInfo.buildNumber,
},
'deviceInfo': <String, dynamic>{
'platform': deviceInfo.platform,
'platformVersion': deviceInfo.platformVersion,
'deviceFamily': deviceInfo.deviceFamily,
'modelIdentifier': deviceInfo.modelIdentifier,
},
'identity': <String, dynamic>{
'deviceId': identity.deviceId,
'publicKeyBase64Url': identity.publicKeyBase64Url,
'privateKeyBase64Url': identity.privateKeyBase64Url,
},
'auth': <String, dynamic>{
if (authToken.trim().isNotEmpty) 'token': authToken.trim(),
if (authDeviceToken.trim().isNotEmpty)
'deviceToken': authDeviceToken.trim(),
if (authPassword.trim().isNotEmpty) 'password': authPassword.trim(),
},
};
}
}
class GatewayRuntimeSessionConnectResult {
const GatewayRuntimeSessionConnectResult({
required this.snapshot,
required this.auth,
required this.returnedDeviceToken,
required this.raw,
});
final GatewayConnectionSnapshot snapshot;
final Map<String, dynamic> auth;
final String returnedDeviceToken;
final Map<String, dynamic> raw;
factory GatewayRuntimeSessionConnectResult.fromJson(
Map<String, dynamic> json,
) {
return GatewayRuntimeSessionConnectResult(
snapshot: gatewayConnectionSnapshotFromJson(_castMap(json['snapshot'])),
auth: _castMap(json['auth']),
returnedDeviceToken: json['returnedDeviceToken']?.toString().trim() ?? '',
raw: json,
);
}
}
enum GatewayRuntimeSessionUpdateType { snapshot, log, push }
class GatewayRuntimeSessionUpdate {
const GatewayRuntimeSessionUpdate({
required this.runtimeId,
required this.type,
this.snapshot,
this.log,
this.push,
this.raw = const <String, dynamic>{},
});
final String runtimeId;
final GatewayRuntimeSessionUpdateType type;
final GatewayConnectionSnapshot? snapshot;
final RuntimeLogEntry? log;
final GatewayPushEvent? push;
final Map<String, dynamic> raw;
factory GatewayRuntimeSessionUpdate.fromNotification(
Map<String, dynamic> notification,
) {
final method = notification['method']?.toString().trim() ?? '';
final params = _castMap(notification['params']);
final runtimeId = params['runtimeId']?.toString().trim() ?? '';
switch (method) {
case 'xworkmate.gateway.snapshot':
return GatewayRuntimeSessionUpdate(
runtimeId: runtimeId,
type: GatewayRuntimeSessionUpdateType.snapshot,
snapshot: gatewayConnectionSnapshotFromJson(
_castMap(params['snapshot']),
),
raw: params,
);
case 'xworkmate.gateway.log':
return GatewayRuntimeSessionUpdate(
runtimeId: runtimeId,
type: GatewayRuntimeSessionUpdateType.log,
log: runtimeLogEntryFromJson(_castMap(params['log'])),
raw: params,
);
case 'xworkmate.gateway.push':
final event = _castMap(params['event']);
return GatewayRuntimeSessionUpdate(
runtimeId: runtimeId,
type: GatewayRuntimeSessionUpdateType.push,
push: GatewayPushEvent(
event: event['event']?.toString().trim() ?? '',
payload: event['payload'],
sequence: intValue(event['sequence']),
),
raw: params,
);
default:
throw GatewayRuntimeException(
'Unsupported gateway notification: $method',
code: 'GO_GATEWAY_RUNTIME_NOTIFICATION_UNSUPPORTED',
);
}
}
}
abstract class GatewayRuntimeSessionClient {
Stream<GatewayRuntimeSessionUpdate> get updates;
Future<GatewayRuntimeSessionConnectResult> connect(
GatewayRuntimeSessionConnectRequest request,
);
Future<dynamic> request({
required String runtimeId,
required String method,
Map<String, dynamic>? params,
Duration timeout = const Duration(seconds: 15),
});
Future<void> disconnect({required String runtimeId});
Future<void> dispose();
}
GatewayConnectionSnapshot gatewayConnectionSnapshotFromJson(
Map<String, dynamic> json,
) {
return GatewayConnectionSnapshot(
status: _statusFromJson(json['status']?.toString()),
mode: RuntimeConnectionModeCopy.fromJsonValue(json['mode']?.toString()),
statusText: json['statusText']?.toString() ?? 'Offline',
serverName: json['serverName']?.toString(),
remoteAddress: json['remoteAddress']?.toString(),
mainSessionKey: json['mainSessionKey']?.toString(),
lastError: json['lastError']?.toString(),
lastErrorCode: json['lastErrorCode']?.toString(),
lastErrorDetailCode: json['lastErrorDetailCode']?.toString(),
lastConnectedAtMs: intValue(json['lastConnectedAtMs']),
deviceId: json['deviceId']?.toString(),
authRole: json['authRole']?.toString(),
authScopes: stringList(json['authScopes']),
connectAuthMode: json['connectAuthMode']?.toString(),
connectAuthFields: stringList(json['connectAuthFields']),
connectAuthSources: stringList(json['connectAuthSources']),
hasSharedAuth: boolValue(json['hasSharedAuth']) ?? false,
hasDeviceToken: boolValue(json['hasDeviceToken']) ?? false,
healthPayload: _castNullableMap(json['healthPayload']),
statusPayload: _castNullableMap(json['statusPayload']),
);
}
RuntimeLogEntry runtimeLogEntryFromJson(Map<String, dynamic> json) {
return RuntimeLogEntry(
timestampMs:
intValue(json['timestampMs']) ?? DateTime.now().millisecondsSinceEpoch,
level: json['level']?.toString() ?? 'info',
category: json['category']?.toString() ?? 'runtime',
message: json['message']?.toString() ?? '',
);
}
RuntimeConnectionStatus _statusFromJson(String? value) {
switch (value?.trim()) {
case 'connecting':
return RuntimeConnectionStatus.connecting;
case 'connected':
return RuntimeConnectionStatus.connected;
case 'error':
return RuntimeConnectionStatus.error;
default:
return RuntimeConnectionStatus.offline;
}
}
Map<String, dynamic> _castMap(Object? value) {
if (value is Map<String, dynamic>) {
return value;
}
if (value is Map) {
return value.cast<String, dynamic>();
}
return const <String, dynamic>{};
}
Map<String, dynamic>? _castNullableMap(Object? value) {
final resolved = _castMap(value);
return resolved.isEmpty ? null : resolved;
}

View File

@ -0,0 +1,382 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'embedded_agent_launch_policy.dart';
import 'gateway_runtime_errors.dart';
import 'gateway_runtime_helpers.dart';
import 'gateway_runtime_session_client.dart';
import 'go_core.dart';
typedef GoGatewayRuntimeProcessStarter =
Future<Process> Function(
String executable,
List<String> arguments, {
Map<String, String>? environment,
String? workingDirectory,
});
class GoGatewayRuntimeDesktopClient implements GatewayRuntimeSessionClient {
GoGatewayRuntimeDesktopClient({
GoCoreLocator? goCoreLocator,
GoGatewayRuntimeProcessStarter? processStarter,
}) : _goCoreLocator = goCoreLocator ?? GoCoreLocator(),
_processStarter =
processStarter ??
((executable, arguments, {environment, workingDirectory}) {
return Process.start(
executable,
arguments,
environment: environment,
workingDirectory: workingDirectory,
);
});
final GoCoreLocator _goCoreLocator;
final GoGatewayRuntimeProcessStarter _processStarter;
final StreamController<GatewayRuntimeSessionUpdate> _updatesController =
StreamController<GatewayRuntimeSessionUpdate>.broadcast();
final Map<String, Completer<Map<String, dynamic>>> _pending =
<String, Completer<Map<String, dynamic>>>{};
Process? _localProcess;
Uri? _localEndpoint;
Future<Uri?>? _localEndpointFuture;
WebSocket? _socket;
StreamSubscription<dynamic>? _socketSubscription;
Future<void>? _socketReadyFuture;
int _requestCounter = 0;
@override
Stream<GatewayRuntimeSessionUpdate> get updates => _updatesController.stream;
@override
Future<GatewayRuntimeSessionConnectResult> connect(
GatewayRuntimeSessionConnectRequest request,
) async {
final result = await _request(
method: 'xworkmate.gateway.connect',
params: request.toJson(),
);
if (boolValue(result['ok']) != true) {
throw _gatewayErrorFromResult(
result,
fallbackMessage: 'Gateway connect failed',
);
}
return GatewayRuntimeSessionConnectResult.fromJson(result);
}
@override
Future<dynamic> request({
required String runtimeId,
required String method,
Map<String, dynamic>? params,
Duration timeout = const Duration(seconds: 15),
}) async {
final result = await _request(
method: 'xworkmate.gateway.request',
params: <String, dynamic>{
'runtimeId': runtimeId,
'method': method,
if (params != null && params.isNotEmpty) 'params': params,
'timeoutMs': timeout.inMilliseconds,
},
);
if (boolValue(result['ok']) != true) {
throw _gatewayErrorFromResult(
result,
fallbackMessage: '$method request failed',
);
}
return result['payload'];
}
@override
Future<void> disconnect({required String runtimeId}) async {
await _request(
method: 'xworkmate.gateway.disconnect',
params: <String, dynamic>{'runtimeId': runtimeId},
);
}
@override
Future<void> dispose() async {
for (final completer in _pending.values) {
if (!completer.isCompleted) {
completer.completeError(
GatewayRuntimeException(
'Go gateway runtime transport disposed',
code: 'GO_GATEWAY_RUNTIME_TRANSPORT_DISPOSED',
),
);
}
}
_pending.clear();
await _socketSubscription?.cancel();
_socketSubscription = null;
try {
await _socket?.close();
} catch (_) {
// Best effort only.
}
_socket = null;
_socketReadyFuture = null;
final process = _localProcess;
_localProcess = null;
_localEndpoint = null;
_localEndpointFuture = null;
if (process != null) {
try {
process.kill();
} catch (_) {
// Best effort only.
}
}
await _updatesController.close();
}
Future<Map<String, dynamic>> _request({
required String method,
required Map<String, dynamic> params,
}) async {
await _ensureSocketReady();
final socket = _socket;
if (socket == null) {
throw GatewayRuntimeException(
'Missing Go gateway runtime transport',
code: 'GO_GATEWAY_RUNTIME_TRANSPORT_UNAVAILABLE',
);
}
final requestId =
'${DateTime.now().microsecondsSinceEpoch}-$method-${_requestCounter++}';
final completer = Completer<Map<String, dynamic>>();
_pending[requestId] = completer;
socket.add(
jsonEncode(<String, dynamic>{
'jsonrpc': '2.0',
'id': requestId,
'method': method,
'params': params,
}),
);
try {
return await completer.future.timeout(const Duration(seconds: 120));
} finally {
_pending.remove(requestId);
}
}
Future<void> _ensureSocketReady() async {
final inFlight = _socketReadyFuture;
if (inFlight != null) {
return inFlight;
}
final next = _openSocket();
_socketReadyFuture = next;
try {
await next;
} finally {
_socketReadyFuture = null;
}
}
Future<void> _openSocket() async {
if (_socket != null) {
return;
}
final endpoint = await _ensureLocalEndpoint();
if (endpoint == null) {
throw GatewayRuntimeException(
'Missing Go gateway runtime endpoint',
code: 'GO_GATEWAY_RUNTIME_ENDPOINT_MISSING',
);
}
final wsEndpoint = endpoint.replace(
scheme: endpoint.scheme == 'https' ? 'wss' : 'ws',
path: '/acp',
);
final socket = await WebSocket.connect(wsEndpoint.toString()).timeout(
const Duration(seconds: 6),
onTimeout: () => throw GatewayRuntimeException(
'Go gateway runtime websocket connect timeout',
code: 'GO_GATEWAY_RUNTIME_WS_CONNECT_TIMEOUT',
),
);
_socket = socket;
_socketSubscription = socket.listen(
_handleSocketMessage,
onError: (Object error, StackTrace stackTrace) {
_failPending(
GatewayRuntimeException(
error.toString(),
code: 'GO_GATEWAY_RUNTIME_WS_ERROR',
),
);
},
onDone: () {
_socket = null;
_socketSubscription = null;
_failPending(
GatewayRuntimeException(
'Go gateway runtime websocket closed',
code: 'GO_GATEWAY_RUNTIME_WS_CLOSED',
),
);
},
cancelOnError: true,
);
}
void _handleSocketMessage(dynamic raw) {
final json = _decodeMap(raw);
final id = json['id']?.toString().trim();
if (id != null && id.isNotEmpty) {
final completer = _pending[id];
if (completer != null && !completer.isCompleted) {
final error = _castMap(json['error']);
if (error.isNotEmpty) {
completer.completeError(
GatewayRuntimeException(
error['message']?.toString() ??
'Go gateway runtime request failed',
code: error['code']?.toString(),
),
);
} else {
completer.complete(_castMap(json['result']));
}
}
return;
}
final method = json['method']?.toString().trim() ?? '';
if (method.isEmpty) {
return;
}
try {
_updatesController.add(
GatewayRuntimeSessionUpdate.fromNotification(json),
);
} catch (_) {
// Ignore unrelated ACP notifications.
}
}
void _failPending(GatewayRuntimeException error) {
for (final completer in _pending.values) {
if (!completer.isCompleted) {
completer.completeError(error);
}
}
_pending.clear();
}
Future<Uri?> _ensureLocalEndpoint() async {
if (_localEndpoint != null) {
return _localEndpoint;
}
final inFlight = _localEndpointFuture;
if (inFlight != null) {
return inFlight;
}
final next = _startLocalProcess();
_localEndpointFuture = next;
try {
_localEndpoint = await next;
return _localEndpoint;
} finally {
_localEndpointFuture = null;
}
}
Future<Uri?> _startLocalProcess() async {
if (shouldBlockEmbeddedAgentLaunch(
isAppleHost: Platform.isIOS || Platform.isMacOS,
)) {
return null;
}
final launch = await _goCoreLocator.locate();
if (launch == null) {
return null;
}
final reservedSocket = await ServerSocket.bind(
InternetAddress.loopbackIPv4,
0,
);
final port = reservedSocket.port;
await reservedSocket.close();
final listenAddress = '127.0.0.1:$port';
final process = await _processStarter(
launch.executable,
<String>[...launch.arguments, 'serve', '--listen', listenAddress],
environment: Platform.environment,
workingDirectory: launch.workingDirectory,
);
_localProcess = process;
unawaited(process.stdout.drain<void>());
unawaited(process.stderr.drain<void>());
final endpoint = Uri(scheme: 'http', host: '127.0.0.1', port: port);
final deadline = DateTime.now().add(const Duration(seconds: 8));
while (DateTime.now().isBefore(deadline)) {
if (_localProcess != process) {
break;
}
final exitCode = await process.exitCode.timeout(
const Duration(milliseconds: 20),
onTimeout: () => -1,
);
if (exitCode != -1) {
break;
}
try {
final probe = await WebSocket.connect(
endpoint.replace(scheme: 'ws', path: '/acp').toString(),
).timeout(const Duration(milliseconds: 300));
await probe.close();
return endpoint;
} catch (_) {
await Future<void>.delayed(const Duration(milliseconds: 120));
}
}
return null;
}
GatewayRuntimeException _gatewayErrorFromResult(
Map<String, dynamic> result, {
required String fallbackMessage,
}) {
final error = _castMap(result['error']);
return GatewayRuntimeException(
error['message']?.toString() ?? fallbackMessage,
code: error['code']?.toString(),
details: error['details'],
);
}
Map<String, dynamic> _decodeMap(dynamic raw) {
if (raw is Map<String, dynamic>) {
return raw;
}
if (raw is Map) {
return raw.cast<String, dynamic>();
}
if (raw is String) {
return _castMap(jsonDecode(raw));
}
if (raw is List<int>) {
return _castMap(jsonDecode(utf8.decode(raw)));
}
return const <String, dynamic>{};
}
Map<String, dynamic> _castMap(Object? value) {
if (value is Map<String, dynamic>) {
return value;
}
if (value is Map) {
return value.cast<String, dynamic>();
}
return const <String, dynamic>{};
}
}

View File

@ -9,6 +9,7 @@ import 'package:flutter_test/flutter_test.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:xworkmate/runtime/device_identity_store.dart';
import 'package:xworkmate/runtime/gateway_runtime.dart';
import 'package:xworkmate/runtime/gateway_runtime_session_client.dart';
import 'package:xworkmate/runtime/runtime_models.dart';
import '../test_support.dart';
@ -124,6 +125,141 @@ void main() {
},
);
test(
'GatewayRuntime persists returned device token and applies go-core session notifications',
() async {
SharedPreferences.setMockInitialValues(<String, Object>{});
final store = createIsolatedTestStore();
final identityStore = DeviceIdentityStore(store);
final fakeClient = _FakeGatewayRuntimeSessionClient(
connectResult: GatewayRuntimeSessionConnectResult(
snapshot:
GatewayConnectionSnapshot.initial(
mode: RuntimeConnectionMode.remote,
).copyWith(
status: RuntimeConnectionStatus.connected,
statusText: 'Connected',
remoteAddress: '127.0.0.1:8787',
deviceId: 'device-1',
authRole: 'operator',
authScopes: const <String>['operator.admin'],
connectAuthMode: 'shared-token',
connectAuthFields: const <String>['token'],
connectAuthSources: const <String>['shared:form'],
hasSharedAuth: true,
hasDeviceToken: true,
),
auth: const <String, dynamic>{'role': 'operator'},
returnedDeviceToken: 'go-device-token',
raw: const <String, dynamic>{},
),
);
final runtime = GatewayRuntime(
store: store,
identityStore: identityStore,
sessionClient: fakeClient,
);
addTearDown(runtime.dispose);
await runtime.initialize();
await runtime.connectProfile(
GatewayConnectionProfile.defaults().copyWith(
mode: RuntimeConnectionMode.remote,
host: '127.0.0.1',
port: 8787,
tls: false,
useSetupCode: false,
),
authTokenOverride: 'shared-token-from-form',
);
final identity = await identityStore.loadOrCreate();
expect(
await store.loadDeviceToken(
deviceId: identity.deviceId,
role: 'operator',
),
'go-device-token',
);
expect(fakeClient.lastConnectRequest, isNotNull);
expect(
fakeClient.lastConnectRequest!.authToken,
'shared-token-from-form',
);
expect(runtime.snapshot.status, RuntimeConnectionStatus.connected);
final nextEvent = runtime.events.firstWhere(
(event) => event.event == 'health',
);
fakeClient.emit(
GatewayRuntimeSessionUpdate(
runtimeId: fakeClient.lastConnectRequest!.runtimeId,
type: GatewayRuntimeSessionUpdateType.log,
log: const RuntimeLogEntry(
timestampMs: 42,
level: 'info',
category: 'socket',
message: 'reconnect firing',
),
),
);
fakeClient.emit(
GatewayRuntimeSessionUpdate(
runtimeId: fakeClient.lastConnectRequest!.runtimeId,
type: GatewayRuntimeSessionUpdateType.push,
push: const GatewayPushEvent(
event: 'health',
payload: <String, dynamic>{'ok': true},
sequence: 7,
),
),
);
await Future<void>.delayed(Duration.zero);
expect(
runtime.logs.any((entry) => entry.message == 'reconnect firing'),
isTrue,
);
expect(await nextEvent, isA<GatewayPushEvent>());
},
);
test(
'GatewayRuntime falls back to direct websocket when go-core bridge is unavailable',
() async {
SharedPreferences.setMockInitialValues(<String, Object>{});
final store = createIsolatedTestStore();
final runtime = GatewayRuntime(
store: store,
identityStore: DeviceIdentityStore(store),
sessionClient: _FakeGatewayRuntimeSessionClient(
connectError: GatewayRuntimeException(
'go bridge unavailable',
code: 'GO_GATEWAY_RUNTIME_ENDPOINT_MISSING',
),
),
);
final server = await FakeGatewayRuntimeServerInternal.start();
addTearDown(runtime.dispose);
addTearDown(server.close);
await runtime.initialize();
await runtime.connectProfile(
GatewayConnectionProfile.defaults().copyWith(
mode: RuntimeConnectionMode.local,
host: '127.0.0.1',
port: server.port,
tls: false,
useSetupCode: false,
),
authTokenOverride: 'shared-token-from-form',
);
expect(server.connectAuth?['token'], 'shared-token-from-form');
expect(runtime.snapshot.status, RuntimeConnectionStatus.connected);
},
);
test(
'GatewayRuntime parses device pairing state and syncs rotated local role tokens',
() async {
@ -291,6 +427,58 @@ void main() {
);
}
class _FakeGatewayRuntimeSessionClient implements GatewayRuntimeSessionClient {
_FakeGatewayRuntimeSessionClient({this.connectResult, this.connectError});
final GatewayRuntimeSessionConnectResult? connectResult;
final GatewayRuntimeException? connectError;
final StreamController<GatewayRuntimeSessionUpdate> _updates =
StreamController<GatewayRuntimeSessionUpdate>.broadcast();
GatewayRuntimeSessionConnectRequest? lastConnectRequest;
@override
Stream<GatewayRuntimeSessionUpdate> get updates => _updates.stream;
void emit(GatewayRuntimeSessionUpdate update) {
_updates.add(update);
}
@override
Future<GatewayRuntimeSessionConnectResult> connect(
GatewayRuntimeSessionConnectRequest request,
) async {
lastConnectRequest = request;
if (connectError != null) {
throw connectError!;
}
return connectResult ??
GatewayRuntimeSessionConnectResult(
snapshot: GatewayConnectionSnapshot.initial(mode: request.mode),
auth: const <String, dynamic>{},
returnedDeviceToken: '',
raw: const <String, dynamic>{},
);
}
@override
Future<void> disconnect({required String runtimeId}) async {}
@override
Future<void> dispose() async {
await _updates.close();
}
@override
Future<dynamic> request({
required String runtimeId,
required String method,
Map<String, dynamic>? params,
Duration timeout = const Duration(seconds: 15),
}) async {
return const <String, dynamic>{};
}
}
class FakeGatewayRuntimeServerInternal {
FakeGatewayRuntimeServerInternal._(
this.serverInternal, {