diff --git a/packages/core/src/tool/http-body.ts b/packages/core/src/tool/http-body.ts new file mode 100644 index 000000000..7cb534a44 --- /dev/null +++ b/packages/core/src/tool/http-body.ts @@ -0,0 +1,30 @@ +import { Effect, Stream } from "effect" +import { HttpClientResponse } from "effect/unstable/http" + +export const collectBoundedResponseBody = ( + response: HttpClientResponse.HttpClientResponse, + maximumBytes: number, + tooLarge: () => Error, +) => + Effect.gen(function* () { + const contentLength = response.headers["content-length"] + const parsedSize = contentLength ? Number.parseInt(contentLength, 10) : undefined + const declaredSize = + parsedSize !== undefined && Number.isSafeInteger(parsedSize) && parsedSize >= 0 ? parsedSize : undefined + if (declaredSize !== undefined && declaredSize > maximumBytes) return yield* Effect.fail(tooLarge()) + let body = Buffer.allocUnsafe(Math.min(maximumBytes, declaredSize || 64 * 1024)) + let size = 0 + yield* Stream.runForEach(response.stream, (chunk) => { + if (chunk.byteLength === 0) return Effect.void + if (size + chunk.byteLength > maximumBytes) return Effect.fail(tooLarge()) + if (size + chunk.byteLength > body.byteLength) { + const grown = Buffer.allocUnsafe(Math.min(maximumBytes, Math.max(size + chunk.byteLength, body.byteLength * 2))) + body.copy(grown, 0, 0, size) + body = grown + } + body.set(chunk, size) + size += chunk.byteLength + return Effect.void + }) + return body.subarray(0, size) + }) diff --git a/packages/core/src/tool/webfetch.ts b/packages/core/src/tool/webfetch.ts index 1e209e500..2ce0868d9 100644 --- a/packages/core/src/tool/webfetch.ts +++ b/packages/core/src/tool/webfetch.ts @@ -1,11 +1,12 @@ export * as WebFetchTool from "./webfetch" import { ToolFailure } from "@opencode-ai/llm" -import { Duration, Effect, Layer, Schema, Stream } from "effect" +import { Duration, Effect, Layer, Schema } from "effect" import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" import { Parser } from "htmlparser2" import TurndownService from "turndown" import { PermissionV2 } from "../permission" +import { collectBoundedResponseBody } from "./http-body" import { Tool } from "./tool" import { Tools } from "./tools" @@ -86,24 +87,11 @@ const execute = (http: HttpClient.HttpClient, url: string, format: Format, userA http.execute(request(url, format, userAgent)).pipe(Effect.flatMap(HttpClientResponse.filterStatusOk)) const collectBody = (response: HttpClientResponse.HttpClientResponse) => - Effect.gen(function* () { - const contentLength = response.headers["content-length"] - if (contentLength && Number.parseInt(contentLength, 10) > MAX_RESPONSE_BYTES) { - return yield* Effect.fail(new Error(`Response too large (exceeds ${MAX_RESPONSE_BYTES} byte limit)`)) - } - const chunks: Uint8Array[] = [] - let size = 0 - yield* Stream.runForEach(response.stream, (chunk) => - Effect.gen(function* () { - size += chunk.byteLength - if (size > MAX_RESPONSE_BYTES) - return yield* Effect.fail(new Error(`Response too large (exceeds ${MAX_RESPONSE_BYTES} byte limit)`)) - chunks.push(chunk) - return undefined - }), - ) - return Buffer.concat(chunks, size) - }) + collectBoundedResponseBody( + response, + MAX_RESPONSE_BYTES, + () => new Error(`Response too large (exceeds ${MAX_RESPONSE_BYTES} byte limit)`), + ) const mimeFrom = (contentType: string) => contentType.split(";", 1)[0]?.trim().toLowerCase() ?? "" const isImageAttachment = (mime: string) => @@ -171,12 +159,16 @@ export const layer = Layer.effectDiscard( orElse: () => Effect.fail(new Error("Request timed out")), }), ) - const content = convert(new TextDecoder().decode(body), contentType, input.format) + const content = new TextDecoder().decode(body) + const output = yield* Effect.try({ + try: () => convert(content, contentType, input.format), + catch: (error) => error, + }) return { url: input.url, contentType, format: input.format, - output: content, + output, } }).pipe(Effect.mapError(() => new ToolFailure({ message: `Unable to fetch ${input.url}` }))), }), diff --git a/packages/core/src/tool/websearch.ts b/packages/core/src/tool/websearch.ts index cea19c17e..14c10377e 100644 --- a/packages/core/src/tool/websearch.ts +++ b/packages/core/src/tool/websearch.ts @@ -9,6 +9,7 @@ import { PositiveInt } from "../schema" import { PermissionV2 } from "../permission" import { Tool } from "./tool" import { Tools } from "./tools" +import { collectBoundedResponseBody } from "./http-body" import { checksum } from "../util/encode" export const name = "websearch" @@ -164,10 +165,12 @@ const callMcp = ( ) return yield* Effect.gen(function* () { const response = yield* HttpClient.filterStatusOk(http).execute(request) - const body = yield* response.text - if (Buffer.byteLength(body, "utf8") > MAX_RESPONSE_BYTES) - return yield* Effect.fail(new Error(`${tool} response exceeded ${MAX_RESPONSE_BYTES} bytes`)) - return yield* parseResponse(body) + const body = yield* collectBoundedResponseBody( + response, + MAX_RESPONSE_BYTES, + () => new Error(`${tool} response exceeded ${MAX_RESPONSE_BYTES} bytes`), + ) + return yield* parseResponse(body.toString("utf8")) }).pipe( Effect.timeoutOrElse({ duration: Duration.seconds(25), diff --git a/packages/core/test/tool-webfetch.test.ts b/packages/core/test/tool-webfetch.test.ts index b2541e3e2..5a856ffaf 100644 --- a/packages/core/test/tool-webfetch.test.ts +++ b/packages/core/test/tool-webfetch.test.ts @@ -176,6 +176,25 @@ describe("WebFetchTool registration", () => { }), ) + it.effect("returns an error result when HTML-to-Markdown conversion throws", () => + Effect.gen(function* () { + reset() + respond = () => + Effect.succeed( + new Response("
".repeat(10_000) + "content" + "
".repeat(10_000), { + headers: { "content-type": "text/html" }, + }), + ) + const registry = yield* ToolRegistry.Service + const url = "https://1.1.1.1/deep-html" + + expect(yield* executeTool(registry, call({ url, format: "markdown" }))).toEqual({ + type: "error", + value: `Unable to fetch ${url}`, + }) + }), + ) + it.effect("rejects declared and streamed oversized bodies", () => Effect.gen(function* () { reset() diff --git a/packages/core/test/tool-websearch.test.ts b/packages/core/test/tool-websearch.test.ts index dc38a9c35..9715dd5c7 100644 --- a/packages/core/test/tool-websearch.test.ts +++ b/packages/core/test/tool-websearch.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test } from "bun:test" +import { beforeEach, describe, expect, test } from "bun:test" import { Effect, Layer, Schema } from "effect" import { HttpClient, HttpClientResponse } from "effect/unstable/http" import { PermissionV2 } from "@opencode-ai/core/permission" @@ -66,8 +66,14 @@ interface Request { const requests: Request[] = [] const assertions: PermissionV2.AssertInput[] = [] let responseBody = payload("search results") +let makeResponse = () => new Response(responseBody, { status: 200 }) let config: WebSearchTool.Config = { enableExa: false, enableParallel: false } +beforeEach(() => { + responseBody = payload("search results") + makeResponse = () => new Response(responseBody, { status: 200 }) +}) + const http = Layer.succeed( HttpClient.HttpClient, HttpClient.make((request) => @@ -78,7 +84,7 @@ const http = Layer.succeed( headers: request.headers, body: JSON.parse(new TextDecoder().decode(request.body.body)), }) - return HttpClientResponse.fromWeb(request, new Response(responseBody, { status: 200 })) + return HttpClientResponse.fromWeb(request, makeResponse()) }), ), ) @@ -270,7 +276,22 @@ describe("WebSearchTool registration", () => { Effect.gen(function* () { requests.length = 0 assertions.length = 0 - responseBody = "x".repeat(WebSearchTool.MAX_RESPONSE_BYTES + 1) + let chunksRead = 0 + let cancelled = false + makeResponse = () => + new Response( + new ReadableStream({ + pull(controller) { + chunksRead++ + if (chunksRead === 10) throw new Error("response was not stopped at the byte limit") + controller.enqueue(new Uint8Array(64 * 1024)) + }, + cancel() { + cancelled = true + }, + }), + { status: 200 }, + ) config = { provider: "exa", enableExa: false, enableParallel: false } const registry = yield* ToolRegistry.Service @@ -281,6 +302,8 @@ describe("WebSearchTool registration", () => { call: { type: "tool-call", id: "call-large-response", name: "websearch", input: { query: "too much" } }, }), ).toEqual({ type: "error", value: "Unable to search the web for too much" }) + expect(chunksRead).toBeLessThan(10) + expect(cancelled).toBe(true) }), ) })