/** * @typedef {import("./app-server-protocol").AppServerNotification} AppServerNotification * @typedef {import("./app-server-protocol").ReviewTarget} ReviewTarget * @typedef {import("./app-server-protocol").ThreadItem} ThreadItem * @typedef {import("./app-server-protocol").ThreadResumeParams} ThreadResumeParams * @typedef {import("./app-server-protocol").ThreadStartParams} ThreadStartParams * @typedef {import("./app-server-protocol").Turn} Turn * @typedef {import("./app-server-protocol").UserInput} UserInput * @typedef {((update: string | { message: string, phase: string | null, threadId?: string | null, turnId?: string | null, stderrMessage?: string | null, logTitle?: string | null, logBody?: string | null }) => void)} ProgressReporter * @typedef {{ * threadId: string, * rootThreadId: string, * threadIds: Set, * threadTurnIds: Map, * threadLabels: Map, * turnId: string | null, * bufferedNotifications: AppServerNotification[], * completion: Promise, * resolveCompletion: (state: TurnCaptureState) => void, * rejectCompletion: (error: unknown) => void, * finalTurn: Turn | null, * completed: boolean, * finalAnswerSeen: boolean, * pendingCollaborations: Set, * activeSubagentTurns: Set, * completionTimer: ReturnType | null, * lastAgentMessage: string, * reviewText: string, * reasoningSummary: string[], * error: unknown, * messages: Array<{ lifecycle: string, phase: string | null, text: string }>, * fileChanges: ThreadItem[], * commandExecutions: ThreadItem[], * onProgress: ProgressReporter | null * }} TurnCaptureState */ import { readJsonFile } from "./fs.mjs"; import { BROKER_BUSY_RPC_CODE, BROKER_ENDPOINT_ENV, CodexAppServerClient } from "./app-server.mjs"; import { loadBrokerSession } from "./broker-lifecycle.mjs"; import { binaryAvailable, runCommand } from "./process.mjs"; const SERVICE_NAME = "claude_code_codex_plugin"; const TASK_THREAD_PREFIX = "Codex Companion Task"; const DEFAULT_CONTINUE_PROMPT = "Continue from the current thread state. Pick the next highest-value step and follow through until the task is resolved."; function cleanCodexStderr(stderr) { return stderr .split(/\r?\n/) .map((line) => line.trimEnd()) .filter((line) => line && !line.startsWith("WARNING: proceeding, even though we could not update PATH:")) .join("\n"); } /** @returns {ThreadStartParams} */ function buildThreadParams(cwd, options = {}) { return { cwd, model: options.model ?? null, approvalPolicy: options.approvalPolicy ?? "never", sandbox: options.sandbox ?? "read-only", serviceName: SERVICE_NAME, ephemeral: options.ephemeral ?? true, experimentalRawEvents: false }; } /** @returns {ThreadResumeParams} */ function buildResumeParams(threadId, cwd, options = {}) { return { threadId, cwd, model: options.model ?? null, approvalPolicy: options.approvalPolicy ?? "never", sandbox: options.sandbox ?? "read-only" }; } /** @returns {UserInput[]} */ function buildTurnInput(prompt) { return [{ type: "text", text: prompt, text_elements: [] }]; } function shorten(text, limit = 72) { const normalized = String(text ?? "").trim().replace(/\s+/g, " "); if (!normalized) { return ""; } if (normalized.length <= limit) { return normalized; } return `${normalized.slice(0, limit - 3)}...`; } function looksLikeVerificationCommand(command) { return /\b(test|tests|lint|build|typecheck|type-check|check|verify|validate|pytest|jest|vitest|cargo test|npm test|pnpm test|yarn test|go test|mvn test|gradle test|tsc|eslint|ruff)\b/i.test( command ); } function buildTaskThreadName(prompt) { const excerpt = shorten(prompt, 56); return excerpt ? `${TASK_THREAD_PREFIX}: ${excerpt}` : TASK_THREAD_PREFIX; } function extractThreadId(message) { return message?.params?.threadId ?? null; } function extractTurnId(message) { if (message?.params?.turnId) { return message.params.turnId; } if (message?.params?.turn?.id) { return message.params.turn.id; } return null; } function collectTouchedFiles(fileChanges) { const paths = new Set(); for (const fileChange of fileChanges) { for (const change of fileChange.changes ?? []) { if (change.path) { paths.add(change.path); } } } return [...paths]; } function normalizeReasoningText(text) { return String(text ?? "").replace(/\s+/g, " ").trim(); } function extractReasoningSections(value) { if (!value) { return []; } if (typeof value === "string") { const normalized = normalizeReasoningText(value); return normalized ? [normalized] : []; } if (Array.isArray(value)) { return value.flatMap((entry) => extractReasoningSections(entry)); } if (typeof value === "object") { if (typeof value.text === "string") { return extractReasoningSections(value.text); } if ("summary" in value) { return extractReasoningSections(value.summary); } if ("content" in value) { return extractReasoningSections(value.content); } if ("parts" in value) { return extractReasoningSections(value.parts); } } return []; } function mergeReasoningSections(existingSections, nextSections) { const merged = []; for (const section of [...existingSections, ...nextSections]) { const normalized = normalizeReasoningText(section); if (!normalized || merged.includes(normalized)) { continue; } merged.push(normalized); } return merged; } /** * @param {ProgressReporter | null | undefined} onProgress * @param {string | null | undefined} message * @param {string | null | undefined} [phase] */ function emitProgress(onProgress, message, phase = null, extra = {}) { if (!onProgress || !message) { return; } if (!phase && Object.keys(extra).length === 0) { onProgress(message); return; } onProgress({ message, phase, ...extra }); } function emitLogEvent(onProgress, options = {}) { if (!onProgress) { return; } onProgress({ message: options.message ?? "", phase: options.phase ?? null, stderrMessage: options.stderrMessage ?? null, logTitle: options.logTitle ?? null, logBody: options.logBody ?? null }); } function labelForThread(state, threadId) { if (!threadId || threadId === state.rootThreadId || threadId === state.threadId) { return null; } return state.threadLabels.get(threadId) ?? threadId; } function registerThread(state, threadId, options = {}) { if (!threadId) { return; } state.threadIds.add(threadId); const label = options.threadName ?? options.name ?? options.agentNickname ?? options.agentRole ?? state.threadLabels.get(threadId) ?? null; if (label) { state.threadLabels.set(threadId, label); } } function describeStartedItem(state, item) { switch (item.type) { case "enteredReviewMode": return { message: `Reviewer started: ${item.review}`, phase: "reviewing" }; case "commandExecution": return { message: `Running command: ${shorten(item.command, 96)}`, phase: looksLikeVerificationCommand(item.command) ? "verifying" : "running" }; case "fileChange": return { message: `Applying ${item.changes.length} file change(s).`, phase: "editing" }; case "mcpToolCall": return { message: `Calling ${item.server}/${item.tool}.`, phase: "investigating" }; case "dynamicToolCall": return { message: `Running tool: ${item.tool}.`, phase: "investigating" }; case "collabAgentToolCall": { const subagents = (item.receiverThreadIds ?? []).map((threadId) => labelForThread(state, threadId) ?? threadId); const summary = subagents.length > 0 ? `Starting subagent ${subagents.join(", ")} via collaboration tool: ${item.tool}.` : `Starting collaboration tool: ${item.tool}.`; return { message: summary, phase: "investigating" }; } case "webSearch": return { message: `Searching: ${shorten(item.query, 96)}`, phase: "investigating" }; default: return null; } } function describeCompletedItem(state, item) { switch (item.type) { case "commandExecution": { const exitCode = item.exitCode ?? "?"; const statusLabel = item.status === "completed" ? "completed" : item.status; return { message: `Command ${statusLabel}: ${shorten(item.command, 96)} (exit ${exitCode})`, phase: looksLikeVerificationCommand(item.command) ? "verifying" : "running" }; } case "fileChange": return { message: `File changes ${item.status}.`, phase: "editing" }; case "mcpToolCall": return { message: `Tool ${item.server}/${item.tool} ${item.status}.`, phase: "investigating" }; case "dynamicToolCall": return { message: `Tool ${item.tool} ${item.status}.`, phase: "investigating" }; case "collabAgentToolCall": { const subagents = (item.receiverThreadIds ?? []).map((threadId) => labelForThread(state, threadId) ?? threadId); const summary = subagents.length > 0 ? `Subagent ${subagents.join(", ")} ${item.status}.` : `Collaboration tool ${item.tool} ${item.status}.`; return { message: summary, phase: "investigating" }; } case "exitedReviewMode": return { message: "Reviewer finished.", phase: "finalizing" }; default: return null; } } /** @returns {TurnCaptureState} */ function createTurnCaptureState(threadId, options = {}) { let resolveCompletion; let rejectCompletion; const completion = new Promise((resolve, reject) => { resolveCompletion = resolve; rejectCompletion = reject; }); return { threadId, rootThreadId: threadId, threadIds: new Set([threadId]), threadTurnIds: new Map(), threadLabels: new Map(), turnId: null, bufferedNotifications: [], completion, resolveCompletion, rejectCompletion, finalTurn: null, completed: false, finalAnswerSeen: false, pendingCollaborations: new Set(), activeSubagentTurns: new Set(), completionTimer: null, lastAgentMessage: "", reviewText: "", reasoningSummary: [], error: null, messages: [], fileChanges: [], commandExecutions: [], onProgress: options.onProgress ?? null }; } function clearCompletionTimer(state) { if (state.completionTimer) { clearTimeout(state.completionTimer); state.completionTimer = null; } } function completeTurn(state, turn = null, options = {}) { if (state.completed) { return; } clearCompletionTimer(state); state.completed = true; if (turn) { state.finalTurn = turn; if (!state.turnId) { state.turnId = turn.id; } } else if (!state.finalTurn) { state.finalTurn = { id: state.turnId ?? "inferred-turn", status: "completed" }; } if (options.inferred) { emitProgress(state.onProgress, "Turn completion inferred after the main thread finished and subagent work drained.", "finalizing"); } state.resolveCompletion(state); } function scheduleInferredCompletion(state) { if (state.completed || state.finalTurn || !state.finalAnswerSeen) { return; } if (state.pendingCollaborations.size > 0 || state.activeSubagentTurns.size > 0) { return; } clearCompletionTimer(state); state.completionTimer = setTimeout(() => { state.completionTimer = null; if (state.completed || state.finalTurn || !state.finalAnswerSeen) { return; } if (state.pendingCollaborations.size > 0 || state.activeSubagentTurns.size > 0) { return; } completeTurn(state, null, { inferred: true }); }, 250); state.completionTimer.unref?.(); } function belongsToTurn(state, message) { const messageThreadId = extractThreadId(message); if (!messageThreadId || !state.threadIds.has(messageThreadId)) { return false; } const trackedTurnId = state.threadTurnIds.get(messageThreadId) ?? null; const messageTurnId = extractTurnId(message); return trackedTurnId === null || messageTurnId === null || messageTurnId === trackedTurnId; } function recordItem(state, item, lifecycle, threadId = null) { if (item.type === "collabAgentToolCall") { if (!threadId || threadId === state.threadId) { if (lifecycle === "started" || item.status === "inProgress") { state.pendingCollaborations.add(item.id); } else if (lifecycle === "completed") { state.pendingCollaborations.delete(item.id); scheduleInferredCompletion(state); } } for (const receiverThreadId of item.receiverThreadIds ?? []) { registerThread(state, receiverThreadId); } } if (item.type === "agentMessage") { state.messages.push({ lifecycle, phase: item.phase ?? null, text: item.text ?? "" }); if (item.text) { if (!threadId || threadId === state.threadId) { state.lastAgentMessage = item.text; if (lifecycle === "completed" && item.phase === "final_answer") { state.finalAnswerSeen = true; scheduleInferredCompletion(state); } } if (lifecycle === "completed") { const sourceLabel = labelForThread(state, threadId); emitLogEvent(state.onProgress, { message: sourceLabel ? `Subagent ${sourceLabel}: ${shorten(item.text, 96)}` : `Assistant message captured: ${shorten(item.text, 96)}`, stderrMessage: null, phase: item.phase === "final_answer" ? "finalizing" : null, logTitle: sourceLabel ? `Subagent ${sourceLabel} message` : "Assistant message", logBody: item.text }); } } return; } if (item.type === "exitedReviewMode") { state.reviewText = item.review ?? ""; if (lifecycle === "completed" && item.review) { emitLogEvent(state.onProgress, { message: "Review output captured.", stderrMessage: null, phase: "finalizing", logTitle: "Review output", logBody: item.review }); } return; } if (item.type === "reasoning" && lifecycle === "completed") { const nextSections = extractReasoningSections(item.summary); state.reasoningSummary = mergeReasoningSections(state.reasoningSummary, nextSections); if (nextSections.length > 0) { const sourceLabel = labelForThread(state, threadId); emitLogEvent(state.onProgress, { message: sourceLabel ? `Subagent ${sourceLabel} reasoning: ${shorten(nextSections[0], 96)}` : `Reasoning summary captured: ${shorten(nextSections[0], 96)}`, stderrMessage: null, logTitle: sourceLabel ? `Subagent ${sourceLabel} reasoning summary` : "Reasoning summary", logBody: nextSections.map((section) => `- ${section}`).join("\n") }); } return; } if (item.type === "fileChange" && lifecycle === "completed") { state.fileChanges.push(item); return; } if (item.type === "commandExecution" && lifecycle === "completed") { state.commandExecutions.push(item); } } function applyTurnNotification(state, message) { switch (message.method) { case "thread/started": registerThread(state, message.params.thread.id, { threadName: message.params.thread.name, name: message.params.thread.name, agentNickname: message.params.thread.agentNickname, agentRole: message.params.thread.agentRole }); break; case "thread/name/updated": registerThread(state, message.params.threadId, { threadName: message.params.threadName ?? null }); break; case "turn/started": registerThread(state, message.params.threadId); state.threadTurnIds.set(message.params.threadId, message.params.turn.id); if ((message.params.threadId ?? null) !== state.threadId) { state.activeSubagentTurns.add(message.params.threadId); } emitProgress( state.onProgress, `Turn started (${message.params.turn.id}).`, "starting", (message.params.threadId ?? null) === state.threadId ? { threadId: message.params.threadId ?? null, turnId: message.params.turn.id ?? null } : {} ); break; case "item/started": recordItem(state, message.params.item, "started", message.params.threadId ?? null); { const update = describeStartedItem(state, message.params.item); emitProgress(state.onProgress, update?.message, update?.phase ?? null); } break; case "item/completed": recordItem(state, message.params.item, "completed", message.params.threadId ?? null); { const update = describeCompletedItem(state, message.params.item); emitProgress(state.onProgress, update?.message, update?.phase ?? null); } break; case "error": state.error = message.params.error; emitProgress(state.onProgress, `Codex error: ${message.params.error.message}`, "failed"); break; case "turn/completed": if ((message.params.threadId ?? null) !== state.threadId) { state.activeSubagentTurns.delete(message.params.threadId); scheduleInferredCompletion(state); break; } emitProgress( state.onProgress, `Turn ${message.params.turn.status === "completed" ? "completed" : message.params.turn.status}.`, "finalizing" ); completeTurn(state, message.params.turn); break; default: break; } } async function captureTurn(client, threadId, startRequest, options = {}) { const state = createTurnCaptureState(threadId, options); const previousHandler = client.notificationHandler; client.setNotificationHandler((message) => { if (!state.turnId) { state.bufferedNotifications.push(message); return; } if (message.method === "thread/started" || message.method === "thread/name/updated") { applyTurnNotification(state, message); return; } if (!belongsToTurn(state, message)) { if (previousHandler) { previousHandler(message); } return; } applyTurnNotification(state, message); }); try { const response = await startRequest(); options.onResponse?.(response, state); state.turnId = response.turn?.id ?? null; if (state.turnId) { state.threadTurnIds.set(state.threadId, state.turnId); } for (const message of state.bufferedNotifications) { if (belongsToTurn(state, message)) { applyTurnNotification(state, message); } else { if (previousHandler) { previousHandler(message); } } } state.bufferedNotifications.length = 0; if (response.turn?.status && response.turn.status !== "inProgress") { completeTurn(state, response.turn); } return await state.completion; } finally { clearCompletionTimer(state); client.setNotificationHandler(previousHandler ?? null); } } async function withAppServer(cwd, fn) { let client = null; try { client = await CodexAppServerClient.connect(cwd); const result = await fn(client); await client.close(); return result; } catch (error) { const brokerRequested = client?.transport === "broker" || Boolean(process.env[BROKER_ENDPOINT_ENV]); const shouldRetryDirect = (client?.transport === "broker" && error?.rpcCode === BROKER_BUSY_RPC_CODE) || (brokerRequested && (error?.code === "ENOENT" || error?.code === "ECONNREFUSED")); if (client) { await client.close().catch(() => {}); client = null; } if (!shouldRetryDirect) { throw error; } const directClient = await CodexAppServerClient.connect(cwd, { disableBroker: true }); try { return await fn(directClient); } finally { await directClient.close(); } } } async function startThread(client, cwd, options = {}) { const response = await client.request("thread/start", buildThreadParams(cwd, options)); const threadId = response.thread.id; if (options.threadName) { await client.request("thread/name/set", { threadId, name: options.threadName }); } return response; } async function resumeThread(client, threadId, cwd, options = {}) { return client.request("thread/resume", buildResumeParams(threadId, cwd, options)); } function buildResultStatus(turnState) { return turnState.finalTurn?.status === "completed" ? 0 : 1; } export function getCodexAvailability(cwd) { const versionStatus = binaryAvailable("codex", ["--version"], { cwd }); if (!versionStatus.available) { return versionStatus; } const appServerStatus = binaryAvailable("codex", ["app-server", "--help"], { cwd }); if (!appServerStatus.available) { return { available: false, detail: `${versionStatus.detail}; advanced runtime unavailable: ${appServerStatus.detail}` }; } return { available: true, detail: `${versionStatus.detail}; advanced runtime available` }; } export function getSessionRuntimeStatus(env = process.env, cwd = process.cwd()) { const endpoint = env?.[BROKER_ENDPOINT_ENV] ?? loadBrokerSession(cwd)?.endpoint ?? null; if (endpoint) { return { mode: "shared", label: "shared session", detail: "This Claude session is configured to reuse one shared Codex runtime.", endpoint }; } return { mode: "direct", label: "direct startup", detail: "No shared Codex runtime is active yet. The first review or task command will start one on demand.", endpoint: null }; } export function getCodexLoginStatus(cwd) { const availability = getCodexAvailability(cwd); if (!availability.available) { return { available: false, loggedIn: false, detail: availability.detail }; } const result = runCommand("codex", ["login", "status"], { cwd }); if (result.error) { return { available: true, loggedIn: false, detail: result.error.message }; } if (result.status === 0) { return { available: true, loggedIn: true, detail: result.stdout.trim() || "authenticated" }; } return { available: true, loggedIn: false, detail: result.stderr.trim() || result.stdout.trim() || "not authenticated" }; } export async function interruptAppServerTurn(cwd, { threadId, turnId }) { if (!threadId || !turnId) { return { attempted: false, interrupted: false, transport: null, detail: "missing threadId or turnId" }; } const availability = getCodexAvailability(cwd); if (!availability.available) { return { attempted: false, interrupted: false, transport: null, detail: availability.detail }; } const brokerEndpoint = process.env[BROKER_ENDPOINT_ENV] ?? loadBrokerSession(cwd)?.endpoint ?? null; let client = null; try { client = brokerEndpoint ? await CodexAppServerClient.connect(cwd, { brokerEndpoint }) : await CodexAppServerClient.connect(cwd, { disableBroker: true }); await client.request("turn/interrupt", { threadId, turnId }); return { attempted: true, interrupted: true, transport: client.transport, detail: `Interrupted ${turnId} on ${threadId}.` }; } catch (error) { return { attempted: true, interrupted: false, transport: client?.transport ?? null, detail: error instanceof Error ? error.message : String(error) }; } finally { await client?.close().catch(() => {}); } } export async function runAppServerReview(cwd, options = {}) { const availability = getCodexAvailability(cwd); if (!availability.available) { throw new Error("Codex CLI is not installed or is missing required runtime support. Install it with `npm install -g @openai/codex`, then rerun `/codex:setup`."); } return withAppServer(cwd, async (client) => { emitProgress(options.onProgress, "Starting Codex review thread.", "starting"); const thread = await startThread(client, cwd, { model: options.model, sandbox: "read-only", ephemeral: true, threadName: options.threadName }); const sourceThreadId = thread.thread.id; emitProgress(options.onProgress, `Thread ready (${sourceThreadId}).`, "starting", { threadId: sourceThreadId }); const delivery = options.delivery ?? "inline"; const turnState = await captureTurn( client, sourceThreadId, () => client.request("review/start", { threadId: sourceThreadId, delivery, target: options.target }), { onProgress: options.onProgress, onResponse(response, state) { if (response.reviewThreadId) { state.threadIds.add(response.reviewThreadId); if (delivery === "detached") { state.threadId = response.reviewThreadId; } } } } ); return { status: buildResultStatus(turnState), threadId: turnState.threadId, sourceThreadId, turnId: turnState.turnId, reviewText: turnState.reviewText, reasoningSummary: turnState.reasoningSummary, turn: turnState.finalTurn, error: turnState.error, stderr: cleanCodexStderr(client.stderr) }; }); } export async function runAppServerTurn(cwd, options = {}) { const availability = getCodexAvailability(cwd); if (!availability.available) { throw new Error("Codex CLI is not installed or is missing required runtime support. Install it with `npm install -g @openai/codex`, then rerun `/codex:setup`."); } return withAppServer(cwd, async (client) => { let threadId; if (options.resumeThreadId) { emitProgress(options.onProgress, `Resuming thread ${options.resumeThreadId}.`, "starting"); const response = await resumeThread(client, options.resumeThreadId, cwd, { model: options.model, sandbox: options.sandbox, ephemeral: false }); threadId = response.thread.id; } else { emitProgress(options.onProgress, "Starting Codex task thread.", "starting"); const response = await startThread(client, cwd, { model: options.model, sandbox: options.sandbox, ephemeral: options.persistThread ? false : true, threadName: options.persistThread ? options.threadName : options.threadName ?? null }); threadId = response.thread.id; } emitProgress(options.onProgress, `Thread ready (${threadId}).`, "starting", { threadId }); const prompt = options.prompt?.trim() || options.defaultPrompt || ""; if (!prompt) { throw new Error("A prompt is required for this Codex run."); } const turnState = await captureTurn( client, threadId, () => client.request("turn/start", { threadId, input: buildTurnInput(prompt), model: options.model ?? null, effort: options.effort ?? null, outputSchema: options.outputSchema ?? null }), { onProgress: options.onProgress } ); return { status: buildResultStatus(turnState), threadId, turnId: turnState.turnId, finalMessage: turnState.lastAgentMessage, reasoningSummary: turnState.reasoningSummary, turn: turnState.finalTurn, error: turnState.error, stderr: cleanCodexStderr(client.stderr), fileChanges: turnState.fileChanges, touchedFiles: collectTouchedFiles(turnState.fileChanges), commandExecutions: turnState.commandExecutions }; }); } export async function findLatestTaskThread(cwd) { const availability = getCodexAvailability(cwd); if (!availability.available) { throw new Error("Codex CLI is not installed or is missing required runtime support. Install it with `npm install -g @openai/codex`, then rerun `/codex:setup`."); } return withAppServer(cwd, async (client) => { const response = await client.request("thread/list", { cwd, limit: 20, sortKey: "updated_at", sourceKinds: ["appServer"], searchTerm: TASK_THREAD_PREFIX }); return ( response.data.find((thread) => typeof thread.name === "string" && thread.name.startsWith(TASK_THREAD_PREFIX)) ?? null ); }); } export function buildPersistentTaskThreadName(prompt) { return buildTaskThreadName(prompt); } export function parseStructuredOutput(rawOutput, fallback = {}) { if (!rawOutput) { return { parsed: null, parseError: fallback.failureMessage ?? "Codex did not return a final structured message.", rawOutput: rawOutput ?? "", ...fallback }; } try { return { parsed: JSON.parse(rawOutput), parseError: null, rawOutput, ...fallback }; } catch (error) { return { parsed: null, parseError: error.message, rawOutput, ...fallback }; } } export function readOutputSchema(schemaPath) { return readJsonFile(schemaPath); } export { DEFAULT_CONTINUE_PROMPT, TASK_THREAD_PREFIX };