Merge pull request #286 from joelev/fix/multi-session-http
fix: support multiple concurrent HTTP clients
This commit is contained in:
commit
e3bc5ccdc3
92
src/mcp.ts
92
src/mcp.ts
@ -14,6 +14,7 @@ import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mc
|
||||
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
|
||||
import { WebStandardStreamableHTTPServerTransport }
|
||||
from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js";
|
||||
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
|
||||
import { z } from "zod";
|
||||
import {
|
||||
createStore,
|
||||
@ -552,12 +553,31 @@ export type HttpServerHandle = {
|
||||
*/
|
||||
export async function startMcpHttpServer(port: number, options?: { quiet?: boolean }): Promise<HttpServerHandle> {
|
||||
const store = createStore();
|
||||
const mcpServer = createMcpServer(store);
|
||||
const transport = new WebStandardStreamableHTTPServerTransport({
|
||||
sessionIdGenerator: () => randomUUID(),
|
||||
enableJsonResponse: true,
|
||||
});
|
||||
await mcpServer.connect(transport);
|
||||
|
||||
// Session map: each client gets its own McpServer + Transport pair (MCP spec requirement).
|
||||
// The store is shared — it's stateless SQLite, safe for concurrent access.
|
||||
const sessions = new Map<string, WebStandardStreamableHTTPServerTransport>();
|
||||
|
||||
async function createSession(): Promise<WebStandardStreamableHTTPServerTransport> {
|
||||
const transport = new WebStandardStreamableHTTPServerTransport({
|
||||
sessionIdGenerator: () => randomUUID(),
|
||||
enableJsonResponse: true,
|
||||
onsessioninitialized: (sessionId: string) => {
|
||||
sessions.set(sessionId, transport);
|
||||
log(`${ts()} New session ${sessionId} (${sessions.size} active)`);
|
||||
},
|
||||
});
|
||||
const server = createMcpServer(store);
|
||||
await server.connect(transport);
|
||||
|
||||
transport.onclose = () => {
|
||||
if (transport.sessionId) {
|
||||
sessions.delete(transport.sessionId);
|
||||
}
|
||||
};
|
||||
|
||||
return transport;
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
const quiet = options?.quiet ?? false;
|
||||
@ -669,8 +689,38 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
|
||||
for (const [k, v] of Object.entries(nodeReq.headers)) {
|
||||
if (typeof v === "string") headers[k] = v;
|
||||
}
|
||||
|
||||
// Route to existing session or create new one on initialize
|
||||
const sessionId = headers["mcp-session-id"];
|
||||
let transport: WebStandardStreamableHTTPServerTransport;
|
||||
|
||||
if (sessionId) {
|
||||
const existing = sessions.get(sessionId);
|
||||
if (!existing) {
|
||||
nodeRes.writeHead(404, { "Content-Type": "application/json" });
|
||||
nodeRes.end(JSON.stringify({
|
||||
jsonrpc: "2.0",
|
||||
error: { code: -32001, message: "Session not found" },
|
||||
id: body?.id ?? null,
|
||||
}));
|
||||
return;
|
||||
}
|
||||
transport = existing;
|
||||
} else if (isInitializeRequest(body)) {
|
||||
transport = await createSession();
|
||||
} else {
|
||||
nodeRes.writeHead(400, { "Content-Type": "application/json" });
|
||||
nodeRes.end(JSON.stringify({
|
||||
jsonrpc: "2.0",
|
||||
error: { code: -32000, message: "Bad Request: Missing session ID" },
|
||||
id: body?.id ?? null,
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
const request = new Request(url, { method: "POST", headers, body: rawBody });
|
||||
const response = await transport.handleRequest(request, { parsedBody: body });
|
||||
|
||||
nodeRes.writeHead(response.status, Object.fromEntries(response.headers));
|
||||
nodeRes.end(Buffer.from(await response.arrayBuffer()));
|
||||
log(`${ts()} POST /mcp ${label} (${Date.now() - reqStart}ms)`);
|
||||
@ -678,11 +728,34 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
|
||||
}
|
||||
|
||||
if (pathname === "/mcp") {
|
||||
const url = `http://localhost:${port}${pathname}`;
|
||||
const headers: Record<string, string> = {};
|
||||
for (const [k, v] of Object.entries(nodeReq.headers)) {
|
||||
if (typeof v === "string") headers[k] = v;
|
||||
}
|
||||
|
||||
// GET/DELETE must have a valid session
|
||||
const sessionId = headers["mcp-session-id"];
|
||||
if (!sessionId) {
|
||||
nodeRes.writeHead(400, { "Content-Type": "application/json" });
|
||||
nodeRes.end(JSON.stringify({
|
||||
jsonrpc: "2.0",
|
||||
error: { code: -32000, message: "Bad Request: Missing session ID" },
|
||||
id: null,
|
||||
}));
|
||||
return;
|
||||
}
|
||||
const transport = sessions.get(sessionId);
|
||||
if (!transport) {
|
||||
nodeRes.writeHead(404, { "Content-Type": "application/json" });
|
||||
nodeRes.end(JSON.stringify({
|
||||
jsonrpc: "2.0",
|
||||
error: { code: -32001, message: "Session not found" },
|
||||
id: null,
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
const url = `http://localhost:${port}${pathname}`;
|
||||
const rawBody = nodeReq.method !== "GET" && nodeReq.method !== "HEAD" ? await collectBody(nodeReq) : undefined;
|
||||
const request = new Request(url, { method: nodeReq.method || "GET", headers, ...(rawBody ? { body: rawBody } : {}) });
|
||||
const response = await transport.handleRequest(request);
|
||||
@ -711,7 +784,10 @@ export async function startMcpHttpServer(port: number, options?: { quiet?: boole
|
||||
const stop = async () => {
|
||||
if (stopping) return;
|
||||
stopping = true;
|
||||
await transport.close();
|
||||
for (const transport of sessions.values()) {
|
||||
await transport.close();
|
||||
}
|
||||
sessions.clear();
|
||||
httpServer.close();
|
||||
store.close();
|
||||
await disposeDefaultLlamaCpp();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user