fix(opencode): recover expired MCP sessions (#32088)

This commit is contained in:
Aiden Cline 2026-06-13 11:20:59 -05:00 committed by GitHub
parent 45e4606fa4
commit c7dee9c609
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 481 additions and 1 deletions

View File

@ -914,6 +914,7 @@
"patchedDependencies": {
"solid-js@1.9.10": "patches/solid-js@1.9.10.patch",
"virtua@0.49.1": "patches/virtua@0.49.1.patch",
"@modelcontextprotocol/sdk@1.29.0": "patches/@modelcontextprotocol%2Fsdk@1.29.0.patch",
"gcp-metadata@8.1.2": "patches/gcp-metadata@8.1.2.patch",
"@ai-sdk/google@3.0.73": "patches/@ai-sdk%2Fgoogle@3.0.73.patch",
"@ai-sdk/xai@3.0.82": "patches/@ai-sdk%2Fxai@3.0.82.patch",

View File

@ -149,6 +149,7 @@
"@ai-sdk/xai@3.0.82": "patches/@ai-sdk%2Fxai@3.0.82.patch",
"gcp-metadata@8.1.2": "patches/gcp-metadata@8.1.2.patch",
"pacote@21.5.0": "patches/pacote@21.5.0.patch",
"@ai-sdk/google@3.0.73": "patches/@ai-sdk%2Fgoogle@3.0.73.patch"
"@ai-sdk/google@3.0.73": "patches/@ai-sdk%2Fgoogle@3.0.73.patch",
"@modelcontextprotocol/sdk@1.29.0": "patches/@modelcontextprotocol%2Fsdk@1.29.0.patch"
}
}

View File

@ -0,0 +1,50 @@
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"
import { LATEST_PROTOCOL_VERSION } from "@modelcontextprotocol/sdk/types.js"
const posts: Array<{ method: string; session: string | null }> = []
let initializeCount = 0
let pingCount = 0
const server = Bun.serve({
port: 0,
async fetch(request) {
if (request.method === "GET") return new Response(null, { status: 405 })
if (request.method === "DELETE") return new Response(null, { status: 200 })
const message = (await request.json()) as { id?: number; method: string }
const session = request.headers.get("mcp-session-id")
posts.push({ method: message.method, session })
if (message.method === "initialize") {
initializeCount++
return Response.json(
{
jsonrpc: "2.0",
id: message.id,
result: {
protocolVersion: LATEST_PROTOCOL_VERSION,
capabilities: {},
serverInfo: { name: "test", version: "1" },
},
},
{ headers: { "mcp-session-id": initializeCount === 1 ? "expired" : "replacement" } },
)
}
if (message.method === "notifications/initialized") return new Response(null, { status: 202 })
pingCount++
if (pingCount === 1) return new Response("Session not found", { status: 404 })
return Response.json({ jsonrpc: "2.0", id: message.id, result: {} })
},
})
const client = new Client({ name: "test", version: "1" })
try {
await client.connect(new StreamableHTTPClientTransport(server.url))
await client.ping()
process.stdout.write(JSON.stringify(posts))
} finally {
await client.close()
server.stop(true)
}

View File

@ -0,0 +1,27 @@
import path from "node:path"
import { describe, expect, test } from "bun:test"
describe("mcp session recovery", () => {
test("reinitializes and retries once after a session-bound POST returns 404", async () => {
const child = Bun.spawn([process.execPath, path.join(import.meta.dir, "../fixture/mcp-session-recovery.ts")], {
cwd: path.join(import.meta.dir, "../.."),
stdout: "pipe",
stderr: "pipe",
})
const [code, stdout, stderr] = await Promise.all([
child.exited,
Bun.readableStreamToText(child.stdout),
Bun.readableStreamToText(child.stderr),
])
expect(code, stderr).toBe(0)
expect(JSON.parse(stdout)).toEqual([
{ method: "initialize", session: null },
{ method: "notifications/initialized", session: "expired" },
{ method: "ping", session: "expired" },
{ method: "initialize", session: null },
{ method: "notifications/initialized", session: "replacement" },
{ method: "ping", session: "replacement" },
])
})
})

