fix(openai): preserve websocket idle state (#30586)
This commit is contained in:
parent
400f0fb100
commit
7f8412ec3e
@ -362,6 +362,10 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug
|
||||
for (const websocketFetch of websocketFetches) websocketFetch.close()
|
||||
websocketFetches.length = 0
|
||||
},
|
||||
async event(input) {
|
||||
if (input.event.type !== "session.deleted") return
|
||||
for (const websocketFetch of websocketFetches) websocketFetch.remove(input.event.properties.info.id)
|
||||
},
|
||||
provider: {
|
||||
id: "openai",
|
||||
async models(provider, ctx) {
|
||||
|
||||
@ -121,6 +121,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
|
||||
onConnectionInvalid: (error) => {
|
||||
log.warn("websocket invalidated", { key, error: error.message })
|
||||
entry.busy = false
|
||||
entry.lastUsedAt = Date.now()
|
||||
if (!entry.fallback) recordStreamFailure(entry)
|
||||
invalidate(entry)
|
||||
resolveFirstEvent(false)
|
||||
@ -186,6 +187,7 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
|
||||
const now = Date.now()
|
||||
for (const [key, entry] of pool) {
|
||||
if (entry.busy) continue
|
||||
if (entry.fallback) continue
|
||||
if (now - entry.lastUsedAt < idleTimeout) continue
|
||||
log.debug("websocket idle prune", { key })
|
||||
invalidate(entry)
|
||||
@ -200,7 +202,16 @@ export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
|
||||
pool.clear()
|
||||
}
|
||||
|
||||
return Object.assign(websocketFetch, { close })
|
||||
function remove(sessionID: string) {
|
||||
const key = `${sessionID}:conversation`
|
||||
const entry = pool.get(key)
|
||||
if (!entry) return
|
||||
log.debug("websocket pool remove", { key })
|
||||
invalidate(entry)
|
||||
pool.delete(key)
|
||||
}
|
||||
|
||||
return Object.assign(websocketFetch, { close, remove })
|
||||
}
|
||||
|
||||
function connectionLimitError(event: Record<string, unknown>) {
|
||||
|
||||
@ -211,7 +211,7 @@ describe("plugin.openai.ws-pool", () => {
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("prunes HTTP fallback after its idle timeout", async () => {
|
||||
test("keeps HTTP fallback active after its idle timeout", async () => {
|
||||
let websocketAttempts = 0
|
||||
await using server = await createRejectingWebSocketServer(() => websocketAttempts++)
|
||||
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
|
||||
@ -226,12 +226,86 @@ describe("plugin.openai.ws-pool", () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50))
|
||||
const second = await fetch(server.url, streamRequest())
|
||||
|
||||
expect(await second.text()).toBe("http")
|
||||
expect(websocketAttempts).toBe(1)
|
||||
expect(server.httpRequests).toHaveLength(2)
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("removes HTTP fallback when its session is deleted", async () => {
|
||||
let websocketAttempts = 0
|
||||
await using server = await createRejectingWebSocketServer(() => websocketAttempts++)
|
||||
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
|
||||
url: server.url,
|
||||
connectTimeout: 100,
|
||||
streamRetries: 0,
|
||||
})
|
||||
|
||||
const first = await fetch(server.url, streamRequest())
|
||||
expect(await first.text()).toBe("http")
|
||||
fetch.remove("session-1")
|
||||
const second = await fetch(server.url, streamRequest())
|
||||
|
||||
expect(await second.text()).toBe("http")
|
||||
expect(websocketAttempts).toBe(2)
|
||||
expect(server.httpRequests).toHaveLength(2)
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("terminates active websocket connections when their session is deleted", async () => {
|
||||
let connections = 0
|
||||
await using server = await createWebSocketServer((socket) => {
|
||||
connections += 1
|
||||
socket.once("message", () => {
|
||||
if (connections === 1) {
|
||||
socket.send(JSON.stringify({ type: "response.output_text.delta", delta: "started" }))
|
||||
return
|
||||
}
|
||||
socket.send(JSON.stringify({ type: "response.completed", response: { id: "resp_after_remove" } }))
|
||||
})
|
||||
})
|
||||
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
|
||||
url: server.url,
|
||||
})
|
||||
|
||||
const first = await fetch(server.url, streamRequest())
|
||||
const firstText = first.text()
|
||||
fetch.remove("session-1")
|
||||
expect((await readTextError(firstText)).message).toContain("WebSocket closed before response.completed")
|
||||
|
||||
const second = await fetch(server.url, streamRequest())
|
||||
|
||||
expect(await second.text()).toContain("data: [DONE]")
|
||||
expect(connections).toBe(2)
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("prunes idle websocket connections after completed responses", async () => {
|
||||
let connections = 0
|
||||
let closed = 0
|
||||
await using server = await createWebSocketServer((socket) => {
|
||||
connections += 1
|
||||
socket.once("close", () => closed++)
|
||||
socket.once("message", () => {
|
||||
socket.send(JSON.stringify({ type: "response.completed", response: { id: `resp_${connections}` } }))
|
||||
})
|
||||
})
|
||||
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
|
||||
url: server.url,
|
||||
idleTimeout: 20,
|
||||
})
|
||||
|
||||
const first = await fetch(server.url, streamRequest())
|
||||
expect(await first.text()).toContain("data: [DONE]")
|
||||
await waitFor(() => closed === 1, "idle websocket was not pruned")
|
||||
|
||||
const second = await fetch(server.url, streamRequest())
|
||||
|
||||
expect(await second.text()).toContain("data: [DONE]")
|
||||
expect(connections).toBe(2)
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("invalidates but does not reuse a socket after terminal failure frames", async () => {
|
||||
let connections = 0
|
||||
await using server = await createWebSocketServer((socket) => {
|
||||
@ -459,6 +533,31 @@ describe("plugin.openai.ws-pool", () => {
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("keeps websocket retry state until the failed stream becomes idle", async () => {
|
||||
let connections = 0
|
||||
await using server = await createWebSocketServer((socket) => {
|
||||
connections += 1
|
||||
socket.once("message", () => {})
|
||||
})
|
||||
const fetch = OpenAIWebSocketPool.createWebSocketFetch({
|
||||
url: server.url,
|
||||
idleTimeout: 500,
|
||||
streamRetries: 1,
|
||||
})
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 250))
|
||||
const first = await fetch(server.url, streamRequest())
|
||||
expect((await readTextError(first.text())).message).toContain("idle timeout waiting for websocket")
|
||||
await new Promise((resolve) => setTimeout(resolve, 300))
|
||||
|
||||
const second = await fetch(server.url, streamRequest())
|
||||
|
||||
expect(await second.text()).toBe("http")
|
||||
expect(connections).toBe(2)
|
||||
expect(server.httpRequests).toHaveLength(1)
|
||||
fetch.close()
|
||||
})
|
||||
|
||||
test("retries failed websocket streams before using HTTP fallback", async () => {
|
||||
await using server = await createWebSocketServer((socket) => {
|
||||
socket.once("message", () => {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user