fix(mcp): apply timeouts to catalog requests (#31618)
This commit is contained in:
parent
722f4dd416
commit
f43b0d3afd
138
packages/opencode/src/mcp/catalog.ts
Normal file
138
packages/opencode/src/mcp/catalog.ts
Normal file
@ -0,0 +1,138 @@
|
||||
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
|
||||
import {
|
||||
CallToolResultSchema,
|
||||
ListToolsResultSchema,
|
||||
ToolSchema,
|
||||
type Tool as MCPToolDef,
|
||||
} from "@modelcontextprotocol/sdk/types.js"
|
||||
import { dynamicTool, jsonSchema, type JSONSchema7, type Tool } from "ai"
|
||||
import { Effect } from "effect"
|
||||
|
||||
const DEFAULT_TIMEOUT = 30_000
|
||||
const MAX_LIST_PAGES = 1_000
|
||||
|
||||
const TolerantListToolsResultSchema = ListToolsResultSchema.extend({
|
||||
tools: ToolSchema.omit({ outputSchema: true }).array(),
|
||||
})
|
||||
|
||||
export async function paginate<T, R extends { nextCursor?: string }>(
|
||||
list: (cursor?: string) => Promise<R>,
|
||||
items: (result: R) => T[],
|
||||
) {
|
||||
const result: T[] = []
|
||||
const cursors = new Set<string>()
|
||||
let cursor: string | undefined
|
||||
|
||||
for (let page = 0; page < MAX_LIST_PAGES; page++) {
|
||||
const page = await list(cursor)
|
||||
result.push(...items(page))
|
||||
if (page.nextCursor === undefined) return result
|
||||
if (cursors.has(page.nextCursor)) throw new Error(`MCP list returned duplicate cursor: ${page.nextCursor}`)
|
||||
cursors.add(page.nextCursor)
|
||||
cursor = page.nextCursor
|
||||
}
|
||||
|
||||
throw new Error(`MCP list exceeded ${MAX_LIST_PAGES} pages`)
|
||||
}
|
||||
|
||||
export function defs(client: Client, timeout?: number) {
|
||||
return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(Effect.catch(() => Effect.void))
|
||||
}
|
||||
|
||||
export function convertTool(mcpTool: MCPToolDef, client: Client, timeout?: number): Tool {
|
||||
const inputSchema: JSONSchema7 = {
|
||||
...(mcpTool.inputSchema as JSONSchema7),
|
||||
type: "object",
|
||||
properties: (mcpTool.inputSchema.properties ?? {}) as JSONSchema7["properties"],
|
||||
additionalProperties: false,
|
||||
}
|
||||
|
||||
return dynamicTool({
|
||||
description: mcpTool.description ?? "",
|
||||
inputSchema: jsonSchema(inputSchema),
|
||||
execute: (args: unknown, options) =>
|
||||
client.callTool(
|
||||
{
|
||||
name: mcpTool.name,
|
||||
arguments: (args || {}) as Record<string, unknown>,
|
||||
},
|
||||
CallToolResultSchema,
|
||||
{
|
||||
resetTimeoutOnProgress: true,
|
||||
signal: options.abortSignal,
|
||||
timeout,
|
||||
},
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
export function fetch<T extends { name: string }>(
|
||||
clientName: string,
|
||||
client: Client,
|
||||
list: (client: Client) => Promise<T[]>,
|
||||
label: string,
|
||||
) {
|
||||
return Effect.tryPromise({
|
||||
try: () => list(client),
|
||||
catch: (error) => error,
|
||||
}).pipe(
|
||||
Effect.tapError((error) =>
|
||||
Effect.logWarning(`failed to get ${label}`, {
|
||||
clientName,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
}),
|
||||
),
|
||||
Effect.map((items) => {
|
||||
const sanitizedClient = sanitize(clientName)
|
||||
return Object.fromEntries(
|
||||
items.map((item) => [sanitizedClient + ":" + sanitize(item.name), { ...item, client: clientName }]),
|
||||
)
|
||||
}),
|
||||
Effect.orElseSucceed(() => undefined),
|
||||
)
|
||||
}
|
||||
|
||||
export const sanitize = (value: string) => value.replace(/[^a-zA-Z0-9_-]/g, "_")
|
||||
|
||||
export function prompts(client: Client, timeout?: number) {
|
||||
if (!client.getServerCapabilities()?.prompts) return Promise.resolve([])
|
||||
return paginate(
|
||||
(cursor) => client.listPrompts(cursor === undefined ? undefined : { cursor }, { timeout }),
|
||||
(result) => result.prompts,
|
||||
)
|
||||
}
|
||||
|
||||
export function resources(client: Client, timeout?: number) {
|
||||
if (!client.getServerCapabilities()?.resources) return Promise.resolve([])
|
||||
return paginate(
|
||||
(cursor) => client.listResources(cursor === undefined ? undefined : { cursor }, { timeout }),
|
||||
(result) => result.resources,
|
||||
)
|
||||
}
|
||||
|
||||
function listTools(client: Client, timeout: number) {
|
||||
return Effect.tryPromise({
|
||||
try: () =>
|
||||
paginate(
|
||||
async (cursor) => {
|
||||
const params = cursor === undefined ? undefined : { cursor }
|
||||
try {
|
||||
return await client.listTools(params, { timeout })
|
||||
} catch (error) {
|
||||
if (!(error instanceof Error) || !isOutputSchemaValidationError(error)) throw error
|
||||
return client.request({ method: "tools/list", params }, TolerantListToolsResultSchema, { timeout })
|
||||
}
|
||||
},
|
||||
(result) => result.tools,
|
||||
),
|
||||
catch: (error) => (error instanceof Error ? error : new Error(String(error))),
|
||||
})
|
||||
}
|
||||
|
||||
function isOutputSchemaValidationError(error: Error) {
|
||||
return /can't resolve reference|resolves to more than one schema|outputSchema|schema.*reference|reference.*schema/i.test(
|
||||
error.message,
|
||||
)
|
||||
}
|
||||
|
||||
export * as McpCatalog from "./catalog"
|
||||
@ -1,5 +1,5 @@
|
||||
import { LayerNode } from "@opencode-ai/core/effect/layer-node"
|
||||
import { dynamicTool, type Tool, jsonSchema, type JSONSchema7 } from "ai"
|
||||
import { type Tool } from "ai"
|
||||
import { ConfigV1 } from "@opencode-ai/core/v1/config/config"
|
||||
import { serviceUse } from "@opencode-ai/core/effect/service-use"
|
||||
import { Client } from "@modelcontextprotocol/sdk/client/index.js"
|
||||
@ -7,13 +7,7 @@ import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/
|
||||
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"
|
||||
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
|
||||
import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js"
|
||||
import {
|
||||
CallToolResultSchema,
|
||||
ListToolsResultSchema,
|
||||
ToolSchema,
|
||||
type Tool as MCPToolDef,
|
||||
ToolListChangedNotificationSchema,
|
||||
} from "@modelcontextprotocol/sdk/types.js"
|
||||
import { type Tool as MCPToolDef, ToolListChangedNotificationSchema } from "@modelcontextprotocol/sdk/types.js"
|
||||
import { Config } from "@/config/config"
|
||||
import { ConfigMCPV1 } from "@opencode-ai/core/v1/config/mcp"
|
||||
import { NamedError } from "@opencode-ai/core/util/error"
|
||||
@ -32,13 +26,10 @@ import { EffectBridge } from "@/effect/bridge"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { McpCatalog } from "./catalog"
|
||||
|
||||
const DEFAULT_TIMEOUT = 30_000
|
||||
|
||||
const TolerantListToolsResultSchema = ListToolsResultSchema.extend({
|
||||
tools: ToolSchema.omit({ outputSchema: true }).array(),
|
||||
})
|
||||
|
||||
export const Resource = Schema.Struct({
|
||||
name: Schema.String,
|
||||
uri: Schema.String,
|
||||
@ -112,122 +103,10 @@ function isMcpConfigured(entry: McpEntry): entry is ConfigMCPV1.Info {
|
||||
return typeof entry === "object" && entry !== null && "type" in entry
|
||||
}
|
||||
|
||||
const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
|
||||
const MAX_LIST_PAGES = 1_000
|
||||
|
||||
function remoteURL(value: string) {
|
||||
if (URL.canParse(value)) return new URL(value)
|
||||
}
|
||||
|
||||
function isOutputSchemaValidationError(error: Error) {
|
||||
return /can't resolve reference|resolves to more than one schema|outputSchema|schema.*reference|reference.*schema/i.test(
|
||||
error.message,
|
||||
)
|
||||
}
|
||||
|
||||
async function paginate<T, R extends { nextCursor?: string }>(
|
||||
list: (cursor?: string) => Promise<R>,
|
||||
items: (result: R) => T[],
|
||||
) {
|
||||
const result: T[] = []
|
||||
const cursors = new Set<string>()
|
||||
let cursor: string | undefined
|
||||
|
||||
for (let page = 0; page < MAX_LIST_PAGES; page++) {
|
||||
const page = await list(cursor)
|
||||
result.push(...items(page))
|
||||
if (page.nextCursor === undefined) return result
|
||||
if (cursors.has(page.nextCursor)) throw new Error(`MCP list returned duplicate cursor: ${page.nextCursor}`)
|
||||
cursors.add(page.nextCursor)
|
||||
cursor = page.nextCursor
|
||||
}
|
||||
|
||||
throw new Error(`MCP list exceeded ${MAX_LIST_PAGES} pages`)
|
||||
}
|
||||
|
||||
function listTools(client: MCPClient, timeout: number) {
|
||||
return Effect.tryPromise({
|
||||
try: () =>
|
||||
paginate(
|
||||
async (cursor) => {
|
||||
const params = cursor === undefined ? undefined : { cursor }
|
||||
try {
|
||||
return await client.listTools(params, { timeout })
|
||||
} catch (error) {
|
||||
if (!(error instanceof Error) || !isOutputSchemaValidationError(error)) throw error
|
||||
return client.request({ method: "tools/list", params }, TolerantListToolsResultSchema, { timeout })
|
||||
}
|
||||
},
|
||||
(result) => result.tools,
|
||||
),
|
||||
catch: (err) => (err instanceof Error ? err : new Error(String(err))),
|
||||
})
|
||||
}
|
||||
|
||||
// Convert MCP tool definition to AI SDK Tool type
|
||||
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
|
||||
const inputSchema = mcpTool.inputSchema
|
||||
|
||||
// Spread first, then override type to ensure it's always "object"
|
||||
const schema: JSONSchema7 = {
|
||||
...(inputSchema as JSONSchema7),
|
||||
type: "object",
|
||||
properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"],
|
||||
additionalProperties: false,
|
||||
}
|
||||
|
||||
return dynamicTool({
|
||||
description: mcpTool.description ?? "",
|
||||
inputSchema: jsonSchema(schema),
|
||||
execute: async (args: unknown, options) => {
|
||||
return client.callTool(
|
||||
{
|
||||
name: mcpTool.name,
|
||||
arguments: (args || {}) as Record<string, unknown>,
|
||||
},
|
||||
CallToolResultSchema,
|
||||
{
|
||||
resetTimeoutOnProgress: true,
|
||||
signal: options.abortSignal,
|
||||
timeout,
|
||||
},
|
||||
)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
function defs(client: MCPClient, timeout?: number) {
|
||||
return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(Effect.catch(() => Effect.void))
|
||||
}
|
||||
|
||||
function fetchFromClient<T extends { name: string }>(
|
||||
clientName: string,
|
||||
client: Client,
|
||||
listFn: (c: Client) => Promise<T[]>,
|
||||
label: string,
|
||||
) {
|
||||
return Effect.tryPromise({
|
||||
try: () => listFn(client),
|
||||
catch: (error) => error,
|
||||
}).pipe(
|
||||
Effect.tapError((error) =>
|
||||
Effect.logWarning(`failed to get ${label}`, {
|
||||
clientName,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
}),
|
||||
),
|
||||
Effect.map((items) => {
|
||||
const out: Record<string, T & { client: string }> = {}
|
||||
const sanitizedClient = sanitize(clientName)
|
||||
for (const item of items) {
|
||||
out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName }
|
||||
}
|
||||
return out
|
||||
}),
|
||||
Effect.orElseSucceed(() => undefined),
|
||||
)
|
||||
}
|
||||
|
||||
interface CreateResult {
|
||||
mcpClient?: MCPClient
|
||||
status: Status
|
||||
@ -465,7 +344,7 @@ export const layer = Layer.effect(
|
||||
}
|
||||
|
||||
return yield* Effect.gen(function* () {
|
||||
const listed = mcpClient.getServerCapabilities()?.tools ? yield* defs(mcpClient, mcp.timeout) : []
|
||||
const listed = mcpClient.getServerCapabilities()?.tools ? yield* McpCatalog.defs(mcpClient, mcp.timeout) : []
|
||||
if (!listed) {
|
||||
return yield* Effect.fail(new Error("Failed to get tools"))
|
||||
}
|
||||
@ -516,7 +395,7 @@ export const layer = Layer.effect(
|
||||
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
const listed = await bridge.promise(defs(client, timeout))
|
||||
const listed = await bridge.promise(McpCatalog.defs(client, timeout))
|
||||
if (!listed) return
|
||||
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
|
||||
|
||||
@ -670,6 +549,11 @@ export const layer = Layer.effect(
|
||||
s.status[name] = { status: "disabled" }
|
||||
})
|
||||
|
||||
function requestTimeout(s: State, name: string, configured: McpEntry | undefined, fallback?: number) {
|
||||
const staticTimeout = configured && isMcpConfigured(configured) ? configured.timeout : undefined
|
||||
return s.config[name]?.timeout ?? staticTimeout ?? fallback
|
||||
}
|
||||
|
||||
const tools = Effect.fn("MCP.tools")(function* () {
|
||||
const result: Record<string, Tool> = {}
|
||||
const s = yield* InstanceState.get(state)
|
||||
@ -681,15 +565,15 @@ export const layer = Layer.effect(
|
||||
for (const [clientName, client] of Object.entries(s.clients)) {
|
||||
if (s.status[clientName]?.status !== "connected") continue
|
||||
const mcpConfig = config[clientName]
|
||||
const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : s.config[clientName]
|
||||
const listed = s.defs[clientName]
|
||||
if (!listed) {
|
||||
yield* Effect.logWarning("missing cached tools for connected server", { clientName })
|
||||
continue
|
||||
}
|
||||
const timeout = entry?.timeout ?? defaultTimeout
|
||||
const timeout = requestTimeout(s, clientName, mcpConfig, defaultTimeout)
|
||||
for (const mcpTool of listed) {
|
||||
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
|
||||
const key = McpCatalog.sanitize(clientName) + "_" + McpCatalog.sanitize(mcpTool.name)
|
||||
result[key] = McpCatalog.convertTool(mcpTool, client, timeout)
|
||||
}
|
||||
}
|
||||
return result
|
||||
@ -697,45 +581,31 @@ export const layer = Layer.effect(
|
||||
|
||||
function collectFromConnected<T extends { name: string }>(
|
||||
s: State,
|
||||
listFn: (c: Client) => Promise<T[]>,
|
||||
listFn: (c: Client, timeout?: number) => Promise<T[]>,
|
||||
label: string,
|
||||
) {
|
||||
return Effect.forEach(
|
||||
Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
|
||||
([clientName, client]) =>
|
||||
fetchFromClient(clientName, client, listFn, label).pipe(Effect.map((items) => Object.entries(items ?? {}))),
|
||||
{ concurrency: "unbounded" },
|
||||
).pipe(Effect.map((results) => Object.fromEntries<T & { client: string }>(results.flat())))
|
||||
return Effect.gen(function* () {
|
||||
const cfg = yield* cfgSvc.get()
|
||||
return yield* Effect.forEach(
|
||||
Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
|
||||
([clientName, client]) =>
|
||||
McpCatalog.fetch(
|
||||
clientName,
|
||||
client,
|
||||
(c) => listFn(c, requestTimeout(s, clientName, cfg.mcp?.[clientName], cfg.experimental?.mcp_timeout)),
|
||||
label,
|
||||
).pipe(Effect.map((items) => Object.entries(items ?? {}))),
|
||||
{ concurrency: "unbounded" },
|
||||
).pipe(Effect.map((results) => Object.fromEntries<T & { client: string }>(results.flat())))
|
||||
})
|
||||
}
|
||||
|
||||
const prompts = Effect.fn("MCP.prompts")(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
return yield* collectFromConnected(
|
||||
s,
|
||||
(c) =>
|
||||
c.getServerCapabilities()?.prompts
|
||||
? paginate(
|
||||
(cursor) => c.listPrompts(cursor === undefined ? undefined : { cursor }),
|
||||
(result) => result.prompts,
|
||||
)
|
||||
: Promise.resolve([]),
|
||||
"prompts",
|
||||
)
|
||||
return yield* collectFromConnected(yield* InstanceState.get(state), McpCatalog.prompts, "prompts")
|
||||
})
|
||||
|
||||
const resources = Effect.fn("MCP.resources")(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
return yield* collectFromConnected(
|
||||
s,
|
||||
(c) =>
|
||||
c.getServerCapabilities()?.resources
|
||||
? paginate(
|
||||
(cursor) => c.listResources(cursor === undefined ? undefined : { cursor }),
|
||||
(result) => result.resources,
|
||||
)
|
||||
: Promise.resolve([]),
|
||||
"resources",
|
||||
)
|
||||
return yield* collectFromConnected(yield* InstanceState.get(state), McpCatalog.resources, "resources")
|
||||
})
|
||||
|
||||
const withClient = Effect.fnUntraced(function* <A>(
|
||||
@ -751,10 +621,8 @@ export const layer = Layer.effect(
|
||||
return undefined
|
||||
}
|
||||
const cfg = yield* cfgSvc.get()
|
||||
const configured = cfg.mcp?.[clientName]
|
||||
const staticTimeout = configured && isMcpConfigured(configured) ? configured.timeout : undefined
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => fn(client, s.config[clientName]?.timeout ?? staticTimeout ?? cfg.experimental?.mcp_timeout),
|
||||
try: () => fn(client, requestTimeout(s, clientName, cfg.mcp?.[clientName], cfg.experimental?.mcp_timeout)),
|
||||
catch: (error) => error,
|
||||
}).pipe(
|
||||
Effect.tapError((error) =>
|
||||
@ -877,7 +745,7 @@ export const layer = Layer.effect(
|
||||
|
||||
const listed = client
|
||||
? client.getServerCapabilities()?.tools
|
||||
? yield* defs(client, mcpConfig.timeout)
|
||||
? yield* McpCatalog.defs(client, mcpConfig.timeout)
|
||||
: []
|
||||
: undefined
|
||||
if (!client || !listed) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user