fix(core): recover v2 context overflow (#31005)
This commit is contained in:
parent
c814f84c87
commit
820c984d47
@ -69,6 +69,13 @@ type Dependencies = {
|
||||
readonly config: readonly Config.Entry[]
|
||||
}
|
||||
|
||||
type Input = {
|
||||
readonly sessionID: SessionSchema.ID
|
||||
readonly entries: readonly Entry[]
|
||||
readonly model: Model
|
||||
readonly request: LLMRequest
|
||||
}
|
||||
|
||||
const estimate = (value: unknown) => Token.estimate(JSON.stringify(value))
|
||||
|
||||
const truncate = (value: string) =>
|
||||
@ -160,21 +167,10 @@ export const buildPrompt = (input: { readonly previousSummary?: string; readonly
|
||||
|
||||
export const make = (dependencies: Dependencies) => {
|
||||
const config = settings(dependencies.config)
|
||||
return Effect.fn("SessionCompaction.compactIfNeeded")(function* (input: {
|
||||
readonly sessionID: SessionSchema.ID
|
||||
readonly entries: readonly Entry[]
|
||||
readonly model: Model
|
||||
readonly request: LLMRequest
|
||||
}) {
|
||||
const compactAfterOverflow = Effect.fn("SessionCompaction.compactAfterOverflow")(function* (input: Input) {
|
||||
const context = input.model.route.defaults.limits?.context
|
||||
if (!config.auto || context === undefined || context <= 0) return false
|
||||
if (context === undefined || context <= 0) return false
|
||||
const output = input.request.generation?.maxTokens ?? input.model.route.defaults.limits?.output ?? 0
|
||||
if (
|
||||
estimate({ system: input.request.system, messages: input.request.messages, tools: input.request.tools }) <=
|
||||
context - Math.max(output, config.buffer)
|
||||
)
|
||||
return false
|
||||
|
||||
const selected = select(input.entries, config.tokens)
|
||||
const previousSummary = input.entries.find((entry) => entry.message.type === "compaction")?.message
|
||||
if (!selected || (selected.head.length === 0 && previousSummary?.type !== "compaction")) return false
|
||||
@ -193,7 +189,8 @@ export const make = (dependencies: Dependencies) => {
|
||||
})
|
||||
|
||||
const chunks: string[] = []
|
||||
yield* dependencies.llm
|
||||
let failed = false
|
||||
const summarized = yield* dependencies.llm
|
||||
.stream(
|
||||
LLM.request({
|
||||
model: input.model,
|
||||
@ -204,13 +201,15 @@ export const make = (dependencies: Dependencies) => {
|
||||
)
|
||||
.pipe(
|
||||
Stream.runForEach((event) => {
|
||||
if (!LLMEvent.is.textDelta(event)) return Effect.void
|
||||
chunks.push(event.text)
|
||||
if (LLMEvent.is.providerError(event)) failed = true
|
||||
if (LLMEvent.is.textDelta(event)) chunks.push(event.text)
|
||||
return Effect.void
|
||||
}),
|
||||
Effect.as(true),
|
||||
Effect.catchTag("LLM.Error", () => Effect.succeed(false)),
|
||||
)
|
||||
const summary = chunks.join("")
|
||||
if (!summary.trim()) return yield* Effect.die("Compaction returned an empty summary")
|
||||
if (!summarized || failed || !summary.trim()) return false
|
||||
yield* dependencies.events.publish(SessionEvent.Compaction.Ended, {
|
||||
sessionID: input.sessionID,
|
||||
messageID,
|
||||
@ -221,4 +220,20 @@ export const make = (dependencies: Dependencies) => {
|
||||
})
|
||||
return true
|
||||
})
|
||||
const compactIfNeeded = Effect.fn("SessionCompaction.compactIfNeeded")(function* (input: Input) {
|
||||
if (!config.auto) return false
|
||||
const context = input.model.route.defaults.limits?.context
|
||||
if (context === undefined || context <= 0) return false
|
||||
const output = input.request.generation?.maxTokens ?? input.model.route.defaults.limits?.output ?? 0
|
||||
if (
|
||||
estimate({ system: input.request.system, messages: input.request.messages, tools: input.request.tools }) <=
|
||||
context - Math.max(output, config.buffer)
|
||||
)
|
||||
return false
|
||||
return yield* compactAfterOverflow(input)
|
||||
})
|
||||
return {
|
||||
compactIfNeeded,
|
||||
compactAfterOverflow,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,13 @@
|
||||
import { LLM, LLMClient, LLMError, LLMEvent, SystemPart } from "@opencode-ai/llm"
|
||||
import { Cause, DateTime, Effect, FiberSet, Layer, Schema, Semaphore, Stream } from "effect"
|
||||
import {
|
||||
LLM,
|
||||
LLMClient,
|
||||
LLMError,
|
||||
LLMEvent,
|
||||
SystemPart,
|
||||
isContextOverflowFailure,
|
||||
type ProviderErrorEvent,
|
||||
} from "@opencode-ai/llm"
|
||||
import { Cause, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect"
|
||||
import { AgentV2 } from "../../agent"
|
||||
import { Config } from "../../config"
|
||||
import { Database } from "../../database/database"
|
||||
@ -91,7 +99,7 @@ export const layer = Layer.effect(
|
||||
const skillGuidance = yield* SkillGuidance.Service
|
||||
const config = yield* Config.Service
|
||||
const db = (yield* Database.Service).db
|
||||
const compact = SessionCompaction.make({ events, llm, config: yield* config.entries() })
|
||||
const compaction = SessionCompaction.make({ events, llm, config: yield* config.entries() })
|
||||
const getSession = Effect.fn("SessionRunner.getSession")(function* (sessionID: SessionSchema.ID) {
|
||||
const session = yield* store.get(sessionID)
|
||||
if (!session) return yield* Effect.die(`Session not found: ${sessionID}`)
|
||||
@ -130,14 +138,29 @@ export const layer = Layer.effect(
|
||||
const isQuestionRejected = (cause: Cause.Cause<unknown>) =>
|
||||
cause.reasons.some((reason) => Cause.isDieReason(reason) && reason.defect instanceof QuestionV2.RejectedError)
|
||||
|
||||
class RetryTurn extends Error {
|
||||
constructor(readonly promotion: SessionInput.Delivery | undefined) {
|
||||
type TurnTransition =
|
||||
// Request preparation observed a concurrent Session change and must restart from durable state.
|
||||
| { readonly _tag: "RebuildPreparedTurn"; readonly promotion?: SessionInput.Delivery }
|
||||
// Overflow compaction completed; rebuild once through the path without overflow recovery.
|
||||
| { readonly _tag: "ContinueAfterOverflowCompaction" }
|
||||
|
||||
class TurnTransitionError extends Error {
|
||||
constructor(readonly transition: TurnTransition) {
|
||||
super()
|
||||
}
|
||||
}
|
||||
|
||||
const rebuildPreparedTurn = (promotion?: SessionInput.Delivery) =>
|
||||
new TurnTransitionError({ _tag: "RebuildPreparedTurn", promotion })
|
||||
const continueAfterOverflowCompaction = new TurnTransitionError({
|
||||
_tag: "ContinueAfterOverflowCompaction",
|
||||
})
|
||||
|
||||
const retryAgentMismatch = (promotion: SessionInput.Delivery | undefined) =>
|
||||
Effect.catchDefect((defect) =>
|
||||
defect instanceof SessionContextEpoch.AgentMismatch ? Effect.die(new RetryTurn(promotion)) : Effect.die(defect),
|
||||
defect instanceof SessionContextEpoch.AgentMismatch
|
||||
? Effect.die(rebuildPreparedTurn(promotion))
|
||||
: Effect.die(defect),
|
||||
)
|
||||
|
||||
const sameModel = Schema.toEquivalence(Schema.UndefinedOr(ModelV2.Ref))
|
||||
@ -149,6 +172,7 @@ export const layer = Layer.effect(
|
||||
const runTurnAttempt = Effect.fn("SessionRunner.runTurn")(function* (
|
||||
sessionID: SessionSchema.ID,
|
||||
promotion: SessionInput.Delivery | undefined,
|
||||
recoverOverflow?: typeof compaction.compactAfterOverflow,
|
||||
) {
|
||||
const session = yield* getSession(sessionID)
|
||||
if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID)
|
||||
@ -183,7 +207,7 @@ export const layer = Layer.effect(
|
||||
).pipe(retryAgentMismatch(undefined)))
|
||||
const current = yield* getSession(sessionID)
|
||||
if ((yield* agents.select(current.agent)).id !== agent.id || !sameModel(current.model, session.model))
|
||||
return yield* Effect.die(new RetryTurn(undefined))
|
||||
return yield* Effect.die(rebuildPreparedTurn())
|
||||
const model = yield* models.resolve(session)
|
||||
const entries = yield* SessionHistory.entriesForRunner(db, session.id, system.baselineSeq)
|
||||
const context = entries.map((entry) => entry.message)
|
||||
@ -195,8 +219,8 @@ export const layer = Layer.effect(
|
||||
messages: toLLMMessages(context, model),
|
||||
tools: yield* tools.definitions(),
|
||||
})
|
||||
if (yield* compact({ sessionID: session.id, entries, model, request }))
|
||||
return yield* Effect.die(new RetryTurn(undefined))
|
||||
if (yield* compaction.compactIfNeeded({ sessionID: session.id, entries, model, request }))
|
||||
return yield* Effect.die(rebuildPreparedTurn())
|
||||
const publisher = createLLMEventPublisher(events, {
|
||||
sessionID: session.id,
|
||||
agent: agent.id,
|
||||
@ -209,11 +233,19 @@ export const layer = Layer.effect(
|
||||
const withPublication = Semaphore.makeUnsafe(1).withPermit
|
||||
const publish = (event: LLMEvent, outputPaths: ReadonlyArray<string> = []) =>
|
||||
withPublication(publisher.publish(event, outputPaths))
|
||||
let overflowFailure: ProviderErrorEvent | undefined
|
||||
if (!(yield* SessionContextEpoch.current(db, session.id, agent.id, system.revision)))
|
||||
return yield* Effect.die(new RetryTurn(undefined))
|
||||
return yield* Effect.die(rebuildPreparedTurn())
|
||||
const providerStream = llm.stream(request).pipe(
|
||||
Stream.runForEach((event) =>
|
||||
Effect.gen(function* () {
|
||||
if (overflowFailure || publisher.hasProviderError()) return
|
||||
if (LLMEvent.is.providerError(event)) {
|
||||
if (isContextOverflowFailure(event) && !publisher.hasAssistantStarted()) {
|
||||
overflowFailure = event
|
||||
return
|
||||
}
|
||||
}
|
||||
yield* publish(event)
|
||||
if (event.type !== "tool-call" || event.providerExecuted) return
|
||||
needsContinuation = true
|
||||
@ -248,13 +280,17 @@ export const layer = Layer.effect(
|
||||
return yield* Effect.uninterruptibleMask((restore) =>
|
||||
Effect.gen(function* () {
|
||||
const stream = yield* restore(providerStream).pipe(Effect.exit)
|
||||
let llmFailure: LLMError | undefined
|
||||
if (stream._tag === "Failure") {
|
||||
for (const reason of stream.cause.reasons) {
|
||||
if (!Cause.isFailReason(reason)) continue
|
||||
if (reason.error instanceof LLMError) llmFailure = reason.error
|
||||
}
|
||||
}
|
||||
const failure =
|
||||
stream._tag === "Failure" ? Option.getOrUndefined(Cause.findErrorOption(stream.cause)) : undefined
|
||||
if (
|
||||
recoverOverflow &&
|
||||
!publisher.hasAssistantStarted() &&
|
||||
isContextOverflowFailure(overflowFailure ?? failure) &&
|
||||
(yield* restore(recoverOverflow({ sessionID: session.id, entries, model, request })))
|
||||
)
|
||||
return yield* Effect.die(continueAfterOverflowCompaction)
|
||||
if (overflowFailure) yield* publish(overflowFailure)
|
||||
const llmFailure = failure instanceof LLMError ? failure : undefined
|
||||
if (llmFailure && !publisher.hasProviderError()) {
|
||||
yield* withPublication(publisher.failUnsettledTools("Provider did not return a tool result", true))
|
||||
yield* withPublication(
|
||||
@ -290,17 +326,38 @@ export const layer = Layer.effect(
|
||||
}),
|
||||
)
|
||||
}, Effect.scoped)
|
||||
const runTurn: (
|
||||
type RunTurn = (
|
||||
sessionID: SessionSchema.ID,
|
||||
promotion: SessionInput.Delivery | undefined,
|
||||
) => Effect.Effect<boolean, RunError> = (sessionID, promotion) =>
|
||||
runTurnAttempt(sessionID, promotion).pipe(
|
||||
Effect.catchDefect((defect) =>
|
||||
defect instanceof RetryTurn
|
||||
? Effect.yieldNow.pipe(Effect.andThen(runTurn(sessionID, defect.promotion)))
|
||||
: Effect.die(defect),
|
||||
) => Effect.Effect<boolean, RunError>
|
||||
|
||||
const runAfterOverflowCompaction: RunTurn = Effect.fnUntraced(function* (sessionID, promotion) {
|
||||
return yield* runTurnAttempt(sessionID, promotion).pipe(
|
||||
Effect.catchDefect(
|
||||
Effect.fnUntraced(function* (defect) {
|
||||
if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect)
|
||||
if (defect.transition._tag === "ContinueAfterOverflowCompaction")
|
||||
return yield* Effect.die("Post-compaction provider attempt cannot recover another overflow")
|
||||
yield* Effect.yieldNow
|
||||
return yield* runAfterOverflowCompaction(sessionID, defect.transition.promotion)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
const runTurn: RunTurn = Effect.fnUntraced(function* (sessionID, promotion) {
|
||||
return yield* runTurnAttempt(sessionID, promotion, compaction.compactAfterOverflow).pipe(
|
||||
Effect.catchDefect(
|
||||
Effect.fnUntraced(function* (defect) {
|
||||
if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect)
|
||||
yield* Effect.yieldNow
|
||||
if (defect.transition._tag === "ContinueAfterOverflowCompaction")
|
||||
return yield* runAfterOverflowCompaction(sessionID, undefined)
|
||||
return yield* runTurn(sessionID, defect.transition.promotion)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
const run = Effect.fn("SessionRunner.run")(function* (input: {
|
||||
readonly sessionID: SessionSchema.ID
|
||||
|
||||
@ -165,7 +165,7 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
|
||||
const startToolInput = Effect.fnUntraced(function* (event: { readonly id: string; readonly name: string }) {
|
||||
if (tools.has(event.id)) return yield* Effect.die(`Duplicate tool input start: ${event.id}`)
|
||||
const assistantMessageID = yield* currentAssistantMessageID()
|
||||
const assistantMessageID = yield* startAssistant()
|
||||
tools.set(event.id, {
|
||||
assistantMessageID,
|
||||
name: event.name,
|
||||
@ -224,7 +224,6 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
) {
|
||||
switch (event.type) {
|
||||
case "step-start":
|
||||
yield* startAssistant()
|
||||
return
|
||||
case "text-start":
|
||||
yield* text.start(event.id)
|
||||
@ -381,7 +380,7 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
yield* events.publish(SessionEvent.Step.Ended, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: yield* timestamp,
|
||||
assistantMessageID: yield* currentAssistantMessageID(),
|
||||
assistantMessageID: yield* startAssistant(),
|
||||
finish: event.reason,
|
||||
cost: 0,
|
||||
tokens: tokens(event.usage),
|
||||
@ -402,5 +401,12 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
}
|
||||
})
|
||||
|
||||
return { publish, flush, failUnsettledTools, hasProviderError: () => providerFailed, startAssistant }
|
||||
return {
|
||||
publish,
|
||||
flush,
|
||||
failUnsettledTools,
|
||||
hasAssistantStarted: () => assistantMessageID !== undefined,
|
||||
hasProviderError: () => providerFailed,
|
||||
startAssistant,
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import {
|
||||
Model,
|
||||
Tool,
|
||||
TransportReason,
|
||||
InvalidRequestReason,
|
||||
type LLMClientShape,
|
||||
type LLMRequest,
|
||||
} from "@opencode-ai/llm"
|
||||
@ -104,6 +105,11 @@ const compactModel = Model.make({
|
||||
provider: "fake",
|
||||
route: OpenAIChat.route.with({ limits: { context: 4_000, output: 50 } }),
|
||||
})
|
||||
const recoveryModel = Model.make({
|
||||
id: "recovery",
|
||||
provider: "fake",
|
||||
route: OpenAIChat.route.with({ limits: { context: 20_000, output: 1_000 } }),
|
||||
})
|
||||
const authorizations: ToolRegistry.AuthorizeInput[] = []
|
||||
const executions: string[] = []
|
||||
const permission = Layer.succeed(
|
||||
@ -348,6 +354,21 @@ const providerUnavailable = () =>
|
||||
reason: new TransportReason({ message: "Provider unavailable" }),
|
||||
})
|
||||
|
||||
const setupOverflowRecovery = Effect.gen(function* () {
|
||||
yield* setup
|
||||
const session = yield* SessionV2.Service
|
||||
response = fragmentFixture("text", "text-earlier", ["Earlier answer"]).completeEvents
|
||||
yield* session.prompt({
|
||||
sessionID,
|
||||
prompt: new Prompt({ text: "Earlier question ".repeat(700) }),
|
||||
resume: false,
|
||||
})
|
||||
yield* session.resume(sessionID)
|
||||
currentModel = recoveryModel
|
||||
requests.length = 0
|
||||
return session
|
||||
})
|
||||
|
||||
const userTexts = (request: LLMRequest) =>
|
||||
request.messages.flatMap((message) =>
|
||||
message.role === "user"
|
||||
@ -1461,6 +1482,131 @@ describe("SessionRunnerLLM", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("forces one compaction and retries after provider context overflow", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* setupOverflowRecovery
|
||||
responses = [
|
||||
[
|
||||
LLMEvent.stepStart({ index: 0 }),
|
||||
LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }),
|
||||
],
|
||||
fragmentFixture("text", "text-summary", ["## Goal\n- Recover overflow"]).completeEvents,
|
||||
fragmentFixture("text", "text-final", ["Recovered"]).completeEvents,
|
||||
]
|
||||
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
||||
yield* session.resume(sessionID)
|
||||
|
||||
expect(requests).toHaveLength(3)
|
||||
expect(userTexts(requests[1])[0]).toContain("## Goal")
|
||||
expect(userTexts(requests[2])[0]).toContain("<summary>\n## Goal\n- Recover overflow\n</summary>")
|
||||
expect(yield* session.context(sessionID)).toMatchObject([
|
||||
{ type: "compaction", summary: "## Goal\n- Recover overflow" },
|
||||
{ type: "assistant", finish: "stop" },
|
||||
])
|
||||
yield* replaySessionProjection(sessionID)
|
||||
expect(yield* session.context(sessionID)).toMatchObject([
|
||||
{ type: "compaction" },
|
||||
{ type: "assistant", finish: "stop" },
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("persists a second context overflow after one recovery", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* setupOverflowRecovery
|
||||
const overflow = () => [
|
||||
LLMEvent.stepStart({ index: 0 }),
|
||||
LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }),
|
||||
]
|
||||
responses = [
|
||||
overflow(),
|
||||
fragmentFixture("text", "text-summary", ["## Goal\n- Recover once"]).completeEvents,
|
||||
overflow(),
|
||||
]
|
||||
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
||||
yield* session.resume(sessionID)
|
||||
|
||||
expect(requests).toHaveLength(3)
|
||||
expect(yield* session.context(sessionID)).toMatchObject([
|
||||
{ type: "compaction" },
|
||||
{ type: "assistant", finish: "error", error: { message: "prompt too long" } },
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("recovers once from a raw context overflow failure", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* setupOverflowRecovery
|
||||
responseStream = Stream.fail(
|
||||
new LLMError({
|
||||
module: "test",
|
||||
method: "stream",
|
||||
reason: new InvalidRequestReason({
|
||||
message: "prompt too long",
|
||||
classification: "context-overflow",
|
||||
}),
|
||||
}),
|
||||
)
|
||||
responses = [
|
||||
fragmentFixture("text", "text-summary", ["## Goal\n- Recover raw overflow"]).completeEvents,
|
||||
fragmentFixture("text", "text-final", ["Recovered"]).completeEvents,
|
||||
]
|
||||
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
||||
yield* session.resume(sessionID)
|
||||
|
||||
expect(requests).toHaveLength(3)
|
||||
expect(yield* session.context(sessionID)).toMatchObject([
|
||||
{ type: "compaction", summary: "## Goal\n- Recover raw overflow" },
|
||||
{ type: "assistant", finish: "stop" },
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("publishes the original overflow when recovery summarization fails", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* setupOverflowRecovery
|
||||
responses = [
|
||||
[LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" })],
|
||||
[LLMEvent.providerError({ message: "summary unavailable" })],
|
||||
]
|
||||
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
||||
yield* session.resume(sessionID)
|
||||
|
||||
expect(requests).toHaveLength(2)
|
||||
const context = yield* session.context(sessionID)
|
||||
expect(context.some((message) => message.type === "compaction")).toBe(false)
|
||||
expect(context.slice(-2)).toMatchObject([
|
||||
{ type: "user", text: "Continue" },
|
||||
{ type: "assistant", finish: "error", error: { message: "prompt too long" } },
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("interrupts overflow recovery while the summary provider is running", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* setupOverflowRecovery
|
||||
responses = [
|
||||
[LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" })],
|
||||
fragmentFixture("text", "text-summary", ["## Goal\n- Interrupted"]).completeEvents,
|
||||
]
|
||||
const firstGate = yield* Deferred.make<void>()
|
||||
const summaryGate = yield* Deferred.make<void>()
|
||||
streamGate = firstGate
|
||||
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
||||
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
||||
while (requests.length < 1) yield* Effect.yieldNow
|
||||
streamGate = summaryGate
|
||||
yield* Deferred.succeed(firstGate, undefined)
|
||||
while (requests.length < 2) yield* Effect.yieldNow
|
||||
|
||||
yield* session.interrupt(sessionID)
|
||||
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
|
||||
streamGate = undefined
|
||||
expect(requests).toHaveLength(2)
|
||||
expect((yield* session.context(sessionID)).some((message) => message.type === "compaction")).toBe(false)
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("preserves effective System updates while compaction replacement is blocked", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* setup
|
||||
@ -3113,6 +3259,35 @@ describe("SessionRunnerLLM", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("does not recover context overflow after durable assistant output", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* setup
|
||||
const session = yield* SessionV2.Service
|
||||
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail after output" }), resume: false })
|
||||
|
||||
requests.length = 0
|
||||
response = [
|
||||
LLMEvent.stepStart({ index: 0 }),
|
||||
LLMEvent.textStart({ id: "text-partial" }),
|
||||
LLMEvent.textDelta({ id: "text-partial", text: "Partial" }),
|
||||
LLMEvent.textEnd({ id: "text-partial" }),
|
||||
LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }),
|
||||
]
|
||||
yield* session.resume(sessionID)
|
||||
|
||||
expect(requests).toHaveLength(1)
|
||||
expect(yield* session.context(sessionID)).toMatchObject([
|
||||
{ type: "user", text: "Fail after output" },
|
||||
{
|
||||
type: "assistant",
|
||||
finish: "error",
|
||||
error: { message: "prompt too long" },
|
||||
content: [{ type: "text", text: "Partial" }],
|
||||
},
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("projects raw provider stream failures as terminal assistant step failures", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* setup
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
export { LLMClient } from "./route/client"
|
||||
export { Auth } from "./route/auth"
|
||||
export { Provider } from "./provider"
|
||||
export { isContextOverflow, isContextOverflowFailure } from "./provider-error"
|
||||
export type {
|
||||
RouteModelInput,
|
||||
RouteRoutedModelInput,
|
||||
|
||||
@ -18,6 +18,7 @@ import {
|
||||
type ToolResultPart,
|
||||
} from "../schema"
|
||||
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
|
||||
import { isContextOverflow } from "../provider-error"
|
||||
import * as Cache from "./utils/cache"
|
||||
import { Lifecycle } from "./utils/lifecycle"
|
||||
import { ToolStream } from "./utils/tool-stream"
|
||||
@ -786,7 +787,12 @@ const providerErrorMessage = (event: AnthropicEvent): string => {
|
||||
|
||||
const onError = (state: ParserState, event: AnthropicEvent): StepResult => [
|
||||
state,
|
||||
[LLMEvent.providerError({ message: providerErrorMessage(event) })],
|
||||
[
|
||||
LLMEvent.providerError({
|
||||
message: providerErrorMessage(event),
|
||||
classification: isContextOverflow(event.error?.message ?? "") ? "context-overflow" : undefined,
|
||||
}),
|
||||
],
|
||||
]
|
||||
|
||||
const step = (state: ParserState, event: AnthropicEvent) => {
|
||||
|
||||
@ -15,6 +15,7 @@ import {
|
||||
type ToolResultPart,
|
||||
} from "../schema"
|
||||
import { BedrockEventStream } from "./bedrock-event-stream"
|
||||
import { isContextOverflow } from "../provider-error"
|
||||
import { JsonObject, optionalArray, ProviderShared } from "./shared"
|
||||
import { BedrockAuth } from "./utils/bedrock-auth"
|
||||
import { BedrockCache } from "./utils/bedrock-cache"
|
||||
@ -582,7 +583,16 @@ const step = (state: ParserState, event: BedrockEvent) =>
|
||||
if (event.validationException || event.throttlingException) {
|
||||
const message =
|
||||
event.validationException?.message ?? event.throttlingException?.message ?? "Bedrock Converse error"
|
||||
return [state, [LLMEvent.providerError({ message, retryable: event.throttlingException !== undefined })]] as const
|
||||
return [
|
||||
state,
|
||||
[
|
||||
LLMEvent.providerError({
|
||||
message,
|
||||
classification: event.validationException && isContextOverflow(message) ? "context-overflow" : undefined,
|
||||
retryable: event.throttlingException !== undefined,
|
||||
}),
|
||||
],
|
||||
] as const
|
||||
}
|
||||
|
||||
return [state, []] as const
|
||||
|
||||
@ -18,6 +18,7 @@ import {
|
||||
type ToolResultPart,
|
||||
} from "../schema"
|
||||
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
|
||||
import { isContextOverflow } from "../provider-error"
|
||||
import { OpenAIOptions } from "./utils/openai-options"
|
||||
import { Lifecycle } from "./utils/lifecycle"
|
||||
import { ToolStream } from "./utils/tool-stream"
|
||||
@ -880,14 +881,23 @@ const providerErrorMessage = (event: OpenAIResponsesEvent, fallback: string): st
|
||||
return message || code || fallback
|
||||
}
|
||||
|
||||
const providerError = (event: OpenAIResponsesEvent, fallback: string) => {
|
||||
const code = event.code || event.response?.error?.code || undefined
|
||||
const message = providerErrorMessage(event, fallback)
|
||||
return LLMEvent.providerError({
|
||||
message,
|
||||
classification: code === "context_length_exceeded" || isContextOverflow(message) ? "context-overflow" : undefined,
|
||||
})
|
||||
}
|
||||
|
||||
const onResponseFailed = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
|
||||
state,
|
||||
[LLMEvent.providerError({ message: providerErrorMessage(event, "OpenAI Responses response failed") })],
|
||||
[providerError(event, "OpenAI Responses response failed")],
|
||||
]
|
||||
|
||||
const onError = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
|
||||
state,
|
||||
[LLMEvent.providerError({ message: providerErrorMessage(event, "OpenAI Responses stream error") })],
|
||||
[providerError(event, "OpenAI Responses stream error")],
|
||||
]
|
||||
|
||||
const step = (state: ParserState, event: OpenAIResponsesEvent) => {
|
||||
|
||||
32
packages/llm/src/provider-error.ts
Normal file
32
packages/llm/src/provider-error.ts
Normal file
@ -0,0 +1,32 @@
|
||||
import { Schema } from "effect"
|
||||
import { LLMError, ProviderErrorEvent } from "./schema"
|
||||
|
||||
const patterns = [
|
||||
/prompt is too long/i,
|
||||
/input is too long for requested model/i,
|
||||
/exceeds the context window/i,
|
||||
/input token count.*exceeds the maximum/i,
|
||||
/maximum prompt length is \d+/i,
|
||||
/reduce the length of the messages/i,
|
||||
/maximum context length is \d+ tokens/i,
|
||||
/exceeds the limit of \d+/i,
|
||||
/exceeds the available context size/i,
|
||||
/greater than the context length/i,
|
||||
/context window exceeds limit/i,
|
||||
/exceeded model token limit/i,
|
||||
/context[_ ]length[_ ]exceeded/i,
|
||||
/request entity too large/i,
|
||||
/context length is only \d+ tokens/i,
|
||||
/input length.*exceeds.*context length/i,
|
||||
/prompt too long; exceeded (?:max )?context length/i,
|
||||
/too large for model with \d+ maximum context length/i,
|
||||
/model_context_window_exceeded/i,
|
||||
]
|
||||
|
||||
export const isContextOverflow = (message: string) =>
|
||||
patterns.some((pattern) => pattern.test(message)) || /^4(00|13)\s*(status code)?\s*\(no body\)/i.test(message)
|
||||
|
||||
export const isContextOverflowFailure = (failure: unknown) =>
|
||||
failure instanceof LLMError
|
||||
? failure.reason._tag === "InvalidRequest" && failure.reason.classification === "context-overflow"
|
||||
: Schema.is(ProviderErrorEvent)(failure) && failure.classification === "context-overflow"
|
||||
@ -22,6 +22,7 @@ import {
|
||||
TransportReason,
|
||||
UnknownProviderReason,
|
||||
} from "../schema"
|
||||
import { isContextOverflow } from "../provider-error"
|
||||
|
||||
export interface Interface {
|
||||
readonly execute: (
|
||||
@ -249,8 +250,18 @@ const statusReason = (input: {
|
||||
http: input.http,
|
||||
})
|
||||
}
|
||||
if (input.status === 400 || input.status === 404 || input.status === 409 || input.status === 422) {
|
||||
return new InvalidRequestReason({ message: input.message, http: input.http })
|
||||
if (
|
||||
input.status === 400 ||
|
||||
input.status === 404 ||
|
||||
input.status === 409 ||
|
||||
input.status === 413 ||
|
||||
input.status === 422
|
||||
) {
|
||||
return new InvalidRequestReason({
|
||||
message: input.message,
|
||||
classification: isContextOverflow(body) ? "context-overflow" : undefined,
|
||||
http: input.http,
|
||||
})
|
||||
}
|
||||
if (input.status >= 500 || retryableStatus(input.status)) {
|
||||
return new ProviderInternalReason({
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
import { Schema } from "effect"
|
||||
import { ModelID, ProviderID, ProviderMetadata, RouteID } from "./ids"
|
||||
|
||||
export const ProviderFailureClassification = Schema.Literal("context-overflow")
|
||||
export type ProviderFailureClassification = typeof ProviderFailureClassification.Type
|
||||
|
||||
export class HttpRequestDetails extends Schema.Class<HttpRequestDetails>("LLM.HttpRequestDetails")({
|
||||
method: Schema.String,
|
||||
url: Schema.String,
|
||||
@ -32,6 +35,7 @@ export class InvalidRequestReason extends Schema.Class<InvalidRequestReason>("LL
|
||||
_tag: Schema.tag("InvalidRequest"),
|
||||
message: Schema.String,
|
||||
parameter: Schema.optional(Schema.String),
|
||||
classification: Schema.optional(ProviderFailureClassification),
|
||||
providerMetadata: Schema.optional(ProviderMetadata),
|
||||
http: Schema.optional(HttpContext),
|
||||
}) {
|
||||
|
||||
@ -2,6 +2,7 @@ import { Schema } from "effect"
|
||||
import { ContentBlockID, FinishReason, ProtocolID, ProviderMetadata, RouteID, ToolCallID } from "./ids"
|
||||
import { ModelSchema } from "./options"
|
||||
import { ToolOutput, ToolResultValue } from "./messages"
|
||||
import { ProviderFailureClassification } from "./errors"
|
||||
|
||||
/**
|
||||
* Token usage reported by an LLM provider.
|
||||
@ -199,6 +200,7 @@ export type Finish = Schema.Schema.Type<typeof Finish>
|
||||
export const ProviderErrorEvent = Schema.Struct({
|
||||
type: Schema.tag("provider-error"),
|
||||
message: Schema.String,
|
||||
classification: Schema.optional(ProviderFailureClassification),
|
||||
retryable: Schema.optional(Schema.Boolean),
|
||||
providerMetadata: Schema.optional(ProviderMetadata),
|
||||
}).annotate({ identifier: "LLM.Event.ProviderError" })
|
||||
|
||||
@ -73,6 +73,46 @@ const expectLLMError = (error: unknown) => {
|
||||
const errorHttp = (error: LLMError) => ("http" in error.reason ? error.reason.http : undefined)
|
||||
|
||||
describe("RequestExecutor", () => {
|
||||
it.effect("classifies context overflow responses", () =>
|
||||
Effect.gen(function* () {
|
||||
const executor = yield* RequestExecutor.Service
|
||||
const error = yield* executor.execute(request).pipe(Effect.flip)
|
||||
|
||||
expectLLMError(error)
|
||||
expect(error.reason).toMatchObject({ _tag: "InvalidRequest", classification: "context-overflow" })
|
||||
}).pipe(
|
||||
Effect.provide(
|
||||
responsesLayer([
|
||||
new Response('{"error":{"code":"context_length_exceeded","message":"prompt too long"}}', {
|
||||
status: 400,
|
||||
}),
|
||||
]),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
it.effect("does not classify generic HTTP 413 payload errors as context overflow", () =>
|
||||
Effect.gen(function* () {
|
||||
const executor = yield* RequestExecutor.Service
|
||||
const error = yield* executor.execute(request).pipe(Effect.flip)
|
||||
|
||||
expectLLMError(error)
|
||||
expect(error.reason).toMatchObject({ _tag: "InvalidRequest" })
|
||||
expect("classification" in error.reason ? error.reason.classification : undefined).toBeUndefined()
|
||||
}).pipe(Effect.provide(responsesLayer([new Response("request too large", { status: 413 })]))),
|
||||
)
|
||||
|
||||
it.effect("does not classify ordinary invalid requests as context overflow", () =>
|
||||
Effect.gen(function* () {
|
||||
const executor = yield* RequestExecutor.Service
|
||||
const error = yield* executor.execute(request).pipe(Effect.flip)
|
||||
|
||||
expectLLMError(error)
|
||||
expect(error.reason).toMatchObject({ _tag: "InvalidRequest" })
|
||||
expect("classification" in error.reason ? error.reason.classification : undefined).toBeUndefined()
|
||||
}).pipe(Effect.provide(responsesLayer([new Response("invalid parameter", { status: 400 })]))),
|
||||
)
|
||||
|
||||
it.effect("returns redacted diagnostics for retryable rate limits", () =>
|
||||
Effect.gen(function* () {
|
||||
const executor = yield* RequestExecutor.Service
|
||||
|
||||
@ -477,6 +477,29 @@ describe("Anthropic Messages route", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("classifies prompt-too-long provider errors", () =>
|
||||
Effect.gen(function* () {
|
||||
const response = yield* LLMClient.generate(request).pipe(
|
||||
Effect.provide(
|
||||
fixedResponse(
|
||||
sseEvents({
|
||||
type: "error",
|
||||
error: { type: "invalid_request_error", message: "prompt is too long: 210000 tokens" },
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
expect(response.events).toEqual([
|
||||
{
|
||||
type: "provider-error",
|
||||
message: "invalid_request_error: prompt is too long: 210000 tokens",
|
||||
classification: "context-overflow",
|
||||
},
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("falls back to error type when no message is present", () =>
|
||||
Effect.gen(function* () {
|
||||
const response = yield* LLMClient.generate(request).pipe(
|
||||
|
||||
@ -351,6 +351,23 @@ describe("Bedrock Converse route", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("classifies input-too-long validation exceptions", () =>
|
||||
Effect.gen(function* () {
|
||||
const response = yield* LLMClient.generate(baseRequest).pipe(
|
||||
Effect.provide(
|
||||
fixedBytes(eventStreamBody(["validationException", { message: "Input is too long for requested model" }])),
|
||||
),
|
||||
)
|
||||
|
||||
expect(response.events.find((event) => event.type === "provider-error")).toEqual({
|
||||
type: "provider-error",
|
||||
message: "Input is too long for requested model",
|
||||
classification: "context-overflow",
|
||||
retryable: false,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("rejects requests with no auth path", () =>
|
||||
Effect.gen(function* () {
|
||||
const unsignedModel = AmazonBedrock.configure({
|
||||
|
||||
@ -1351,7 +1351,13 @@ describe("OpenAI Responses route", () => {
|
||||
),
|
||||
)
|
||||
|
||||
expect(response.events).toEqual([{ type: "provider-error", message: "context_length_exceeded: prompt too long" }])
|
||||
expect(response.events).toEqual([
|
||||
{
|
||||
type: "provider-error",
|
||||
message: "context_length_exceeded: prompt too long",
|
||||
classification: "context-overflow",
|
||||
},
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ import { APICallError } from "ai"
|
||||
import { STATUS_CODES } from "http"
|
||||
import { iife } from "@/util/iife"
|
||||
import type { ProviderV2 } from "@opencode-ai/core/provider"
|
||||
import { isContextOverflow } from "@opencode-ai/llm"
|
||||
|
||||
export class HeaderTimeoutError extends Error {
|
||||
public override readonly name = "ProviderHeaderTimeoutError"
|
||||
@ -19,30 +20,6 @@ export class ResponseStreamError extends Error {
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from overflow detection patterns in:
|
||||
// https://github.com/badlogic/pi-mono/blob/main/packages/ai/src/utils/overflow.ts
|
||||
const OVERFLOW_PATTERNS = [
|
||||
/prompt is too long/i, // Anthropic
|
||||
/input is too long for requested model/i, // Amazon Bedrock
|
||||
/exceeds the context window/i, // OpenAI (Completions + Responses API message text)
|
||||
/input token count.*exceeds the maximum/i, // Google (Gemini)
|
||||
/maximum prompt length is \d+/i, // xAI (Grok)
|
||||
/reduce the length of the messages/i, // Groq
|
||||
/maximum context length is \d+ tokens/i, // OpenRouter, DeepSeek, vLLM
|
||||
/exceeds the limit of \d+/i, // GitHub Copilot
|
||||
/exceeds the available context size/i, // llama.cpp server
|
||||
/greater than the context length/i, // LM Studio
|
||||
/context window exceeds limit/i, // MiniMax
|
||||
/exceeded model token limit/i, // Kimi For Coding, Moonshot
|
||||
/context[_ ]length[_ ]exceeded/i, // Generic fallback
|
||||
/request entity too large/i, // HTTP 413
|
||||
/context length is only \d+ tokens/i, // vLLM
|
||||
/input length.*exceeds.*context length/i, // vLLM
|
||||
/prompt too long; exceeded (?:max )?context length/i, // Ollama explicit overflow error
|
||||
/too large for model with \d+ maximum context length/i, // Mistral
|
||||
/model_context_window_exceeded/i, // z.ai non-standard finish_reason surfaced as error text
|
||||
]
|
||||
|
||||
function isOpenAiErrorRetryable(e: APICallError) {
|
||||
const status = e.statusCode
|
||||
if (!status) return e.isRetryable
|
||||
@ -52,15 +29,6 @@ function isOpenAiErrorRetryable(e: APICallError) {
|
||||
|
||||
// Providers not reliably handled in this function:
|
||||
// - z.ai: can accept overflow silently (needs token-count/context-window checks)
|
||||
function isOverflow(message: string) {
|
||||
if (OVERFLOW_PATTERNS.some((p) => p.test(message))) return true
|
||||
|
||||
// Providers/status patterns handled outside of regex list:
|
||||
// - Cerebras: often returns "400 (no body)" / "413 (no body)"
|
||||
// - Mistral: often returns "400 (no body)" / "413 (no body)"
|
||||
return /^4(00|13)\s*(status code)?\s*\(no body\)/i.test(message)
|
||||
}
|
||||
|
||||
function message(providerID: ProviderV2.ID, e: APICallError) {
|
||||
return iife(() => {
|
||||
const msg = e.message
|
||||
@ -197,7 +165,7 @@ export type ParsedAPICallError =
|
||||
export function parseAPICallError(input: { providerID: ProviderV2.ID; error: APICallError }): ParsedAPICallError {
|
||||
const m = message(input.providerID, input.error)
|
||||
const body = json(input.error.responseBody)
|
||||
if (isOverflow(m) || input.error.statusCode === 413 || body?.error?.code === "context_length_exceeded") {
|
||||
if (isContextOverflow(m) || input.error.statusCode === 413 || body?.error?.code === "context_length_exceeded") {
|
||||
return {
|
||||
type: "context_overflow",
|
||||
message: m,
|
||||
|
||||
@ -98,7 +98,7 @@ Current Context Epoch follow-ups:
|
||||
|
||||
- Add configured, remote, and nested instruction sources with explicit precedence and removal semantics.
|
||||
- Add durable post-crash activity recovery for promoted or provider-dispatched work.
|
||||
- Add provider-overflow recovery and explicit manual compaction on top of automatic request-budget compaction.
|
||||
- Add explicit manual compaction on top of automatic request-budget compaction.
|
||||
- Add operational metrics for observation latency, unavailable sources, contention, baseline size, and chronological-update growth.
|
||||
- Consider watcher-backed per-file caching only if measurements show direct safe-boundary observation is too expensive.
|
||||
- Expose plugin-defined Context Sources only after plugin reload and scoped cleanup semantics are designed.
|
||||
@ -112,7 +112,9 @@ Compaction keeps the full transcript durable while replacing its active model re
|
||||
|
||||
`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.2` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message and requests Context Epoch replacement. A failed or interrupted attempt therefore leaves the previous history boundary active.
|
||||
|
||||
Repeated compactions update the previous structured summary with newly compacted messages. The runner then reloads projected history and executes the original pending turn. Provider overflow recovery and deterministic old tool-result pruning remain separate follow-ups.
|
||||
Repeated compactions update the previous structured summary with newly compacted messages. The runner then reloads projected history and executes the original pending turn.
|
||||
|
||||
When a provider rejects a request as context overflow before durable assistant output or tool activity, the runner attempts one overflow-triggered compaction even when the local estimate did not predict pressure. A completed checkpoint rebuilds the same logical provider turn with one remaining physical attempt. A second overflow, unavailable compaction, or overflow after durable output becomes the ordinary terminal failure; recovery never loops or replays partial side effects. Deterministic old tool-result pruning remains a separate follow-up.
|
||||
|
||||
## V1 Runtime Context Parity
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user