#!/usr/bin/env node import fs from "node:fs"; import net from "node:net"; import path from "node:path"; import process from "node:process"; import { parseArgs } from "./lib/args.mjs"; import { BROKER_BUSY_RPC_CODE, CodexAppServerClient } from "./lib/app-server.mjs"; import { parseBrokerEndpoint } from "./lib/broker-endpoint.mjs"; const STREAMING_METHODS = new Set(["turn/start", "review/start", "thread/compact/start"]); function buildStreamThreadIds(method, params, result) { const threadIds = new Set(); if (params?.threadId) { threadIds.add(params.threadId); } if (method === "review/start" && result?.reviewThreadId) { threadIds.add(result.reviewThreadId); } return threadIds; } function buildJsonRpcError(code, message, data) { return data === undefined ? { code, message } : { code, message, data }; } function send(socket, message) { if (socket.destroyed) { return; } socket.write(`${JSON.stringify(message)}\n`); } function isInterruptRequest(message) { return message?.method === "turn/interrupt"; } function writePidFile(pidFile) { if (!pidFile) { return; } fs.mkdirSync(path.dirname(pidFile), { recursive: true }); fs.writeFileSync(pidFile, `${process.pid}\n`, "utf8"); } async function main() { const [subcommand, ...argv] = process.argv.slice(2); if (subcommand !== "serve") { throw new Error("Usage: node scripts/app-server-broker.mjs serve --endpoint [--cwd ] [--pid-file ]"); } const { options } = parseArgs(argv, { valueOptions: ["cwd", "pid-file", "endpoint"] }); if (!options.endpoint) { throw new Error("Missing required --endpoint."); } const cwd = options.cwd ? path.resolve(process.cwd(), options.cwd) : process.cwd(); const endpoint = String(options.endpoint); const listenTarget = parseBrokerEndpoint(endpoint); const pidFile = options["pid-file"] ? path.resolve(options["pid-file"]) : null; writePidFile(pidFile); const appClient = await CodexAppServerClient.connect(cwd, { disableBroker: true }); let activeRequestSocket = null; let activeStreamSocket = null; let activeStreamThreadIds = null; const sockets = new Set(); function clearSocketOwnership(socket) { if (activeRequestSocket === socket) { activeRequestSocket = null; } if (activeStreamSocket === socket) { activeStreamSocket = null; activeStreamThreadIds = null; } } function routeNotification(message) { const target = activeRequestSocket ?? activeStreamSocket; if (!target) { return; } send(target, message); if (message.method === "turn/completed" && activeStreamSocket === target) { const threadId = message.params?.threadId ?? null; if (!threadId || !activeStreamThreadIds || activeStreamThreadIds.has(threadId)) { activeStreamSocket = null; activeStreamThreadIds = null; if (activeRequestSocket === target) { activeRequestSocket = null; } } } } async function shutdown(server) { for (const socket of sockets) { socket.end(); } await appClient.close().catch(() => {}); await new Promise((resolve) => server.close(resolve)); if (listenTarget.kind === "unix" && fs.existsSync(listenTarget.path)) { fs.unlinkSync(listenTarget.path); } if (pidFile && fs.existsSync(pidFile)) { fs.unlinkSync(pidFile); } } appClient.setNotificationHandler(routeNotification); const server = net.createServer((socket) => { sockets.add(socket); socket.setEncoding("utf8"); let buffer = ""; socket.on("data", async (chunk) => { buffer += chunk; let newlineIndex = buffer.indexOf("\n"); while (newlineIndex !== -1) { const line = buffer.slice(0, newlineIndex); buffer = buffer.slice(newlineIndex + 1); newlineIndex = buffer.indexOf("\n"); if (!line.trim()) { continue; } let message; try { message = JSON.parse(line); } catch (error) { send(socket, { id: null, error: buildJsonRpcError(-32700, `Invalid JSON: ${error.message}`) }); continue; } if (message.id !== undefined && message.method === "initialize") { send(socket, { id: message.id, result: { userAgent: "codex-companion-broker" } }); continue; } if (message.method === "initialized" && message.id === undefined) { continue; } if (message.id !== undefined && message.method === "broker/shutdown") { send(socket, { id: message.id, result: {} }); await shutdown(server); process.exit(0); } if (message.id === undefined) { continue; } const allowInterruptDuringActiveStream = isInterruptRequest(message) && activeStreamSocket && activeStreamSocket !== socket && !activeRequestSocket; if ( ((activeRequestSocket && activeRequestSocket !== socket) || (activeStreamSocket && activeStreamSocket !== socket)) && !allowInterruptDuringActiveStream ) { send(socket, { id: message.id, error: buildJsonRpcError(BROKER_BUSY_RPC_CODE, "Shared Codex broker is busy.") }); continue; } if (allowInterruptDuringActiveStream) { try { const result = await appClient.request(message.method, message.params ?? {}); send(socket, { id: message.id, result }); } catch (error) { send(socket, { id: message.id, error: buildJsonRpcError(error.rpcCode ?? -32000, error.message) }); } continue; } const isStreaming = STREAMING_METHODS.has(message.method); activeRequestSocket = socket; try { const result = await appClient.request(message.method, message.params ?? {}); send(socket, { id: message.id, result }); if (isStreaming) { activeStreamSocket = socket; activeStreamThreadIds = buildStreamThreadIds(message.method, message.params ?? {}, result); } if (activeRequestSocket === socket) { activeRequestSocket = null; } } catch (error) { send(socket, { id: message.id, error: buildJsonRpcError(error.rpcCode ?? -32000, error.message) }); if (activeRequestSocket === socket) { activeRequestSocket = null; } if (activeStreamSocket === socket && !isStreaming) { activeStreamSocket = null; } } } }); socket.on("close", () => { sockets.delete(socket); clearSocketOwnership(socket); }); socket.on("error", () => { sockets.delete(socket); clearSocketOwnership(socket); }); }); process.on("SIGTERM", async () => { await shutdown(server); process.exit(0); }); process.on("SIGINT", async () => { await shutdown(server); process.exit(0); }); server.listen(listenTarget.path); } main().catch((error) => { process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); process.exit(1); });