From f43b0d3afd80f4bb3b13982a25e350525d9ab189 Mon Sep 17 00:00:00 2001 From: Aiden Cline <63023139+rekram1-node@users.noreply.github.com> Date: Wed, 10 Jun 2026 14:38:08 -0500 Subject: [PATCH] fix(mcp): apply timeouts to catalog requests (#31618) --- packages/opencode/src/mcp/catalog.ts | 138 +++++++++++++++++++ packages/opencode/src/mcp/index.ts | 196 +++++---------------------- 2 files changed, 170 insertions(+), 164 deletions(-) create mode 100644 packages/opencode/src/mcp/catalog.ts diff --git a/packages/opencode/src/mcp/catalog.ts b/packages/opencode/src/mcp/catalog.ts new file mode 100644 index 000000000..71979dfd6 --- /dev/null +++ b/packages/opencode/src/mcp/catalog.ts @@ -0,0 +1,138 @@ +import { Client } from "@modelcontextprotocol/sdk/client/index.js" +import { + CallToolResultSchema, + ListToolsResultSchema, + ToolSchema, + type Tool as MCPToolDef, +} from "@modelcontextprotocol/sdk/types.js" +import { dynamicTool, jsonSchema, type JSONSchema7, type Tool } from "ai" +import { Effect } from "effect" + +const DEFAULT_TIMEOUT = 30_000 +const MAX_LIST_PAGES = 1_000 + +const TolerantListToolsResultSchema = ListToolsResultSchema.extend({ + tools: ToolSchema.omit({ outputSchema: true }).array(), +}) + +export async function paginate( + list: (cursor?: string) => Promise, + items: (result: R) => T[], +) { + const result: T[] = [] + const cursors = new Set() + let cursor: string | undefined + + for (let page = 0; page < MAX_LIST_PAGES; page++) { + const page = await list(cursor) + result.push(...items(page)) + if (page.nextCursor === undefined) return result + if (cursors.has(page.nextCursor)) throw new Error(`MCP list returned duplicate cursor: ${page.nextCursor}`) + cursors.add(page.nextCursor) + cursor = page.nextCursor + } + + throw new Error(`MCP list exceeded ${MAX_LIST_PAGES} pages`) +} + +export function defs(client: Client, timeout?: number) { + return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(Effect.catch(() => Effect.void)) +} + +export function convertTool(mcpTool: MCPToolDef, client: Client, timeout?: number): Tool { + const inputSchema: JSONSchema7 = { + ...(mcpTool.inputSchema as JSONSchema7), + type: "object", + properties: (mcpTool.inputSchema.properties ?? {}) as JSONSchema7["properties"], + additionalProperties: false, + } + + return dynamicTool({ + description: mcpTool.description ?? "", + inputSchema: jsonSchema(inputSchema), + execute: (args: unknown, options) => + client.callTool( + { + name: mcpTool.name, + arguments: (args || {}) as Record, + }, + CallToolResultSchema, + { + resetTimeoutOnProgress: true, + signal: options.abortSignal, + timeout, + }, + ), + }) +} + +export function fetch( + clientName: string, + client: Client, + list: (client: Client) => Promise, + label: string, +) { + return Effect.tryPromise({ + try: () => list(client), + catch: (error) => error, + }).pipe( + Effect.tapError((error) => + Effect.logWarning(`failed to get ${label}`, { + clientName, + error: error instanceof Error ? error.message : String(error), + }), + ), + Effect.map((items) => { + const sanitizedClient = sanitize(clientName) + return Object.fromEntries( + items.map((item) => [sanitizedClient + ":" + sanitize(item.name), { ...item, client: clientName }]), + ) + }), + Effect.orElseSucceed(() => undefined), + ) +} + +export const sanitize = (value: string) => value.replace(/[^a-zA-Z0-9_-]/g, "_") + +export function prompts(client: Client, timeout?: number) { + if (!client.getServerCapabilities()?.prompts) return Promise.resolve([]) + return paginate( + (cursor) => client.listPrompts(cursor === undefined ? undefined : { cursor }, { timeout }), + (result) => result.prompts, + ) +} + +export function resources(client: Client, timeout?: number) { + if (!client.getServerCapabilities()?.resources) return Promise.resolve([]) + return paginate( + (cursor) => client.listResources(cursor === undefined ? undefined : { cursor }, { timeout }), + (result) => result.resources, + ) +} + +function listTools(client: Client, timeout: number) { + return Effect.tryPromise({ + try: () => + paginate( + async (cursor) => { + const params = cursor === undefined ? undefined : { cursor } + try { + return await client.listTools(params, { timeout }) + } catch (error) { + if (!(error instanceof Error) || !isOutputSchemaValidationError(error)) throw error + return client.request({ method: "tools/list", params }, TolerantListToolsResultSchema, { timeout }) + } + }, + (result) => result.tools, + ), + catch: (error) => (error instanceof Error ? error : new Error(String(error))), + }) +} + +function isOutputSchemaValidationError(error: Error) { + return /can't resolve reference|resolves to more than one schema|outputSchema|schema.*reference|reference.*schema/i.test( + error.message, + ) +} + +export * as McpCatalog from "./catalog" diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index d40594bac..0ff39589a 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -1,5 +1,5 @@ import { LayerNode } from "@opencode-ai/core/effect/layer-node" -import { dynamicTool, type Tool, jsonSchema, type JSONSchema7 } from "ai" +import { type Tool } from "ai" import { ConfigV1 } from "@opencode-ai/core/v1/config/config" import { serviceUse } from "@opencode-ai/core/effect/service-use" import { Client } from "@modelcontextprotocol/sdk/client/index.js" @@ -7,13 +7,7 @@ import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/ import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js" import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js" import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js" -import { - CallToolResultSchema, - ListToolsResultSchema, - ToolSchema, - type Tool as MCPToolDef, - ToolListChangedNotificationSchema, -} from "@modelcontextprotocol/sdk/types.js" +import { type Tool as MCPToolDef, ToolListChangedNotificationSchema } from "@modelcontextprotocol/sdk/types.js" import { Config } from "@/config/config" import { ConfigMCPV1 } from "@opencode-ai/core/v1/config/mcp" import { NamedError } from "@opencode-ai/core/util/error" @@ -32,13 +26,10 @@ import { EffectBridge } from "@/effect/bridge" import { InstanceState } from "@/effect/instance-state" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" +import { McpCatalog } from "./catalog" const DEFAULT_TIMEOUT = 30_000 -const TolerantListToolsResultSchema = ListToolsResultSchema.extend({ - tools: ToolSchema.omit({ outputSchema: true }).array(), -}) - export const Resource = Schema.Struct({ name: Schema.String, uri: Schema.String, @@ -112,122 +103,10 @@ function isMcpConfigured(entry: McpEntry): entry is ConfigMCPV1.Info { return typeof entry === "object" && entry !== null && "type" in entry } -const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_") -const MAX_LIST_PAGES = 1_000 - function remoteURL(value: string) { if (URL.canParse(value)) return new URL(value) } -function isOutputSchemaValidationError(error: Error) { - return /can't resolve reference|resolves to more than one schema|outputSchema|schema.*reference|reference.*schema/i.test( - error.message, - ) -} - -async function paginate( - list: (cursor?: string) => Promise, - items: (result: R) => T[], -) { - const result: T[] = [] - const cursors = new Set() - let cursor: string | undefined - - for (let page = 0; page < MAX_LIST_PAGES; page++) { - const page = await list(cursor) - result.push(...items(page)) - if (page.nextCursor === undefined) return result - if (cursors.has(page.nextCursor)) throw new Error(`MCP list returned duplicate cursor: ${page.nextCursor}`) - cursors.add(page.nextCursor) - cursor = page.nextCursor - } - - throw new Error(`MCP list exceeded ${MAX_LIST_PAGES} pages`) -} - -function listTools(client: MCPClient, timeout: number) { - return Effect.tryPromise({ - try: () => - paginate( - async (cursor) => { - const params = cursor === undefined ? undefined : { cursor } - try { - return await client.listTools(params, { timeout }) - } catch (error) { - if (!(error instanceof Error) || !isOutputSchemaValidationError(error)) throw error - return client.request({ method: "tools/list", params }, TolerantListToolsResultSchema, { timeout }) - } - }, - (result) => result.tools, - ), - catch: (err) => (err instanceof Error ? err : new Error(String(err))), - }) -} - -// Convert MCP tool definition to AI SDK Tool type -function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { - const inputSchema = mcpTool.inputSchema - - // Spread first, then override type to ensure it's always "object" - const schema: JSONSchema7 = { - ...(inputSchema as JSONSchema7), - type: "object", - properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"], - additionalProperties: false, - } - - return dynamicTool({ - description: mcpTool.description ?? "", - inputSchema: jsonSchema(schema), - execute: async (args: unknown, options) => { - return client.callTool( - { - name: mcpTool.name, - arguments: (args || {}) as Record, - }, - CallToolResultSchema, - { - resetTimeoutOnProgress: true, - signal: options.abortSignal, - timeout, - }, - ) - }, - }) -} - -function defs(client: MCPClient, timeout?: number) { - return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(Effect.catch(() => Effect.void)) -} - -function fetchFromClient( - clientName: string, - client: Client, - listFn: (c: Client) => Promise, - label: string, -) { - return Effect.tryPromise({ - try: () => listFn(client), - catch: (error) => error, - }).pipe( - Effect.tapError((error) => - Effect.logWarning(`failed to get ${label}`, { - clientName, - error: error instanceof Error ? error.message : String(error), - }), - ), - Effect.map((items) => { - const out: Record = {} - const sanitizedClient = sanitize(clientName) - for (const item of items) { - out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName } - } - return out - }), - Effect.orElseSucceed(() => undefined), - ) -} - interface CreateResult { mcpClient?: MCPClient status: Status @@ -465,7 +344,7 @@ export const layer = Layer.effect( } return yield* Effect.gen(function* () { - const listed = mcpClient.getServerCapabilities()?.tools ? yield* defs(mcpClient, mcp.timeout) : [] + const listed = mcpClient.getServerCapabilities()?.tools ? yield* McpCatalog.defs(mcpClient, mcp.timeout) : [] if (!listed) { return yield* Effect.fail(new Error("Failed to get tools")) } @@ -516,7 +395,7 @@ export const layer = Layer.effect( client.setNotificationHandler(ToolListChangedNotificationSchema, async () => { if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - const listed = await bridge.promise(defs(client, timeout)) + const listed = await bridge.promise(McpCatalog.defs(client, timeout)) if (!listed) return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return @@ -670,6 +549,11 @@ export const layer = Layer.effect( s.status[name] = { status: "disabled" } }) + function requestTimeout(s: State, name: string, configured: McpEntry | undefined, fallback?: number) { + const staticTimeout = configured && isMcpConfigured(configured) ? configured.timeout : undefined + return s.config[name]?.timeout ?? staticTimeout ?? fallback + } + const tools = Effect.fn("MCP.tools")(function* () { const result: Record = {} const s = yield* InstanceState.get(state) @@ -681,15 +565,15 @@ export const layer = Layer.effect( for (const [clientName, client] of Object.entries(s.clients)) { if (s.status[clientName]?.status !== "connected") continue const mcpConfig = config[clientName] - const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : s.config[clientName] const listed = s.defs[clientName] if (!listed) { yield* Effect.logWarning("missing cached tools for connected server", { clientName }) continue } - const timeout = entry?.timeout ?? defaultTimeout + const timeout = requestTimeout(s, clientName, mcpConfig, defaultTimeout) for (const mcpTool of listed) { - result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout) + const key = McpCatalog.sanitize(clientName) + "_" + McpCatalog.sanitize(mcpTool.name) + result[key] = McpCatalog.convertTool(mcpTool, client, timeout) } } return result @@ -697,45 +581,31 @@ export const layer = Layer.effect( function collectFromConnected( s: State, - listFn: (c: Client) => Promise, + listFn: (c: Client, timeout?: number) => Promise, label: string, ) { - return Effect.forEach( - Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"), - ([clientName, client]) => - fetchFromClient(clientName, client, listFn, label).pipe(Effect.map((items) => Object.entries(items ?? {}))), - { concurrency: "unbounded" }, - ).pipe(Effect.map((results) => Object.fromEntries(results.flat()))) + return Effect.gen(function* () { + const cfg = yield* cfgSvc.get() + return yield* Effect.forEach( + Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"), + ([clientName, client]) => + McpCatalog.fetch( + clientName, + client, + (c) => listFn(c, requestTimeout(s, clientName, cfg.mcp?.[clientName], cfg.experimental?.mcp_timeout)), + label, + ).pipe(Effect.map((items) => Object.entries(items ?? {}))), + { concurrency: "unbounded" }, + ).pipe(Effect.map((results) => Object.fromEntries(results.flat()))) + }) } const prompts = Effect.fn("MCP.prompts")(function* () { - const s = yield* InstanceState.get(state) - return yield* collectFromConnected( - s, - (c) => - c.getServerCapabilities()?.prompts - ? paginate( - (cursor) => c.listPrompts(cursor === undefined ? undefined : { cursor }), - (result) => result.prompts, - ) - : Promise.resolve([]), - "prompts", - ) + return yield* collectFromConnected(yield* InstanceState.get(state), McpCatalog.prompts, "prompts") }) const resources = Effect.fn("MCP.resources")(function* () { - const s = yield* InstanceState.get(state) - return yield* collectFromConnected( - s, - (c) => - c.getServerCapabilities()?.resources - ? paginate( - (cursor) => c.listResources(cursor === undefined ? undefined : { cursor }), - (result) => result.resources, - ) - : Promise.resolve([]), - "resources", - ) + return yield* collectFromConnected(yield* InstanceState.get(state), McpCatalog.resources, "resources") }) const withClient = Effect.fnUntraced(function* ( @@ -751,10 +621,8 @@ export const layer = Layer.effect( return undefined } const cfg = yield* cfgSvc.get() - const configured = cfg.mcp?.[clientName] - const staticTimeout = configured && isMcpConfigured(configured) ? configured.timeout : undefined return yield* Effect.tryPromise({ - try: () => fn(client, s.config[clientName]?.timeout ?? staticTimeout ?? cfg.experimental?.mcp_timeout), + try: () => fn(client, requestTimeout(s, clientName, cfg.mcp?.[clientName], cfg.experimental?.mcp_timeout)), catch: (error) => error, }).pipe( Effect.tapError((error) => @@ -877,7 +745,7 @@ export const layer = Layer.effect( const listed = client ? client.getServerCapabilities()?.tools - ? yield* defs(client, mcpConfig.timeout) + ? yield* McpCatalog.defs(client, mcpConfig.timeout) : [] : undefined if (!client || !listed) {