From 383a2e5cf152ba0ae320e70e5301821f00c7dde5 Mon Sep 17 00:00:00 2001 From: Joel Johnson Date: Tue, 3 Mar 2026 22:22:49 -0500 Subject: [PATCH] fix: support multiple concurrent HTTP clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The HTTP MCP server creates a single Transport + McpServer pair at startup. Once the first client initializes, all subsequent clients are rejected with "Server already initialized" — making the HTTP mode unusable for reconnect, crash recovery, or multi-client scenarios. Replace the singleton with a per-session architecture: each initialize request creates its own McpServer + Transport pair, stored in a sessions Map keyed by session ID. The shared Store (SQLite) is stateless and safe for concurrent access. Key changes: - createSession() factory creates fresh McpServer + Transport per client - POST /mcp routes by mcp-session-id header to existing sessions - New initialize requests (no session header) create new sessions - Unknown session IDs return 404 per MCP spec - Missing session IDs return 400 - onsessioninitialized callback stores sessions at the right time - transport.onclose cleans up the sessions Map - Shutdown iterates all active sessions Tested with 3+ concurrent clients, session cleanup via DELETE, cross-session isolation, and rapid session creation. Fixes #195 Closes #163 Co-Authored-By: Claude Opus 4.6 --- src/mcp.ts | 92 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 84 insertions(+), 8 deletions(-) diff --git a/src/mcp.ts b/src/mcp.ts index ebd45eb..d5e1fbf 100644 --- a/src/mcp.ts +++ b/src/mcp.ts @@ -14,6 +14,7 @@ import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mc import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js"; +import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"; import { z } from "zod"; import { createStore, @@ -552,12 +553,31 @@ export type HttpServerHandle = { */ export async function startMcpHttpServer(port: number, options?: { quiet?: boolean }): Promise { const store = createStore(); - const mcpServer = createMcpServer(store); - const transport = new WebStandardStreamableHTTPServerTransport({ - sessionIdGenerator: () => randomUUID(), - enableJsonResponse: true, - }); - await mcpServer.connect(transport); + + // Session map: each client gets its own McpServer + Transport pair (MCP spec requirement). + // The store is shared — it's stateless SQLite, safe for concurrent access. + const sessions = new Map(); + + async function createSession(): Promise { + const transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + enableJsonResponse: true, + onsessioninitialized: (sessionId: string) => { + sessions.set(sessionId, transport); + log(`${ts()} New session ${sessionId} (${sessions.size} active)`); + }, + }); + const server = createMcpServer(store); + await server.connect(transport); + + transport.onclose = () => { + if (transport.sessionId) { + sessions.delete(transport.sessionId); + } + }; + + return transport; + } const startTime = Date.now(); const quiet = options?.quiet ?? false; @@ -669,8 +689,38 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole for (const [k, v] of Object.entries(nodeReq.headers)) { if (typeof v === "string") headers[k] = v; } + + // Route to existing session or create new one on initialize + const sessionId = headers["mcp-session-id"]; + let transport: WebStandardStreamableHTTPServerTransport; + + if (sessionId) { + const existing = sessions.get(sessionId); + if (!existing) { + nodeRes.writeHead(404, { "Content-Type": "application/json" }); + nodeRes.end(JSON.stringify({ + jsonrpc: "2.0", + error: { code: -32001, message: "Session not found" }, + id: body?.id ?? null, + })); + return; + } + transport = existing; + } else if (isInitializeRequest(body)) { + transport = await createSession(); + } else { + nodeRes.writeHead(400, { "Content-Type": "application/json" }); + nodeRes.end(JSON.stringify({ + jsonrpc: "2.0", + error: { code: -32000, message: "Bad Request: Missing session ID" }, + id: body?.id ?? null, + })); + return; + } + const request = new Request(url, { method: "POST", headers, body: rawBody }); const response = await transport.handleRequest(request, { parsedBody: body }); + nodeRes.writeHead(response.status, Object.fromEntries(response.headers)); nodeRes.end(Buffer.from(await response.arrayBuffer())); log(`${ts()} POST /mcp ${label} (${Date.now() - reqStart}ms)`); @@ -678,11 +728,34 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole } if (pathname === "/mcp") { - const url = `http://localhost:${port}${pathname}`; const headers: Record = {}; for (const [k, v] of Object.entries(nodeReq.headers)) { if (typeof v === "string") headers[k] = v; } + + // GET/DELETE must have a valid session + const sessionId = headers["mcp-session-id"]; + if (!sessionId) { + nodeRes.writeHead(400, { "Content-Type": "application/json" }); + nodeRes.end(JSON.stringify({ + jsonrpc: "2.0", + error: { code: -32000, message: "Bad Request: Missing session ID" }, + id: null, + })); + return; + } + const transport = sessions.get(sessionId); + if (!transport) { + nodeRes.writeHead(404, { "Content-Type": "application/json" }); + nodeRes.end(JSON.stringify({ + jsonrpc: "2.0", + error: { code: -32001, message: "Session not found" }, + id: null, + })); + return; + } + + const url = `http://localhost:${port}${pathname}`; const rawBody = nodeReq.method !== "GET" && nodeReq.method !== "HEAD" ? await collectBody(nodeReq) : undefined; const request = new Request(url, { method: nodeReq.method || "GET", headers, ...(rawBody ? { body: rawBody } : {}) }); const response = await transport.handleRequest(request); @@ -711,7 +784,10 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole const stop = async () => { if (stopping) return; stopping = true; - await transport.close(); + for (const transport of sessions.values()) { + await transport.close(); + } + sessions.clear(); httpServer.close(); store.close(); await disposeDefaultLlamaCpp();