View File

@ -0,0 +1,401 @@
diff --git a/dist/cjs/client/index.js b/dist/cjs/client/index.js
index 6ac1da14dc7f6211ae70f7711c124b76098816d8..adb5b7bd45514a406a0f7e40b64631c101584c84 100644
--- a/dist/cjs/client/index.js
+++ b/dist/cjs/client/index.js
@@ -288,41 +288,16 @@ class Client extends protocol_js_1.Protocol {
}
async connect(transport, options) {
await super.connect(transport);
+ transport.onsessionexpired = async () => {
+ await this._initialize(transport);
+ };
// When transport sessionId is already set this means we are trying to reconnect.
// In this case we don't need to initialize again.
if (transport.sessionId !== undefined) {
return;
}
try {
- const result = await this.request({
- method: 'initialize',
- params: {
- protocolVersion: types_js_1.LATEST_PROTOCOL_VERSION,
- capabilities: this._capabilities,
- clientInfo: this._clientInfo
- }
- }, types_js_1.InitializeResultSchema, options);
- if (result === undefined) {
- throw new Error(`Server sent invalid initialize result: ${result}`);
- }
- if (!types_js_1.SUPPORTED_PROTOCOL_VERSIONS.includes(result.protocolVersion)) {
- throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
- }
- this._serverCapabilities = result.capabilities;
- this._serverVersion = result.serverInfo;
- // HTTP transports must set the protocol version in each header after initialization.
- if (transport.setProtocolVersion) {
- transport.setProtocolVersion(result.protocolVersion);
- }
- this._instructions = result.instructions;
- await this.notification({
- method: 'notifications/initialized'
- });
- // Set up list changed handlers now that we know server capabilities
- if (this._pendingListChangedConfig) {
- this._setupListChangedHandlers(this._pendingListChangedConfig);
- this._pendingListChangedConfig = undefined;
- }
+ await this._initialize(transport, options);
}
catch (error) {
// Disconnect if initialization fails.
@@ -330,6 +305,37 @@ class Client extends protocol_js_1.Protocol {
throw error;
}
}
+ async _initialize(transport, options) {
+ const result = await this.request({
+ method: 'initialize',
+ params: {
+ protocolVersion: types_js_1.LATEST_PROTOCOL_VERSION,
+ capabilities: this._capabilities,
+ clientInfo: this._clientInfo
+ }
+ }, types_js_1.InitializeResultSchema, options);
+ if (result === undefined) {
+ throw new Error(`Server sent invalid initialize result: ${result}`);
+ }
+ if (!types_js_1.SUPPORTED_PROTOCOL_VERSIONS.includes(result.protocolVersion)) {
+ throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
+ }
+ this._serverCapabilities = result.capabilities;
+ this._serverVersion = result.serverInfo;
+ // HTTP transports must set the protocol version in each header after initialization.
+ if (transport.setProtocolVersion) {
+ transport.setProtocolVersion(result.protocolVersion);
+ }
+ this._instructions = result.instructions;
+ await this.notification({
+ method: 'notifications/initialized'
+ });
+ // Set up list changed handlers now that we know server capabilities
+ if (this._pendingListChangedConfig) {
+ this._setupListChangedHandlers(this._pendingListChangedConfig);
+ this._pendingListChangedConfig = undefined;
+ }
+ }
/**
* After initialization has completed, this will be populated with the server's reported capabilities.
*/
diff --git a/dist/cjs/client/streamableHttp.js b/dist/cjs/client/streamableHttp.js
index a29a7d3a0f14d9cd800ef5b296485237350c666f..c362ae5fe6c62c8c8eae7e2e61de1eedff5443c9 100644
--- a/dist/cjs/client/streamableHttp.js
+++ b/dist/cjs/client/streamableHttp.js
@@ -290,7 +290,38 @@ class StreamableHTTPClientTransport {
this.onclose?.();
}
async send(message, options) {
+ return this._send(message, options, false);
+ }
+ async _recoverSession(expiredSessionId) {
+ if (this._sessionRecovery) {
+ await this._sessionRecovery;
+ return true;
+ }
+ if (this._sessionId !== expiredSessionId)
+ return true;
+ this._sessionId = undefined;
+ this._sessionRecovery = Promise.resolve().then(() => this.onsessionexpired?.());
try {
+ await this._sessionRecovery;
+ }
+ catch (error) {
+ this._sessionId = undefined;
+ await this.close();
+ throw error;
+ }
+ finally {
+ this._sessionRecovery = undefined;
+ }
+ return true;
+ }
+ async _send(message, options, isSessionRetry) {
+ try {
+ if (this._sessionRecovery && !(0, types_js_1.isInitializeRequest)(message) && !(0, types_js_1.isInitializedNotification)(message)) {
+ await this._sessionRecovery;
+ if (options?.isRequestActive?.() === false) {
+ throw new Error('Request is no longer active');
+ }
+ }
const { resumptionToken, onresumptiontoken } = options || {};
if (resumptionToken) {
// If we have at last event ID, we need to reconnect the SSE stream
@@ -298,6 +329,7 @@ class StreamableHTTPClientTransport {
return;
}
const headers = await this._commonHeaders();
+ const requestSessionId = headers.get('mcp-session-id') ?? undefined;
headers.set('content-type', 'application/json');
headers.set('accept', 'application/json, text/event-stream');
const init = {
@@ -310,11 +342,20 @@ class StreamableHTTPClientTransport {
const response = await (this._fetch ?? fetch)(this._url, init);
// Handle session ID received during initialization
const sessionId = response.headers.get('mcp-session-id');
- if (sessionId) {
+ if (sessionId && (requestSessionId === undefined || this._sessionId === requestSessionId)) {
this._sessionId = sessionId;
}
if (!response.ok) {
const text = await response.text().catch(() => null);
+ if (response.status === 404 && requestSessionId && !isSessionRetry && !(0, types_js_1.isInitializedNotification)(message)) {
+ const recovered = await this._recoverSession(requestSessionId);
+ if (options?.isRequestActive?.() === false) {
+ throw new Error('Request is no longer active');
+ }
+ if (recovered) {
+ return this._send(message, options, true);
+ }
+ }
if (response.status === 401 && this._authProvider) {
// Prevent infinite recursion when server returns 401 after successful auth
if (this._hasCompletedAuthFlow) {
@@ -335,7 +376,7 @@ class StreamableHTTPClientTransport {
// Mark that we completed auth flow
this._hasCompletedAuthFlow = true;
// Purposely _not_ awaited, so we don't call onerror twice
- return this.send(message);
+ return this._send(message, options, isSessionRetry);
}
if (response.status === 403 && this._authProvider) {
const { resourceMetadataUrl, scope, error } = (0, auth_js_1.extractWWWAuthenticateParams)(response);
@@ -362,7 +403,7 @@ class StreamableHTTPClientTransport {
if (result !== 'AUTHORIZED') {
throw new auth_js_1.UnauthorizedError();
}
- return this.send(message);
+ return this._send(message, options, isSessionRetry);
}
}
throw new StreamableHTTPError(response.status, `Error POSTing to endpoint: ${text}`);
diff --git a/dist/cjs/shared/protocol.js b/dist/cjs/shared/protocol.js
index 3617e787f0ba70447c99501aee7aa67584d89758..4a96d6a0328fa348b96f3869ab7e0bb77538182b 100644
--- a/dist/cjs/shared/protocol.js
+++ b/dist/cjs/shared/protocol.js
@@ -744,7 +744,12 @@ class Protocol {
}
else {
// No related task - send through transport normally
- this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
+ this._transport.send(jsonrpcRequest, {
+ relatedRequestId,
+ resumptionToken,
+ onresumptiontoken,
+ isRequestActive: () => this._responseHandlers.has(messageId)
+ }).catch(error => {
this._cleanupTimeout(messageId);
reject(error);
});
diff --git a/dist/esm/client/index.js b/dist/esm/client/index.js
index 49b12c6cd918c457420fef7ad5528a9443d1a191..2afe2e22e960f26c9d516ef135d89f8eb9e4caff 100644
--- a/dist/esm/client/index.js
+++ b/dist/esm/client/index.js
@@ -284,41 +284,16 @@ export class Client extends Protocol {
}
async connect(transport, options) {
await super.connect(transport);
+ transport.onsessionexpired = async () => {
+ await this._initialize(transport);
+ };
// When transport sessionId is already set this means we are trying to reconnect.
// In this case we don't need to initialize again.
if (transport.sessionId !== undefined) {
return;
}
try {
- const result = await this.request({
- method: 'initialize',
- params: {
- protocolVersion: LATEST_PROTOCOL_VERSION,
- capabilities: this._capabilities,
- clientInfo: this._clientInfo
- }
- }, InitializeResultSchema, options);
- if (result === undefined) {
- throw new Error(`Server sent invalid initialize result: ${result}`);
- }
- if (!SUPPORTED_PROTOCOL_VERSIONS.includes(result.protocolVersion)) {
- throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
- }
- this._serverCapabilities = result.capabilities;
- this._serverVersion = result.serverInfo;
- // HTTP transports must set the protocol version in each header after initialization.
- if (transport.setProtocolVersion) {
- transport.setProtocolVersion(result.protocolVersion);
- }
- this._instructions = result.instructions;
- await this.notification({
- method: 'notifications/initialized'
- });
- // Set up list changed handlers now that we know server capabilities
- if (this._pendingListChangedConfig) {
- this._setupListChangedHandlers(this._pendingListChangedConfig);
- this._pendingListChangedConfig = undefined;
- }
+ await this._initialize(transport, options);
}
catch (error) {
// Disconnect if initialization fails.
@@ -326,6 +301,37 @@ export class Client extends Protocol {
throw error;
}
}
+ async _initialize(transport, options) {
+ const result = await this.request({
+ method: 'initialize',
+ params: {
+ protocolVersion: LATEST_PROTOCOL_VERSION,
+ capabilities: this._capabilities,
+ clientInfo: this._clientInfo
+ }
+ }, InitializeResultSchema, options);
+ if (result === undefined) {
+ throw new Error(`Server sent invalid initialize result: ${result}`);
+ }
+ if (!SUPPORTED_PROTOCOL_VERSIONS.includes(result.protocolVersion)) {
+ throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
+ }
+ this._serverCapabilities = result.capabilities;
+ this._serverVersion = result.serverInfo;
+ // HTTP transports must set the protocol version in each header after initialization.
+ if (transport.setProtocolVersion) {
+ transport.setProtocolVersion(result.protocolVersion);
+ }
+ this._instructions = result.instructions;
+ await this.notification({
+ method: 'notifications/initialized'
+ });
+ // Set up list changed handlers now that we know server capabilities
+ if (this._pendingListChangedConfig) {
+ this._setupListChangedHandlers(this._pendingListChangedConfig);
+ this._pendingListChangedConfig = undefined;
+ }
+ }
/**
* After initialization has completed, this will be populated with the server's reported capabilities.
*/
diff --git a/dist/esm/client/streamableHttp.js b/dist/esm/client/streamableHttp.js
index 624172aa24ae255a67c083f9c19053343e4a0581..ac75b14545fda44aff7ff4d97cc5da884fcc627a 100644
--- a/dist/esm/client/streamableHttp.js
+++ b/dist/esm/client/streamableHttp.js
@@ -1,5 +1,5 @@
import { createFetchWithInit, normalizeHeaders } from '../shared/transport.js';
-import { isInitializedNotification, isJSONRPCRequest, isJSONRPCResultResponse, JSONRPCMessageSchema } from '../types.js';
+import { isInitializedNotification, isInitializeRequest, isJSONRPCRequest, isJSONRPCResultResponse, JSONRPCMessageSchema } from '../types.js';
import { auth, extractWWWAuthenticateParams, UnauthorizedError } from './auth.js';
import { EventSourceParserStream } from 'eventsource-parser/stream';
// Default reconnection options for StreamableHTTP connections
@@ -286,7 +286,38 @@ export class StreamableHTTPClientTransport {
this.onclose?.();
}
async send(message, options) {
+ return this._send(message, options, false);
+ }
+ async _recoverSession(expiredSessionId) {
+ if (this._sessionRecovery) {
+ await this._sessionRecovery;
+ return true;
+ }
+ if (this._sessionId !== expiredSessionId)
+ return true;
+ this._sessionId = undefined;
+ this._sessionRecovery = Promise.resolve().then(() => this.onsessionexpired?.());
try {
+ await this._sessionRecovery;
+ }
+ catch (error) {
+ this._sessionId = undefined;
+ await this.close();
+ throw error;
+ }
+ finally {
+ this._sessionRecovery = undefined;
+ }
+ return true;
+ }
+ async _send(message, options, isSessionRetry) {
+ try {
+ if (this._sessionRecovery && !isInitializeRequest(message) && !isInitializedNotification(message)) {
+ await this._sessionRecovery;
+ if (options?.isRequestActive?.() === false) {
+ throw new Error('Request is no longer active');
+ }
+ }
const { resumptionToken, onresumptiontoken } = options || {};
if (resumptionToken) {
// If we have at last event ID, we need to reconnect the SSE stream
@@ -294,6 +325,7 @@ export class StreamableHTTPClientTransport {
return;
}
const headers = await this._commonHeaders();
+ const requestSessionId = headers.get('mcp-session-id') ?? undefined;
headers.set('content-type', 'application/json');
headers.set('accept', 'application/json, text/event-stream');
const init = {
@@ -306,11 +338,20 @@ export class StreamableHTTPClientTransport {
const response = await (this._fetch ?? fetch)(this._url, init);
// Handle session ID received during initialization
const sessionId = response.headers.get('mcp-session-id');
- if (sessionId) {
+ if (sessionId && (requestSessionId === undefined || this._sessionId === requestSessionId)) {
this._sessionId = sessionId;
}
if (!response.ok) {
const text = await response.text().catch(() => null);
+ if (response.status === 404 && requestSessionId && !isSessionRetry && !isInitializedNotification(message)) {
+ const recovered = await this._recoverSession(requestSessionId);
+ if (options?.isRequestActive?.() === false) {
+ throw new Error('Request is no longer active');
+ }
+ if (recovered) {
+ return this._send(message, options, true);
+ }
+ }
if (response.status === 401 && this._authProvider) {
// Prevent infinite recursion when server returns 401 after successful auth
if (this._hasCompletedAuthFlow) {
@@ -331,7 +372,7 @@ export class StreamableHTTPClientTransport {
// Mark that we completed auth flow
this._hasCompletedAuthFlow = true;
// Purposely _not_ awaited, so we don't call onerror twice
- return this.send(message);
+ return this._send(message, options, isSessionRetry);
}
if (response.status === 403 && this._authProvider) {
const { resourceMetadataUrl, scope, error } = extractWWWAuthenticateParams(response);
@@ -358,7 +399,7 @@ export class StreamableHTTPClientTransport {
if (result !== 'AUTHORIZED') {
throw new UnauthorizedError();
}
- return this.send(message);
+ return this._send(message, options, isSessionRetry);
}
}
throw new StreamableHTTPError(response.status, `Error POSTing to endpoint: ${text}`);
diff --git a/dist/esm/shared/protocol.js b/dist/esm/shared/protocol.js
index bfa2b7120a0f50c569364ea5264e6f811076f44f..abd8dfd707c155f71dae7aeeeeaf7547368ac749 100644
--- a/dist/esm/shared/protocol.js
+++ b/dist/esm/shared/protocol.js
@@ -740,7 +740,12 @@ export class Protocol {
}
else {
// No related task - send through transport normally
- this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
+ this._transport.send(jsonrpcRequest, {
+ relatedRequestId,
+ resumptionToken,
+ onresumptiontoken,
+ isRequestActive: () => this._responseHandlers.has(messageId)
+ }).catch(error => {
this._cleanupTimeout(messageId);
reject(error);
});