fix(core): bound web tool failures (#33259)
This commit is contained in:
parent
823d327401
commit
69f1ec22e3
30
packages/core/src/tool/http-body.ts
Normal file
30
packages/core/src/tool/http-body.ts
Normal file
@ -0,0 +1,30 @@
|
||||
import { Effect, Stream } from "effect"
|
||||
import { HttpClientResponse } from "effect/unstable/http"
|
||||
|
||||
export const collectBoundedResponseBody = (
|
||||
response: HttpClientResponse.HttpClientResponse,
|
||||
maximumBytes: number,
|
||||
tooLarge: () => Error,
|
||||
) =>
|
||||
Effect.gen(function* () {
|
||||
const contentLength = response.headers["content-length"]
|
||||
const parsedSize = contentLength ? Number.parseInt(contentLength, 10) : undefined
|
||||
const declaredSize =
|
||||
parsedSize !== undefined && Number.isSafeInteger(parsedSize) && parsedSize >= 0 ? parsedSize : undefined
|
||||
if (declaredSize !== undefined && declaredSize > maximumBytes) return yield* Effect.fail(tooLarge())
|
||||
let body = Buffer.allocUnsafe(Math.min(maximumBytes, declaredSize || 64 * 1024))
|
||||
let size = 0
|
||||
yield* Stream.runForEach(response.stream, (chunk) => {
|
||||
if (chunk.byteLength === 0) return Effect.void
|
||||
if (size + chunk.byteLength > maximumBytes) return Effect.fail(tooLarge())
|
||||
if (size + chunk.byteLength > body.byteLength) {
|
||||
const grown = Buffer.allocUnsafe(Math.min(maximumBytes, Math.max(size + chunk.byteLength, body.byteLength * 2)))
|
||||
body.copy(grown, 0, 0, size)
|
||||
body = grown
|
||||
}
|
||||
body.set(chunk, size)
|
||||
size += chunk.byteLength
|
||||
return Effect.void
|
||||
})
|
||||
return body.subarray(0, size)
|
||||
})
|
||||
@ -1,11 +1,12 @@
|
||||
export * as WebFetchTool from "./webfetch"
|
||||
|
||||
import { ToolFailure } from "@opencode-ai/llm"
|
||||
import { Duration, Effect, Layer, Schema, Stream } from "effect"
|
||||
import { Duration, Effect, Layer, Schema } from "effect"
|
||||
import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
|
||||
import { Parser } from "htmlparser2"
|
||||
import TurndownService from "turndown"
|
||||
import { PermissionV2 } from "../permission"
|
||||
import { collectBoundedResponseBody } from "./http-body"
|
||||
import { Tool } from "./tool"
|
||||
import { Tools } from "./tools"
|
||||
|
||||
@ -86,24 +87,11 @@ const execute = (http: HttpClient.HttpClient, url: string, format: Format, userA
|
||||
http.execute(request(url, format, userAgent)).pipe(Effect.flatMap(HttpClientResponse.filterStatusOk))
|
||||
|
||||
const collectBody = (response: HttpClientResponse.HttpClientResponse) =>
|
||||
Effect.gen(function* () {
|
||||
const contentLength = response.headers["content-length"]
|
||||
if (contentLength && Number.parseInt(contentLength, 10) > MAX_RESPONSE_BYTES) {
|
||||
return yield* Effect.fail(new Error(`Response too large (exceeds ${MAX_RESPONSE_BYTES} byte limit)`))
|
||||
}
|
||||
const chunks: Uint8Array[] = []
|
||||
let size = 0
|
||||
yield* Stream.runForEach(response.stream, (chunk) =>
|
||||
Effect.gen(function* () {
|
||||
size += chunk.byteLength
|
||||
if (size > MAX_RESPONSE_BYTES)
|
||||
return yield* Effect.fail(new Error(`Response too large (exceeds ${MAX_RESPONSE_BYTES} byte limit)`))
|
||||
chunks.push(chunk)
|
||||
return undefined
|
||||
}),
|
||||
)
|
||||
return Buffer.concat(chunks, size)
|
||||
})
|
||||
collectBoundedResponseBody(
|
||||
response,
|
||||
MAX_RESPONSE_BYTES,
|
||||
() => new Error(`Response too large (exceeds ${MAX_RESPONSE_BYTES} byte limit)`),
|
||||
)
|
||||
|
||||
const mimeFrom = (contentType: string) => contentType.split(";", 1)[0]?.trim().toLowerCase() ?? ""
|
||||
const isImageAttachment = (mime: string) =>
|
||||
@ -171,12 +159,16 @@ export const layer = Layer.effectDiscard(
|
||||
orElse: () => Effect.fail(new Error("Request timed out")),
|
||||
}),
|
||||
)
|
||||
const content = convert(new TextDecoder().decode(body), contentType, input.format)
|
||||
const content = new TextDecoder().decode(body)
|
||||
const output = yield* Effect.try({
|
||||
try: () => convert(content, contentType, input.format),
|
||||
catch: (error) => error,
|
||||
})
|
||||
return {
|
||||
url: input.url,
|
||||
contentType,
|
||||
format: input.format,
|
||||
output: content,
|
||||
output,
|
||||
}
|
||||
}).pipe(Effect.mapError(() => new ToolFailure({ message: `Unable to fetch ${input.url}` }))),
|
||||
}),
|
||||
|
||||
@ -9,6 +9,7 @@ import { PositiveInt } from "../schema"
|
||||
import { PermissionV2 } from "../permission"
|
||||
import { Tool } from "./tool"
|
||||
import { Tools } from "./tools"
|
||||
import { collectBoundedResponseBody } from "./http-body"
|
||||
import { checksum } from "../util/encode"
|
||||
|
||||
export const name = "websearch"
|
||||
@ -164,10 +165,12 @@ const callMcp = <F extends Schema.Struct.Fields>(
|
||||
)
|
||||
return yield* Effect.gen(function* () {
|
||||
const response = yield* HttpClient.filterStatusOk(http).execute(request)
|
||||
const body = yield* response.text
|
||||
if (Buffer.byteLength(body, "utf8") > MAX_RESPONSE_BYTES)
|
||||
return yield* Effect.fail(new Error(`${tool} response exceeded ${MAX_RESPONSE_BYTES} bytes`))
|
||||
return yield* parseResponse(body)
|
||||
const body = yield* collectBoundedResponseBody(
|
||||
response,
|
||||
MAX_RESPONSE_BYTES,
|
||||
() => new Error(`${tool} response exceeded ${MAX_RESPONSE_BYTES} bytes`),
|
||||
)
|
||||
return yield* parseResponse(body.toString("utf8"))
|
||||
}).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration: Duration.seconds(25),
|
||||
|
||||
@ -176,6 +176,25 @@ describe("WebFetchTool registration", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("returns an error result when HTML-to-Markdown conversion throws", () =>
|
||||
Effect.gen(function* () {
|
||||
reset()
|
||||
respond = () =>
|
||||
Effect.succeed(
|
||||
new Response("<div>".repeat(10_000) + "content" + "</div>".repeat(10_000), {
|
||||
headers: { "content-type": "text/html" },
|
||||
}),
|
||||
)
|
||||
const registry = yield* ToolRegistry.Service
|
||||
const url = "https://1.1.1.1/deep-html"
|
||||
|
||||
expect(yield* executeTool(registry, call({ url, format: "markdown" }))).toEqual({
|
||||
type: "error",
|
||||
value: `Unable to fetch ${url}`,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("rejects declared and streamed oversized bodies", () =>
|
||||
Effect.gen(function* () {
|
||||
reset()
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { beforeEach, describe, expect, test } from "bun:test"
|
||||
import { Effect, Layer, Schema } from "effect"
|
||||
import { HttpClient, HttpClientResponse } from "effect/unstable/http"
|
||||
import { PermissionV2 } from "@opencode-ai/core/permission"
|
||||
@ -66,8 +66,14 @@ interface Request {
|
||||
const requests: Request[] = []
|
||||
const assertions: PermissionV2.AssertInput[] = []
|
||||
let responseBody = payload("search results")
|
||||
let makeResponse = () => new Response(responseBody, { status: 200 })
|
||||
let config: WebSearchTool.Config = { enableExa: false, enableParallel: false }
|
||||
|
||||
beforeEach(() => {
|
||||
responseBody = payload("search results")
|
||||
makeResponse = () => new Response(responseBody, { status: 200 })
|
||||
})
|
||||
|
||||
const http = Layer.succeed(
|
||||
HttpClient.HttpClient,
|
||||
HttpClient.make((request) =>
|
||||
@ -78,7 +84,7 @@ const http = Layer.succeed(
|
||||
headers: request.headers,
|
||||
body: JSON.parse(new TextDecoder().decode(request.body.body)),
|
||||
})
|
||||
return HttpClientResponse.fromWeb(request, new Response(responseBody, { status: 200 }))
|
||||
return HttpClientResponse.fromWeb(request, makeResponse())
|
||||
}),
|
||||
),
|
||||
)
|
||||
@ -270,7 +276,22 @@ describe("WebSearchTool registration", () => {
|
||||
Effect.gen(function* () {
|
||||
requests.length = 0
|
||||
assertions.length = 0
|
||||
responseBody = "x".repeat(WebSearchTool.MAX_RESPONSE_BYTES + 1)
|
||||
let chunksRead = 0
|
||||
let cancelled = false
|
||||
makeResponse = () =>
|
||||
new Response(
|
||||
new ReadableStream({
|
||||
pull(controller) {
|
||||
chunksRead++
|
||||
if (chunksRead === 10) throw new Error("response was not stopped at the byte limit")
|
||||
controller.enqueue(new Uint8Array(64 * 1024))
|
||||
},
|
||||
cancel() {
|
||||
cancelled = true
|
||||
},
|
||||
}),
|
||||
{ status: 200 },
|
||||
)
|
||||
config = { provider: "exa", enableExa: false, enableParallel: false }
|
||||
const registry = yield* ToolRegistry.Service
|
||||
|
||||
@ -281,6 +302,8 @@ describe("WebSearchTool registration", () => {
|
||||
call: { type: "tool-call", id: "call-large-response", name: "websearch", input: { query: "too much" } },
|
||||
}),
|
||||
).toEqual({ type: "error", value: "Unable to search the web for too much" })
|
||||
expect(chunksRead).toBeLessThan(10)
|
||||
expect(cancelled).toBe(true)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user