opencode/packages/core/test/session-runner.test.ts
opencode-agent[bot] 7c6adcf60f
fix(core): scope v2 prompt cache by session (#31036)
Co-authored-by: opencode-agent[bot] <opencode-agent[bot]@users.noreply.github.com>
2026-06-05 19:11:32 -04:00

3537 lines
129 KiB
TypeScript

import { describe, expect } from "bun:test"
import {
LLMClient,
LLMError,
LLMEvent,
Model,
Tool,
TransportReason,
InvalidRequestReason,
type LLMClientShape,
type LLMRequest,
} from "@opencode-ai/llm"
import * as OpenAIChat from "@opencode-ai/llm/protocols/openai-chat"
import { Database } from "@opencode-ai/core/database/database"
import { EventV2 } from "@opencode-ai/core/event"
import { PermissionV2 } from "@opencode-ai/core/permission"
import { EventTable } from "@opencode-ai/core/event/sql"
import { Project } from "@opencode-ai/core/project"
import { ProjectTable } from "@opencode-ai/core/project/sql"
import { QuestionV2 } from "@opencode-ai/core/question"
import { AbsolutePath } from "@opencode-ai/core/schema"
import { SessionV2 } from "@opencode-ai/core/session"
import { ContextSnapshotDecodeError } from "@opencode-ai/core/session/error"
import { SessionEvent } from "@opencode-ai/core/session/event"
import { SessionInput } from "@opencode-ai/core/session/input"
import { SessionMessage } from "@opencode-ai/core/session/message"
import { Prompt } from "@opencode-ai/core/session/prompt"
import { SessionProjector } from "@opencode-ai/core/session/projector"
import { SessionExecution } from "@opencode-ai/core/session/execution"
import { SessionContextEpoch } from "@opencode-ai/core/session/context-epoch"
import { SessionRunCoordinator } from "@opencode-ai/core/session/run-coordinator"
import { SessionRunner } from "@opencode-ai/core/session/runner"
import * as SessionRunnerLLM from "@opencode-ai/core/session/runner/llm"
import { SessionRunnerModel } from "@opencode-ai/core/session/runner/model"
import { ToolRegistry } from "@opencode-ai/core/tool/registry"
import { ToolOutputStore } from "@opencode-ai/core/tool-output-store"
import { ApplicationTools } from "@opencode-ai/core/tool/application-tools"
import { AgentV2 } from "@opencode-ai/core/agent"
import { Config } from "@opencode-ai/core/config"
import { ConfigCompaction } from "@opencode-ai/core/config/compaction"
import { NativeTool } from "@opencode-ai/core/tool/native"
import {
SessionContextEpochTable,
SessionInputTable,
SessionMessageTable,
SessionTable,
} from "@opencode-ai/core/session/sql"
import { SessionStore } from "@opencode-ai/core/session/store"
import { SystemContext } from "@opencode-ai/core/system-context"
import { SystemContextRegistry } from "@opencode-ai/core/system-context/registry"
import { SkillGuidance } from "@opencode-ai/core/skill/guidance"
import { ModelV2 } from "@opencode-ai/core/model"
import { Location } from "@opencode-ai/core/location"
import { ProviderV2 } from "@opencode-ai/core/provider"
import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect"
import { asc, eq } from "drizzle-orm"
import { testEffect } from "./lib/effect"
const database = Database.layerFromPath(":memory:")
const events = EventV2.layer.pipe(Layer.provide(database))
const questions = QuestionV2.layer.pipe(Layer.provide(events))
const projector = SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database))
const store = SessionStore.layer.pipe(Layer.provide(database))
const requests: LLMRequest[] = []
let response: LLMEvent[] = []
let responses: LLMEvent[][] | undefined
let responseStream: Stream.Stream<LLMEvent, LLMError> | undefined
let streamGate: Deferred.Deferred<void> | undefined
let streamStarted: Deferred.Deferred<void> | undefined
let streamFailure: LLMError | undefined
let toolExecutionGate: Deferred.Deferred<void> | undefined
let toolExecutionsStarted: Deferred.Deferred<void> | undefined
let toolExecutionsReady = 5
let activeToolExecutions = 0
let maxActiveToolExecutions = 0
const client = Layer.succeed(
LLMClient.Service,
LLMClient.Service.of({
prepare: () => Effect.die("unused"),
stream: ((request: LLMRequest) => {
requests.push(request)
if (responseStream) {
const stream = responseStream
responseStream = undefined
return stream
}
const events = streamFailure
? Stream.fail(streamFailure)
: Stream.fromIterable(responses === undefined ? response : (responses.shift() ?? []))
if (!streamGate) return events
return Stream.unwrap(
(streamStarted ? Deferred.succeed(streamStarted, undefined) : Effect.void).pipe(
Effect.andThen(Deferred.await(streamGate)),
Effect.as(events),
),
)
}) as unknown as LLMClientShape["stream"],
generate: () => Effect.die("unused"),
}),
)
const model = Model.make({ id: "fake-model", provider: "fake", route: OpenAIChat.route })
const replacementModel = Model.make({ id: "replacement", provider: "fake", route: OpenAIChat.route })
const compactModel = Model.make({
id: "compact",
provider: "fake",
route: OpenAIChat.route.with({ limits: { context: 4_000, output: 50 } }),
})
const recoveryModel = Model.make({
id: "recovery",
provider: "fake",
route: OpenAIChat.route.with({ limits: { context: 20_000, output: 1_000 } }),
})
const authorizations: ToolRegistry.AuthorizeInput[] = []
const executions: string[] = []
const permission = Layer.succeed(
PermissionV2.Service,
PermissionV2.Service.of({
assert: () => Effect.die("unused"),
ask: () => Effect.die("unused"),
reply: () => Effect.die("unused"),
get: () => Effect.die("unused"),
forSession: () => Effect.die("unused"),
list: () => Effect.die("unused"),
}),
)
const applications = ApplicationTools.layer
const registry = ToolRegistry.layer.pipe(
Layer.provide(permission),
Layer.provide(applications),
Layer.provide(ToolOutputStore.defaultLayer),
)
const agents = AgentV2.layer
const echo = Layer.effectDiscard(
ToolRegistry.Service.use((registry) =>
registry.contribute((editor) => {
;(editor.set("echo", {
authorize: (input) =>
Effect.sync(() => {
authorizations.push(input)
}),
tool: Tool.make({
description: "Echo text",
parameters: Schema.Struct({ text: Schema.String }),
success: Schema.Struct({ text: Schema.String }),
toModelOutput: ({ output }) => [{ type: "text", text: output.text }],
execute: ({ text }) =>
Effect.gen(function* () {
executions.push(text)
activeToolExecutions++
maxActiveToolExecutions = Math.max(maxActiveToolExecutions, activeToolExecutions)
if (activeToolExecutions === toolExecutionsReady && toolExecutionsStarted) {
yield* Deferred.succeed(toolExecutionsStarted, undefined)
}
if (toolExecutionGate) yield* Deferred.await(toolExecutionGate)
return { text }
}).pipe(Effect.ensuring(Effect.sync(() => activeToolExecutions--))),
}),
}),
editor.set("defect", {
tool: Tool.make({
description: "Fail unexpectedly",
parameters: Schema.Struct({}),
success: Schema.Struct({}),
execute: () => Effect.die("unexpected tool defect"),
}),
}))
}),
),
).pipe(Layer.provide(registry))
let modelResolveHook = Effect.void
let currentModel = model
const models = SessionRunnerModel.layerWith((session) =>
modelResolveHook.pipe(Effect.as(session.model?.id === "replacement" ? replacementModel : currentModel)),
)
const systemContextKey = SystemContext.Key.make("test/context")
let systemBaseline = "Initial context"
let systemRemoved = false
let systemUnavailable = false
let systemLoadHook = Effect.void
const skillBaselines = new Map<AgentV2.ID, string>()
const systemContext = Layer.effectDiscard(
SystemContextRegistry.Service.pipe(
Effect.flatMap((registry) =>
registry.contribute({
key: systemContextKey,
load: Effect.sync(() =>
SystemContext.combine(
systemRemoved
? []
: [
SystemContext.make({
key: systemContextKey,
codec: Schema.toCodecJson(Schema.String),
load: systemLoadHook.pipe(
Effect.andThen(
Effect.sync(() => (systemUnavailable ? SystemContext.unavailable : systemBaseline)),
),
),
baseline: String,
update: (_previous, current) => current,
removed: () => "System context source removed: test/context",
}),
],
),
),
}),
),
),
).pipe(Layer.provideMerge(SystemContextRegistry.layer))
const location = Location.layer({ directory: AbsolutePath.make("/project") }).pipe(Layer.provide(Project.defaultLayer))
const skillGuidance = Layer.mock(SkillGuidance.Service, {
load: (agent) =>
Effect.succeed(
skillBaselines.has(agent.id)
? SystemContext.make({
key: SystemContext.Key.make("test/skill-guidance"),
codec: Schema.toCodecJson(Schema.String),
load: Effect.succeed(skillBaselines.get(agent.id)!),
baseline: String,
update: (_previous, current) => current,
removed: () => "Skill guidance removed",
})
: SystemContext.empty,
),
})
const config = Layer.succeed(
Config.Service,
Config.Service.of({
entries: () =>
Effect.succeed([
new Config.Document({
type: "document",
info: new Config.Info({
compaction: new ConfigCompaction.Info({
buffer: 3_000,
keep: new ConfigCompaction.Keep({ tokens: 1_000 }),
}),
}),
}),
]),
}),
)
const runner = SessionRunnerLLM.layer.pipe(
Layer.provide(database),
Layer.provide(store),
Layer.provide(events),
Layer.provide(client),
Layer.provide(registry),
Layer.provide(models),
Layer.provide(systemContext),
Layer.provide(location),
Layer.provide(agents),
Layer.provide(skillGuidance),
Layer.provide(config),
)
const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
const execution = Layer.effect(
SessionExecution.Service,
SessionRunCoordinator.Service.pipe(
Effect.map((coordinator) =>
SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
interrupt: coordinator.interrupt,
}),
),
),
).pipe(Layer.provide(coordinator))
const sessions = SessionV2.layer.pipe(
Layer.provide(events),
Layer.provide(database),
Layer.provide(store),
Layer.provide(Project.defaultLayer),
Layer.provide(execution),
)
const it = testEffect(
Layer.mergeAll(
database,
events,
questions,
projector,
store,
client,
permission,
applications,
agents,
registry,
echo,
models,
systemContext,
location,
skillGuidance,
config,
runner,
coordinator,
execution,
sessions,
),
)
const sessionID = SessionV2.ID.make("ses_runner_test")
const otherSessionID = SessionV2.ID.make("ses_runner_other")
const insertSession = (id: SessionV2.ID) =>
Effect.gen(function* () {
const { db } = yield* Database.Service
yield* db
.insert(SessionTable)
.values({
id,
project_id: Project.ID.global,
slug: id,
directory: "/project",
title: "test",
version: "test",
})
.onConflictDoNothing()
.run()
.pipe(Effect.orDie)
})
const setup = Effect.gen(function* () {
const { db } = yield* Database.Service
response = []
systemBaseline = "Initial context"
systemRemoved = false
systemUnavailable = false
systemLoadHook = Effect.void
modelResolveHook = Effect.void
currentModel = model
skillBaselines.clear()
responses = undefined
streamFailure = undefined
responseStream = undefined
streamGate = undefined
streamStarted = undefined
toolExecutionGate = undefined
toolExecutionsStarted = undefined
toolExecutionsReady = 5
activeToolExecutions = 0
maxActiveToolExecutions = 0
yield* db
.insert(ProjectTable)
.values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] })
.onConflictDoNothing()
.run()
.pipe(Effect.orDie)
yield* insertSession(sessionID)
})
const providerUnavailable = () =>
new LLMError({
module: "test",
method: "stream",
reason: new TransportReason({ message: "Provider unavailable" }),
})
const setupOverflowRecovery = Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
response = fragmentFixture("text", "text-earlier", ["Earlier answer"]).completeEvents
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Earlier question ".repeat(700) }),
resume: false,
})
yield* session.resume(sessionID)
currentModel = recoveryModel
requests.length = 0
return session
})
const userTexts = (request: LLMRequest) =>
request.messages.flatMap((message) =>
message.role === "user"
? message.content.flatMap((content) => (content.type === "text" ? [content.text] : []))
: [],
)
const replaySessionProjection = (id: SessionV2.ID) =>
Effect.gen(function* () {
const { db } = yield* Database.Service
const events = yield* EventV2.Service
const recorded = yield* db
.select()
.from(EventTable)
.where(eq(EventTable.aggregate_id, id))
.orderBy(asc(EventTable.seq))
.all()
.pipe(Effect.orDie)
yield* events.remove(id)
yield* db.delete(SessionInputTable).where(eq(SessionInputTable.session_id, id)).run().pipe(Effect.orDie)
yield* db.delete(SessionMessageTable).where(eq(SessionMessageTable.session_id, id)).run().pipe(Effect.orDie)
yield* events.replayAll(
recorded.map((event) => ({
id: event.id,
aggregateID: event.aggregate_id,
seq: event.seq,
type: event.type,
data: event.data,
})),
)
})
type FragmentKind = "text" | "reasoning" | "tool input"
type FragmentFixture = {
readonly delta: EventV2.Definition
readonly completeEvents: LLMEvent[]
readonly partialEvents: LLMEvent[]
readonly expectedAssistant: unknown
readonly expectedContent: unknown
}
const fragmentKinds: readonly FragmentKind[] = ["text", "reasoning", "tool input"]
const fragmentID = (kind: FragmentKind, suffix: string) => `${kind === "tool input" ? "call" : kind}-${suffix}`
const fragmentFixture = (kind: FragmentKind, id: string, chunks: readonly string[]): FragmentFixture => {
const text = chunks.join("")
switch (kind) {
case "text": {
const partialEvents = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id }),
...chunks.map((text) => LLMEvent.textDelta({ id, text })),
]
const expectedContent = { type: "text", id, text }
return {
delta: SessionEvent.Text.Delta,
partialEvents,
completeEvents: [
...partialEvents,
LLMEvent.textEnd({ id }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
expectedAssistant: { type: "assistant", finish: "stop", content: [expectedContent] },
expectedContent,
}
}
case "reasoning": {
const partialEvents = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.reasoningStart({ id }),
...chunks.map((text) => LLMEvent.reasoningDelta({ id, text })),
]
const expectedContent = { type: "reasoning", id, text }
return {
delta: SessionEvent.Reasoning.Delta,
partialEvents,
completeEvents: [
...partialEvents,
LLMEvent.reasoningEnd({ id }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
expectedAssistant: { type: "assistant", finish: "stop", content: [expectedContent] },
expectedContent,
}
}
case "tool input": {
const partialEvents = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolInputStart({ id, name: "echo" }),
...chunks.map((text) => LLMEvent.toolInputDelta({ id, name: "echo", text })),
]
const expectedContent = { type: "tool", id, state: { status: "pending", input: text } }
return {
delta: SessionEvent.Tool.Input.Delta,
partialEvents,
completeEvents: [...partialEvents, LLMEvent.toolInputEnd({ id, name: "echo" })],
expectedAssistant: { type: "assistant", content: [expectedContent] },
expectedContent,
}
}
}
}
const verifyEphemeralDeltas = (kind: FragmentKind) =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const prompt = `Stream ${kind}`
const chunks = Array.from({ length: 32 }, (_, index) => `${index},`)
const fixture = fragmentFixture(kind, fragmentID(kind, "many"), chunks)
const expectedContext = [{ type: "user", text: prompt }, fixture.expectedAssistant]
yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false })
const events = yield* EventV2.Service
const live = yield* events.subscribe(fixture.delta).pipe(Stream.take(32), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
response = fixture.completeEvents
yield* session.resume(sessionID)
const { db } = yield* Database.Service
const deltas = yield* db
.select({ type: EventTable.type })
.from(EventTable)
.where(eq(EventTable.type, EventV2.versionedType(fixture.delta.type, 1)))
.all()
.pipe(Effect.orDie)
expect(Array.from(yield* Fiber.join(live))).toHaveLength(32)
expect(deltas).toHaveLength(0)
expect(yield* session.context(sessionID)).toMatchObject(expectedContext)
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject(expectedContext)
})
const verifyPartialFlushOnFailure = (kind: FragmentKind) =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const prompt = `Fail after ${kind}`
const fixture = fragmentFixture(kind, fragmentID(kind, "partial"), ["Partial"])
const failure = providerUnavailable()
yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false })
responseStream = Stream.concat(Stream.fromIterable(fixture.partialEvents), Stream.fail(failure))
expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: prompt },
{
type: "assistant",
finish: "error",
error: { type: "unknown", message: "Provider unavailable" },
content: [fixture.expectedContent],
},
])
})
const verifyPartialFlushOnInterruption = (kind: FragmentKind) =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const prompt = `Interrupt after ${kind}`
const fixture = fragmentFixture(kind, fragmentID(kind, "interrupted"), ["Partial"])
const streamed = yield* Deferred.make<void>()
yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false })
responseStream = Stream.concat(
Stream.fromIterable(fixture.partialEvents),
Stream.fromEffect(Deferred.succeed(streamed, undefined)).pipe(Stream.flatMap(() => Stream.never)),
)
const runner = yield* SessionRunner.Service
const fiber = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild)
yield* Deferred.await(streamed)
yield* Fiber.interrupt(fiber)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: prompt },
{
type: "assistant",
content: [
kind === "tool input"
? { type: "tool", id: fragmentID(kind, "interrupted"), state: { status: "error" } }
: fixture.expectedContent,
],
},
])
})
describe("SessionRunnerLLM", () => {
it.effect("advertises and executes a globally attached application tool", () =>
Effect.gen(function* () {
yield* setup
const applicationTools = yield* ApplicationTools.Service
const session = yield* SessionV2.Service
const contexts: NativeTool.Context[] = []
yield* applicationTools.attach({
application_context: NativeTool.make({
description: "Read application context",
parameters: Schema.Struct({ query: Schema.String }),
success: Schema.Struct({ answer: Schema.String }),
execute: ({ query }, context) =>
Effect.sync(() => {
contexts.push(context)
return { answer: query.toUpperCase() }
}),
}),
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Use application context" }), resume: false })
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-application", name: "application_context", input: { query: "hello" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[],
]
yield* session.resume(sessionID)
expect(requests[0]?.tools.map((tool) => tool.name)).toContain("application_context")
expect(contexts).toEqual([{ sessionID, id: "call-application", name: "application_context" }])
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Use application context" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-application",
state: { status: "completed", structured: { answer: "HELLO" } },
},
],
},
])
}),
)
it.effect("starts a real runner turn after default prompt recording", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
requests.length = 0
responses = undefined
streamGate = undefined
streamStarted = undefined
response = []
const message = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run automatically" }) })
expect(requests).toHaveLength(1)
expect(yield* session.messages({ sessionID })).toMatchObject([
{ id: message.id, type: "user", text: "Run automatically" },
])
}),
)
it.effect("streams one request with registry definitions from chronological V2 user history", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
requests.length = 0
responses = undefined
streamGate = undefined
streamStarted = undefined
response = []
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.model).toBe(model)
expect(requests[0]?.tools.map((tool) => tool.name)).toEqual(["echo", "defect"])
expect(requests[0]?.messages.map((message) => ({ role: message.role, content: message.content }))).toEqual([
{ role: "user", content: [{ type: "text", text: "First" }] },
{ role: "user", content: [{ type: "text", text: "Second" }] },
])
expect(yield* session.messages({ sessionID })).toHaveLength(2)
}),
)
it.effect("retries the first provider turn after system context becomes available", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
const messageID = SessionMessage.ID.create()
systemUnavailable = true
yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
const exit = yield* session.resume(sessionID).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(SystemContext.InitializationBlocked)
expect(requests).toHaveLength(0)
expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true)
expect(
yield* db
.select()
.from(SessionContextEpochTable)
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get(),
).toBeUndefined()
systemUnavailable = false
yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "First" }) })
yield* (yield* SessionRunCoordinator.Service).awaitIdle(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user"])
}),
)
it.effect("interrupts a source Location runner after a Session moves", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.Moved, {
sessionID,
timestamp: DateTime.makeUnsafe(1),
location: { directory: AbsolutePath.make("/moved") },
})
expect(
yield* db
.select()
.from(SessionContextEpochTable)
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get(),
).toBeUndefined()
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
const exit = yield* session.resume(sessionID).pipe(Effect.exit)
expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBe(true)
expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true)
}),
)
it.effect("fails gracefully when a stored context snapshot cannot be decoded", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
response = []
yield* session.resume(sessionID)
yield* db
.update(SessionContextEpochTable)
.set({ snapshot: { invalid: { value: "bad" } } })
.where(eq(SessionContextEpochTable.session_id, sessionID))
.run()
.pipe(Effect.orDie)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
requests.length = 0
const exit = yield* session.resume(sessionID).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(ContextSnapshotDecodeError)
expect(requests).toHaveLength(0)
}),
)
it.effect("does not create a source Location epoch after a concurrent Session move", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const { db } = yield* Database.Service
let moved = false
systemLoadHook = Effect.suspend(() => {
if (moved) return Effect.void
moved = true
return events
.publish(SessionEvent.Moved, {
sessionID,
timestamp: DateTime.makeUnsafe(1),
location: { directory: AbsolutePath.make("/moved") },
})
.pipe(Effect.asVoid)
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
expect(Exit.isFailure(yield* session.resume(sessionID).pipe(Effect.exit))).toBe(true)
expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true)
expect(
yield* db
.select()
.from(SessionContextEpochTable)
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get(),
).toBeUndefined()
expect((yield* session.get(sessionID)).location.directory).toBe(AbsolutePath.make("/moved"))
}),
)
it.effect("reuses one durable baseline after the context producer changes", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
systemBaseline = "Changed context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context"],
["Initial context"],
])
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"])
expect(requests[1]?.messages.at(-1)?.content).toEqual([{ type: "text", text: "Changed context" }])
expect(yield* session.messages({ sessionID })).toHaveLength(3)
const { db } = yield* Database.Service
expect(
yield* db
.select({ id: EventTable.id })
.from(EventTable)
.where(eq(EventTable.type, "session.next.context.updated.1"))
.all()
.pipe(Effect.orDie),
).toHaveLength(1)
yield* replaySessionProjection(sessionID)
expect(yield* session.messages({ sessionID })).toHaveLength(3)
}),
)
it.effect("includes the effective default agent system before durable context", () =>
Effect.gen(function* () {
yield* setup
const agent = yield* AgentV2.Service
yield* agent.update((editor) =>
editor.update(AgentV2.ID.make("build"), (agent) => {
agent.system = "Build agent instructions"
agent.mode = "primary"
}),
)
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = fragmentFixture("text", "text-build", ["Done"]).completeEvents
yield* session.resume(sessionID)
expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Build agent instructions", "Initial context"])
}),
)
it.effect("uses the configured default agent system for omitted-agent sessions", () =>
Effect.gen(function* () {
yield* setup
const agent = yield* AgentV2.Service
yield* agent.update((editor) => {
editor.update(AgentV2.ID.make("build"), (agent) => {
agent.system = "Build agent instructions"
agent.mode = "primary"
})
editor.update(AgentV2.ID.make("reviewer"), (agent) => {
agent.system = "Reviewer instructions"
agent.mode = "primary"
})
editor.default(AgentV2.ID.make("reviewer"))
})
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = fragmentFixture("text", "text-reviewer", ["Done"]).completeEvents
yield* session.resume(sessionID)
expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Reviewer instructions", "Initial context"])
expect((yield* session.messages({ sessionID }))[0]).toMatchObject({ type: "assistant", agent: "reviewer" })
}),
)
it.effect("uses an explicitly selected non-build agent system", () =>
Effect.gen(function* () {
yield* setup
const { db } = yield* Database.Service
const agent = yield* AgentV2.Service
yield* agent.update((editor) =>
editor.update(AgentV2.ID.make("reviewer"), (agent) => {
agent.system = "Reviewer instructions"
agent.mode = "primary"
}),
)
yield* db
.update(SessionTable)
.set({ agent: "reviewer" })
.where(eq(SessionTable.id, sessionID))
.run()
.pipe(Effect.orDie)
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = fragmentFixture("text", "text-selected", ["Done"]).completeEvents
yield* session.resume(sessionID)
expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Reviewer instructions", "Initial context"])
expect((yield* session.messages({ sessionID }))[0]).toMatchObject({ type: "assistant", agent: "reviewer" })
}),
)
it.effect("composes selected-agent skill guidance and replaces it after an agent switch", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
skillBaselines.set(AgentV2.ID.make("build"), "Build skills")
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills")
yield* events.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: "reviewer",
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context\n\nBuild skills"],
["Initial context\n\nReviewer skills"],
])
}),
)
it.effect("retries first-epoch preparation when the selected agent changes during observation", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
skillBaselines.set(AgentV2.ID.make("build"), "Build skills")
skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills")
let switched = false
systemLoadHook = Effect.suspend(() => {
if (switched) return Effect.void
switched = true
return events
.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: "reviewer",
})
.pipe(Effect.asVoid)
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context\n\nReviewer skills"],
])
}),
)
it.effect("opens a queued activity once when the selected agent changes during observation", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
skillBaselines.set(AgentV2.ID.make("build"), "Build skills")
skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills")
let switched = false
systemLoadHook = Effect.suspend(() => {
if (switched) return Effect.void
switched = true
return events
.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: "reviewer",
})
.pipe(Effect.asVoid)
})
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Queued" }),
delivery: "queue",
resume: false,
})
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect((yield* session.context(sessionID)).filter((message) => message.type === "user")).toHaveLength(1)
}),
)
it.effect("retries an agent switch before the final provider-dispatch boundary", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const { db } = yield* Database.Service
skillBaselines.set(AgentV2.ID.make("build"), "Build skills")
skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills")
let switched = false
modelResolveHook = Effect.suspend(() => {
if (switched) return Effect.void
switched = true
return events
.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: "reviewer",
})
.pipe(Effect.asVoid)
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context\n\nReviewer skills"],
])
expect(
yield* db
.select({ replacementSeq: SessionContextEpochTable.replacement_seq })
.from(SessionContextEpochTable)
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get()
.pipe(Effect.orDie),
).toEqual({ replacementSeq: null })
}),
)
it.effect("retries a model switch before the final provider-dispatch boundary", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
let switched = false
modelResolveHook = Effect.suspend(() => {
if (switched) return Effect.void
switched = true
return events
.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") },
})
.pipe(Effect.asVoid)
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests.map((request) => request.model)).toEqual([replacementModel])
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([["Initial context"]])
}),
)
it.effect("fences an unchanged epoch read across an agent ABA replacement request", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
response = []
yield* session.resume(sessionID)
let switched = false
systemLoadHook = Effect.suspend(() => {
if (switched) return Effect.void
switched = true
return events
.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: AgentV2.ID.make("reviewer"),
})
.pipe(
Effect.andThen(
events.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(2),
agent: AgentV2.defaultID,
}),
),
Effect.asVoid,
)
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
requests.length = 0
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(
yield* db
.select({ replacementSeq: SessionContextEpochTable.replacement_seq })
.from(SessionContextEpochTable)
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get()
.pipe(Effect.orDie),
).toEqual({ replacementSeq: null })
}),
)
it.effect("rejects stale agent guidance when committing an existing-epoch replacement", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
response = []
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: AgentV2.ID.make("reviewer"),
})
const context = (text: string) =>
Effect.succeed(
SystemContext.make({
key: systemContextKey,
codec: Schema.toCodecJson(Schema.String),
load: Effect.succeed(text),
baseline: String,
update: (_previous, current) => current,
}),
)
const location = (yield* session.get(sessionID)).location
expect(
yield* SessionContextEpoch.prepare(
db,
events,
context("Stale build context"),
sessionID,
location,
AgentV2.defaultID,
).pipe(Effect.catchDefect(Effect.succeed)),
).toBeInstanceOf(SessionContextEpoch.AgentMismatch)
expect(
yield* SessionContextEpoch.prepare(
db,
events,
context("Reviewer context"),
sessionID,
location,
AgentV2.ID.make("reviewer"),
),
).toMatchObject({ baseline: "Reviewer context" })
}),
)
it.effect("blocks a cross-agent provider turn while replacement context is unavailable", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
skillBaselines.set(AgentV2.defaultID, "Build skills")
skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills")
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
response = []
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.AgentSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
agent: AgentV2.ID.make("reviewer"),
})
systemUnavailable = true
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
requests.length = 0
const blocked = yield* session.resume(sessionID).pipe(Effect.exit)
expect(Exit.isFailure(blocked)).toBe(true)
if (Exit.isFailure(blocked))
expect(Cause.squash(blocked.cause)).toBeInstanceOf(SessionContextEpoch.AgentReplacementBlocked)
expect(requests).toHaveLength(0)
systemUnavailable = false
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context\n\nReviewer skills"],
])
}),
)
it.effect("admits removed context as a chronological System message", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
systemRemoved = true
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"])
expect(requests[1]?.messages.at(-1)?.content).toEqual([
{ type: "text", text: "System context source removed: test/context" },
])
expect(yield* session.messages({ sessionID })).toHaveLength(3)
}),
)
it.effect("replaces the baseline lazily after a model switch and drops prior System updates", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
systemBaseline = "Changed context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") },
})
systemBaseline = "Replacement context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false })
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context"],
["Initial context"],
["Replacement context"],
])
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"])
expect(requests[2]?.messages.map((message) => message.role)).toEqual(["user", "user", "user"])
expect((yield* session.context(sessionID)).map((message) => message.type)).toEqual([
"user",
"user",
"model-switched",
"user",
])
yield* replaySessionProjection(sessionID)
expect(yield* session.messages({ sessionID })).toHaveLength(5)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fourth" }), resume: false })
yield* session.resume(sessionID)
}),
)
it.effect("defers replacement while admitted context is temporarily unavailable", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") },
})
systemUnavailable = true
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
systemUnavailable = false
systemBaseline = "Replacement context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false })
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context"],
["Initial context"],
["Replacement context"],
])
}),
)
it.effect("advances a pending replacement to the latest invalidation boundary", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
response = []
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement-1"), providerID: ProviderV2.ID.make("fake") },
})
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(2),
model: { id: ModelV2.ID.make("replacement-2"), providerID: ProviderV2.ID.make("fake") },
})
const latest = yield* SessionInput.latestSeq(db, sessionID)
expect(
yield* db
.select({ replacementSeq: SessionContextEpochTable.replacement_seq })
.from(SessionContextEpochTable)
.where(eq(SessionContextEpochTable.session_id, sessionID))
.get()
.pipe(Effect.orDie),
).toEqual({ replacementSeq: latest })
}),
)
it.effect("retries epoch preparation until observation-time invalidations settle", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
response = []
yield* session.resume(sessionID)
requests.length = 0
systemBaseline = "Changed context"
let invalidations = 0
systemLoadHook = Effect.suspend(() => {
if (invalidations === 4) return Effect.void
invalidations++
return events
.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(invalidations),
model: { id: ModelV2.ID.make(`replacement-${invalidations}`), providerID: ProviderV2.ID.make("fake") },
})
.pipe(Effect.asVoid)
})
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
expect(invalidations).toBe(4)
expect(requests).toHaveLength(1)
expect(requests[0]?.system.map((part) => part.text)).toEqual(["Changed context"])
}),
)
it.effect("replays retained context projections while replacement is pending", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
systemBaseline = "Changed context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") },
})
yield* replaySessionProjection(sessionID)
systemBaseline = "Replacement context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false })
yield* session.resume(sessionID)
expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Replacement context"])
}),
)
it.effect("replaces the baseline lazily after completed compaction without reopening replacement on replay", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
const compactionID = SessionMessage.ID.create()
yield* events.publish(SessionEvent.Compaction.Started, {
sessionID,
messageID: compactionID,
timestamp: DateTime.makeUnsafe(1),
reason: "manual",
})
yield* events.publish(SessionEvent.Compaction.Ended, {
sessionID,
messageID: compactionID,
timestamp: DateTime.makeUnsafe(2),
reason: "manual",
text: "summary",
recent: "",
})
systemBaseline = "Replacement context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context"],
["Replacement context"],
])
yield* replaySessionProjection(sessionID)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false })
yield* session.resume(sessionID)
}),
)
it.effect("automatically compacts into a completed summary and retained recent turn", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
response = fragmentFixture("text", "text-first", ["Earlier answer"]).completeEvents
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Earlier question ".repeat(180) }),
resume: false,
})
yield* session.resume(sessionID)
currentModel = compactModel
requests.length = 0
responses = [
fragmentFixture("text", "text-summary", ["## Goal\n- Preserve the task"]).completeEvents,
fragmentFixture("text", "text-final", ["Continued"]).completeEvents,
]
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Recent exact request ".repeat(180) }),
resume: false,
})
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
expect(userTexts(requests[0])[0]).toContain("## Goal")
expect(userTexts(requests[1])).toHaveLength(1)
expect(userTexts(requests[1])[0]).toContain("<summary>\n## Goal\n- Preserve the task\n</summary>")
expect(userTexts(requests[1])[0]).toContain(`[User]: ${"Recent exact request ".repeat(180)}`)
const context = yield* (yield* SessionStore.Service).context(sessionID)
expect(context.map((message) => message.type)).toEqual(["compaction", "assistant"])
expect(context[0]).toMatchObject({
type: "compaction",
summary: "## Goal\n- Preserve the task",
})
requests.length = 0
responses = [
fragmentFixture("text", "text-summary-2", ["## Goal\n- Preserve the updated task"]).completeEvents,
fragmentFixture("text", "text-final-2", ["Continued again"]).completeEvents,
]
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Newest exact request ".repeat(180) }),
resume: false,
})
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
expect(userTexts(requests[0])[0]).toContain(
"<previous-summary>\n## Goal\n- Preserve the task\n</previous-summary>",
)
expect(userTexts(requests[0])[0]).toContain("Recent exact request")
expect((yield* (yield* SessionStore.Service).context(sessionID))[0]).toMatchObject({
type: "compaction",
summary: "## Goal\n- Preserve the updated task",
})
}),
)
it.effect("forces one compaction and retries after provider context overflow", () =>
Effect.gen(function* () {
const session = yield* setupOverflowRecovery
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }),
],
fragmentFixture("text", "text-summary", ["## Goal\n- Recover overflow"]).completeEvents,
fragmentFixture("text", "text-final", ["Recovered"]).completeEvents,
]
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
yield* session.resume(sessionID)
expect(requests).toHaveLength(3)
expect(userTexts(requests[1])[0]).toContain("## Goal")
expect(userTexts(requests[2])[0]).toContain("<summary>\n## Goal\n- Recover overflow\n</summary>")
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "compaction", summary: "## Goal\n- Recover overflow" },
{ type: "assistant", finish: "stop" },
])
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "compaction" },
{ type: "assistant", finish: "stop" },
])
}),
)
it.effect("persists a second context overflow after one recovery", () =>
Effect.gen(function* () {
const session = yield* setupOverflowRecovery
const overflow = () => [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }),
]
responses = [
overflow(),
fragmentFixture("text", "text-summary", ["## Goal\n- Recover once"]).completeEvents,
overflow(),
]
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
yield* session.resume(sessionID)
expect(requests).toHaveLength(3)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "compaction" },
{ type: "assistant", finish: "error", error: { message: "prompt too long" } },
])
}),
)
it.effect("recovers once from a raw context overflow failure", () =>
Effect.gen(function* () {
const session = yield* setupOverflowRecovery
responseStream = Stream.fail(
new LLMError({
module: "test",
method: "stream",
reason: new InvalidRequestReason({
message: "prompt too long",
classification: "context-overflow",
}),
}),
)
responses = [
fragmentFixture("text", "text-summary", ["## Goal\n- Recover raw overflow"]).completeEvents,
fragmentFixture("text", "text-final", ["Recovered"]).completeEvents,
]
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
yield* session.resume(sessionID)
expect(requests).toHaveLength(3)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "compaction", summary: "## Goal\n- Recover raw overflow" },
{ type: "assistant", finish: "stop" },
])
}),
)
it.effect("publishes the original overflow when recovery summarization fails", () =>
Effect.gen(function* () {
const session = yield* setupOverflowRecovery
responses = [
[LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" })],
[LLMEvent.providerError({ message: "summary unavailable" })],
]
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
const context = yield* session.context(sessionID)
expect(context.some((message) => message.type === "compaction")).toBe(false)
expect(context.slice(-2)).toMatchObject([
{ type: "user", text: "Continue" },
{ type: "assistant", finish: "error", error: { message: "prompt too long" } },
])
}),
)
it.effect("interrupts overflow recovery while the summary provider is running", () =>
Effect.gen(function* () {
const session = yield* setupOverflowRecovery
responses = [
[LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" })],
fragmentFixture("text", "text-summary", ["## Goal\n- Interrupted"]).completeEvents,
]
const firstGate = yield* Deferred.make<void>()
const summaryGate = yield* Deferred.make<void>()
streamGate = firstGate
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (requests.length < 1) yield* Effect.yieldNow
streamGate = summaryGate
yield* Deferred.succeed(firstGate, undefined)
while (requests.length < 2) yield* Effect.yieldNow
yield* session.interrupt(sessionID)
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
streamGate = undefined
expect(requests).toHaveLength(2)
expect((yield* session.context(sessionID)).some((message) => message.type === "compaction")).toBe(false)
}),
)
it.effect("preserves effective System updates while compaction replacement is blocked", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
systemBaseline = "Changed context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
const compactionID = SessionMessage.ID.create()
yield* events.publish(SessionEvent.Compaction.Started, {
sessionID,
messageID: compactionID,
timestamp: DateTime.makeUnsafe(1),
reason: "manual",
})
yield* events.publish(SessionEvent.Compaction.Ended, {
sessionID,
messageID: compactionID,
timestamp: DateTime.makeUnsafe(2),
reason: "manual",
text: "summary",
recent: "",
})
systemUnavailable = true
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false })
yield* session.resume(sessionID)
expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Initial context"])
expect(
requests
.at(-1)
?.messages.some(
(message) =>
message.role === "system" &&
message.content[0]?.type === "text" &&
message.content[0].text === "Changed context",
),
).toBe(true)
}),
)
it.effect("projects reasoning and tool events without executing or continuing tools", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Use tools" }), resume: false })
requests.length = 0
responses = undefined
streamGate = undefined
streamStarted = undefined
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.reasoningStart({ id: "reasoning-1" }),
LLMEvent.reasoningDelta({ id: "reasoning-1", text: "Think" }),
LLMEvent.reasoningEnd({ id: "reasoning-1" }),
LLMEvent.toolInputStart({ id: "call-error", name: "write" }),
LLMEvent.toolInputDelta({ id: "call-error", name: "write", text: '{"path":"README.md"}' }),
LLMEvent.toolInputEnd({ id: "call-error", name: "write" }),
LLMEvent.toolCall({ id: "call-error", name: "write", input: { path: "README.md" }, providerExecuted: true }),
LLMEvent.toolError({ id: "call-error", name: "write", message: "Denied" }),
LLMEvent.toolResult({ id: "call-error", name: "write", result: { type: "error", value: "Denied" } }),
LLMEvent.toolCall({
id: "call-provider",
name: "web_search",
input: { query: "hello" },
providerExecuted: true,
providerMetadata: { fake: { source: "provider" } },
}),
LLMEvent.toolResult({
id: "call-provider",
name: "web_search",
result: {
type: "content",
value: [
{ type: "text", text: "Hello" },
{ type: "media", mediaType: "image/png", data: "data:image/png;base64,aGVsbG8=", filename: "hello.png" },
],
},
providerExecuted: true,
providerMetadata: { fake: { source: "provider" } },
}),
LLMEvent.stepFinish({
index: 0,
reason: "tool-calls",
usage: {
inputTokens: 10,
nonCachedInputTokens: 8,
outputTokens: 4,
reasoningTokens: 1,
cacheReadInputTokens: 2,
},
}),
LLMEvent.finish({ reason: "tool-calls" }),
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.tools.map((tool) => tool.name)).toEqual(["echo", "defect"])
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Use tools" },
{
type: "assistant",
finish: "tool-calls",
tokens: { input: 8, output: 3, reasoning: 1, cache: { read: 2, write: 0 } },
content: [
{ type: "reasoning", id: "reasoning-1", text: "Think" },
{
type: "tool",
id: "call-error",
name: "write",
state: {
status: "error",
input: { path: "README.md" },
error: { type: "unknown", message: "Denied" },
},
},
{
type: "tool",
id: "call-provider",
name: "web_search",
provider: { executed: true, metadata: { fake: { source: "provider" } } },
state: {
status: "completed",
input: { query: "hello" },
structured: {},
content: [
{ type: "text", text: "Hello" },
{ type: "file", mime: "image/png", source: { type: "data", data: "aGVsbG8=" }, name: "hello.png" },
],
},
},
],
},
])
}),
)
it.effect("continues with reloaded history after durably settling one local tool call", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo this" }), resume: false })
requests.length = 0
authorizations.length = 0
executions.length = 0
streamGate = undefined
streamStarted = undefined
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id: "text-final" }),
LLMEvent.textDelta({ id: "text-final", text: "Done" }),
LLMEvent.textEnd({ id: "text-final" }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
expect(authorizations).toMatchObject([{ sessionID, call: { id: "call-echo", name: "echo" } }])
expect(executions).toEqual(["hello"])
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Echo this" },
{
type: "assistant",
finish: "tool-calls",
content: [
{
type: "tool",
id: "call-echo",
name: "echo",
state: {
status: "completed",
input: { text: "hello" },
structured: { text: "hello" },
content: [{ type: "text", text: "hello" }],
},
},
],
},
{ type: "assistant", finish: "stop", content: [{ type: "text", id: "text-final", text: "Done" }] },
])
}),
)
it.effect("reloads a model switch before a tool-driven continuation turn", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo this" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
toolExecutionGate = yield* Deferred.make<void>()
toolExecutionsStarted = yield* Deferred.make<void>()
toolExecutionsReady = 1
const run = yield* Effect.forkChild(session.resume(sessionID))
yield* Deferred.await(toolExecutionsStarted)
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") },
})
systemBaseline = "Replacement context"
yield* Deferred.succeed(toolExecutionGate, undefined)
yield* Fiber.join(run)
expect(requests.map((request) => request.model)).toEqual([model, replacementModel])
expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([
["Initial context"],
["Replacement context"],
])
}),
)
it.effect("restores durable reasoning provider metadata in a second-turn request", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Think first" }), resume: false })
requests.length = 0
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.reasoningStart({ id: "reasoning-anthropic" }),
LLMEvent.reasoningDelta({ id: "reasoning-anthropic", text: "Signed thought" }),
LLMEvent.reasoningEnd({ id: "reasoning-anthropic", providerMetadata: { anthropic: { signature: "sig_1" } } }),
LLMEvent.reasoningStart({
id: "reasoning-openai",
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: null } },
}),
LLMEvent.reasoningDelta({ id: "reasoning-openai", text: "Encrypted thought" }),
LLMEvent.reasoningEnd({
id: "reasoning-openai",
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } },
}),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
]
yield* session.resume(sessionID)
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Think first" },
{
type: "assistant",
content: [
{ type: "reasoning", text: "Signed thought", providerMetadata: { anthropic: { signature: "sig_1" } } },
{
type: "reasoning",
text: "Encrypted thought",
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } },
},
],
},
])
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
response = []
yield* session.resume(sessionID)
expect(requests[1]?.messages[1]?.content).toEqual([
{ type: "reasoning", text: "Signed thought", providerMetadata: { anthropic: { signature: "sig_1" } } },
{
type: "reasoning",
text: "Encrypted thought",
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } },
},
])
}),
)
it.effect("replays durable provider-executed tool results inline in a second-turn request", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Search first" }), resume: false })
requests.length = 0
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({
id: "hosted-search",
name: "web_search",
input: { query: "Effect" },
providerExecuted: true,
providerMetadata: { openai: { itemId: "hosted-search" } },
}),
LLMEvent.toolResult({
id: "hosted-search",
name: "web_search",
result: { type: "json", value: [{ title: "Effect" }] },
providerExecuted: true,
providerMetadata: { anthropic: { blockType: "web_search_tool_result" } },
}),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
]
yield* session.resume(sessionID)
yield* replaySessionProjection(sessionID)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
response = []
yield* session.resume(sessionID)
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "user"])
expect(requests[1]?.messages[1]?.content).toMatchObject([
{
type: "tool-call",
id: "hosted-search",
name: "web_search",
input: { query: "Effect" },
providerExecuted: true,
providerMetadata: { openai: { itemId: "hosted-search" } },
},
{
type: "tool-result",
id: "hosted-search",
name: "web_search",
result: { type: "json", value: [{ title: "Effect" }] },
providerExecuted: true,
providerMetadata: { anthropic: { blockType: "web_search_tool_result" } },
},
])
}),
)
it.effect("starts recorded local tools eagerly and awaits settlement before continuing", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo five times" }), resume: false })
requests.length = 0
executions.length = 0
toolExecutionGate = yield* Deferred.make<void>()
toolExecutionsStarted = yield* Deferred.make<void>()
const providerGate = yield* Deferred.make<void>()
response = []
responses = undefined
const initial = Stream.fromIterable([
LLMEvent.stepStart({ index: 0 }),
...Array.from({ length: 5 }, (_, index) =>
LLMEvent.toolCall({ id: `call-echo-${index}`, name: "echo", input: { text: `${index}` } }),
),
])
const final = Stream.fromIterable([
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
])
streamGate = undefined
responseStream = Stream.concat(
initial,
Stream.fromEffect(Deferred.await(providerGate)).pipe(Stream.flatMap(() => final)),
)
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(toolExecutionsStarted)
expect(executions).toHaveLength(5)
expect(maxActiveToolExecutions).toBe(5)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Echo five times" },
{
type: "assistant",
content: Array.from({ length: 5 }, (_, index) => ({
type: "tool",
id: `call-echo-${index}`,
state: { status: "running", input: { text: `${index}` } },
})),
},
])
yield* Deferred.succeed(providerGate, undefined)
yield* Effect.yieldNow
expect(requests).toHaveLength(1)
yield* Deferred.succeed(toolExecutionGate, undefined)
yield* Fiber.join(run)
toolExecutionGate = undefined
toolExecutionsStarted = undefined
expect(executions).toHaveLength(5)
expect(maxActiveToolExecutions).toBe(5)
expect(requests).toHaveLength(2)
}),
)
it.effect("settles repeated provider-local tool call IDs against their owning assistant messages", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo twice" }), resume: false })
requests.length = 0
executions.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "tool_0", name: "echo", input: { text: "first" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "tool_0", name: "echo", input: { text: "second" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[],
]
yield* session.resume(sessionID)
expect(executions).toEqual(["first", "second"])
expect(requests).toHaveLength(3)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Echo twice" },
{
type: "assistant",
content: [
{
type: "tool",
id: "tool_0",
state: { status: "completed", structured: { text: "first" }, content: [{ type: "text", text: "first" }] },
},
],
},
{
type: "assistant",
content: [
{
type: "tool",
id: "tool_0",
state: {
status: "completed",
structured: { text: "second" },
content: [{ type: "text", text: "second" }],
},
},
],
},
])
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Echo twice" },
{
type: "assistant",
content: [
{
type: "tool",
id: "tool_0",
state: { status: "completed", structured: { text: "first" }, content: [{ type: "text", text: "first" }] },
},
],
},
{
type: "assistant",
content: [
{
type: "tool",
id: "tool_0",
state: {
status: "completed",
structured: { text: "second" },
content: [{ type: "text", text: "second" }],
},
},
],
},
])
}),
)
it.effect("joins concurrent resume calls into one active provider run", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run once" }), resume: false })
requests.length = 0
responses = undefined
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id: "text-once" }),
LLMEvent.textDelta({ id: "text-once", text: "Once" }),
LLMEvent.textEnd({ id: "text-once" }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
const second = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Effect.yieldNow
expect(requests).toHaveLength(1)
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(first)
yield* Fiber.join(second)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(1)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Run once" },
{ type: "assistant", finish: "stop", content: [{ type: "text", id: "text-once", text: "Once" }] },
])
}),
)
it.effect("steers an active provider turn with newly recorded prompts", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Change direction" }) })
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(first)
streamGate = undefined
streamStarted = undefined
yield* Effect.yieldNow
expect(requests).toHaveLength(2)
expect(userTexts(requests[0]!)).toEqual(["Start working"])
expect(userTexts(requests[1]!)).toEqual(["Start working", "Change direction"])
expect((yield* session.context(sessionID)).map((message) => message.type)).toEqual([
"user",
"assistant",
"user",
"assistant",
])
}),
)
it.effect("starts queued input after the active activity settles", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Wait until the next activity" }),
delivery: "queue",
})
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(first)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(3)
expect(userTexts(requests[0]!)).toEqual(["Start working"])
expect(userTexts(requests[1]!)).toEqual(["Start working"])
expect(userTexts(requests[2]!)).toEqual(["Start working", "Wait until the next activity"])
}),
)
it.effect("preserves durable queued input for a later wake after interruption", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false })
requests.length = 0
responses = [
[],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Run after interrupt" }),
delivery: "queue",
})
yield* session.interrupt(sessionID)
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(true)
const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (requests.length < 2) yield* Effect.yieldNow
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(resumed)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(2)
expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"])
expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Run after interrupt"])
}),
)
it.effect("preserves durable steering input for a later resume after interruption", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false })
requests.length = 0
responses = [
[],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Steer after interrupt" }),
})
yield* session.interrupt(sessionID)
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true)
const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (requests.length < 2) yield* Effect.yieldNow
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(resumed)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(2)
expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"])
expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Steer after interrupt"])
}),
)
it.effect("runs queued active inputs as separate FIFO activities", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue first" }), delivery: "queue" })
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue second" }), delivery: "queue" })
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(first)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(3)
expect(userTexts(requests[0]!)).toEqual(["Start working"])
expect(userTexts(requests[1]!)).toEqual(["Start working", "Queue first"])
expect(userTexts(requests[2]!)).toEqual(["Start working", "Queue first", "Queue second"])
}),
)
it.effect("opens queued input after idle steering activity settles", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start steering activity" }), resume: false })
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Queue later activity" }),
delivery: "queue",
resume: false,
})
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
expect(userTexts(requests[0]!)).toEqual(["Start steering activity"])
expect(userTexts(requests[1]!)).toEqual(["Start steering activity", "Queue later activity"])
}),
)
it.effect("coalesces steers into the active queued activity before starting the next queued activity", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
const firstGate = yield* Deferred.make<void>()
const secondGate = yield* Deferred.make<void>()
streamGate = firstGate
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (requests.length < 1) yield* Effect.yieldNow
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue first" }), delivery: "queue" })
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue second" }), delivery: "queue" })
streamGate = secondGate
yield* Deferred.succeed(firstGate, undefined)
while (requests.length < 2) yield* Effect.yieldNow
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Steer first queued activity" }) })
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Also steer first queued activity" }) })
yield* Deferred.succeed(secondGate, undefined)
yield* Fiber.join(first)
streamGate = undefined
expect(requests).toHaveLength(4)
expect(userTexts(requests[0]!)).toEqual(["Start working"])
expect(userTexts(requests[1]!)).toEqual(["Start working", "Queue first"])
expect(userTexts(requests[2]!)).toEqual([
"Start working",
"Queue first",
"Steer first queued activity",
"Also steer first queued activity",
])
expect(userTexts(requests[3]!)).toEqual([
"Start working",
"Queue first",
"Steer first queued activity",
"Also steer first queued activity",
"Queue second",
])
}),
)
it.effect("coalesces multiple active steering prompts into one continuation turn", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First steer" }) })
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second steer" }) })
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(first)
streamGate = undefined
streamStarted = undefined
yield* Effect.yieldNow
expect(requests).toHaveLength(2)
expect(userTexts(requests[1]!)).toEqual(["Start working", "First steer", "Second steer"])
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
yield* Effect.yieldNow
expect(requests).toHaveLength(2)
}),
)
it.effect("runs steering input accepted while the active provider turn fails", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
requests.length = 0
responses = undefined
response = []
streamFailure = providerUnavailable()
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover with this" }) })
yield* Deferred.succeed(streamGate, undefined)
expect(yield* Fiber.join(first).pipe(Effect.flip)).toBe(streamFailure)
streamFailure = undefined
streamGate = undefined
streamStarted = undefined
yield* Effect.yieldNow
expect(requests).toHaveLength(2)
expect(userTexts(requests[1]!)).toEqual(["Start working", "Recover with this"])
}),
)
it.effect("durably fails local tools left running by a prior process before continuing", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover interrupted tool" }), resume: false })
yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID, Number.MAX_SAFE_INTEGER)
const assistantMessageID = SessionMessage.ID.create()
yield* events.publish(SessionEvent.Step.Started, {
sessionID,
assistantMessageID,
timestamp: yield* DateTime.now,
agent: "build",
model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") },
})
yield* events.publish(SessionEvent.Tool.Input.Started, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-interrupted",
name: "echo",
})
yield* events.publish(SessionEvent.Tool.Input.Ended, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-interrupted",
text: '{"text":"stale"}',
})
yield* events.publish(SessionEvent.Tool.Called, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-interrupted",
tool: "echo",
input: { text: "stale" },
provider: { executed: false },
})
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Recover interrupted tool" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-interrupted",
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
},
],
},
])
}),
)
it.effect("durably fails hosted tools left running by a prior process before continuing inline", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Recover interrupted hosted tool" }),
resume: false,
})
yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID, Number.MAX_SAFE_INTEGER)
const assistantMessageID = SessionMessage.ID.create()
yield* events.publish(SessionEvent.Step.Started, {
sessionID,
assistantMessageID,
timestamp: yield* DateTime.now,
agent: "build",
model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") },
})
yield* events.publish(SessionEvent.Tool.Input.Started, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-hosted-interrupted",
name: "web_search",
})
yield* events.publish(SessionEvent.Tool.Input.Ended, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-hosted-interrupted",
text: '{"query":"stale"}',
})
yield* events.publish(SessionEvent.Tool.Called, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-hosted-interrupted",
tool: "web_search",
input: { query: "stale" },
provider: { executed: true, metadata: { openai: { itemId: "call-hosted-interrupted" } } },
})
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant"])
expect(requests[0]?.messages[1]?.content).toMatchObject([
{
type: "tool-call",
id: "call-hosted-interrupted",
providerExecuted: true,
providerMetadata: { openai: { itemId: "call-hosted-interrupted" } },
},
{ type: "tool-result", id: "call-hosted-interrupted", providerExecuted: true, result: { type: "error" } },
])
}),
)
it.effect("durably fails pending tool input left by a prior process before continuing", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Recover interrupted tool input" }),
resume: false,
})
yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID, Number.MAX_SAFE_INTEGER)
const assistantMessageID = SessionMessage.ID.create()
yield* events.publish(SessionEvent.Step.Started, {
sessionID,
assistantMessageID,
timestamp: yield* DateTime.now,
agent: "build",
model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") },
})
yield* events.publish(SessionEvent.Tool.Input.Started, {
sessionID,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: "call-pending-interrupted",
name: "echo",
})
requests.length = 0
response = []
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Recover interrupted tool input" },
{ type: "assistant", content: [{ type: "tool", id: "call-pending-interrupted", state: { status: "error" } }] },
])
}),
)
it.effect("starts the first queued activity when woken while idle", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Wait for fresh activity" }),
delivery: "queue",
resume: false,
})
requests.length = 0
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
yield* Effect.yieldNow
expect(requests).toHaveLength(1)
expect(userTexts(requests[0]!)).toEqual(["Wait for fresh activity"])
}),
)
it.effect("does not spend one activity step budget across queued activities", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const queued = Array.from({ length: 26 }, (_, index) => `Queued activity ${index + 1}`)
for (const text of queued) {
yield* session.prompt({ sessionID, prompt: new Prompt({ text }), delivery: "queue", resume: false })
}
requests.length = 0
responses = queued.map(() => [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
])
yield* session.resume(sessionID)
expect(requests).toHaveLength(queued.length)
expect(userTexts(requests.at(-1)!)).toEqual(queued)
}),
)
it.effect("retries inbox input after prompt projection rolls back", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const defect = new Error("fail after prompt promotion")
let fail = true
yield* events.project(SessionEvent.PromptLifecycle.Promoted, () => (fail ? Effect.die(defect) : Effect.void))
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover promoted input" }), resume: false })
expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect)
fail = false
requests.length = 0
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
]
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
while (requests.length === 0) yield* Effect.yieldNow
expect(userTexts(requests[0]!)).toEqual(["Recover promoted input"])
}),
)
it.effect("does not strand a committed promotion when a post-commit listener defects", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* events.listen((event) =>
event.type === SessionEvent.PromptLifecycle.Promoted.type
? Effect.die("fail after prompt promotion commits")
: Effect.void,
)
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Run committed promotion" }),
resume: false,
})
requests.length = 0
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(userTexts(requests[0]!)).toEqual(["Run committed promotion"])
}),
)
it.effect("runs different sessions concurrently", () =>
Effect.gen(function* () {
yield* setup
yield* insertSession(otherSessionID)
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run first" }), resume: false })
yield* session.prompt({ sessionID: otherSessionID, prompt: new Prompt({ text: "Run second" }), resume: false })
requests.length = 0
responses = undefined
response = []
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
const second = yield* session.resume(otherSessionID).pipe(Effect.forkChild)
yield* Effect.yieldNow
expect(requests).toHaveLength(2)
expect(requests.map((request) => request.providerOptions?.openai?.promptCacheKey)).toEqual([
sessionID,
otherSessionID,
])
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(first)
yield* Fiber.join(second)
streamGate = undefined
streamStarted = undefined
}),
)
it.effect("fans out one failed run and allows a later retry", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Retry after failure" }), resume: false })
requests.length = 0
responses = undefined
response = []
streamFailure = providerUnavailable()
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
const second = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Effect.yieldNow
expect(requests).toHaveLength(1)
yield* Deferred.succeed(streamGate, undefined)
const [firstExit, secondExit] = yield* Effect.all([Fiber.await(first), Fiber.await(second)])
expect(secondExit).toEqual(firstExit)
streamFailure = undefined
streamGate = undefined
streamStarted = undefined
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
}),
)
it.effect("durably settles local tool failures before continuing", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call missing" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-missing", name: "missing", input: {} }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id: "text-after-error" }),
LLMEvent.textDelta({ id: "text-after-error", text: "Recovered" }),
LLMEvent.textEnd({ id: "text-after-error" }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = undefined
streamStarted = undefined
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Call missing" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-missing",
state: { status: "error", error: { message: "Unknown tool: missing" } },
},
],
},
{ type: "assistant", finish: "stop", content: [{ type: "text", id: "text-after-error", text: "Recovered" }] },
])
}),
)
it.effect("durably settles unexpected local tool defects before continuing", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call defect" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-defect", name: "defect", input: {} }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[],
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(2)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Call defect" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-defect",
state: { status: "error", error: { message: "unexpected tool defect" } },
},
],
},
])
}),
)
it.effect("interrupts runner continuation when a question is dismissed", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const registry = yield* ToolRegistry.Service
const questions = yield* QuestionV2.Service
const transform = yield* registry.transform()
yield* transform((editor) =>
editor.set("question", {
tool: Tool.make({
description: "Ask the user",
parameters: Schema.Struct({}),
success: Schema.Struct({}),
}),
execute: ({ sessionID }) => questions.ask({ sessionID, questions: [] }).pipe(Effect.as({}), Effect.orDie),
}),
)
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Ask then stop" }), resume: false })
requests.length = 0
responses = [
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-question", name: "question", input: {} }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
],
[],
]
const run = yield* session.resume(sessionID).pipe(Effect.exit, Effect.forkChild)
let pending = yield* questions.list()
while (pending.length === 0) {
yield* Effect.yieldNow
pending = yield* questions.list()
}
yield* questions.reject(pending[0]!.id)
const exit = yield* Fiber.join(run)
expect(exit._tag).toBe("Failure")
if (exit._tag === "Failure") expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
expect(requests).toHaveLength(1)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Ask then stop" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-question",
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
},
],
},
])
}),
)
it.effect("awaits started local tools before surfacing provider stream failure", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Settle before failing" }), resume: false })
const failure = providerUnavailable()
toolExecutionGate = yield* Deferred.make<void>()
responseStream = Stream.concat(
Stream.fromIterable([
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-before-failure", name: "echo", input: { text: "settle" } }),
]),
Stream.fail(failure),
)
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (executions.length === 0) yield* Effect.yieldNow
yield* Effect.yieldNow
yield* Deferred.succeed(toolExecutionGate, undefined)
expect(yield* Fiber.join(run).pipe(Effect.flip)).toBe(failure)
toolExecutionGate = undefined
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Settle before failing" },
{
type: "assistant",
content: [
{ type: "tool", id: "call-before-failure", state: { status: "completed", structured: { text: "settle" } } },
],
},
])
}),
)
it.effect("durably fails blocked local tools when a provider turn is interrupted", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt blocked tool" }), resume: false })
executions.length = 0
toolExecutionGate = yield* Deferred.make<void>()
responseStream = Stream.concat(
Stream.fromIterable([
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-before-interrupt", name: "echo", input: { text: "blocked" } }),
]),
Stream.never,
)
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (executions.length === 0) yield* Effect.yieldNow
yield* session.interrupt(sessionID)
toolExecutionGate = undefined
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
yield* session.interrupt(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Interrupt blocked tool" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-before-interrupt",
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
},
],
},
])
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Interrupt blocked tool" },
{ type: "assistant", content: [{ type: "tool", id: "call-before-interrupt", state: { status: "error" } }] },
])
requests.length = 0
responseStream = undefined
response = []
yield* session.resume(sessionID)
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
}),
)
it.effect("interrupts a blocked provider turn without local tool activity", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt provider" }), resume: false })
requests.length = 0
response = []
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.interrupt(sessionID)
const exit = yield* Fiber.await(run)
streamGate = undefined
streamStarted = undefined
expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue()
expect(requests).toHaveLength(1)
yield* session.interrupt(sessionID)
}),
)
it.effect("durably fails blocked local tools when interrupted while awaiting settlement", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt tool settlement" }), resume: false })
executions.length = 0
toolExecutionGate = yield* Deferred.make<void>()
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-await-interrupt", name: "echo", input: { text: "blocked" } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
]
const runner = yield* SessionRunner.Service
const run = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild)
while (executions.length === 0) yield* Effect.yieldNow
yield* Fiber.interrupt(run)
toolExecutionGate = undefined
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Interrupt tool settlement" },
{
type: "assistant",
content: [
{
type: "tool",
id: "call-await-interrupt",
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
},
],
},
])
}),
)
it.effect("fails after the bounded number of local tool continuation steps", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Loop forever" }), resume: false })
requests.length = 0
authorizations.length = 0
executions.length = 0
streamGate = undefined
streamStarted = undefined
responses = Array.from({ length: 25 }, (_, index) => [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: `call-echo-${index}`, name: "echo", input: { text: `${index}` } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
])
const failure = yield* session.resume(sessionID).pipe(Effect.flip)
expect(failure).toMatchObject({ _tag: "SessionRunner.StepLimitExceededError", sessionID, limit: 25 })
expect(requests).toHaveLength(25)
expect(executions).toHaveLength(25)
}),
)
it.effect("does not restart a capped tool loop for a coalesced stale wake", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const coordinator = yield* SessionRunCoordinator.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Loop forever" }), resume: false })
requests.length = 0
responses = Array.from({ length: 25 }, (_, index) => [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: `call-capped-${index}`, name: "echo", input: { text: `${index}` } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
])
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* coordinator.wake(sessionID)
yield* Deferred.succeed(streamGate, undefined)
expect(yield* Fiber.join(run).pipe(Effect.flip)).toMatchObject({ _tag: "SessionRunner.StepLimitExceededError" })
streamGate = undefined
streamStarted = undefined
yield* Effect.yieldNow
expect(requests).toHaveLength(25)
}),
)
it.effect("accepts a terminal response on the final bounded provider turn", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Finish at the limit" }), resume: false })
requests.length = 0
responses = [
...Array.from({ length: 24 }, (_, index) => [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: `call-terminal-${index}`, name: "echo", input: { text: `${index}` } }),
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
LLMEvent.finish({ reason: "tool-calls" }),
]),
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(25)
}),
)
it.effect("projects provider errors as terminal assistant step failures", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail durably" }), resume: false })
requests.length = 0
responses = undefined
streamGate = undefined
streamStarted = undefined
response = [LLMEvent.stepStart({ index: 0 }), LLMEvent.providerError({ message: "Provider unavailable" })]
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail durably" },
{ type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } },
])
}),
)
it.effect("projects provider errors emitted before assistant step start", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail before step" }), resume: false })
requests.length = 0
response = [LLMEvent.providerError({ message: "Provider unavailable" })]
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail before step" },
{ type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } },
])
}),
)
it.effect("does not recover context overflow after durable assistant output", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail after output" }), resume: false })
requests.length = 0
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id: "text-partial" }),
LLMEvent.textDelta({ id: "text-partial", text: "Partial" }),
LLMEvent.textEnd({ id: "text-partial" }),
LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }),
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail after output" },
{
type: "assistant",
finish: "error",
error: { message: "prompt too long" },
content: [{ type: "text", text: "Partial" }],
},
])
}),
)
it.effect("projects raw provider stream failures as terminal assistant step failures", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail raw stream durably" }), resume: false })
const failure = providerUnavailable()
responseStream = Stream.fail(failure)
expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure)
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail raw stream durably" },
{ type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } },
])
}),
)
it.effect("does not continue automatically after a provider error follows a local tool call", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Do not continue failed provider" }),
resume: false,
})
requests.length = 0
const executionCount = executions.length
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({ id: "call-before-provider-error", name: "echo", input: { text: "settled" } }),
LLMEvent.providerError({ message: "Provider unavailable" }),
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(executions.slice(executionCount)).toEqual(["settled"])
}),
)
it.effect("durably fails a hosted tool when its provider errors before returning a result", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool durably" }), resume: false })
requests.length = 0
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({
id: "call-hosted-provider-error",
name: "web_search",
input: { query: "effect" },
providerExecuted: true,
}),
LLMEvent.providerError({ message: "Provider unavailable" }),
]
yield* session.resume(sessionID)
expect(requests).toHaveLength(1)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail hosted tool durably" },
{
type: "assistant",
content: [{ type: "tool", id: "call-hosted-provider-error", state: { status: "error" } }],
},
])
}),
)
it.effect("durably fails a hosted tool left unresolved at normal provider EOF", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool at EOF" }), resume: false })
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({
id: "call-hosted-eof",
name: "web_search",
input: { query: "effect" },
providerExecuted: true,
}),
]
yield* session.resume(sessionID)
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail hosted tool at EOF" },
{ type: "assistant", content: [{ type: "tool", id: "call-hosted-eof", state: { status: "error" } }] },
])
}),
)
it.effect("durably fails a hosted tool left unresolved by a raw provider stream failure", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Fail hosted tool on raw failure" }),
resume: false,
})
const failure = providerUnavailable()
responseStream = Stream.concat(
Stream.fromIterable([
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolCall({
id: "call-hosted-raw-failure",
name: "web_search",
input: { query: "effect" },
providerExecuted: true,
}),
]),
Stream.fail(failure),
)
expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure)
yield* replaySessionProjection(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Fail hosted tool on raw failure" },
{
type: "assistant",
finish: "error",
error: { type: "unknown", message: "Provider unavailable" },
content: [{ type: "tool", id: "call-hosted-raw-failure", state: { status: "error" } }],
},
])
}),
)
it.effect("keeps interleaved assistant text blocks separate", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Two blocks" }), resume: false })
responses = undefined
streamGate = undefined
streamStarted = undefined
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.textStart({ id: "text-1" }),
LLMEvent.textStart({ id: "text-2" }),
LLMEvent.textDelta({ id: "text-1", text: "First" }),
LLMEvent.textDelta({ id: "text-2", text: "Second" }),
LLMEvent.textEnd({ id: "text-1" }),
LLMEvent.textEnd({ id: "text-2" }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
]
yield* session.resume(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Two blocks" },
{
type: "assistant",
content: [
{ type: "text", id: "text-1", text: "First" },
{ type: "text", id: "text-2", text: "Second" },
],
},
])
}),
)
for (const kind of fragmentKinds) {
it.effect(`broadcasts provider ${kind} deltas without storing projection rewrites`, () =>
verifyEphemeralDeltas(kind),
)
it.effect(`durably closes partial ${kind} when the provider stream fails`, () => verifyPartialFlushOnFailure(kind))
it.effect(`durably closes partial ${kind} when the provider stream is interrupted`, () =>
verifyPartialFlushOnInterruption(kind),
)
}
it.effect("rejects duplicate streamed text starts", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
responses = undefined
streamGate = undefined
streamStarted = undefined
response = [LLMEvent.textStart({ id: "text-1" }), LLMEvent.textStart({ id: "text-1" })]
expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(
"Duplicate text start: text-1",
)
}),
)
it.effect("transitions streamed raw tool input to parsed called input", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call provider tool" }), resume: false })
responses = undefined
streamGate = undefined
streamStarted = undefined
response = [
LLMEvent.stepStart({ index: 0 }),
LLMEvent.toolInputStart({ id: "call-parsed", name: "web_search" }),
LLMEvent.toolInputDelta({ id: "call-parsed", name: "web_search", text: '{"query":"hello"}' }),
LLMEvent.toolInputEnd({ id: "call-parsed", name: "web_search" }),
LLMEvent.toolCall({ id: "call-parsed", name: "web_search", input: { query: "hello" }, providerExecuted: true }),
]
yield* session.resume(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Call provider tool" },
{
type: "assistant",
content: [{ type: "tool", id: "call-parsed", state: { status: "error", input: { query: "hello" } } }],
},
])
}),
)
it.effect("rejects malformed streamed tool input ordering", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
responses = undefined
streamGate = undefined
streamStarted = undefined
response = [LLMEvent.toolInputDelta({ id: "call-1", name: "read", text: "{}" })]
expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(
"Tool input delta before start: call-1",
)
}),
)
})