From 7f8412ec3e8b964ae3794e5c38e67dbe100c4cc7 Mon Sep 17 00:00:00 2001 From: Aiden Cline <63023139+rekram1-node@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:23:00 -0500 Subject: [PATCH] fix(openai): preserve websocket idle state (#30586) --- packages/opencode/src/plugin/openai/codex.ts | 4 + .../opencode/src/plugin/openai/ws-pool.ts | 13 ++- .../opencode/test/plugin/openai-ws.test.ts | 101 +++++++++++++++++- 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/packages/opencode/src/plugin/openai/codex.ts b/packages/opencode/src/plugin/openai/codex.ts index efa1871eb..a577e2a82 100644 --- a/packages/opencode/src/plugin/openai/codex.ts +++ b/packages/opencode/src/plugin/openai/codex.ts @@ -362,6 +362,10 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug for (const websocketFetch of websocketFetches) websocketFetch.close() websocketFetches.length = 0 }, + async event(input) { + if (input.event.type !== "session.deleted") return + for (const websocketFetch of websocketFetches) websocketFetch.remove(input.event.properties.info.id) + }, provider: { id: "openai", async models(provider, ctx) { diff --git a/packages/opencode/src/plugin/openai/ws-pool.ts b/packages/opencode/src/plugin/openai/ws-pool.ts index abb935d58..6ece4fc74 100644 --- a/packages/opencode/src/plugin/openai/ws-pool.ts +++ b/packages/opencode/src/plugin/openai/ws-pool.ts @@ -121,6 +121,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { onConnectionInvalid: (error) => { log.warn("websocket invalidated", { key, error: error.message }) entry.busy = false + entry.lastUsedAt = Date.now() if (!entry.fallback) recordStreamFailure(entry) invalidate(entry) resolveFirstEvent(false) @@ -186,6 +187,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { const now = Date.now() for (const [key, entry] of pool) { if (entry.busy) continue + if (entry.fallback) continue if (now - entry.lastUsedAt < idleTimeout) continue log.debug("websocket idle prune", { key }) invalidate(entry) @@ -200,7 +202,16 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { pool.clear() } - return Object.assign(websocketFetch, { close }) + function remove(sessionID: string) { + const key = `${sessionID}:conversation` + const entry = pool.get(key) + if (!entry) return + log.debug("websocket pool remove", { key }) + invalidate(entry) + pool.delete(key) + } + + return Object.assign(websocketFetch, { close, remove }) } function connectionLimitError(event: Record) { diff --git a/packages/opencode/test/plugin/openai-ws.test.ts b/packages/opencode/test/plugin/openai-ws.test.ts index 1fadf97cc..cdcded9da 100644 --- a/packages/opencode/test/plugin/openai-ws.test.ts +++ b/packages/opencode/test/plugin/openai-ws.test.ts @@ -211,7 +211,7 @@ describe("plugin.openai.ws-pool", () => { fetch.close() }) - test("prunes HTTP fallback after its idle timeout", async () => { + test("keeps HTTP fallback active after its idle timeout", async () => { let websocketAttempts = 0 await using server = await createRejectingWebSocketServer(() => websocketAttempts++) const fetch = OpenAIWebSocketPool.createWebSocketFetch({ @@ -226,12 +226,86 @@ describe("plugin.openai.ws-pool", () => { await new Promise((resolve) => setTimeout(resolve, 50)) const second = await fetch(server.url, streamRequest()) + expect(await second.text()).toBe("http") + expect(websocketAttempts).toBe(1) + expect(server.httpRequests).toHaveLength(2) + fetch.close() + }) + + test("removes HTTP fallback when its session is deleted", async () => { + let websocketAttempts = 0 + await using server = await createRejectingWebSocketServer(() => websocketAttempts++) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + connectTimeout: 100, + streamRetries: 0, + }) + + const first = await fetch(server.url, streamRequest()) + expect(await first.text()).toBe("http") + fetch.remove("session-1") + const second = await fetch(server.url, streamRequest()) + expect(await second.text()).toBe("http") expect(websocketAttempts).toBe(2) expect(server.httpRequests).toHaveLength(2) fetch.close() }) + test("terminates active websocket connections when their session is deleted", async () => { + let connections = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.once("message", () => { + if (connections === 1) { + socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" })) + return + } + socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_after_remove" } })) + }) + }) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + }) + + const first = await fetch(server.url, streamRequest()) + const firstText = first.text() + fetch.remove("session-1") + expect((await readTextError(firstText)).message).toContain("WebSocket closed before response.completed") + + const second = await fetch(server.url, streamRequest()) + + expect(await second.text()).toContain("data: [DONE]") + expect(connections).toBe(2) + fetch.close() + }) + + test("prunes idle websocket connections after completed responses", async () => { + let connections = 0 + let closed = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.once("close", () => closed++) + socket.once("message", () => { + socket.send(JSON.stringify({ type: "response.completed", response: { id: `resp_${connections}` } })) + }) + }) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + idleTimeout: 20, + }) + + const first = await fetch(server.url, streamRequest()) + expect(await first.text()).toContain("data: [DONE]") + await waitFor(() => closed === 1, "idle websocket was not pruned") + + const second = await fetch(server.url, streamRequest()) + + expect(await second.text()).toContain("data: [DONE]") + expect(connections).toBe(2) + fetch.close() + }) + test("invalidates but does not reuse a socket after terminal failure frames", async () => { let connections = 0 await using server = await createWebSocketServer((socket) => { @@ -459,6 +533,31 @@ describe("plugin.openai.ws-pool", () => { fetch.close() }) + test("keeps websocket retry state until the failed stream becomes idle", async () => { + let connections = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.once("message", () => {}) + }) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + idleTimeout: 500, + streamRetries: 1, + }) + + await new Promise((resolve) => setTimeout(resolve, 250)) + const first = await fetch(server.url, streamRequest()) + expect((await readTextError(first.text())).message).toContain("idle timeout waiting for websocket") + await new Promise((resolve) => setTimeout(resolve, 300)) + + const second = await fetch(server.url, streamRequest()) + + expect(await second.text()).toBe("http") + expect(connections).toBe(2) + expect(server.httpRequests).toHaveLength(1) + fetch.close() + }) + test("retries failed websocket streams before using HTTP fallback", async () => { await using server = await createWebSocketServer((socket) => { socket.once("message", () => {