fix(opencode): preserve websocket api errors (#30321)

This commit is contained in:
Aiden Cline 2026-06-01 23:23:51 -05:00 committed by GitHub
parent 497ff36e91
commit 1b85d55a0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 127 additions and 6 deletions

View File

@ -97,9 +97,9 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
maxConnectionAge,
init?.signal,
)
let resolveFirstEvent: (started: boolean) => void = () => {}
let resolveFirstEvent: (event: boolean | OpenAIWebSocket.WrappedError) => void = () => {}
let rejectFirstEvent: (error: Error) => void = () => {}
const firstEvent = new Promise<boolean>((resolve, reject) => {
const firstEvent = new Promise<boolean | OpenAIWebSocket.WrappedError>((resolve, reject) => {
resolveFirstEvent = resolve
rejectFirstEvent = reject
})
@ -108,7 +108,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
body,
idleTimeout,
signal: init?.signal ?? undefined,
onFirstEvent: () => resolveFirstEvent(true),
onFirstEvent: (error) => resolveFirstEvent(error ?? true),
onTerminal: (event) => {
entry.busy = false
entry.lastUsedAt = Date.now()
@ -140,7 +140,14 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
throw error
},
})
if (await firstEvent) return response
const first = await firstEvent
if (first !== false) {
if (first === true || first.status < 200 || first.status > 599) return response
return new Response(first.body, {
status: first.status,
headers: { "content-type": "application/json", ...first.headers },
})
}
if (!entry.fallback) return response
log.debug("http fallback", { key, reason: "websocket_retries_exhausted" })
return httpFetch(input, httpInit)

View File

@ -2,9 +2,11 @@
// fallback, and continuation state intentionally live above this file.
import WebSocket from "ws"
import { APICallError } from "ai"
import { ProviderError } from "@/provider/error"
import { errorMessage } from "@/util/error"
import { ProxyEnv } from "@/util/proxy-env"
import { isRecord } from "@/util/record"
export const PROTOCOL_HEADER = "responses_websockets=2026-02-06"
@ -20,7 +22,7 @@ export interface StreamResponsesWebSocketOptions {
body: Record<string, unknown>
idleTimeout?: number
signal?: AbortSignal
onFirstEvent?: () => void
onFirstEvent?: (error?: WrappedError) => void
onComplete?: (event: Record<string, unknown>) => void
onTerminal?: (event: Record<string, unknown>) => void
onRetryableTerminal?: (event: Record<string, unknown>) => Promise<WebSocket | undefined>
@ -28,6 +30,12 @@ export interface StreamResponsesWebSocketOptions {
onAbort?: (error: Error) => void
}
export interface WrappedError {
status: number
headers?: Record<string, string>
body: string
}
export function toWebSocketUrl(url: string) {
return url.replace(/^http/, "ws")
}
@ -186,7 +194,7 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
}
})()
if (event?.type === "error" && !emitted && options.onRetryableTerminal) {
if (event?.type === "error" && options.onRetryableTerminal) {
cleanupSocket()
if (idleTimer) clearTimeout(idleTimer)
idleTimer = undefined
@ -210,6 +218,25 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
}
}
const wrappedError = parseWrappedError(event, text)
if (wrappedError && event) {
if (!emitted) options.onFirstEvent?.(wrappedError)
completed = true
cleanup()
options.onTerminal?.(event)
controller?.error(
new APICallError({
message: wrappedError.message,
url: socket.url,
requestBodyValues: options.body,
statusCode: wrappedError.status,
responseHeaders: wrappedError.headers,
responseBody: wrappedError.body,
}),
)
return
}
if (!emitted) options.onFirstEvent?.()
controller?.enqueue(
encoder.encode(
@ -312,6 +339,26 @@ export function streamResponsesWebSocket(options: StreamResponsesWebSocketOption
)
}
function parseWrappedError(event: Record<string, unknown> | undefined, body: string) {
if (event?.type !== "error") return
const status = event.status ?? event.status_code
if (typeof status !== "number" || (status >= 200 && status < 300)) return
return {
status,
headers: isRecord(event.headers)
? Object.fromEntries(
Object.entries(event.headers).flatMap(([key, value]) =>
typeof value === "string" || typeof value === "number" || typeof value === "boolean"
? [[key, String(value)]]
: [],
),
)
: undefined,
body,
message: isRecord(event.error) && typeof event.error.message === "string" ? event.error.message : `${status}`,
}
}
function cancelError(reason: unknown) {
if (isAbortError(reason)) return reason
if (reason instanceof Error) return reason

View File

@ -3,6 +3,7 @@ import { EventEmitter } from "node:events"
import { createServer, type IncomingMessage, type Server as HttpServer } from "node:http"
import net, { type AddressInfo, type Socket } from "node:net"
import WebSocket, { WebSocketServer } from "ws"
import { APICallError } from "ai"
import { ProviderError } from "../../src/provider/error"
import { OpenAIWebSocket } from "../../src/plugin/openai/ws"
import { OpenAIWebSocketPool, TITLE_HEADER } from "../../src/plugin/openai/ws-pool"
@ -253,6 +254,72 @@ describe("plugin.openai.ws-pool", () => {
fetch.close()
})
test("returns initial websocket error frames as HTTP-style API errors", async () => {
const error = {
type: "invalid_request_error",
message: "The model is not supported when using Codex with a ChatGPT account.",
}
const event = {
type: "error",
status: 400,
error,
headers: {
"x-codex-primary-window-minutes": 15,
ignored: { nested: true },
},
}
await using server = await createWebSocketServer((socket) => {
socket.once("message", () => {
socket.send(JSON.stringify(event))
})
})
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
})
const response = await fetch(server.url, streamRequest())
expect(response.status).toBe(400)
expect(response.headers.get("content-type")).toContain("application/json")
expect(response.headers.get("x-codex-primary-window-minutes")).toBe("15")
expect(response.headers.get("ignored")).toBeNull()
expect(await response.json()).toEqual(event)
fetch.close()
})
test("fails mid-stream wrapped websocket errors as HTTP-style API errors", async () => {
const event = {
type: "error",
status_code: 429,
error: {
type: "usage_limit_reached",
message: "The usage limit has been reached",
},
headers: {
"x-codex-primary-used-percent": "100.0",
},
}
await using server = await createWebSocketServer((socket) => {
socket.once("message", () => {
socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" }))
socket.send(JSON.stringify(event))
})
})
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
url: server.url,
})
const response = await fetch(server.url, streamRequest())
const error = await readTextError(response.text())
expect(APICallError.isInstance(error)).toBe(true)
if (!APICallError.isInstance(error)) throw new Error("Expected APICallError")
expect(error.statusCode).toBe(429)
expect(error.responseHeaders).toEqual({ "x-codex-primary-used-percent": "100.0" })
expect(error.responseBody).toBe(JSON.stringify(event))
fetch.close()
})
test("retries websocket connection limit errors on the next stream attempt", async () => {
let connections = 0
let messages = 0