diff --git a/packages/opencode/src/plugin/openai/README.md b/packages/opencode/src/plugin/openai/README.md index c359f8c60..2286b3509 100644 --- a/packages/opencode/src/plugin/openai/README.md +++ b/packages/opencode/src/plugin/openai/README.md @@ -20,9 +20,9 @@ Enabled by default on `local`, `dev`, and `beta`. On `latest` and `prod`, set `O ## Retries -- If WebSocket setup fails or it fails before its first event, replay over HTTP and keep that session on HTTP until idle-pruned. -- If the server returns `websocket_connection_limit_reached` before output, reconnect up to 5 times, then follow the same HTTP fallback. -- If a WebSocket fails after its first event, fail the stream. Do not replay partial output. +- Retry WebSocket stream/setup failures up to 5 times, then use HTTP for that session until the pool entry is idle-pruned. +- `websocket_connection_limit_reached` consumes the same retry budget and HTTP fallback. +- If a WebSocket fails after its first event, fail it as retryable rather than replaying partial output in transport. - Abort or cancel closes the socket. ## Next Steps diff --git a/packages/opencode/src/plugin/openai/ws-pool.ts b/packages/opencode/src/plugin/openai/ws-pool.ts index 0af4c57c9..331657097 100644 --- a/packages/opencode/src/plugin/openai/ws-pool.ts +++ b/packages/opencode/src/plugin/openai/ws-pool.ts @@ -1,5 +1,6 @@ import WebSocket from "ws" import * as Log from "@opencode-ai/core/util/log" +import { ProviderError } from "@/provider/error" import { isRecord } from "@/util/record" import { OpenAIWebSocket } from "./ws" @@ -13,7 +14,7 @@ export interface CreateWebSocketFetchOptions { connectTimeout?: number idleTimeout?: number maxConnectionAge?: number - connectionLimitRetries?: number + streamRetries?: number } interface PoolEntry { @@ -22,6 +23,7 @@ interface PoolEntry { lastUsedAt: number busy: boolean fallback: boolean + streamFailures: number } const DEFAULT_CONNECT_TIMEOUT = 15_000 @@ -35,7 +37,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { const connectTimeout = options?.connectTimeout ?? DEFAULT_CONNECT_TIMEOUT const idleTimeout = options?.idleTimeout ?? DEFAULT_IDLE_TIMEOUT const maxConnectionAge = options?.maxConnectionAge ?? DEFAULT_MAX_CONNECTION_AGE - const connectionLimitRetries = options?.connectionLimitRetries ?? 5 + const streamRetries = options?.streamRetries ?? 5 const pruneTimer = setInterval(() => prune(), Math.min(idleTimeout, 60_000)) if (typeof pruneTimer === "object" && "unref" in pruneTimer && typeof pruneTimer.unref === "function") { pruneTimer.unref() @@ -72,7 +74,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { } const key = `${sessionID}:conversation` - const entry = pool.get(key) ?? { lastUsedAt: Date.now(), busy: false, fallback: false } + const entry = pool.get(key) ?? { lastUsedAt: Date.now(), busy: false, fallback: false, streamFailures: 0 } pool.set(key, entry) if (entry.fallback) { @@ -87,7 +89,6 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { entry.busy = true entry.lastUsedAt = Date.now() try { - let connectionLimitAttempts = 0 entry.socket = await socket( entry, options?.url ?? url, @@ -111,15 +112,16 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { onTerminal: (event) => { entry.busy = false entry.lastUsedAt = Date.now() + entry.streamFailures = 0 if (event.type !== "response.completed" && event.type !== "response.done") { log.warn("websocket terminal failure", { key, type: event.type }) invalidate(entry) } }, onConnectionInvalid: (error) => { - log.warn("websocket invalidated", { key, error: error instanceof Error ? error.message : String(error) }) + log.warn("websocket invalidated", { key, error: error.message }) entry.busy = false - entry.fallback = true + if (!entry.fallback) recordStreamFailure(entry) invalidate(entry) resolveFirstEvent(false) }, @@ -127,51 +129,52 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { log.debug("websocket aborted", { key }) entry.busy = false entry.lastUsedAt = Date.now() + entry.streamFailures = 0 invalidate(entry) rejectFirstEvent(error) }, onRetryableTerminal: async (event) => { const error = connectionLimitError(event) if (!error) return undefined - if (connectionLimitAttempts >= connectionLimitRetries) throw error - - connectionLimitAttempts++ - log.warn("websocket connection limit reached", { key, attempt: connectionLimitAttempts }) - invalidate(entry) - entry.socket = await socket( - entry, - options?.url ?? url, - OpenAIWebSocket.normalizeHeaders(httpInit?.headers), - connectTimeout, - maxConnectionAge, - init?.signal, - ) - entry.lastUsedAt = Date.now() - return entry.socket + log.warn("websocket connection limit reached", { key }) + throw error }, }) if (await firstEvent) return response - log.debug("http fallback", { key, reason: "websocket_failed_before_first_event" }) + if (!entry.fallback) return response + log.debug("http fallback", { key, reason: "websocket_retries_exhausted" }) return httpFetch(input, httpInit) } catch (error) { entry.busy = false entry.lastUsedAt = Date.now() if (OpenAIWebSocket.isAbortError(error)) { + entry.streamFailures = 0 invalidate(entry) throw error } - entry.fallback = true + recordStreamFailure(entry) log.warn("websocket setup failed", { key, error: error instanceof Error ? error.message : String(error), - fallback: "http", + fallback: entry.fallback ? "http" : undefined, }) invalidate(entry) - return httpFetch(input, httpInit) + if (entry.fallback) return httpFetch(input, httpInit) + return failedResponse( + new ProviderError.ResponseStreamError(error instanceof Error ? error.message : String(error), { + cause: error, + }), + ) } } + function recordStreamFailure(entry: PoolEntry) { + entry.streamFailures++ + // Codex counts retries after the initial failed WebSocket attempt. + if (entry.streamFailures > streamRetries) entry.fallback = true + } + function prune() { const now = Date.now() for (const [key, entry] of pool) { @@ -198,6 +201,20 @@ function connectionLimitError(event: Record) { return new Error(typeof event.error.message === "string" ? event.error.message : CONNECTION_LIMIT_REACHED_CODE) } +function failedResponse(error: ProviderError.ResponseStreamError) { + return new Response( + new ReadableStream({ + start(controller) { + controller.error(error) + }, + }), + { + status: 200, + headers: { "content-type": "text/event-stream" }, + }, + ) +} + async function socket( entry: PoolEntry, url: string, diff --git a/packages/opencode/src/plugin/openai/ws.ts b/packages/opencode/src/plugin/openai/ws.ts index 7176ab210..cad676960 100644 --- a/packages/opencode/src/plugin/openai/ws.ts +++ b/packages/opencode/src/plugin/openai/ws.ts @@ -2,6 +2,7 @@ // fallback, and continuation state intentionally live above this file. import WebSocket from "ws" +import { ProviderError } from "@/provider/error" export const PROTOCOL_HEADER = "responses_websockets=2026-02-06" @@ -21,7 +22,7 @@ export interface StreamResponsesWebSocketOptions { onComplete?: (event: Record) => void onTerminal?: (event: Record) => void onRetryableTerminal?: (event: Record) => Promise - onConnectionInvalid?: (error: Error) => void + onConnectionInvalid?: (error: ProviderError.ResponseStreamError) => void onAbort?: (error: Error) => void } @@ -101,7 +102,7 @@ export function connectResponsesWebSocket(options: ConnectResponsesWebSocketOpti function onClose(code: number, reason: Buffer) { cleanup() - reject(closeError("WebSocket closed before open", code, reason)) + reject(new Error(closeMessage("WebSocket closed before open", code, reason))) } function onAbort() { @@ -145,7 +146,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption controller?.close() } - function invalidate(error: Error) { + function invalidate(error: ProviderError.ResponseStreamError) { if (completed) return completed = true cleanup() @@ -157,7 +158,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption if (completed) return if (!options.idleTimeout) return if (idleTimer) clearTimeout(idleTimer) - idleTimer = setTimeout(() => invalidate(new Error(message)), options.idleTimeout) + idleTimer = setTimeout(() => invalidate(new ProviderError.ResponseStreamError(message)), options.idleTimeout) if (typeof idleTimer === "object" && "unref" in idleTimer && typeof idleTimer.unref === "function") { idleTimer.unref() } @@ -166,7 +167,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption async function onMessage(data: WebSocket.RawData, isBinary: boolean) { if (completed) return if (isBinary) { - invalidate(new Error("Unexpected binary WebSocket frame")) + invalidate(new ProviderError.ResponseStreamError("Unexpected binary WebSocket frame")) return } @@ -195,7 +196,11 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption return } } catch (error) { - invalidate(error instanceof Error ? error : new Error(String(error))) + invalidate( + new ProviderError.ResponseStreamError(error instanceof Error ? error.message : String(error), { + cause: error, + }), + ) return } } @@ -230,12 +235,14 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption } function onError(error: Error) { - invalidate(error) + invalidate(new ProviderError.ResponseStreamError(error.message, { cause: error })) } function onClose(code: number, reason: Buffer) { if (completed) return - invalidate(closeError("WebSocket closed before response.completed", code, reason)) + invalidate( + new ProviderError.ResponseStreamError(closeMessage("WebSocket closed before response.completed", code, reason)), + ) } function onAbort() { @@ -272,7 +279,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption socket.send(JSON.stringify({ type: "response.create", ...payload }), (error) => { if (completed) return resetIdleTimeout("idle timeout waiting for websocket") - if (error) invalidate(error) + if (error) invalidate(new ProviderError.ResponseStreamError(error.message, { cause: error })) }) } @@ -312,11 +319,11 @@ function abortError(signal: AbortSignal | undefined) { return new DOMException(reason instanceof Error ? reason.message : "Aborted", "AbortError") } -function closeError(message: string, code: number, reason: Buffer) { +function closeMessage(message: string, code: number, reason: Buffer) { const details = [`code ${code}`] if (code === 1009) details.push("message too big") if (reason.length > 0) details.push(reason.toString()) - return new Error(`${message} (${details.join(": ")})`) + return `${message} (${details.join(": ")})` } export * as OpenAIWebSocket from "./ws" diff --git a/packages/opencode/src/provider/error.ts b/packages/opencode/src/provider/error.ts index 423383917..06b388b48 100644 --- a/packages/opencode/src/provider/error.ts +++ b/packages/opencode/src/provider/error.ts @@ -11,6 +11,14 @@ export class HeaderTimeoutError extends Error { } } +export class ResponseStreamError extends Error { + public override readonly name = "ProviderResponseStreamError" + + constructor(message: string, options?: ErrorOptions) { + super(message, options) + } +} + // Adapted from overflow detection patterns in: // https://github.com/badlogic/pi-mono/blob/main/packages/ai/src/utils/overflow.ts const OVERFLOW_PATTERNS = [ diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 29cc83a1e..68157f5f4 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -1155,6 +1155,17 @@ export function fromError( }, { cause: e }, ).toObject() + case e instanceof ProviderError.ResponseStreamError: + return new APIError( + { + message: e.message, + isRetryable: true, + metadata: { + code: e.name, + }, + }, + { cause: e }, + ).toObject() case APICallError.isInstance(e): const parsed = ProviderError.parseAPICallError({ providerID: ctx.providerID, diff --git a/packages/opencode/test/plugin/openai-ws.test.ts b/packages/opencode/test/plugin/openai-ws.test.ts index 1fe26cfcb..d27774037 100644 --- a/packages/opencode/test/plugin/openai-ws.test.ts +++ b/packages/opencode/test/plugin/openai-ws.test.ts @@ -1,8 +1,9 @@ import { describe, expect, test } from "bun:test" import { EventEmitter } from "node:events" -import type { IncomingMessage } from "node:http" +import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http" import net, { type AddressInfo, type Socket } from "node:net" import WebSocket, { WebSocketServer } from "ws" +import { ProviderError } from "../../src/provider/error" import { OpenAIWebSocket } from "../../src/plugin/openai/ws" import { OpenAIWebSocketPool, TITLE_HEADER } from "../../src/plugin/openai/ws-pool" @@ -85,7 +86,7 @@ describe("plugin.openai.ws", () => { }) test("errors the SSE stream when the server closes before a terminal event", async () => { - const invalid: string[] = [] + const invalid: Error[] = [] await using server = await createWebSocketServer((socket) => { socket.once("message", () => { socket.close(1009, "payload too large") @@ -96,13 +97,14 @@ describe("plugin.openai.ws", () => { const response = OpenAIWebSocket.streamResponsesWebSocket({ socket, body: { stream: true, input: "hi" }, - onConnectionInvalid: (error) => invalid.push(error.message), + onConnectionInvalid: (error) => invalid.push(error), }) await expect(response.text()).rejects.toThrow( "WebSocket closed before response.completed (code 1009: message too big: payload too large)", ) - expect(invalid).toEqual([ + expect(invalid[0]).toBeInstanceOf(ProviderError.ResponseStreamError) + expect(invalid.map((error) => error.message)).toEqual([ "WebSocket closed before response.completed (code 1009: message too big: payload too large)", ]) }) @@ -140,13 +142,12 @@ describe("plugin.openai.ws-pool", () => { }) const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async () => new Response("http")), }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) expect(await first.text()).toContain("data: [DONE]") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) expect(await second.text()).toContain("data: [DONE]") expect(connections).toBe(1) expect(messages).toBe(2) @@ -163,41 +164,59 @@ describe("plugin.openai.ws-pool", () => { }) const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async () => new Response("http")), maxConnectionAge: 0, }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) expect(await first.text()).toContain("data: [DONE]") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) expect(await second.text()).toContain("data: [DONE]") expect(connections).toBe(2) fetch.close() }) - test("falls back to HTTP when websocket setup fails and keeps the fallback sticky", async () => { + test("falls back to HTTP after websocket setup retries are exhausted", async () => { const attempts: string[] = [] await using server = await createRejectingWebSocketServer(() => attempts.push("websocket")) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), connectTimeout: 100, + streamRetries: 1, }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest({ [TITLE_HEADER]: "false" })) - const second = await fetch("https://api.openai.com/v1/responses", streamRequest({ [TITLE_HEADER]: "false" })) + const first = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" })) + await expect(first.text()).rejects.toBeInstanceOf(ProviderError.ResponseStreamError) + const second = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" })) + const third = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" })) - expect(await first.text()).toBe("http") expect(await second.text()).toBe("http") - expect(attempts).toEqual(["websocket"]) - expect(httpRequests).toHaveLength(2) - expect(httpRequests[0]?.get(TITLE_HEADER)).toBeNull() - expect(httpRequests[1]?.get(TITLE_HEADER)).toBeNull() + expect(await third.text()).toBe("http") + expect(attempts).toEqual(["websocket", "websocket"]) + expect(server.httpRequests).toHaveLength(2) + expect(server.httpRequests[0]?.headers[TITLE_HEADER]).toBeUndefined() + expect(server.httpRequests[1]?.headers[TITLE_HEADER]).toBeUndefined() + fetch.close() + }) + + test("prunes HTTP fallback after its idle timeout", async () => { + let websocketAttempts = 0 + await using server = await createRejectingWebSocketServer(() => websocketAttempts++) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + connectTimeout: 100, + idleTimeout: 20, + streamRetries: 0, + }) + + const first = await fetch(server.url, streamRequest()) + expect(await first.text()).toBe("http") + await new Promise((resolve) => setTimeout(resolve, 50)) + const second = await fetch(server.url, streamRequest()) + + expect(await second.text()).toBe("http") + expect(websocketAttempts).toBe(2) + expect(server.httpRequests).toHaveLength(2) fetch.close() }) @@ -209,26 +228,21 @@ describe("plugin.openai.ws-pool", () => { socket.send(JSON.stringify({ type: connections === 1 ? "response.failed" : "response.completed" })) }) }) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) expect(await first.text()).toContain('data: {"type":"response.failed"}') - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) expect(await second.text()).toContain('data: {"type":"response.completed"}') expect(connections).toBe(2) - expect(httpRequests).toHaveLength(0) + expect(server.httpRequests).toHaveLength(0) fetch.close() }) - test("reconnects and replays after websocket connection limit errors", async () => { + test("retries websocket connection limit errors on the next stream attempt", async () => { let connections = 0 let messages = 0 await using server = await createWebSocketServer((socket) => { @@ -252,24 +266,21 @@ describe("plugin.openai.ws-pool", () => { socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_retry" } })) }) }) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), }) - const response = await fetch("https://api.openai.com/v1/responses", streamRequest()) - const text = await response.text() + const first = await fetch(server.url, streamRequest()) + await expect(first.text()).rejects.toThrow("Responses websocket connection limit reached") + const second = await fetch(server.url, streamRequest()) + const text = await second.text() expect(text).not.toContain("websocket_connection_limit_reached") expect(text).toContain('data: {"type":"response.completed","response":{"id":"resp_retry"}}') expect(text).toContain("data: [DONE]") expect(connections).toBe(2) expect(messages).toBe(2) - expect(httpRequests).toHaveLength(0) + expect(server.httpRequests).toHaveLength(0) fetch.close() }) @@ -291,98 +302,159 @@ describe("plugin.openai.ws-pool", () => { ) }) }) - let httpRequests = 0 const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - connectionLimitRetries: 2, - httpFetch: mockFetch(async () => { - httpRequests += 1 - return new Response("http") - }), + streamRetries: 2, }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) + await expect(first.text()).rejects.toThrow("Responses websocket connection limit reached") + const second = await fetch(server.url, streamRequest()) + await expect(second.text()).rejects.toThrow("Responses websocket connection limit reached") + const third = await fetch(server.url, streamRequest()) + const fourth = await fetch(server.url, streamRequest()) - expect(await first.text()).toBe("http") - expect(await second.text()).toBe("http") + expect(await third.text()).toBe("http") + expect(await fourth.text()).toBe("http") expect(connections).toBe(3) - expect(httpRequests).toBe(2) + expect(server.httpRequests).toHaveLength(2) fetch.close() }) - test("replays over HTTP when websocket idles before its first event", async () => { + test("shares the websocket retry budget across stream and connection limit failures", async () => { + let connections = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.once("message", () => { + if (connections === 1) { + socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" })) + socket.terminate() + return + } + socket.send( + JSON.stringify({ + type: "error", + error: { + code: "websocket_connection_limit_reached", + message: "Responses websocket connection limit reached", + }, + }), + ) + }) + }) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + streamRetries: 1, + }) + + const first = await fetch(server.url, streamRequest()) + await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed") + const second = await fetch(server.url, streamRequest()) + + expect(await second.text()).toBe("http") + expect(connections).toBe(2) + expect(server.httpRequests).toHaveLength(1) + fetch.close() + }) + + test("retries websocket idle failures before first event then falls back to HTTP", async () => { let connections = 0 await using server = await createWebSocketServer((socket) => { connections += 1 socket.once("message", () => {}) }) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, idleTimeout: 20, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), + streamRetries: 1, }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) - expect(await first.text()).toBe("http") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) + await expect(first.text()).rejects.toThrow("idle timeout waiting for websocket") + const second = await fetch(server.url, streamRequest()) + const third = await fetch(server.url, streamRequest()) expect(await second.text()).toBe("http") - expect(connections).toBe(1) - expect(httpRequests).toHaveLength(2) + expect(await third.text()).toBe("http") + expect(connections).toBe(2) + expect(server.httpRequests).toHaveLength(2) fetch.close() }) - test("does not replay over HTTP after a websocket event was emitted", async () => { + test("retries failed websocket streams before using HTTP fallback", async () => { await using server = await createWebSocketServer((socket) => { socket.once("message", () => { socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" })) }) }) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, idleTimeout: 20, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), + streamRetries: 1, }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) await expect(first.text()).rejects.toThrow("idle timeout waiting for websocket") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) + await expect(second.text()).rejects.toThrow("idle timeout waiting for websocket") + const third = await fetch(server.url, streamRequest()) - expect(await second.text()).toBe("http") - expect(httpRequests).toHaveLength(1) + expect(await third.text()).toBe("http") + expect(server.httpRequests).toHaveLength(1) + fetch.close() + }) + + test("resets websocket stream failures after a completed response", async () => { + let connections = 0 + let requests = 0 + await using server = await createWebSocketServer((socket) => { + connections += 1 + socket.on("message", () => { + requests += 1 + if (requests === 1 || requests === 3) { + socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" })) + socket.terminate() + return + } + socket.send(JSON.stringify({ type: "response.completed", response: { id: `resp_${requests}` } })) + }) + }) + const fetch = OpenAIWebSocketPool.createWebSocketFetch({ + url: server.url, + streamRetries: 1, + }) + + const first = await fetch(server.url, streamRequest()) + await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed") + const second = await fetch(server.url, streamRequest()) + expect(await second.text()).toContain("data: [DONE]") + const third = await fetch(server.url, streamRequest()) + await expect(third.text()).rejects.toThrow("WebSocket closed before response.completed") + const fourth = await fetch(server.url, streamRequest()) + + expect(await fourth.text()).toContain("data: [DONE]") + expect(connections).toBe(3) + expect(requests).toBe(4) + expect(server.httpRequests).toHaveLength(0) fetch.close() }) test("falls back to HTTP for missing session and title requests", async () => { - const httpRequests: Headers[] = [] - const fetch = OpenAIWebSocketPool.createWebSocketFetch({ - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), - }) + await using server = await createWebSocketServer(() => {}) + const fetch = OpenAIWebSocketPool.createWebSocketFetch() - const missingSession = await fetch("https://api.openai.com/v1/responses", { + const missingSession = await fetch(server.url, { method: "POST", headers: { [TITLE_HEADER]: "false" }, body: JSON.stringify({ stream: true }), }) - const title = await fetch("https://api.openai.com/v1/responses", streamRequest({ [TITLE_HEADER]: "true" })) + const title = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "true" })) expect(await missingSession.text()).toBe("http") expect(await title.text()).toBe("http") - expect(httpRequests).toHaveLength(2) - expect(httpRequests[0]?.get(TITLE_HEADER)).toBeNull() - expect(httpRequests[1]?.get(TITLE_HEADER)).toBeNull() + expect(server.httpRequests).toHaveLength(2) + expect(server.httpRequests[0]?.headers[TITLE_HEADER]).toBeUndefined() + expect(server.httpRequests[1]?.headers[TITLE_HEADER]).toBeUndefined() fetch.close() }) @@ -395,22 +467,17 @@ describe("plugin.openai.ws-pool", () => { }) }) const abort = new AbortController() - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest({}, abort.signal)) + const first = await fetch(server.url, streamRequest({}, abort.signal)) const firstText = first.text() await waitFor(() => connections === 1, "websocket did not connect") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) expect(await second.text()).toBe("http") - expect(httpRequests).toHaveLength(1) + expect(server.httpRequests).toHaveLength(1) expect(connections).toBe(1) abort.abort(new Error("stop")) await expect(firstText).rejects.toThrow("stop") @@ -419,28 +486,25 @@ describe("plugin.openai.ws-pool", () => { test("reserves a websocket lane while its socket is connecting", async () => { await using server = await createHangingTcpServer() - let httpRequests = 0 + await using fallback = await createHttpServer() const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, connectTimeout: 20, - httpFetch: mockFetch(async () => { - httpRequests += 1 - return new Response("http") - }), + streamRetries: 0, }) - const first = fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = fetch(fallback.url, streamRequest()) await waitFor(() => server.connections() === 1, "first websocket did not begin connecting") - const second = fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = fetch(fallback.url, streamRequest()) expect(await (await second).text()).toBe("http") expect(await (await first).text()).toBe("http") expect(server.connections()).toBe(1) - expect(httpRequests).toBe(2) + expect(fallback.httpRequests).toHaveLength(2) fetch.close() }) - test("replays over HTTP after an unexpected close before the first event", async () => { + test("retries unexpected closes before first event then falls back to HTTP", async () => { let connections = 0 await using server = await createWebSocketServer((socket) => { connections += 1 @@ -448,22 +512,20 @@ describe("plugin.openai.ws-pool", () => { socket.close(1001, "server shutdown") }) }) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), + streamRetries: 1, }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) - expect(await first.text()).toBe("http") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) + await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed") + const second = await fetch(server.url, streamRequest()) + const third = await fetch(server.url, streamRequest()) expect(await second.text()).toBe("http") - expect(connections).toBe(1) - expect(httpRequests).toHaveLength(2) + expect(await third.text()).toBe("http") + expect(connections).toBe(2) + expect(server.httpRequests).toHaveLength(2) fetch.close() }) @@ -479,27 +541,22 @@ describe("plugin.openai.ws-pool", () => { socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_456" } })) }) }) - const httpRequests: Headers[] = [] const abort = new AbortController() const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest({}, abort.signal)) + const first = await fetch(server.url, streamRequest({}, abort.signal)) const firstText = first.text() await waitFor(() => connections === 1, "first websocket did not connect") abort.abort(new Error("stop")) await expect(firstText).rejects.toThrow("stop") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) expect(await second.text()).toContain("data: [DONE]") expect(connections).toBe(2) - expect(httpRequests).toHaveLength(0) + expect(server.httpRequests).toHaveLength(0) fetch.close() }) @@ -515,24 +572,19 @@ describe("plugin.openai.ws-pool", () => { socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_after_cancel" } })) }) }) - const httpRequests: Headers[] = [] const fetch = OpenAIWebSocketPool.createWebSocketFetch({ url: server.url, - httpFetch: mockFetch(async (_input, init) => { - httpRequests.push(new Headers(init?.headers)) - return new Response("http") - }), }) - const first = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const first = await fetch(server.url, streamRequest()) await waitFor(() => connections === 1, "first websocket did not connect") await first.body!.cancel("stop") - const second = await fetch("https://api.openai.com/v1/responses", streamRequest()) + const second = await fetch(server.url, streamRequest()) expect(await second.text()).toContain("data: [DONE]") expect(connections).toBe(2) - expect(httpRequests).toHaveLength(0) + expect(server.httpRequests).toHaveLength(0) fetch.close() }) }) @@ -550,20 +602,11 @@ function streamRequest(headers?: Record, signal?: AbortSignal): } } -function mockFetch( - fn: ( - input: Parameters[0], - init: Parameters[1], - ) => ReturnType, -): typeof globalThis.fetch { - return Object.assign(fn, { preconnect: globalThis.fetch.preconnect }) -} - async function createWebSocketServer(onConnection: (socket: WebSocket, request: IncomingMessage) => void) { - const server = new WebSocketServer({ host: "127.0.0.1", port: 0 }) + const http = await createHttpServer() + const server = new WebSocketServer({ server: http.server }) server.on("connection", onConnection) - await new Promise((resolve) => server.once("listening", resolve)) - return websocketServerHandle(server) + return websocketServerHandle(server, http) } async function createHangingTcpServer() { @@ -588,31 +631,53 @@ async function createHangingTcpServer() { } async function createRejectingWebSocketServer(onAttempt: () => void) { + const http = await createHttpServer() const server = new WebSocketServer({ - host: "127.0.0.1", - port: 0, + server: http.server, verifyClient(_info, callback) { onAttempt() callback(false, 401, "denied") }, }) - await new Promise((resolve) => server.once("listening", resolve)) - return websocketServerHandle(server) + return websocketServerHandle(server, http) } -function websocketServerHandle(server: WebSocketServer) { +async function createHttpServer() { + const httpRequests: IncomingMessage[] = [] + const server = createServer((request, response) => { + httpRequests.push(request) + response.writeHead(200, { "content-type": "text/plain" }) + response.end("http") + }) + await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) const address = server.address() as AddressInfo - const url = `http://127.0.0.1:${address.port}/v1/responses` return { - url, - wsUrl: url.replace(/^http/, "ws"), + server, + httpRequests, + url: `http://127.0.0.1:${address.port}/v1/responses`, + async [Symbol.asyncDispose]() { + await closeHttpServer(server) + }, + } +} + +function websocketServerHandle(server: WebSocketServer, http: Awaited>) { + return { + url: http.url, + wsUrl: http.url.replace(/^http/, "ws"), + httpRequests: http.httpRequests, async [Symbol.asyncDispose]() { for (const socket of server.clients) socket.terminate() server.close() + http.server.close() }, } } +function closeHttpServer(server: HttpServer) { + return new Promise((resolve, reject) => server.close((error) => (error ? reject(error) : resolve()))) +} + async function waitFor(predicate: () => boolean, message: string) { const started = Date.now() while (!predicate()) { diff --git a/packages/opencode/test/session/retry.test.ts b/packages/opencode/test/session/retry.test.ts index 26c55bde7..e1cbec036 100644 --- a/packages/opencode/test/session/retry.test.ts +++ b/packages/opencode/test/session/retry.test.ts @@ -172,6 +172,17 @@ describe("session.retry.retryable", () => { }) }) + test("retries websocket stream transport errors", () => { + const request = MessageV2.fromError( + new ProviderError.ResponseStreamError("WebSocket closed before response.completed (code 1006: Connection ended)"), + { providerID }, + ) + expect(MessageV2.APIError.isInstance(request)).toBe(true) + expect(SessionRetry.retryable(request, retryProvider)).toEqual({ + message: "WebSocket closed before response.completed (code 1006: Connection ended)", + }) + }) + test("does not retry context overflow errors", () => { const error = new MessageV2.ContextOverflowError({ message: "Input exceeds context window of this model",