refactor(mcp): simplify service helpers (#31549)
This commit is contained in:
parent
c939aa04fe
commit
381eabb970
@ -115,7 +115,7 @@ function isMcpConfigured(entry: McpEntry): entry is ConfigMCPV1.Info {
|
||||
const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
|
||||
const MAX_LIST_PAGES = 1_000
|
||||
|
||||
function remoteURL(key: string, value: string) {
|
||||
function remoteURL(value: string) {
|
||||
if (URL.canParse(value)) return new URL(value)
|
||||
}
|
||||
|
||||
@ -197,11 +197,7 @@ function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number
|
||||
}
|
||||
|
||||
function defs(client: MCPClient, timeout?: number) {
|
||||
return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(
|
||||
Effect.catch((err) => {
|
||||
return Effect.succeed(undefined)
|
||||
}),
|
||||
)
|
||||
return listTools(client, timeout ?? DEFAULT_TIMEOUT).pipe(Effect.catch(() => Effect.void))
|
||||
}
|
||||
|
||||
function fetchFromClient<T extends { name: string }>(
|
||||
@ -212,9 +208,7 @@ function fetchFromClient<T extends { name: string }>(
|
||||
) {
|
||||
return Effect.tryPromise({
|
||||
try: () => listFn(client),
|
||||
catch: (e: any) => {
|
||||
return e
|
||||
},
|
||||
catch: (error) => error,
|
||||
}).pipe(
|
||||
Effect.tapError((error) =>
|
||||
Effect.logWarning(`failed to get ${label}`, {
|
||||
@ -323,7 +317,7 @@ export const layer = Layer.effect(
|
||||
) {
|
||||
const oauthDisabled = mcp.oauth === false
|
||||
const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
|
||||
const url = remoteURL(key, mcp.url)
|
||||
const url = remoteURL(mcp.url)
|
||||
if (!url) {
|
||||
return {
|
||||
client: undefined as MCPClient | undefined,
|
||||
@ -407,12 +401,10 @@ export const layer = Layer.effect(
|
||||
}
|
||||
|
||||
lastStatus = { status: "failed" as const, error: lastError.message }
|
||||
return Effect.succeed(undefined)
|
||||
return Effect.void
|
||||
}),
|
||||
)
|
||||
if (result) {
|
||||
return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status }
|
||||
}
|
||||
if (result) return { client: result.client, status: { status: "connected" } as Status }
|
||||
// If this was an auth error, stop trying other transports
|
||||
if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break
|
||||
}
|
||||
@ -486,8 +478,8 @@ export const layer = Layer.effect(
|
||||
if (process.platform === "win32") return [] as number[]
|
||||
const pids: number[] = []
|
||||
const queue = [pid]
|
||||
while (queue.length > 0) {
|
||||
const current = queue.shift()!
|
||||
for (let index = 0; index < queue.length; index++) {
|
||||
const current = queue[index]
|
||||
const handle = yield* spawner.spawn(ChildProcess.make("pgrep", ["-P", String(current)], { stdin: "ignore" }))
|
||||
const text = yield* Stream.mkString(Stream.decodeText(handle.stdout))
|
||||
yield* handle.exitCode
|
||||
@ -674,30 +666,20 @@ export const layer = Layer.effect(
|
||||
const config = cfg.mcp ?? {}
|
||||
const defaultTimeout = cfg.experimental?.mcp_timeout
|
||||
|
||||
const connectedClients = Object.entries(s.clients).filter(
|
||||
([clientName]) => s.status[clientName]?.status === "connected",
|
||||
)
|
||||
|
||||
yield* Effect.forEach(
|
||||
connectedClients,
|
||||
([clientName, client]) =>
|
||||
Effect.gen(function* () {
|
||||
const mcpConfig = config[clientName]
|
||||
const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : s.config[clientName]
|
||||
|
||||
const listed = s.defs[clientName]
|
||||
if (!listed) {
|
||||
yield* Effect.logWarning("missing cached tools for connected server", { clientName })
|
||||
return
|
||||
}
|
||||
|
||||
const timeout = entry?.timeout ?? defaultTimeout
|
||||
for (const mcpTool of listed) {
|
||||
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
|
||||
}
|
||||
}),
|
||||
{ concurrency: "unbounded" },
|
||||
)
|
||||
for (const [clientName, client] of Object.entries(s.clients)) {
|
||||
if (s.status[clientName]?.status !== "connected") continue
|
||||
const mcpConfig = config[clientName]
|
||||
const entry = mcpConfig && isMcpConfigured(mcpConfig) ? mcpConfig : s.config[clientName]
|
||||
const listed = s.defs[clientName]
|
||||
if (!listed) {
|
||||
yield* Effect.logWarning("missing cached tools for connected server", { clientName })
|
||||
continue
|
||||
}
|
||||
const timeout = entry?.timeout ?? defaultTimeout
|
||||
for (const mcpTool of listed) {
|
||||
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
|
||||
}
|
||||
}
|
||||
return result
|
||||
})
|
||||
|
||||
@ -758,9 +740,7 @@ export const layer = Layer.effect(
|
||||
}
|
||||
return yield* Effect.tryPromise({
|
||||
try: () => fn(client),
|
||||
catch: (e: any) => {
|
||||
return e
|
||||
},
|
||||
catch: (error) => error,
|
||||
}).pipe(
|
||||
Effect.tapError((error) =>
|
||||
Effect.logError(`failed to ${label}`, {
|
||||
@ -809,7 +789,7 @@ export const layer = Layer.effect(
|
||||
const mcpConfig = yield* requireMcpConfig(mcpName)
|
||||
if (mcpConfig.type !== "remote") throw new Error(`MCP server ${mcpName} is not a remote server`)
|
||||
if (mcpConfig.oauth === false) throw new Error(`MCP server ${mcpName} has OAuth explicitly disabled`)
|
||||
const url = remoteURL(mcpName, mcpConfig.url)
|
||||
const url = remoteURL(mcpConfig.url)
|
||||
if (!url) throw new Error(`Invalid MCP URL for "${mcpName}"`)
|
||||
|
||||
// OAuth config is optional - if not provided, we'll use auto-discovery
|
||||
@ -881,7 +861,7 @@ export const layer = Layer.effect(
|
||||
: undefined
|
||||
if (!client || !listed) {
|
||||
yield* Effect.tryPromise(() => client?.close() ?? Promise.resolve()).pipe(Effect.ignore)
|
||||
return { status: "failed", error: "Failed to get tools" } as Status
|
||||
return { status: "failed", error: "Failed to get tools" } satisfies Status
|
||||
}
|
||||
|
||||
const s = yield* InstanceState.get(state)
|
||||
@ -936,7 +916,7 @@ export const layer = Layer.effect(
|
||||
}).pipe(Effect.option)
|
||||
|
||||
if (Option.isNone(result)) {
|
||||
return { status: "failed", error: "OAuth completion failed" } as Status
|
||||
return { status: "failed", error: "OAuth completion failed" } satisfies Status
|
||||
}
|
||||
|
||||
yield* auth.clearCodeVerifier(mcpName)
|
||||
@ -965,9 +945,9 @@ export const layer = Layer.effect(
|
||||
|
||||
const getAuthStatus = Effect.fn("MCP.getAuthStatus")(function* (mcpName: string) {
|
||||
const entry = yield* auth.get(mcpName)
|
||||
if (!entry?.tokens) return "not_authenticated" as AuthStatus
|
||||
if (!entry?.tokens) return "not_authenticated"
|
||||
const expired = yield* auth.isTokenExpired(mcpName)
|
||||
return (expired ? "expired" : "authenticated") as AuthStatus
|
||||
return expired ? "expired" : "authenticated"
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
|
||||
Loading…
Reference in New Issue
Block a user