fix(openai): retry websocket stream failures (#29673)

This commit is contained in:
Aiden Cline 2026-05-28 01:16:18 -05:00 committed by GitHub
parent e5524f5bf9
commit 14e0b9b17f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 305 additions and 186 deletions

View File

@ -20,9 +20,9 @@ Enabled by default on `local`, `dev`, and `beta`. On `latest` and `prod`, set `O
## Retries
- If WebSocket setup fails or it fails before its first event, replay over HTTP and keep that session on HTTP until idle-pruned.
- If the server returns `websocket_connection_limit_reached` before output, reconnect up to 5 times, then follow the same HTTP fallback.
- If a WebSocket fails after its first event, fail the stream. Do not replay partial output.
- Retry WebSocket stream/setup failures up to 5 times, then use HTTP for that session until the pool entry is idle-pruned.
- `websocket_connection_limit_reached` consumes the same retry budget and HTTP fallback.
- If a WebSocket fails after its first event, fail it as retryable rather than replaying partial output in transport.
- Abort or cancel closes the socket.
## Next Steps

View File

@ -1,5 +1,6 @@
import WebSocket from "ws"
import * as Log from "@opencode-ai/core/util/log"
import { ProviderError } from "@/provider/error"
import { isRecord } from "@/util/record"
import { OpenAIWebSocket } from "./ws"
@ -13,7 +14,7 @@ export interface CreateWebSocketFetchOptions {
connectTimeout?: number
idleTimeout?: number
maxConnectionAge?: number
connectionLimitRetries?: number
streamRetries?: number
}
interface PoolEntry {
@ -22,6 +23,7 @@ interface PoolEntry {
lastUsedAt: number
busy: boolean
fallback: boolean
streamFailures: number
}
const DEFAULT_CONNECT_TIMEOUT = 15_000
@ -35,7 +37,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
const connectTimeout = options?.connectTimeout ?? DEFAULT_CONNECT_TIMEOUT
const idleTimeout = options?.idleTimeout ?? DEFAULT_IDLE_TIMEOUT
const maxConnectionAge = options?.maxConnectionAge ?? DEFAULT_MAX_CONNECTION_AGE
const connectionLimitRetries = options?.connectionLimitRetries ?? 5
const streamRetries = options?.streamRetries ?? 5
const pruneTimer = setInterval(() => prune(), Math.min(idleTimeout, 60_000))
if (typeof pruneTimer === "object" && "unref" in pruneTimer && typeof pruneTimer.unref === "function") {
pruneTimer.unref()
@ -72,7 +74,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
}
const key = `${sessionID}:conversation`
const entry = pool.get(key) ?? { lastUsedAt: Date.now(), busy: false, fallback: false }
const entry = pool.get(key) ?? { lastUsedAt: Date.now(), busy: false, fallback: false, streamFailures: 0 }
pool.set(key, entry)
if (entry.fallback) {
@ -87,7 +89,6 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
entry.busy = true
entry.lastUsedAt = Date.now()
try {
let connectionLimitAttempts = 0
entry.socket = await socket(
entry,
options?.url ?? url,
@ -111,15 +112,16 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
onTerminal: (event) => {
entry.busy = false
entry.lastUsedAt = Date.now()
entry.streamFailures = 0
if (event.type !== "response.completed" && event.type !== "response.done") {
log.warn("websocket terminal failure", { key, type: event.type })
invalidate(entry)
}
},
onConnectionInvalid: (error) => {
log.warn("websocket invalidated", { key, error: error instanceof Error ? error.message : String(error) })
log.warn("websocket invalidated", { key, error: error.message })
entry.busy = false
entry.fallback = true
if (!entry.fallback) recordStreamFailure(entry)
invalidate(entry)
resolveFirstEvent(false)
},
@ -127,51 +129,52 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
log.debug("websocket aborted", { key })
entry.busy = false
entry.lastUsedAt = Date.now()
entry.streamFailures = 0
invalidate(entry)
rejectFirstEvent(error)
},
onRetryableTerminal: async (event) => {
const error = connectionLimitError(event)
if (!error) return undefined
if (connectionLimitAttempts >= connectionLimitRetries) throw error
connectionLimitAttempts++
log.warn("websocket connection limit reached", { key, attempt: connectionLimitAttempts })
invalidate(entry)
entry.socket = await socket(
entry,
options?.url ?? url,
OpenAIWebSocket.normalizeHeaders(httpInit?.headers),
connectTimeout,
maxConnectionAge,
init?.signal,
)
entry.lastUsedAt = Date.now()
return entry.socket
log.warn("websocket connection limit reached", { key })
throw error
},
})
if (await firstEvent) return response
log.debug("http fallback", { key, reason: "websocket_failed_before_first_event" })
if (!entry.fallback) return response
log.debug("http fallback", { key, reason: "websocket_retries_exhausted" })
return httpFetch(input, httpInit)
} catch (error) {
entry.busy = false
entry.lastUsedAt = Date.now()
if (OpenAIWebSocket.isAbortError(error)) {
entry.streamFailures = 0
invalidate(entry)
throw error
}
entry.fallback = true
recordStreamFailure(entry)
log.warn("websocket setup failed", {
key,
error: error instanceof Error ? error.message : String(error),
fallback: "http",
fallback: entry.fallback ? "http" : undefined,
})
invalidate(entry)
return httpFetch(input, httpInit)
if (entry.fallback) return httpFetch(input, httpInit)
return failedResponse(
new ProviderError.ResponseStreamError(error instanceof Error ? error.message : String(error), {
cause: error,
}),
)
}
}
function recordStreamFailure(entry: PoolEntry) {
entry.streamFailures++
// Codex counts retries after the initial failed WebSocket attempt.
if (entry.streamFailures > streamRetries) entry.fallback = true
}
function prune() {
const now = Date.now()
for (const [key, entry] of pool) {
@ -198,6 +201,20 @@ function connectionLimitError(event: Record<string, unknown>) {
return new Error(typeof event.error.message === "string" ? event.error.message : CONNECTION_LIMIT_REACHED_CODE)
}
function failedResponse(error: ProviderError.ResponseStreamError) {
return new Response(
new ReadableStream({
start(controller) {
controller.error(error)
},
}),
{
status: 200,
headers: { "content-type": "text/event-stream" },
},
)
}
async function socket(
entry: PoolEntry,
url: string,

View File

@ -2,6 +2,7 @@
// fallback, and continuation state intentionally live above this file.
import WebSocket from "ws"
import { ProviderError } from "@/provider/error"
export const PROTOCOL_HEADER = "responses_websockets=2026-02-06"
@ -21,7 +22,7 @@ export interface StreamResponsesWebSocketOptions {
onComplete?: (event: Record<string, unknown>) => void
onTerminal?: (event: Record<string, unknown>) => void
onRetryableTerminal?: (event: Record<string, unknown>) => Promise<WebSocket | undefined>
onConnectionInvalid?: (error: Error) => void
onConnectionInvalid?: (error: ProviderError.ResponseStreamError) => void
onAbort?: (error: Error) => void
}
@ -101,7 +102,7 @@ export function connectResponsesWebSocket(options: ConnectResponsesWebSocketOpti
function onClose(code: number, reason: Buffer) {
cleanup()
reject(closeError("WebSocket closed before open", code, reason))
reject(new Error(closeMessage("WebSocket closed before open", code, reason)))
}
function onAbort() {
@ -145,7 +146,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
controller?.close()
}
function invalidate(error: Error) {
function invalidate(error: ProviderError.ResponseStreamError) {
if (completed) return
completed = true
cleanup()
@ -157,7 +158,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
if (completed) return
if (!options.idleTimeout) return
if (idleTimer) clearTimeout(idleTimer)
idleTimer = setTimeout(() => invalidate(new Error(message)), options.idleTimeout)
idleTimer = setTimeout(() => invalidate(new ProviderError.ResponseStreamError(message)), options.idleTimeout)
if (typeof idleTimer === "object" && "unref" in idleTimer && typeof idleTimer.unref === "function") {
idleTimer.unref()
}
@ -166,7 +167,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
async function onMessage(data: WebSocket.RawData, isBinary: boolean) {
if (completed) return
if (isBinary) {
invalidate(new Error("Unexpected binary WebSocket frame"))
invalidate(new ProviderError.ResponseStreamError("Unexpected binary WebSocket frame"))
return
}
@ -195,7 +196,11 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
return
}
} catch (error) {
invalidate(error instanceof Error ? error : new Error(String(error)))
invalidate(
new ProviderError.ResponseStreamError(error instanceof Error ? error.message : String(error), {
cause: error,
}),
)
return
}
}
@ -230,12 +235,14 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
}
function onError(error: Error) {
invalidate(error)
invalidate(new ProviderError.ResponseStreamError(error.message, { cause: error }))
}
function onClose(code: number, reason: Buffer) {
if (completed) return
invalidate(closeError("WebSocket closed before response.completed", code, reason))
invalidate(
new ProviderError.ResponseStreamError(closeMessage("WebSocket closed before response.completed", code, reason)),
)
}
function onAbort() {
@ -272,7 +279,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
socket.send(JSON.stringify({ type: "response.create", ...payload }), (error) => {
if (completed) return
resetIdleTimeout("idle timeout waiting for websocket")
if (error) invalidate(error)
if (error) invalidate(new ProviderError.ResponseStreamError(error.message, { cause: error }))
})
}
@ -312,11 +319,11 @@ function abortError(signal: AbortSignal | undefined) {
return new DOMException(reason instanceof Error ? reason.message : "Aborted", "AbortError")
}
function closeError(message: string, code: number, reason: Buffer) {
function closeMessage(message: string, code: number, reason: Buffer) {
const details = [`code ${code}`]
if (code === 1009) details.push("message too big")
if (reason.length > 0) details.push(reason.toString())
return new Error(`${message} (${details.join(": ")})`)
return `${message} (${details.join(": ")})`
}
export * as OpenAIWebSocket from "./ws"

View File

@ -11,6 +11,14 @@ export class HeaderTimeoutError extends Error {
}
}
export class ResponseStreamError extends Error {
public override readonly name = "ProviderResponseStreamError"
constructor(message: string, options?: ErrorOptions) {
super(message, options)
}
}
// Adapted from overflow detection patterns in:
// https://github.com/badlogic/pi-mono/blob/main/packages/ai/src/utils/overflow.ts
const OVERFLOW_PATTERNS = [

View File

@ -1155,6 +1155,17 @@ export function fromError(
},
{ cause: e },
).toObject()
case e instanceof ProviderError.ResponseStreamError:
return new APIError(
{
message: e.message,
isRetryable: true,
metadata: {
code: e.name,
},
},
{ cause: e },
).toObject()
case APICallError.isInstance(e):
const parsed = ProviderError.parseAPICallError({
providerID: ctx.providerID,

View File

@ -1,8 +1,9 @@
import { describe, expect, test } from "bun:test"
import { EventEmitter } from "node:events"
import type { IncomingMessage } from "node:http"
import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http"
import net, { type AddressInfo, type Socket } from "node:net"
import WebSocket, { WebSocketServer } from "ws"
import { ProviderError } from "../../src/provider/error"
import { OpenAIWebSocket } from "../../src/plugin/openai/ws"
import { OpenAIWebSocketPool, TITLE_HEADER } from "../../src/plugin/openai/ws-pool"
@ -85,7 +86,7 @@ describe("plugin.openai.ws", () => {
})
test("errors the SSE stream when the server closes before a terminal event", async () => {
const invalid: string[] = []
const invalid: Error[] = []
await using server = await createWebSocketServer((socket) => {
socket.once("message", () => {
socket.close(1009, "payload too large")
@ -96,13 +97,14 @@ describe("plugin.openai.ws", () => {
const response = OpenAIWebSocket.streamResponsesWebSocket({
socket,
body: { stream: true, input: "hi" },
onConnectionInvalid: (error) => invalid.push(error.message),
onConnectionInvalid: (error) => invalid.push(error),
})
await expect(response.text()).rejects.toThrow(
"WebSocket closed before response.completed (code 1009: message too big: payload too large)",
)
expect(invalid).toEqual([
expect(invalid[0]).toBeInstanceOf(ProviderError.ResponseStreamError)
expect(invalid.map((error) => error.message)).toEqual([
"WebSocket closed before response.completed (code 1009: message too big: payload too large)",
])
})
@ -140,13 +142,12 @@ describe("plugin.openai.ws-pool", () => {
})
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async () => new Response("http")),
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
expect(await first.text()).toContain("data: [DONE]")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain("data: [DONE]")
expect(connections).toBe(1)
expect(messages).toBe(2)
@ -163,41 +164,59 @@ describe("plugin.openai.ws-pool", () => {
})
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async () => new Response("http")),
maxConnectionAge: 0,
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
expect(await first.text()).toContain("data: [DONE]")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain("data: [DONE]")
expect(connections).toBe(2)
fetch.close()
})
test("falls back to HTTP when websocket setup fails and keeps the fallback sticky", async () => {
test("falls back to HTTP after websocket setup retries are exhausted", async () => {
const attempts: string[] = []
await using server = await createRejectingWebSocketServer(() => attempts.push("websocket"))
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
connectTimeout: 100,
streamRetries: 1,
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest({ [TITLE_HEADER]: "false" }))
const second = await fetch("https://api.openai.com/v1/responses", streamRequest({ [TITLE_HEADER]: "false" }))
const first = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" }))
await expect(first.text()).rejects.toBeInstanceOf(ProviderError.ResponseStreamError)
const second = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" }))
const third = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "false" }))
expect(await first.text()).toBe("http")
expect(await second.text()).toBe("http")
expect(attempts).toEqual(["websocket"])
expect(httpRequests).toHaveLength(2)
expect(httpRequests[0]?.get(TITLE_HEADER)).toBeNull()
expect(httpRequests[1]?.get(TITLE_HEADER)).toBeNull()
expect(await third.text()).toBe("http")
expect(attempts).toEqual(["websocket", "websocket"])
expect(server.httpRequests).toHaveLength(2)
expect(server.httpRequests[0]?.headers[TITLE_HEADER]).toBeUndefined()
expect(server.httpRequests[1]?.headers[TITLE_HEADER]).toBeUndefined()
fetch.close()
})
test("prunes HTTP fallback after its idle timeout", async () => {
let websocketAttempts = 0
await using server = await createRejectingWebSocketServer(() => websocketAttempts++)
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
connectTimeout: 100,
idleTimeout: 20,
streamRetries: 0,
})
const first = await fetch(server.url, streamRequest())
expect(await first.text()).toBe("http")
await new Promise((resolve) => setTimeout(resolve, 50))
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
expect(websocketAttempts).toBe(2)
expect(server.httpRequests).toHaveLength(2)
fetch.close()
})
@ -209,26 +228,21 @@ describe("plugin.openai.ws-pool", () => {
socket.send(JSON.stringify({ type: connections === 1 ? "response.failed" : "response.completed" }))
})
})
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
expect(await first.text()).toContain('data: {"type":"response.failed"}')
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain('data: {"type":"response.completed"}')
expect(connections).toBe(2)
expect(httpRequests).toHaveLength(0)
expect(server.httpRequests).toHaveLength(0)
fetch.close()
})
test("reconnects and replays after websocket connection limit errors", async () => {
test("retries websocket connection limit errors on the next stream attempt", async () => {
let connections = 0
let messages = 0
await using server = await createWebSocketServer((socket) => {
@ -252,24 +266,21 @@ describe("plugin.openai.ws-pool", () => {
socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_retry" } }))
})
})
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
})
const response = await fetch("https://api.openai.com/v1/responses", streamRequest())
const text = await response.text()
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("Responses websocket connection limit reached")
const second = await fetch(server.url, streamRequest())
const text = await second.text()
expect(text).not.toContain("websocket_connection_limit_reached")
expect(text).toContain('data: {"type":"response.completed","response":{"id":"resp_retry"}}')
expect(text).toContain("data: [DONE]")
expect(connections).toBe(2)
expect(messages).toBe(2)
expect(httpRequests).toHaveLength(0)
expect(server.httpRequests).toHaveLength(0)
fetch.close()
})
@ -291,98 +302,159 @@ describe("plugin.openai.ws-pool", () => {
)
})
})
let httpRequests = 0
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
connectionLimitRetries: 2,
httpFetch: mockFetch(async () => {
httpRequests += 1
return new Response("http")
}),
streamRetries: 2,
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("Responses websocket connection limit reached")
const second = await fetch(server.url, streamRequest())
await expect(second.text()).rejects.toThrow("Responses websocket connection limit reached")
const third = await fetch(server.url, streamRequest())
const fourth = await fetch(server.url, streamRequest())
expect(await first.text()).toBe("http")
expect(await second.text()).toBe("http")
expect(await third.text()).toBe("http")
expect(await fourth.text()).toBe("http")
expect(connections).toBe(3)
expect(httpRequests).toBe(2)
expect(server.httpRequests).toHaveLength(2)
fetch.close()
})
test("replays over HTTP when websocket idles before its first event", async () => {
test("shares the websocket retry budget across stream and connection limit failures", async () => {
let connections = 0
await using server = await createWebSocketServer((socket) => {
connections += 1
socket.once("message", () => {
if (connections === 1) {
socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" }))
socket.terminate()
return
}
socket.send(
JSON.stringify({
type: "error",
error: {
code: "websocket_connection_limit_reached",
message: "Responses websocket connection limit reached",
},
}),
)
})
})
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
streamRetries: 1,
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed")
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
expect(connections).toBe(2)
expect(server.httpRequests).toHaveLength(1)
fetch.close()
})
test("retries websocket idle failures before first event then falls back to HTTP", async () => {
let connections = 0
await using server = await createWebSocketServer((socket) => {
connections += 1
socket.once("message", () => {})
})
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
idleTimeout: 20,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
streamRetries: 1,
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
expect(await first.text()).toBe("http")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("idle timeout waiting for websocket")
const second = await fetch(server.url, streamRequest())
const third = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
expect(connections).toBe(1)
expect(httpRequests).toHaveLength(2)
expect(await third.text()).toBe("http")
expect(connections).toBe(2)
expect(server.httpRequests).toHaveLength(2)
fetch.close()
})
test("does not replay over HTTP after a websocket event was emitted", async () => {
test("retries failed websocket streams before using HTTP fallback", async () => {
await using server = await createWebSocketServer((socket) => {
socket.once("message", () => {
socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" }))
})
})
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
idleTimeout: 20,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
streamRetries: 1,
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("idle timeout waiting for websocket")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
await expect(second.text()).rejects.toThrow("idle timeout waiting for websocket")
const third = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
expect(httpRequests).toHaveLength(1)
expect(await third.text()).toBe("http")
expect(server.httpRequests).toHaveLength(1)
fetch.close()
})
test("resets websocket stream failures after a completed response", async () => {
let connections = 0
let requests = 0
await using server = await createWebSocketServer((socket) => {
connections += 1
socket.on("message", () => {
requests += 1
if (requests === 1 || requests === 3) {
socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" }))
socket.terminate()
return
}
socket.send(JSON.stringify({ type: "response.completed", response: { id: `resp_${requests}` } }))
})
})
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
streamRetries: 1,
})
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed")
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain("data: [DONE]")
const third = await fetch(server.url, streamRequest())
await expect(third.text()).rejects.toThrow("WebSocket closed before response.completed")
const fourth = await fetch(server.url, streamRequest())
expect(await fourth.text()).toContain("data: [DONE]")
expect(connections).toBe(3)
expect(requests).toBe(4)
expect(server.httpRequests).toHaveLength(0)
fetch.close()
})
test("falls back to HTTP for missing session and title requests", async () => {
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
})
await using server = await createWebSocketServer(() => {})
const fetch = OpenAIWebSocketPool.createWebSocketFetch()
const missingSession = await fetch("https://api.openai.com/v1/responses", {
const missingSession = await fetch(server.url, {
method: "POST",
headers: { [TITLE_HEADER]: "false" },
body: JSON.stringify({ stream: true }),
})
const title = await fetch("https://api.openai.com/v1/responses", streamRequest({ [TITLE_HEADER]: "true" }))
const title = await fetch(server.url, streamRequest({ [TITLE_HEADER]: "true" }))
expect(await missingSession.text()).toBe("http")
expect(await title.text()).toBe("http")
expect(httpRequests).toHaveLength(2)
expect(httpRequests[0]?.get(TITLE_HEADER)).toBeNull()
expect(httpRequests[1]?.get(TITLE_HEADER)).toBeNull()
expect(server.httpRequests).toHaveLength(2)
expect(server.httpRequests[0]?.headers[TITLE_HEADER]).toBeUndefined()
expect(server.httpRequests[1]?.headers[TITLE_HEADER]).toBeUndefined()
fetch.close()
})
@ -395,22 +467,17 @@ describe("plugin.openai.ws-pool", () => {
})
})
const abort = new AbortController()
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest({}, abort.signal))
const first = await fetch(server.url, streamRequest({}, abort.signal))
const firstText = first.text()
await waitFor(() => connections === 1, "websocket did not connect")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
expect(httpRequests).toHaveLength(1)
expect(server.httpRequests).toHaveLength(1)
expect(connections).toBe(1)
abort.abort(new Error("stop"))
await expect(firstText).rejects.toThrow("stop")
@ -419,28 +486,25 @@ describe("plugin.openai.ws-pool", () => {
test("reserves a websocket lane while its socket is connecting", async () => {
await using server = await createHangingTcpServer()
let httpRequests = 0
await using fallback = await createHttpServer()
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
connectTimeout: 20,
httpFetch: mockFetch(async () => {
httpRequests += 1
return new Response("http")
}),
streamRetries: 0,
})
const first = fetch("https://api.openai.com/v1/responses", streamRequest())
const first = fetch(fallback.url, streamRequest())
await waitFor(() => server.connections() === 1, "first websocket did not begin connecting")
const second = fetch("https://api.openai.com/v1/responses", streamRequest())
const second = fetch(fallback.url, streamRequest())
expect(await (await second).text()).toBe("http")
expect(await (await first).text()).toBe("http")
expect(server.connections()).toBe(1)
expect(httpRequests).toBe(2)
expect(fallback.httpRequests).toHaveLength(2)
fetch.close()
})
test("replays over HTTP after an unexpected close before the first event", async () => {
test("retries unexpected closes before first event then falls back to HTTP", async () => {
let connections = 0
await using server = await createWebSocketServer((socket) => {
connections += 1
@ -448,22 +512,20 @@ describe("plugin.openai.ws-pool", () => {
socket.close(1001, "server shutdown")
})
})
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
streamRetries: 1,
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
expect(await first.text()).toBe("http")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
await expect(first.text()).rejects.toThrow("WebSocket closed before response.completed")
const second = await fetch(server.url, streamRequest())
const third = await fetch(server.url, streamRequest())
expect(await second.text()).toBe("http")
expect(connections).toBe(1)
expect(httpRequests).toHaveLength(2)
expect(await third.text()).toBe("http")
expect(connections).toBe(2)
expect(server.httpRequests).toHaveLength(2)
fetch.close()
})
@ -479,27 +541,22 @@ describe("plugin.openai.ws-pool", () => {
socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_456" } }))
})
})
const httpRequests: Headers[] = []
const abort = new AbortController()
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest({}, abort.signal))
const first = await fetch(server.url, streamRequest({}, abort.signal))
const firstText = first.text()
await waitFor(() => connections === 1, "first websocket did not connect")
abort.abort(new Error("stop"))
await expect(firstText).rejects.toThrow("stop")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain("data: [DONE]")
expect(connections).toBe(2)
expect(httpRequests).toHaveLength(0)
expect(server.httpRequests).toHaveLength(0)
fetch.close()
})
@ -515,24 +572,19 @@ describe("plugin.openai.ws-pool", () => {
socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_after_cancel" } }))
})
})
const httpRequests: Headers[] = []
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
httpFetch: mockFetch(async (_input, init) => {
httpRequests.push(new Headers(init?.headers))
return new Response("http")
}),
})
const first = await fetch("https://api.openai.com/v1/responses", streamRequest())
const first = await fetch(server.url, streamRequest())
await waitFor(() => connections === 1, "first websocket did not connect")
await first.body!.cancel("stop")
const second = await fetch("https://api.openai.com/v1/responses", streamRequest())
const second = await fetch(server.url, streamRequest())
expect(await second.text()).toContain("data: [DONE]")
expect(connections).toBe(2)
expect(httpRequests).toHaveLength(0)
expect(server.httpRequests).toHaveLength(0)
fetch.close()
})
})
@ -550,20 +602,11 @@ function streamRequest(headers?: Record<string, string>, signal?: AbortSignal):
}
}
function mockFetch(
fn: (
input: Parameters<typeof globalThis.fetch>[0],
init: Parameters<typeof globalThis.fetch>[1],
) => ReturnType<typeof globalThis.fetch>,
): typeof globalThis.fetch {
return Object.assign(fn, { preconnect: globalThis.fetch.preconnect })
}
async function createWebSocketServer(onConnection: (socket: WebSocket, request: IncomingMessage) => void) {
const server = new WebSocketServer({ host: "127.0.0.1", port: 0 })
const http = await createHttpServer()
const server = new WebSocketServer({ server: http.server })
server.on("connection", onConnection)
await new Promise<void>((resolve) => server.once("listening", resolve))
return websocketServerHandle(server)
return websocketServerHandle(server, http)
}
async function createHangingTcpServer() {
@ -588,31 +631,53 @@ async function createHangingTcpServer() {
}
async function createRejectingWebSocketServer(onAttempt: () => void) {
const http = await createHttpServer()
const server = new WebSocketServer({
host: "127.0.0.1",
port: 0,
server: http.server,
verifyClient(_info, callback) {
onAttempt()
callback(false, 401, "denied")
},
})
await new Promise<void>((resolve) => server.once("listening", resolve))
return websocketServerHandle(server)
return websocketServerHandle(server, http)
}
function websocketServerHandle(server: WebSocketServer) {
async function createHttpServer() {
const httpRequests: IncomingMessage[] = []
const server = createServer((request, response) => {
httpRequests.push(request)
response.writeHead(200, { "content-type": "text/plain" })
response.end("http")
})
await new Promise<void>((resolve) => server.listen(0, "127.0.0.1", resolve))
const address = server.address() as AddressInfo
const url = `http://127.0.0.1:${address.port}/v1/responses`
return {
url,
wsUrl: url.replace(/^http/, "ws"),
server,
httpRequests,
url: `http://127.0.0.1:${address.port}/v1/responses`,
async [Symbol.asyncDispose]() {
await closeHttpServer(server)
},
}
}
function websocketServerHandle(server: WebSocketServer, http: Awaited<ReturnType<typeof createHttpServer>>) {
return {
url: http.url,
wsUrl: http.url.replace(/^http/, "ws"),
httpRequests: http.httpRequests,
async [Symbol.asyncDispose]() {
for (const socket of server.clients) socket.terminate()
server.close()
http.server.close()
},
}
}
function closeHttpServer(server: HttpServer) {
return new Promise<void>((resolve, reject) => server.close((error) => (error ? reject(error) : resolve())))
}
async function waitFor(predicate: () => boolean, message: string) {
const started = Date.now()
while (!predicate()) {

View File

@ -172,6 +172,17 @@ describe("session.retry.retryable", () => {
})
})
test("retries websocket stream transport errors", () => {
const request = MessageV2.fromError(
new ProviderError.ResponseStreamError("WebSocket closed before response.completed (code 1006: Connection ended)"),
{ providerID },
)
expect(MessageV2.APIError.isInstance(request)).toBe(true)
expect(SessionRetry.retryable(request, retryProvider)).toEqual({
message: "WebSocket closed before response.completed (code 1006: Connection ended)",
})
})
test("does not retry context overflow errors", () => {
const error = new MessageV2.ContextOverflowError({
message: "Input exceeds context window of this model",