import { describe, expect } from "bun:test" import { LLMClient, LLMError, LLMEvent, Model, TransportReason, InvalidRequestReason, type LLMClientShape, type LLMRequest, } from "@opencode-ai/llm" import * as OpenAIChat from "@opencode-ai/llm/protocols/openai-chat" import { Database } from "@opencode-ai/core/database/database" import { EventV2 } from "@opencode-ai/core/event" import { PermissionV2 } from "@opencode-ai/core/permission" import { EventTable } from "@opencode-ai/core/event/sql" import { Project } from "@opencode-ai/core/project" import { ProjectTable } from "@opencode-ai/core/project/sql" import { QuestionV2 } from "@opencode-ai/core/question" import { AbsolutePath } from "@opencode-ai/core/schema" import { SessionV2 } from "@opencode-ai/core/session" import { ContextSnapshotDecodeError } from "@opencode-ai/core/session/error" import { SessionEvent } from "@opencode-ai/core/session/event" import { SessionInput } from "@opencode-ai/core/session/input" import { SessionMessage } from "@opencode-ai/core/session/message" import { Prompt } from "@opencode-ai/core/session/prompt" import { SessionProjector } from "@opencode-ai/core/session/projector" import { SessionExecution } from "@opencode-ai/core/session/execution" import { SessionContextEpoch } from "@opencode-ai/core/session/context-epoch" import { SessionRunCoordinator } from "@opencode-ai/core/session/run-coordinator" import { SessionRunner } from "@opencode-ai/core/session/runner" import * as SessionRunnerLLM from "@opencode-ai/core/session/runner/llm" import { SessionRunnerModel } from "@opencode-ai/core/session/runner/model" import { ToolRegistry } from "@opencode-ai/core/tool/registry" import { ToolOutputStore } from "@opencode-ai/core/tool-output-store" import { ApplicationTools } from "@opencode-ai/core/tool/application-tools" import { AgentV2 } from "@opencode-ai/core/agent" import { Config } from "@opencode-ai/core/config" import { ConfigCompaction } from "@opencode-ai/core/config/compaction" import { Tool } from "@opencode-ai/core/tool/tool" import { SessionContextEpochTable, SessionInputTable, SessionMessageTable, SessionTable, } from "@opencode-ai/core/session/sql" import { SessionStore } from "@opencode-ai/core/session/store" import { SystemContext } from "@opencode-ai/core/system-context" import { SystemContextRegistry } from "@opencode-ai/core/system-context/registry" import { SkillGuidance } from "@opencode-ai/core/skill/guidance" import { ReferenceGuidance } from "@opencode-ai/core/reference/guidance" import { ModelV2 } from "@opencode-ai/core/model" import { Location } from "@opencode-ai/core/location" import { ProviderV2 } from "@opencode-ai/core/provider" import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect" import { asc, eq } from "drizzle-orm" import { testEffect } from "./lib/effect" const questions = QuestionV2.layer.pipe(Layer.provide(EventV2.defaultLayer)) const requests: LLMRequest[] = [] let response: LLMEvent[] = [] let responses: LLMEvent[][] | undefined let responseStream: Stream.Stream | undefined let streamGate: Deferred.Deferred | undefined let streamStarted: Deferred.Deferred | undefined let streamFailure: LLMError | undefined let toolExecutionGate: Deferred.Deferred | undefined let toolExecutionsStarted: Deferred.Deferred | undefined let toolExecutionsReady = 5 let activeToolExecutions = 0 let maxActiveToolExecutions = 0 const client = Layer.succeed( LLMClient.Service, LLMClient.Service.of({ prepare: () => Effect.die("unused"), stream: ((request: LLMRequest) => { requests.push(request) if (responseStream) { const stream = responseStream responseStream = undefined return stream } const events = streamFailure ? Stream.fail(streamFailure) : Stream.fromIterable(responses === undefined ? response : (responses.shift() ?? [])) if (!streamGate) return events return Stream.unwrap( (streamStarted ? Deferred.succeed(streamStarted, undefined) : Effect.void).pipe( Effect.andThen(Deferred.await(streamGate)), Effect.as(events), ), ) }) as unknown as LLMClientShape["stream"], generate: () => Effect.die("unused"), }), ) const model = Model.make({ id: "fake-model", provider: "fake", route: OpenAIChat.route }) const replacementModel = Model.make({ id: "replacement", provider: "fake", route: OpenAIChat.route }) const compactModel = Model.make({ id: "compact", provider: "fake", route: OpenAIChat.route.with({ limits: { context: 4_000, output: 50 } }), }) const recoveryModel = Model.make({ id: "recovery", provider: "fake", route: OpenAIChat.route.with({ limits: { context: 20_000, output: 1_000 } }), }) const authorizations: Tool.Context[] = [] const executions: string[] = [] const permission = Layer.succeed( PermissionV2.Service, PermissionV2.Service.of({ assert: () => Effect.die("unused"), ask: () => Effect.die("unused"), reply: () => Effect.die("unused"), get: () => Effect.die("unused"), forSession: () => Effect.die("unused"), list: () => Effect.die("unused"), }), ) const applications = ApplicationTools.layer const registry = ToolRegistry.layer.pipe( Layer.provide(permission), Layer.provide(applications), Layer.provide(ToolOutputStore.defaultLayer), ) const agents = AgentV2.layer const echo = Layer.effectDiscard( ToolRegistry.Service.use((registry) => registry.register({ echo: Tool.make({ description: "Echo text", input: Schema.Struct({ text: Schema.String }), output: Schema.Struct({ text: Schema.String }), toModelOutput: ({ output }) => [{ type: "text", text: output.text }], execute: ({ text }, context) => Effect.gen(function* () { authorizations.push(context) executions.push(text) activeToolExecutions++ maxActiveToolExecutions = Math.max(maxActiveToolExecutions, activeToolExecutions) if (activeToolExecutions === toolExecutionsReady && toolExecutionsStarted) { yield* Deferred.succeed(toolExecutionsStarted, undefined) } if (toolExecutionGate) yield* Deferred.await(toolExecutionGate) return { text } }).pipe(Effect.ensuring(Effect.sync(() => activeToolExecutions--))), }), defect: Tool.make({ description: "Fail unexpectedly", input: Schema.Struct({}), output: Schema.Struct({}), execute: () => Effect.die("unexpected tool defect"), }), }), ), ).pipe(Layer.provide(registry)) let modelResolveHook = Effect.void let currentModel = model const models = SessionRunnerModel.layerWith((session) => modelResolveHook.pipe(Effect.as(session.model?.id === "replacement" ? replacementModel : currentModel)), ) const systemContextKey = SystemContext.Key.make("test/context") let systemBaseline = "Initial context" let systemRemoved = false let systemUnavailable = false let systemLoadHook = Effect.void const skillBaselines = new Map() const systemContext = Layer.effectDiscard( SystemContextRegistry.Service.pipe( Effect.flatMap((registry) => registry.register({ key: systemContextKey, load: Effect.sync(() => SystemContext.combine( systemRemoved ? [] : [ SystemContext.make({ key: systemContextKey, codec: Schema.toCodecJson(Schema.String), load: systemLoadHook.pipe( Effect.andThen( Effect.sync(() => (systemUnavailable ? SystemContext.unavailable : systemBaseline)), ), ), baseline: String, update: (_previous, current) => current, removed: () => "System context source removed: test/context", }), ], ), ), }), ), ), ).pipe(Layer.provideMerge(SystemContextRegistry.layer)) const location = Location.layer({ directory: AbsolutePath.make("/project") }).pipe(Layer.provide(Project.defaultLayer)) const skillGuidance = Layer.mock(SkillGuidance.Service, { load: (agent) => Effect.succeed( skillBaselines.has(agent.id) ? SystemContext.make({ key: SystemContext.Key.make("test/skill-guidance"), codec: Schema.toCodecJson(Schema.String), load: Effect.succeed(skillBaselines.get(agent.id)!), baseline: String, update: (_previous, current) => current, removed: () => "Skill guidance removed", }) : SystemContext.empty, ), }) const referenceGuidance = Layer.mock(ReferenceGuidance.Service, { load: () => Effect.succeed(SystemContext.empty) }) const config = Layer.succeed( Config.Service, Config.Service.of({ entries: () => Effect.succeed([ new Config.Document({ type: "document", info: new Config.Info({ compaction: new ConfigCompaction.Info({ buffer: 3_000, keep: new ConfigCompaction.Keep({ tokens: 1_000 }), }), }), }), ]), }), ) const runner = SessionRunnerLLM.layer.pipe( Layer.provide(Database.defaultLayer), Layer.provide(SessionStore.defaultLayer), Layer.provide(EventV2.defaultLayer), Layer.provide(client), Layer.provide(registry), Layer.provide(models), Layer.provide(systemContext), Layer.provide(location), Layer.provide(agents), Layer.provide(skillGuidance), Layer.provide(referenceGuidance), Layer.provide(config), ) const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner)) const execution = Layer.effect( SessionExecution.Service, SessionRunCoordinator.Service.pipe( Effect.map((coordinator) => SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt, }), ), ), ).pipe(Layer.provide(coordinator)) const sessions = SessionV2.layer.pipe( Layer.provide(EventV2.defaultLayer), Layer.provide(Database.defaultLayer), Layer.provide(SessionStore.defaultLayer), Layer.provide(Project.defaultLayer), Layer.provide(execution), ) const it = testEffect( Layer.mergeAll( Database.defaultLayer, EventV2.defaultLayer, questions, SessionProjector.defaultLayer, SessionStore.defaultLayer, client, permission, applications, agents, registry, echo, models, systemContext, location, skillGuidance, config, runner, coordinator, execution, sessions, ), ) const sessionID = SessionV2.ID.make("ses_runner_test") const otherSessionID = SessionV2.ID.make("ses_runner_other") const insertSession = (id: SessionV2.ID) => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(SessionTable) .values({ id, project_id: Project.ID.global, slug: id, directory: "/project", title: "test", version: "test", }) .onConflictDoNothing() .run() .pipe(Effect.orDie) }) const setup = Effect.gen(function* () { const { db } = yield* Database.Service response = [] systemBaseline = "Initial context" systemRemoved = false systemUnavailable = false systemLoadHook = Effect.void modelResolveHook = Effect.void currentModel = model skillBaselines.clear() responses = undefined streamFailure = undefined responseStream = undefined streamGate = undefined streamStarted = undefined toolExecutionGate = undefined toolExecutionsStarted = undefined toolExecutionsReady = 5 activeToolExecutions = 0 maxActiveToolExecutions = 0 yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .onConflictDoNothing() .run() .pipe(Effect.orDie) yield* insertSession(sessionID) }) const providerUnavailable = () => new LLMError({ module: "test", method: "stream", reason: new TransportReason({ message: "Provider unavailable" }), }) const setupOverflowRecovery = Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service response = fragmentFixture("text", "text-earlier", ["Earlier answer"]).completeEvents yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Earlier question ".repeat(700) }), resume: false, }) yield* session.resume(sessionID) currentModel = recoveryModel requests.length = 0 return session }) const userTexts = (request: LLMRequest) => request.messages.flatMap((message) => message.role === "user" ? message.content.flatMap((content) => (content.type === "text" ? [content.text] : [])) : [], ) const replaySessionProjection = (id: SessionV2.ID) => Effect.gen(function* () { const { db } = yield* Database.Service const events = yield* EventV2.Service const recorded = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, id)) .orderBy(asc(EventTable.seq)) .all() .pipe(Effect.orDie) yield* events.remove(id) yield* db.delete(SessionInputTable).where(eq(SessionInputTable.session_id, id)).run().pipe(Effect.orDie) yield* db.delete(SessionMessageTable).where(eq(SessionMessageTable.session_id, id)).run().pipe(Effect.orDie) yield* events.replayAll( recorded.map((event) => ({ id: event.id, aggregateID: event.aggregate_id, seq: event.seq, type: event.type, data: event.data, })), ) }) type FragmentKind = "text" | "reasoning" | "tool input" type FragmentFixture = { readonly delta: EventV2.Definition readonly completeEvents: LLMEvent[] readonly partialEvents: LLMEvent[] readonly expectedAssistant: unknown readonly expectedContent: unknown } const fragmentKinds: readonly FragmentKind[] = ["text", "reasoning", "tool input"] const fragmentID = (kind: FragmentKind, suffix: string) => `${kind === "tool input" ? "call" : kind}-${suffix}` const fragmentFixture = (kind: FragmentKind, id: string, chunks: readonly string[]): FragmentFixture => { const text = chunks.join("") switch (kind) { case "text": { const partialEvents = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.textStart({ id }), ...chunks.map((text) => LLMEvent.textDelta({ id, text })), ] const expectedContent = { type: "text", id, text } return { delta: SessionEvent.Text.Delta, partialEvents, completeEvents: [ ...partialEvents, LLMEvent.textEnd({ id }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], expectedAssistant: { type: "assistant", finish: "stop", content: [expectedContent] }, expectedContent, } } case "reasoning": { const partialEvents = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.reasoningStart({ id }), ...chunks.map((text) => LLMEvent.reasoningDelta({ id, text })), ] const expectedContent = { type: "reasoning", id, text } return { delta: SessionEvent.Reasoning.Delta, partialEvents, completeEvents: [ ...partialEvents, LLMEvent.reasoningEnd({ id }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], expectedAssistant: { type: "assistant", finish: "stop", content: [expectedContent] }, expectedContent, } } case "tool input": { const partialEvents = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolInputStart({ id, name: "echo" }), ...chunks.map((text) => LLMEvent.toolInputDelta({ id, name: "echo", text })), ] const expectedContent = { type: "tool", id, state: { status: "pending", input: text } } return { delta: SessionEvent.Tool.Input.Delta, partialEvents, completeEvents: [...partialEvents, LLMEvent.toolInputEnd({ id, name: "echo" })], expectedAssistant: { type: "assistant", content: [expectedContent] }, expectedContent, } } } } const verifyEphemeralDeltas = (kind: FragmentKind) => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const prompt = `Stream ${kind}` const chunks = Array.from({ length: 32 }, (_, index) => `${index},`) const fixture = fragmentFixture(kind, fragmentID(kind, "many"), chunks) const expectedContext = [{ type: "user", text: prompt }, fixture.expectedAssistant] yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false }) const events = yield* EventV2.Service const live = yield* events.subscribe(fixture.delta).pipe(Stream.take(32), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow response = fixture.completeEvents yield* session.resume(sessionID) const { db } = yield* Database.Service const deltas = yield* db .select({ type: EventTable.type }) .from(EventTable) .where(eq(EventTable.type, EventV2.versionedType(fixture.delta.type, 1))) .all() .pipe(Effect.orDie) expect(Array.from(yield* Fiber.join(live))).toHaveLength(32) expect(deltas).toHaveLength(0) expect(yield* session.context(sessionID)).toMatchObject(expectedContext) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject(expectedContext) }) const verifyPartialFlushOnFailure = (kind: FragmentKind) => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const prompt = `Fail after ${kind}` const fixture = fragmentFixture(kind, fragmentID(kind, "partial"), ["Partial"]) const failure = providerUnavailable() yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false }) responseStream = Stream.concat(Stream.fromIterable(fixture.partialEvents), Stream.fail(failure)) expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: prompt }, { type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" }, content: [fixture.expectedContent], }, ]) }) const verifyPartialFlushOnInterruption = (kind: FragmentKind) => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const prompt = `Interrupt after ${kind}` const fixture = fragmentFixture(kind, fragmentID(kind, "interrupted"), ["Partial"]) const streamed = yield* Deferred.make() yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false }) responseStream = Stream.concat( Stream.fromIterable(fixture.partialEvents), Stream.fromEffect(Deferred.succeed(streamed, undefined)).pipe(Stream.flatMap(() => Stream.never)), ) const runner = yield* SessionRunner.Service const fiber = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild) yield* Deferred.await(streamed) yield* Fiber.interrupt(fiber) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: prompt }, { type: "assistant", finish: "error", error: { type: "unknown", message: "Provider turn interrupted" }, content: [ kind === "tool input" ? { type: "tool", id: fragmentID(kind, "interrupted"), state: { status: "error" } } : fixture.expectedContent, ], }, ]) }) describe("SessionRunnerLLM", () => { it.effect("advertises and executes a globally attached application tool", () => Effect.gen(function* () { yield* setup const applicationTools = yield* ApplicationTools.Service const session = yield* SessionV2.Service const contexts: Tool.Context[] = [] yield* applicationTools.register({ application_context: Tool.make({ description: "Read application context", input: Schema.Struct({ query: Schema.String }), output: Schema.Struct({ answer: Schema.String }), execute: ({ query }, context) => Effect.sync(() => { contexts.push(context) return { answer: query.toUpperCase() } }), }), }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Use application context" }), resume: false }) responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-application", name: "application_context", input: { query: "hello" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [], ] yield* session.resume(sessionID) expect(requests[0]?.tools.map((tool) => tool.name)).toContain("application_context") expect(contexts).toEqual([ { sessionID, agent: AgentV2.ID.make("build"), assistantMessageID: expect.stringMatching(/^msg_/), toolCallID: "call-application", }, ]) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Use application context" }, { type: "assistant", content: [ { type: "tool", id: "call-application", state: { status: "completed", structured: { answer: "HELLO" } }, }, ], }, ]) }), ) it.effect("starts a real runner turn after default prompt recording", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service requests.length = 0 responses = undefined streamGate = undefined streamStarted = undefined response = [] const message = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run automatically" }) }) expect(requests).toHaveLength(1) expect(yield* session.messages({ sessionID })).toMatchObject([ { id: message.id, type: "user", text: "Run automatically" }, ]) }), ) it.effect("streams one request with registry definitions from chronological V2 user history", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) requests.length = 0 responses = undefined streamGate = undefined streamStarted = undefined response = [] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.model).toBe(model) expect(requests[0]?.tools.map((tool) => tool.name)).toEqual(["echo", "defect"]) expect(requests[0]?.messages.map((message) => ({ role: message.role, content: message.content }))).toEqual([ { role: "user", content: [{ type: "text", text: "First" }] }, { role: "user", content: [{ type: "text", text: "Second" }] }, ]) expect(yield* session.messages({ sessionID })).toHaveLength(2) }), ) it.effect("retries the first provider turn after system context becomes available", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const { db } = yield* Database.Service const messageID = SessionMessage.ID.create() systemUnavailable = true yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 const exit = yield* session.resume(sessionID).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(SystemContext.InitializationBlocked) expect(requests).toHaveLength(0) expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) expect( yield* db .select() .from(SessionContextEpochTable) .where(eq(SessionContextEpochTable.session_id, sessionID)) .get(), ).toBeUndefined() systemUnavailable = false yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "First" }) }) yield* (yield* SessionRunCoordinator.Service).awaitIdle(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user"]) }), ) it.effect("interrupts a source Location runner after a Session moves", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) yield* events.publish(SessionEvent.Moved, { sessionID, timestamp: DateTime.makeUnsafe(1), location: Location.Ref.make({ directory: AbsolutePath.make("/moved") }), }) expect( yield* db .select() .from(SessionContextEpochTable) .where(eq(SessionContextEpochTable.session_id, sessionID)) .get(), ).toBeUndefined() yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) const exit = yield* session.resume(sessionID).pipe(Effect.exit) expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBe(true) expect(requests).toHaveLength(1) expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) }), ) it.effect("fails gracefully when a stored context snapshot cannot be decoded", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) response = [] yield* session.resume(sessionID) yield* db .update(SessionContextEpochTable) .set({ snapshot: { invalid: { value: "bad" } } }) .where(eq(SessionContextEpochTable.session_id, sessionID)) .run() .pipe(Effect.orDie) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) requests.length = 0 const exit = yield* session.resume(sessionID).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(ContextSnapshotDecodeError) expect(requests).toHaveLength(0) }), ) it.effect("does not create a source Location epoch after a concurrent Session move", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service let moved = false systemLoadHook = Effect.suspend(() => { if (moved) return Effect.void moved = true return events .publish(SessionEvent.Moved, { sessionID, timestamp: DateTime.makeUnsafe(1), location: Location.Ref.make({ directory: AbsolutePath.make("/moved") }), }) .pipe(Effect.asVoid) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) expect(Exit.isFailure(yield* session.resume(sessionID).pipe(Effect.exit))).toBe(true) expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) expect( yield* db .select() .from(SessionContextEpochTable) .where(eq(SessionContextEpochTable.session_id, sessionID)) .get(), ).toBeUndefined() expect((yield* session.get(sessionID)).location.directory).toBe(AbsolutePath.make("/moved")) }), ) it.effect("reuses one durable baseline after the context producer changes", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) systemBaseline = "Changed context" yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Initial context"], ]) expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"]) expect(requests[1]?.messages.at(-1)?.content).toEqual([{ type: "text", text: "Changed context" }]) expect(yield* session.messages({ sessionID })).toHaveLength(3) const { db } = yield* Database.Service expect( yield* db .select({ id: EventTable.id }) .from(EventTable) .where(eq(EventTable.type, "session.next.context.updated.1")) .all() .pipe(Effect.orDie), ).toHaveLength(1) yield* replaySessionProjection(sessionID) expect(yield* session.messages({ sessionID })).toHaveLength(3) }), ) it.effect("includes the effective default agent system before durable context", () => Effect.gen(function* () { yield* setup const agent = yield* AgentV2.Service yield* agent.transform((editor) => editor.update(AgentV2.ID.make("build"), (agent) => { agent.system = "Build agent instructions" agent.mode = "primary" }), ) const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = fragmentFixture("text", "text-build", ["Done"]).completeEvents yield* session.resume(sessionID) expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Build agent instructions", "Initial context"]) }), ) it.effect("uses the configured default agent system for omitted-agent sessions", () => Effect.gen(function* () { yield* setup const agent = yield* AgentV2.Service yield* agent.transform((editor) => { editor.update(AgentV2.ID.make("build"), (agent) => { agent.system = "Build agent instructions" agent.mode = "primary" }) editor.update(AgentV2.ID.make("reviewer"), (agent) => { agent.system = "Reviewer instructions" agent.mode = "primary" }) editor.default(AgentV2.ID.make("reviewer")) }) const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = fragmentFixture("text", "text-reviewer", ["Done"]).completeEvents yield* session.resume(sessionID) expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Reviewer instructions", "Initial context"]) expect((yield* session.messages({ sessionID }))[0]).toMatchObject({ type: "assistant", agent: "reviewer" }) }), ) it.effect("uses an explicitly selected non-build agent system", () => Effect.gen(function* () { yield* setup const { db } = yield* Database.Service const agent = yield* AgentV2.Service yield* agent.transform((editor) => editor.update(AgentV2.ID.make("reviewer"), (agent) => { agent.system = "Reviewer instructions" agent.mode = "primary" }), ) yield* db .update(SessionTable) .set({ agent: "reviewer" }) .where(eq(SessionTable.id, sessionID)) .run() .pipe(Effect.orDie) const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = fragmentFixture("text", "text-selected", ["Done"]).completeEvents yield* session.resume(sessionID) expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Reviewer instructions", "Initial context"]) expect((yield* session.messages({ sessionID }))[0]).toMatchObject({ type: "assistant", agent: "reviewer" }) }), ) it.effect("composes selected-agent skill guidance and replaces it after an agent switch", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service skillBaselines.set(AgentV2.ID.make("build"), "Build skills") yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") yield* events.publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: "reviewer", }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context\n\nBuild skills"], ["Initial context\n\nReviewer skills"], ]) }), ) it.effect("retries first-epoch preparation when the selected agent changes during observation", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service skillBaselines.set(AgentV2.ID.make("build"), "Build skills") skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") let switched = false systemLoadHook = Effect.suspend(() => { if (switched) return Effect.void switched = true return events .publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: "reviewer", }) .pipe(Effect.asVoid) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context\n\nReviewer skills"], ]) }), ) it.effect("opens a queued activity once when the selected agent changes during observation", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service skillBaselines.set(AgentV2.ID.make("build"), "Build skills") skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") let switched = false systemLoadHook = Effect.suspend(() => { if (switched) return Effect.void switched = true return events .publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: "reviewer", }) .pipe(Effect.asVoid) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queued" }), delivery: "queue", resume: false, }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect((yield* session.context(sessionID)).filter((message) => message.type === "user")).toHaveLength(1) }), ) it.effect("retries an agent switch before the final provider-dispatch boundary", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service skillBaselines.set(AgentV2.ID.make("build"), "Build skills") skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") let switched = false modelResolveHook = Effect.suspend(() => { if (switched) return Effect.void switched = true return events .publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: "reviewer", }) .pipe(Effect.asVoid) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context\n\nReviewer skills"], ]) expect( yield* db .select({ replacementSeq: SessionContextEpochTable.replacement_seq }) .from(SessionContextEpochTable) .where(eq(SessionContextEpochTable.session_id, sessionID)) .get() .pipe(Effect.orDie), ).toEqual({ replacementSeq: null }) }), ) it.effect("retries a model switch before the final provider-dispatch boundary", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service let switched = false modelResolveHook = Effect.suspend(() => { if (switched) return Effect.void switched = true return events .publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") }, }) .pipe(Effect.asVoid) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests.map((request) => request.model)).toEqual([replacementModel]) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([["Initial context"]]) }), ) it.effect("fences an unchanged epoch read across an agent ABA replacement request", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) response = [] yield* session.resume(sessionID) let switched = false systemLoadHook = Effect.suspend(() => { if (switched) return Effect.void switched = true return events .publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: AgentV2.ID.make("reviewer"), }) .pipe( Effect.andThen( events.publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(2), agent: AgentV2.defaultID, }), ), Effect.asVoid, ) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) requests.length = 0 yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect( yield* db .select({ replacementSeq: SessionContextEpochTable.replacement_seq }) .from(SessionContextEpochTable) .where(eq(SessionContextEpochTable.session_id, sessionID)) .get() .pipe(Effect.orDie), ).toEqual({ replacementSeq: null }) }), ) it.effect("rejects stale agent guidance when committing an existing-epoch replacement", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) response = [] yield* session.resume(sessionID) yield* events.publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: AgentV2.ID.make("reviewer"), }) const context = (text: string) => Effect.succeed( SystemContext.make({ key: systemContextKey, codec: Schema.toCodecJson(Schema.String), load: Effect.succeed(text), baseline: String, update: (_previous, current) => current, }), ) const location = (yield* session.get(sessionID)).location expect( yield* SessionContextEpoch.prepare( db, events, context("Stale build context"), sessionID, location, AgentV2.defaultID, ).pipe(Effect.catchDefect(Effect.succeed)), ).toBeInstanceOf(SessionContextEpoch.AgentMismatch) expect( yield* SessionContextEpoch.prepare( db, events, context("Reviewer context"), sessionID, location, AgentV2.ID.make("reviewer"), ), ).toMatchObject({ baseline: "Reviewer context" }) }), ) it.effect("blocks a cross-agent provider turn while replacement context is unavailable", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service skillBaselines.set(AgentV2.defaultID, "Build skills") skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) response = [] yield* session.resume(sessionID) yield* events.publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), agent: AgentV2.ID.make("reviewer"), }) systemUnavailable = true yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) requests.length = 0 const blocked = yield* session.resume(sessionID).pipe(Effect.exit) expect(Exit.isFailure(blocked)).toBe(true) if (Exit.isFailure(blocked)) expect(Cause.squash(blocked.cause)).toBeInstanceOf(SessionContextEpoch.AgentReplacementBlocked) expect(requests).toHaveLength(0) systemUnavailable = false yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context\n\nReviewer skills"], ]) }), ) it.effect("admits removed context as a chronological System message", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) systemRemoved = true yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"]) expect(requests[1]?.messages.at(-1)?.content).toEqual([ { type: "text", text: "System context source removed: test/context" }, ]) expect(yield* session.messages({ sessionID })).toHaveLength(3) }), ) it.effect("replaces the baseline lazily after a model switch and drops prior System updates", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) systemBaseline = "Changed context" yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) yield* events.publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") }, }) systemBaseline = "Replacement context" yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false }) yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Initial context"], ["Replacement context"], ]) expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"]) expect(requests[2]?.messages.map((message) => message.role)).toEqual(["user", "user", "user"]) expect((yield* session.context(sessionID)).map((message) => message.type)).toEqual([ "user", "user", "model-switched", "user", ]) yield* replaySessionProjection(sessionID) expect(yield* session.messages({ sessionID })).toHaveLength(5) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fourth" }), resume: false }) yield* session.resume(sessionID) }), ) it.effect("defers replacement while admitted context is temporarily unavailable", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) yield* events.publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") }, }) systemUnavailable = true yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) systemUnavailable = false systemBaseline = "Replacement context" yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false }) yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Initial context"], ["Replacement context"], ]) }), ) it.effect("advances a pending replacement to the latest invalidation boundary", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) response = [] yield* session.resume(sessionID) yield* events.publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), model: { id: ModelV2.ID.make("replacement-1"), providerID: ProviderV2.ID.make("fake") }, }) yield* events.publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(2), model: { id: ModelV2.ID.make("replacement-2"), providerID: ProviderV2.ID.make("fake") }, }) const latest = yield* SessionInput.latestSeq(db, sessionID) expect( yield* db .select({ replacementSeq: SessionContextEpochTable.replacement_seq }) .from(SessionContextEpochTable) .where(eq(SessionContextEpochTable.session_id, sessionID)) .get() .pipe(Effect.orDie), ).toEqual({ replacementSeq: latest }) }), ) it.effect("retries epoch preparation until observation-time invalidations settle", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) response = [] yield* session.resume(sessionID) requests.length = 0 systemBaseline = "Changed context" let invalidations = 0 systemLoadHook = Effect.suspend(() => { if (invalidations === 4) return Effect.void invalidations++ return events .publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(invalidations), model: { id: ModelV2.ID.make(`replacement-${invalidations}`), providerID: ProviderV2.ID.make("fake") }, }) .pipe(Effect.asVoid) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) expect(invalidations).toBe(4) expect(requests).toHaveLength(1) expect(requests[0]?.system.map((part) => part.text)).toEqual(["Changed context"]) }), ) it.effect("replaces the baseline lazily after completed compaction without reopening replacement on replay", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) const compactionID = SessionMessage.ID.create() yield* events.publish(SessionEvent.Compaction.Started, { sessionID, messageID: compactionID, timestamp: DateTime.makeUnsafe(1), reason: "manual", }) yield* events.publish(SessionEvent.Compaction.Ended, { sessionID, messageID: compactionID, timestamp: DateTime.makeUnsafe(2), reason: "manual", text: "summary", recent: "", }) systemBaseline = "Replacement context" yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Replacement context"], ]) yield* replaySessionProjection(sessionID) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false }) yield* session.resume(sessionID) }), ) it.effect("automatically compacts into a completed summary and retained recent turn", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service response = fragmentFixture("text", "text-first", ["Earlier answer"]).completeEvents yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Earlier question ".repeat(180) }), resume: false, }) yield* session.resume(sessionID) currentModel = compactModel requests.length = 0 responses = [ fragmentFixture("text", "text-summary", ["## Goal\n- Preserve the task"]).completeEvents, fragmentFixture("text", "text-final", ["Continued"]).completeEvents, ] yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recent exact request ".repeat(180) }), resume: false, }) yield* session.resume(sessionID) expect(requests).toHaveLength(2) expect(userTexts(requests[0])[0]).toContain("## Goal") expect(userTexts(requests[1])).toHaveLength(1) expect(userTexts(requests[1])[0]).toContain("\n## Goal\n- Preserve the task\n") expect(userTexts(requests[1])[0]).toContain(`[User]: ${"Recent exact request ".repeat(180)}`) const context = yield* (yield* SessionStore.Service).context(sessionID) expect(context.map((message) => message.type)).toEqual(["compaction", "assistant"]) expect(context[0]).toMatchObject({ type: "compaction", summary: "## Goal\n- Preserve the task", }) requests.length = 0 executions.length = 0 responses = [ fragmentFixture("text", "text-summary-2", ["## Goal\n- Preserve the updated task"]).completeEvents, fragmentFixture("text", "text-final-2", ["Continued again"]).completeEvents, ] yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Newest exact request ".repeat(180) }), resume: false, }) yield* session.resume(sessionID) expect(requests).toHaveLength(2) expect(userTexts(requests[0])[0]).toContain( "\n## Goal\n- Preserve the task\n", ) expect(userTexts(requests[0])[0]).toContain("Recent exact request") expect((yield* (yield* SessionStore.Service).context(sessionID))[0]).toMatchObject({ type: "compaction", summary: "## Goal\n- Preserve the updated task", }) }), ) it.effect("forces one compaction and retries after provider context overflow", () => Effect.gen(function* () { const session = yield* setupOverflowRecovery responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }), ], fragmentFixture("text", "text-summary", ["## Goal\n- Recover overflow"]).completeEvents, fragmentFixture("text", "text-final", ["Recovered"]).completeEvents, ] yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) yield* session.resume(sessionID) expect(requests).toHaveLength(3) expect(userTexts(requests[1])[0]).toContain("## Goal") expect(userTexts(requests[2])[0]).toContain("\n## Goal\n- Recover overflow\n") expect(yield* session.context(sessionID)).toMatchObject([ { type: "compaction", summary: "## Goal\n- Recover overflow" }, { type: "assistant", finish: "stop" }, ]) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "compaction" }, { type: "assistant", finish: "stop" }, ]) }), ) it.effect("persists a second context overflow after one recovery", () => Effect.gen(function* () { const session = yield* setupOverflowRecovery const overflow = () => [ LLMEvent.stepStart({ index: 0 }), LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }), ] responses = [ overflow(), fragmentFixture("text", "text-summary", ["## Goal\n- Recover once"]).completeEvents, overflow(), ] yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) yield* session.resume(sessionID) expect(requests).toHaveLength(3) expect(yield* session.context(sessionID)).toMatchObject([ { type: "compaction" }, { type: "assistant", finish: "error", error: { message: "prompt too long" } }, ]) }), ) it.effect("recovers once from a raw context overflow failure", () => Effect.gen(function* () { const session = yield* setupOverflowRecovery responseStream = Stream.fail( new LLMError({ module: "test", method: "stream", reason: new InvalidRequestReason({ message: "prompt too long", classification: "context-overflow", }), }), ) responses = [ fragmentFixture("text", "text-summary", ["## Goal\n- Recover raw overflow"]).completeEvents, fragmentFixture("text", "text-final", ["Recovered"]).completeEvents, ] yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) yield* session.resume(sessionID) expect(requests).toHaveLength(3) expect(yield* session.context(sessionID)).toMatchObject([ { type: "compaction", summary: "## Goal\n- Recover raw overflow" }, { type: "assistant", finish: "stop" }, ]) }), ) it.effect("publishes the original overflow when recovery summarization fails", () => Effect.gen(function* () { const session = yield* setupOverflowRecovery responses = [ [LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" })], [LLMEvent.providerError({ message: "summary unavailable" })], ] yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) yield* session.resume(sessionID) expect(requests).toHaveLength(2) const context = yield* session.context(sessionID) expect(context.some((message) => message.type === "compaction")).toBe(false) expect(context.slice(-2)).toMatchObject([ { type: "user", text: "Continue" }, { type: "assistant", finish: "error", error: { message: "prompt too long" } }, ]) }), ) it.effect("interrupts overflow recovery while the summary provider is running", () => Effect.gen(function* () { const session = yield* setupOverflowRecovery responses = [ [LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" })], fragmentFixture("text", "text-summary", ["## Goal\n- Interrupted"]).completeEvents, ] const firstGate = yield* Deferred.make() const summaryGate = yield* Deferred.make() streamGate = firstGate yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) const run = yield* session.resume(sessionID).pipe(Effect.forkChild) while (requests.length < 1) yield* Effect.yieldNow streamGate = summaryGate yield* Deferred.succeed(firstGate, undefined) while (requests.length < 2) yield* Effect.yieldNow yield* session.interrupt(sessionID) expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) streamGate = undefined expect(requests).toHaveLength(2) expect((yield* session.context(sessionID)).some((message) => message.type === "compaction")).toBe(false) }), ) it.effect("preserves effective System updates while compaction replacement is blocked", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) requests.length = 0 response = [] yield* session.resume(sessionID) systemBaseline = "Changed context" yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.resume(sessionID) const compactionID = SessionMessage.ID.create() yield* events.publish(SessionEvent.Compaction.Started, { sessionID, messageID: compactionID, timestamp: DateTime.makeUnsafe(1), reason: "manual", }) yield* events.publish(SessionEvent.Compaction.Ended, { sessionID, messageID: compactionID, timestamp: DateTime.makeUnsafe(2), reason: "manual", text: "summary", recent: "", }) systemUnavailable = true yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false }) yield* session.resume(sessionID) expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Initial context"]) expect( requests .at(-1) ?.messages.some( (message) => message.role === "system" && message.content[0]?.type === "text" && message.content[0].text === "Changed context", ), ).toBe(true) }), ) it.effect("projects reasoning and tool events without executing or continuing tools", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Use tools" }), resume: false }) requests.length = 0 responses = undefined streamGate = undefined streamStarted = undefined response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.reasoningStart({ id: "reasoning-1" }), LLMEvent.reasoningDelta({ id: "reasoning-1", text: "Think" }), LLMEvent.reasoningEnd({ id: "reasoning-1" }), LLMEvent.toolInputStart({ id: "call-error", name: "write" }), LLMEvent.toolInputDelta({ id: "call-error", name: "write", text: '{"path":"README.md"}' }), LLMEvent.toolInputEnd({ id: "call-error", name: "write" }), LLMEvent.toolCall({ id: "call-error", name: "write", input: { path: "README.md" }, providerExecuted: true }), LLMEvent.toolError({ id: "call-error", name: "write", message: "Denied" }), LLMEvent.toolResult({ id: "call-error", name: "write", result: { type: "error", value: "Denied" } }), LLMEvent.toolCall({ id: "call-provider", name: "web_search", input: { query: "hello" }, providerExecuted: true, providerMetadata: { fake: { source: "provider" } }, }), LLMEvent.toolResult({ id: "call-provider", name: "web_search", result: { type: "content", value: [ { type: "text", text: "Hello" }, { type: "file", uri: "data:image/png;base64,aGVsbG8=", mime: "image/png", name: "hello.png" }, ], }, providerExecuted: true, providerMetadata: { fake: { source: "provider" } }, }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls", usage: { inputTokens: 10, nonCachedInputTokens: 8, outputTokens: 4, reasoningTokens: 1, cacheReadInputTokens: 2, }, }), LLMEvent.finish({ reason: "tool-calls" }), ] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.tools.map((tool) => tool.name)).toEqual(["echo", "defect"]) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Use tools" }, { type: "assistant", finish: "tool-calls", tokens: { input: 8, output: 3, reasoning: 1, cache: { read: 2, write: 0 } }, content: [ { type: "reasoning", id: "reasoning-1", text: "Think" }, { type: "tool", id: "call-error", name: "write", state: { status: "error", input: { path: "README.md" }, error: { type: "unknown", message: "Denied" }, }, }, { type: "tool", id: "call-provider", name: "web_search", provider: { executed: true, metadata: { fake: { source: "provider" } } }, state: { status: "completed", input: { query: "hello" }, structured: {}, content: [ { type: "text", text: "Hello" }, { type: "file", mime: "image/png", uri: "data:image/png;base64,aGVsbG8=", name: "hello.png" }, ], }, }, ], }, ]) }), ) it.effect("continues with reloaded history after durably settling one local tool call", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo this" }), resume: false }) requests.length = 0 authorizations.length = 0 executions.length = 0 streamGate = undefined streamStarted = undefined responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.textStart({ id: "text-final" }), LLMEvent.textDelta({ id: "text-final", text: "Done" }), LLMEvent.textEnd({ id: "text-final" }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] yield* session.resume(sessionID) expect(requests).toHaveLength(2) expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"]) expect(authorizations).toMatchObject([{ sessionID, toolCallID: "call-echo" }]) expect(executions).toEqual(["hello"]) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Echo this" }, { type: "assistant", finish: "tool-calls", content: [ { type: "tool", id: "call-echo", name: "echo", state: { status: "completed", input: { text: "hello" }, structured: { text: "hello" }, content: [{ type: "text", text: "hello" }], }, }, ], }, { type: "assistant", finish: "stop", content: [{ type: "text", id: "text-final", text: "Done" }] }, ]) }), ) it.effect("reloads a model switch before a tool-driven continuation turn", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo this" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] toolExecutionGate = yield* Deferred.make() toolExecutionsStarted = yield* Deferred.make() toolExecutionsReady = 1 const run = yield* Effect.forkChild(session.resume(sessionID)) yield* Deferred.await(toolExecutionsStarted) yield* events.publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: DateTime.makeUnsafe(1), model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") }, }) systemBaseline = "Replacement context" yield* Deferred.succeed(toolExecutionGate, undefined) yield* Fiber.join(run) expect(requests.map((request) => request.model)).toEqual([model, replacementModel]) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Replacement context"], ]) }), ) it.effect("restores durable reasoning provider metadata in a second-turn request", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Think first" }), resume: false }) requests.length = 0 response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.reasoningStart({ id: "reasoning-anthropic" }), LLMEvent.reasoningDelta({ id: "reasoning-anthropic", text: "Signed thought" }), LLMEvent.reasoningEnd({ id: "reasoning-anthropic", providerMetadata: { anthropic: { signature: "sig_1" } } }), LLMEvent.reasoningStart({ id: "reasoning-openai", providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: null } }, }), LLMEvent.reasoningDelta({ id: "reasoning-openai", text: "Encrypted thought" }), LLMEvent.reasoningEnd({ id: "reasoning-openai", providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } }, }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ] yield* session.resume(sessionID) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Think first" }, { type: "assistant", content: [ { type: "reasoning", text: "Signed thought", providerMetadata: { anthropic: { signature: "sig_1" } } }, { type: "reasoning", text: "Encrypted thought", providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } }, }, ], }, ]) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) response = [] yield* session.resume(sessionID) expect(requests[1]?.messages[1]?.content).toEqual([ { type: "reasoning", text: "Signed thought", providerMetadata: { anthropic: { signature: "sig_1" } } }, { type: "reasoning", text: "Encrypted thought", providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } }, }, ]) }), ) it.effect("replays durable provider-executed tool results inline in a second-turn request", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Search first" }), resume: false }) requests.length = 0 response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "hosted-search", name: "web_search", input: { query: "Effect" }, providerExecuted: true, providerMetadata: { openai: { itemId: "hosted-search" } }, }), LLMEvent.toolResult({ id: "hosted-search", name: "web_search", result: { type: "json", value: [{ title: "Effect" }] }, providerExecuted: true, providerMetadata: { anthropic: { blockType: "web_search_tool_result" } }, }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ] yield* session.resume(sessionID) yield* replaySessionProjection(sessionID) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false }) response = [] yield* session.resume(sessionID) expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "user"]) expect(requests[1]?.messages[1]?.content).toMatchObject([ { type: "tool-call", id: "hosted-search", name: "web_search", input: { query: "Effect" }, providerExecuted: true, providerMetadata: { openai: { itemId: "hosted-search" } }, }, { type: "tool-result", id: "hosted-search", name: "web_search", result: { type: "json", value: [{ title: "Effect" }] }, providerExecuted: true, providerMetadata: { anthropic: { blockType: "web_search_tool_result" } }, }, ]) }), ) it.effect("starts recorded local tools eagerly and awaits settlement before continuing", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo five times" }), resume: false }) requests.length = 0 executions.length = 0 toolExecutionGate = yield* Deferred.make() toolExecutionsStarted = yield* Deferred.make() const providerGate = yield* Deferred.make() response = [] responses = undefined const initial = Stream.fromIterable([ LLMEvent.stepStart({ index: 0 }), ...Array.from({ length: 5 }, (_, index) => LLMEvent.toolCall({ id: `call-echo-${index}`, name: "echo", input: { text: `${index}` } }), ), ]) const final = Stream.fromIterable([ LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ]) streamGate = undefined responseStream = Stream.concat( initial, Stream.fromEffect(Deferred.await(providerGate)).pipe(Stream.flatMap(() => final)), ) const run = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(toolExecutionsStarted) expect(executions).toHaveLength(5) expect(maxActiveToolExecutions).toBe(5) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Echo five times" }, { type: "assistant", content: Array.from({ length: 5 }, (_, index) => ({ type: "tool", id: `call-echo-${index}`, state: { status: "running", input: { text: `${index}` } }, })), }, ]) yield* Deferred.succeed(providerGate, undefined) yield* Effect.yieldNow expect(requests).toHaveLength(1) yield* Deferred.succeed(toolExecutionGate, undefined) yield* Fiber.join(run) toolExecutionGate = undefined toolExecutionsStarted = undefined expect(executions).toHaveLength(5) expect(maxActiveToolExecutions).toBe(5) expect(requests).toHaveLength(2) }), ) it.effect("settles repeated provider-local tool call IDs against their owning assistant messages", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo twice" }), resume: false }) requests.length = 0 executions.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "tool_0", name: "echo", input: { text: "first" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "tool_0", name: "echo", input: { text: "second" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [], ] yield* session.resume(sessionID) expect(executions).toEqual(["first", "second"]) expect(requests).toHaveLength(3) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Echo twice" }, { type: "assistant", content: [ { type: "tool", id: "tool_0", state: { status: "completed", structured: { text: "first" }, content: [{ type: "text", text: "first" }] }, }, ], }, { type: "assistant", content: [ { type: "tool", id: "tool_0", state: { status: "completed", structured: { text: "second" }, content: [{ type: "text", text: "second" }], }, }, ], }, ]) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Echo twice" }, { type: "assistant", content: [ { type: "tool", id: "tool_0", state: { status: "completed", structured: { text: "first" }, content: [{ type: "text", text: "first" }] }, }, ], }, { type: "assistant", content: [ { type: "tool", id: "tool_0", state: { status: "completed", structured: { text: "second" }, content: [{ type: "text", text: "second" }], }, }, ], }, ]) }), ) it.effect("joins concurrent resume calls into one active provider run", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run once" }), resume: false }) requests.length = 0 responses = undefined response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.textStart({ id: "text-once" }), LLMEvent.textDelta({ id: "text-once", text: "Once" }), LLMEvent.textEnd({ id: "text-once" }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) const second = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Effect.yieldNow expect(requests).toHaveLength(1) yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(first) yield* Fiber.join(second) streamGate = undefined streamStarted = undefined expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Run once" }, { type: "assistant", finish: "stop", content: [{ type: "text", id: "text-once", text: "Once" }] }, ]) }), ) it.effect("steers an active provider turn with newly recorded prompts", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Change direction" }) }) yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(first) streamGate = undefined streamStarted = undefined yield* Effect.yieldNow expect(requests).toHaveLength(2) expect(userTexts(requests[0]!)).toEqual(["Start working"]) expect(userTexts(requests[1]!)).toEqual(["Start working", "Change direction"]) expect((yield* session.context(sessionID)).map((message) => message.type)).toEqual([ "user", "assistant", "user", "assistant", ]) }), ) it.effect("starts queued input after the active activity settles", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Wait until the next activity" }), delivery: "queue", }) yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(first) streamGate = undefined streamStarted = undefined expect(requests).toHaveLength(3) expect(userTexts(requests[0]!)).toEqual(["Start working"]) expect(userTexts(requests[1]!)).toEqual(["Start working"]) expect(userTexts(requests[2]!)).toEqual(["Start working", "Wait until the next activity"]) }), ) it.effect("preserves durable queued input for a later wake after interruption", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false }) requests.length = 0 responses = [ [], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const run = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run after interrupt" }), delivery: "queue", }) yield* session.interrupt(sessionID) expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) expect(requests).toHaveLength(1) expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(true) const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild) while (requests.length < 2) yield* Effect.yieldNow yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(resumed) streamGate = undefined streamStarted = undefined expect(requests).toHaveLength(2) expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"]) expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Run after interrupt"]) }), ) it.effect("preserves durable steering input for a later resume after interruption", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const { db } = yield* Database.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false }) requests.length = 0 responses = [ [], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const run = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Steer after interrupt" }), }) yield* session.interrupt(sessionID) expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) expect(requests).toHaveLength(1) expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild) while (requests.length < 2) yield* Effect.yieldNow yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(resumed) streamGate = undefined streamStarted = undefined expect(requests).toHaveLength(2) expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"]) expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Steer after interrupt"]) }), ) it.effect("runs queued active inputs as separate FIFO activities", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue first" }), delivery: "queue" }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue second" }), delivery: "queue" }) yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(first) streamGate = undefined streamStarted = undefined expect(requests).toHaveLength(3) expect(userTexts(requests[0]!)).toEqual(["Start working"]) expect(userTexts(requests[1]!)).toEqual(["Start working", "Queue first"]) expect(userTexts(requests[2]!)).toEqual(["Start working", "Queue first", "Queue second"]) }), ) it.effect("opens queued input after idle steering activity settles", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start steering activity" }), resume: false }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue later activity" }), delivery: "queue", resume: false, }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] yield* session.resume(sessionID) expect(requests).toHaveLength(2) expect(userTexts(requests[0]!)).toEqual(["Start steering activity"]) expect(userTexts(requests[1]!)).toEqual(["Start steering activity", "Queue later activity"]) }), ) it.effect("coalesces steers into the active queued activity before starting the next queued activity", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] const firstGate = yield* Deferred.make() const secondGate = yield* Deferred.make() streamGate = firstGate const first = yield* session.resume(sessionID).pipe(Effect.forkChild) while (requests.length < 1) yield* Effect.yieldNow yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue first" }), delivery: "queue" }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue second" }), delivery: "queue" }) streamGate = secondGate yield* Deferred.succeed(firstGate, undefined) while (requests.length < 2) yield* Effect.yieldNow yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Steer first queued activity" }) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Also steer first queued activity" }) }) yield* Deferred.succeed(secondGate, undefined) yield* Fiber.join(first) streamGate = undefined expect(requests).toHaveLength(4) expect(userTexts(requests[0]!)).toEqual(["Start working"]) expect(userTexts(requests[1]!)).toEqual(["Start working", "Queue first"]) expect(userTexts(requests[2]!)).toEqual([ "Start working", "Queue first", "Steer first queued activity", "Also steer first queued activity", ]) expect(userTexts(requests[3]!)).toEqual([ "Start working", "Queue first", "Steer first queued activity", "Also steer first queued activity", "Queue second", ]) }), ) it.effect("coalesces multiple active steering prompts into one continuation turn", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First steer" }) }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second steer" }) }) yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(first) streamGate = undefined streamStarted = undefined yield* Effect.yieldNow expect(requests).toHaveLength(2) expect(userTexts(requests[1]!)).toEqual(["Start working", "First steer", "Second steer"]) yield* (yield* SessionRunCoordinator.Service).wake(sessionID) yield* Effect.yieldNow expect(requests).toHaveLength(2) }), ) it.effect("runs steering input accepted while the active provider turn fails", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false }) requests.length = 0 responses = undefined response = [] streamFailure = providerUnavailable() streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover with this" }) }) yield* Deferred.succeed(streamGate, undefined) expect(yield* Fiber.join(first).pipe(Effect.flip)).toBe(streamFailure) streamFailure = undefined streamGate = undefined streamStarted = undefined yield* Effect.yieldNow expect(requests).toHaveLength(2) expect(userTexts(requests[1]!)).toEqual(["Start working", "Recover with this"]) }), ) it.effect("durably fails local tools left running by a prior process before continuing", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover interrupted tool" }), resume: false }) yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID, Number.MAX_SAFE_INTEGER) const assistantMessageID = SessionMessage.ID.create() yield* events.publish(SessionEvent.Step.Started, { sessionID, assistantMessageID, timestamp: yield* DateTime.now, agent: "build", model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") }, }) yield* events.publish(SessionEvent.Tool.Input.Started, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-interrupted", name: "echo", }) yield* events.publish(SessionEvent.Tool.Input.Ended, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-interrupted", text: '{"text":"stale"}', }) yield* events.publish(SessionEvent.Tool.Called, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-interrupted", tool: "echo", input: { text: "stale" }, provider: { executed: false }, }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"]) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Recover interrupted tool" }, { type: "assistant", content: [ { type: "tool", id: "call-interrupted", state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } }, }, ], }, ]) }), ) it.effect("durably fails hosted tools left running by a prior process before continuing inline", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover interrupted hosted tool" }), resume: false, }) yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID, Number.MAX_SAFE_INTEGER) const assistantMessageID = SessionMessage.ID.create() yield* events.publish(SessionEvent.Step.Started, { sessionID, assistantMessageID, timestamp: yield* DateTime.now, agent: "build", model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") }, }) yield* events.publish(SessionEvent.Tool.Input.Started, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-hosted-interrupted", name: "web_search", }) yield* events.publish(SessionEvent.Tool.Input.Ended, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-hosted-interrupted", text: '{"query":"stale"}', }) yield* events.publish(SessionEvent.Tool.Called, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-hosted-interrupted", tool: "web_search", input: { query: "stale" }, provider: { executed: true, metadata: { openai: { itemId: "call-hosted-interrupted" } } }, }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant"]) expect(requests[0]?.messages[1]?.content).toMatchObject([ { type: "tool-call", id: "call-hosted-interrupted", providerExecuted: true, providerMetadata: { openai: { itemId: "call-hosted-interrupted" } }, }, { type: "tool-result", id: "call-hosted-interrupted", providerExecuted: true, result: { type: "error" } }, ]) }), ) it.effect("durably fails pending tool input left by a prior process before continuing", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover interrupted tool input" }), resume: false, }) yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID, Number.MAX_SAFE_INTEGER) const assistantMessageID = SessionMessage.ID.create() yield* events.publish(SessionEvent.Step.Started, { sessionID, assistantMessageID, timestamp: yield* DateTime.now, agent: "build", model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") }, }) yield* events.publish(SessionEvent.Tool.Input.Started, { sessionID, timestamp: yield* DateTime.now, assistantMessageID, callID: "call-pending-interrupted", name: "echo", }) requests.length = 0 response = [] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"]) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Recover interrupted tool input" }, { type: "assistant", content: [{ type: "tool", id: "call-pending-interrupted", state: { status: "error" } }] }, ]) }), ) it.effect("starts the first queued activity when woken while idle", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Wait for fresh activity" }), delivery: "queue", resume: false, }) requests.length = 0 yield* (yield* SessionRunCoordinator.Service).wake(sessionID) yield* Effect.yieldNow expect(requests).toHaveLength(1) expect(userTexts(requests[0]!)).toEqual(["Wait for fresh activity"]) }), ) it.effect("does not spend one activity step budget across queued activities", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const queued = Array.from({ length: 26 }, (_, index) => `Queued activity ${index + 1}`) for (const text of queued) { yield* session.prompt({ sessionID, prompt: new Prompt({ text }), delivery: "queue", resume: false }) } requests.length = 0 responses = queued.map(() => [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ]) yield* session.resume(sessionID) expect(requests).toHaveLength(queued.length) expect(userTexts(requests.at(-1)!)).toEqual(queued) }), ) it.effect("retries inbox input after prompt projection rolls back", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service const defect = new Error("fail after prompt promotion") let fail = true yield* events.project(SessionEvent.PromptLifecycle.Promoted, () => (fail ? Effect.die(defect) : Effect.void)) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover promoted input" }), resume: false }) expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect) fail = false requests.length = 0 response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ] yield* (yield* SessionRunCoordinator.Service).wake(sessionID) while (requests.length === 0) yield* Effect.yieldNow expect(userTexts(requests[0]!)).toEqual(["Recover promoted input"]) }), ) it.effect("does not strand a committed promotion when a post-commit listener defects", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* events.listen((event) => event.type === SessionEvent.PromptLifecycle.Promoted.type ? Effect.die("fail after prompt promotion commits") : Effect.void, ) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run committed promotion" }), resume: false, }) requests.length = 0 yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(userTexts(requests[0]!)).toEqual(["Run committed promotion"]) }), ) it.effect("runs different sessions concurrently", () => Effect.gen(function* () { yield* setup yield* insertSession(otherSessionID) const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run first" }), resume: false }) yield* session.prompt({ sessionID: otherSessionID, prompt: new Prompt({ text: "Run second" }), resume: false }) requests.length = 0 responses = undefined response = [] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) const second = yield* session.resume(otherSessionID).pipe(Effect.forkChild) yield* Effect.yieldNow expect(requests).toHaveLength(2) expect(requests.map((request) => request.providerOptions?.openai?.promptCacheKey)).toEqual([ sessionID, otherSessionID, ]) yield* Deferred.succeed(streamGate, undefined) yield* Fiber.join(first) yield* Fiber.join(second) streamGate = undefined streamStarted = undefined }), ) it.effect("bounds external session prompt cache keys", () => Effect.gen(function* () { yield* setup const externalSessionID = SessionV2.ID.fromExternal({ namespace: "discord", key: "thread-one", }) const otherExternalSessionID = SessionV2.ID.fromExternal({ namespace: "discord", key: "thread-two", }) yield* insertSession(externalSessionID) yield* insertSession(otherExternalSessionID) const session = yield* SessionV2.Service yield* session.prompt({ sessionID: externalSessionID, prompt: new Prompt({ text: "Run external session" }), resume: false, }) yield* session.prompt({ sessionID: otherExternalSessionID, prompt: new Prompt({ text: "Run other external session" }), resume: false, }) requests.length = 0 yield* session.resume(externalSessionID) yield* session.resume(otherExternalSessionID) const keys = requests.map((request) => request.providerOptions?.openai?.promptCacheKey) expect(keys).toEqual([externalSessionID.slice(4), otherExternalSessionID.slice(4)]) expect(keys.every((key) => typeof key === "string" && key.length === 64)).toBe(true) expect(keys[0]).not.toBe(keys[1]) }), ) it.effect("fans out one failed run and allows a later retry", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Retry after failure" }), resume: false }) requests.length = 0 responses = undefined response = [] streamFailure = providerUnavailable() streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const first = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) const second = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Effect.yieldNow expect(requests).toHaveLength(1) yield* Deferred.succeed(streamGate, undefined) const [firstExit, secondExit] = yield* Effect.all([Fiber.await(first), Fiber.await(second)]) expect(secondExit).toEqual(firstExit) streamFailure = undefined streamGate = undefined streamStarted = undefined yield* session.resume(sessionID) expect(requests).toHaveLength(2) }), ) it.effect("durably settles local tool failures before continuing", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call missing" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-missing", name: "missing", input: {} }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.textStart({ id: "text-after-error" }), LLMEvent.textDelta({ id: "text-after-error", text: "Recovered" }), LLMEvent.textEnd({ id: "text-after-error" }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] streamGate = undefined streamStarted = undefined yield* session.resume(sessionID) expect(requests).toHaveLength(2) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Call missing" }, { type: "assistant", content: [ { type: "tool", id: "call-missing", state: { status: "error", error: { message: "Unknown tool: missing" } }, }, ], }, { type: "assistant", finish: "stop", content: [{ type: "text", id: "text-after-error", text: "Recovered" }] }, ]) }), ) it.effect("propagates unexpected local tool defects operationally", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call defect" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-defect", name: "defect", input: {} }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], ] expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe("unexpected tool defect") expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Call defect" }, { type: "assistant", content: [ { type: "tool", id: "call-defect", state: { status: "error", error: { type: "unknown", message: "Tool execution failed: unexpected tool defect" }, }, }, ], }, ]) }), ) it.effect("interrupts runner continuation when a question is dismissed", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service const registry = yield* ToolRegistry.Service const questions = yield* QuestionV2.Service yield* registry.register({ question: Tool.make({ description: "Ask the user", input: Schema.Struct({}), output: Schema.Struct({}), execute: (_, context) => questions.ask({ sessionID: context.sessionID, questions: [] }).pipe(Effect.as({}), Effect.orDie), }), }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Ask then stop" }), resume: false }) requests.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-question", name: "question", input: {} }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [], ] const run = yield* session.resume(sessionID).pipe(Effect.exit, Effect.forkChild) let pending = yield* questions.list() while (pending.length === 0) { yield* Effect.yieldNow pending = yield* questions.list() } yield* questions.reject(pending[0]!.id) const exit = yield* Fiber.join(run) expect(exit._tag).toBe("Failure") if (exit._tag === "Failure") expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true) expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Ask then stop" }, { type: "assistant", content: [ { type: "tool", id: "call-question", state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } }, }, ], }, ]) }), ) it.effect("awaits started local tools before surfacing provider stream failure", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Settle before failing" }), resume: false }) const failure = providerUnavailable() toolExecutionGate = yield* Deferred.make() responseStream = Stream.concat( Stream.fromIterable([ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-before-failure", name: "echo", input: { text: "settle" } }), ]), Stream.fail(failure), ) const run = yield* session.resume(sessionID).pipe(Effect.forkChild) while (executions.length === 0) yield* Effect.yieldNow yield* Effect.yieldNow yield* Deferred.succeed(toolExecutionGate, undefined) expect(yield* Fiber.join(run).pipe(Effect.flip)).toBe(failure) toolExecutionGate = undefined expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Settle before failing" }, { type: "assistant", content: [ { type: "tool", id: "call-before-failure", state: { status: "completed", structured: { text: "settle" } } }, ], }, ]) }), ) it.effect("durably fails blocked local tools when a provider turn is interrupted", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt blocked tool" }), resume: false }) executions.length = 0 toolExecutionGate = yield* Deferred.make() responseStream = Stream.concat( Stream.fromIterable([ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-before-interrupt", name: "echo", input: { text: "blocked" } }), ]), Stream.never, ) const run = yield* session.resume(sessionID).pipe(Effect.forkChild) while (executions.length === 0) yield* Effect.yieldNow yield* session.interrupt(sessionID) toolExecutionGate = undefined expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) yield* session.interrupt(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Interrupt blocked tool" }, { type: "assistant", content: [ { type: "tool", id: "call-before-interrupt", state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } }, }, ], }, ]) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Interrupt blocked tool" }, { type: "assistant", content: [{ type: "tool", id: "call-before-interrupt", state: { status: "error" } }] }, ]) requests.length = 0 responseStream = undefined response = [] yield* session.resume(sessionID) expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"]) }), ) it.effect("interrupts a blocked provider turn without local tool activity", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt provider" }), resume: false }) requests.length = 0 response = [] streamGate = yield* Deferred.make() streamStarted = yield* Deferred.make() const run = yield* session.resume(sessionID).pipe(Effect.forkChild) yield* Deferred.await(streamStarted) yield* session.interrupt(sessionID) const exit = yield* Fiber.await(run) streamGate = undefined streamStarted = undefined expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() expect(requests).toHaveLength(1) yield* session.interrupt(sessionID) }), ) it.effect("durably fails blocked local tools when interrupted while awaiting settlement", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt tool settlement" }), resume: false }) executions.length = 0 toolExecutionGate = yield* Deferred.make() response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-await-interrupt", name: "echo", input: { text: "blocked" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ] const runner = yield* SessionRunner.Service const run = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild) while (executions.length === 0) yield* Effect.yieldNow yield* Fiber.interrupt(run) toolExecutionGate = undefined expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Interrupt tool settlement" }, { type: "assistant", content: [ { type: "tool", id: "call-await-interrupt", state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } }, }, ], }, ]) }), ) it.effect("continues past 25 local tool steps when the agent has no step limit", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Loop forever" }), resume: false }) requests.length = 0 authorizations.length = 0 executions.length = 0 streamGate = undefined streamStarted = undefined responses = [ ...Array.from({ length: 25 }, (_, index) => [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: `call-echo-${index}`, name: "echo", input: { text: `${index}` } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ]), [ LLMEvent.stepStart({ index: 0 }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ], ] yield* session.resume(sessionID) expect(requests).toHaveLength(26) expect(executions).toHaveLength(25) }), ) it.effect("forces a text response on an agent's configured final step", () => Effect.gen(function* () { yield* setup const agents = yield* AgentV2.Service yield* agents.transform((editor) => editor.update(AgentV2.ID.make("build"), (agent) => { agent.steps = 2 }), ) const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Finish at the limit" }), resume: false }) requests.length = 0 executions.length = 0 responses = [ [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-terminal", name: "echo", input: { text: "done" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-forbidden", name: "echo", input: { text: "forbidden" } }), LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }), LLMEvent.finish({ reason: "tool-calls" }), ], ] yield* session.resume(sessionID) expect(requests).toHaveLength(2) expect(requests[0]?.toolChoice).toBeUndefined() expect(requests[1]?.toolChoice).toMatchObject({ type: "none" }) expect(requests[1]?.tools).toEqual([]) expect(requests[1]?.messages.at(-1)).toMatchObject({ role: "assistant", content: [{ type: "text", text: expect.stringContaining("MAXIMUM STEPS REACHED") }], }) expect(executions).toEqual(["done"]) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Finish at the limit" }, { type: "assistant", content: [{ type: "tool", id: "call-terminal", state: { status: "completed" } }] }, { type: "assistant", content: [{ type: "tool", id: "call-forbidden", state: { status: "error" } }] }, ]) }), ) it.effect("projects provider errors as terminal assistant step failures", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail durably" }), resume: false }) requests.length = 0 responses = undefined streamGate = undefined streamStarted = undefined response = [LLMEvent.stepStart({ index: 0 }), LLMEvent.providerError({ message: "Provider unavailable" })] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail durably" }, { type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } }, ]) }), ) it.effect("projects provider errors emitted before assistant step start", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail before step" }), resume: false }) requests.length = 0 response = [LLMEvent.providerError({ message: "Provider unavailable" })] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail before step" }, { type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } }, ]) }), ) it.effect("does not recover context overflow after durable assistant output", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail after output" }), resume: false }) requests.length = 0 response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.textStart({ id: "text-partial" }), LLMEvent.textDelta({ id: "text-partial", text: "Partial" }), LLMEvent.textEnd({ id: "text-partial" }), LLMEvent.providerError({ message: "prompt too long", classification: "context-overflow" }), ] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail after output" }, { type: "assistant", finish: "error", error: { message: "prompt too long" }, content: [{ type: "text", text: "Partial" }], }, ]) }), ) it.effect("projects raw provider stream failures as terminal assistant step failures", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail raw stream durably" }), resume: false }) const failure = providerUnavailable() responseStream = Stream.fail(failure) expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail raw stream durably" }, { type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } }, ]) }), ) it.effect("does not continue automatically after a provider error follows a local tool call", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Do not continue failed provider" }), resume: false, }) requests.length = 0 const executionCount = executions.length response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-before-provider-error", name: "echo", input: { text: "settled" } }), LLMEvent.providerError({ message: "Provider unavailable" }), ] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(executions.slice(executionCount)).toEqual(["settled"]) }), ) it.effect("durably fails a hosted tool when its provider errors before returning a result", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool durably" }), resume: false }) requests.length = 0 response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-hosted-provider-error", name: "web_search", input: { query: "effect" }, providerExecuted: true, }), LLMEvent.providerError({ message: "Provider unavailable" }), ] yield* session.resume(sessionID) expect(requests).toHaveLength(1) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail hosted tool durably" }, { type: "assistant", content: [{ type: "tool", id: "call-hosted-provider-error", state: { status: "error" } }], }, ]) }), ) it.effect("durably fails a hosted tool left unresolved at normal provider EOF", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool at EOF" }), resume: false }) response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-hosted-eof", name: "web_search", input: { query: "effect" }, providerExecuted: true, }), ] yield* session.resume(sessionID) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail hosted tool at EOF" }, { type: "assistant", content: [{ type: "tool", id: "call-hosted-eof", state: { status: "error" } }] }, ]) }), ) it.effect("durably fails a hosted tool left unresolved by a raw provider stream failure", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool on raw failure" }), resume: false, }) const failure = providerUnavailable() responseStream = Stream.concat( Stream.fromIterable([ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolCall({ id: "call-hosted-raw-failure", name: "web_search", input: { query: "effect" }, providerExecuted: true, }), ]), Stream.fail(failure), ) expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure) yield* replaySessionProjection(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Fail hosted tool on raw failure" }, { type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" }, content: [{ type: "tool", id: "call-hosted-raw-failure", state: { status: "error" } }], }, ]) }), ) it.effect("keeps interleaved assistant text blocks separate", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Two blocks" }), resume: false }) responses = undefined streamGate = undefined streamStarted = undefined response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.textStart({ id: "text-1" }), LLMEvent.textStart({ id: "text-2" }), LLMEvent.textDelta({ id: "text-1", text: "First" }), LLMEvent.textDelta({ id: "text-2", text: "Second" }), LLMEvent.textEnd({ id: "text-1" }), LLMEvent.textEnd({ id: "text-2" }), LLMEvent.stepFinish({ index: 0, reason: "stop" }), LLMEvent.finish({ reason: "stop" }), ] yield* session.resume(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Two blocks" }, { type: "assistant", content: [ { type: "text", id: "text-1", text: "First" }, { type: "text", id: "text-2", text: "Second" }, ], }, ]) }), ) for (const kind of fragmentKinds) { it.effect(`broadcasts provider ${kind} deltas without storing projection rewrites`, () => verifyEphemeralDeltas(kind), ) it.effect(`durably closes partial ${kind} when the provider stream fails`, () => verifyPartialFlushOnFailure(kind)) it.effect(`durably closes partial ${kind} when the provider stream is interrupted`, () => verifyPartialFlushOnInterruption(kind), ) } it.effect("rejects duplicate streamed text starts", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service responses = undefined streamGate = undefined streamStarted = undefined response = [LLMEvent.textStart({ id: "text-1" }), LLMEvent.textStart({ id: "text-1" })] expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe( "Duplicate text start: text-1", ) }), ) it.effect("transitions streamed raw tool input to parsed called input", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call provider tool" }), resume: false }) responses = undefined streamGate = undefined streamStarted = undefined response = [ LLMEvent.stepStart({ index: 0 }), LLMEvent.toolInputStart({ id: "call-parsed", name: "web_search" }), LLMEvent.toolInputDelta({ id: "call-parsed", name: "web_search", text: '{"query":"hello"}' }), LLMEvent.toolInputEnd({ id: "call-parsed", name: "web_search" }), LLMEvent.toolCall({ id: "call-parsed", name: "web_search", input: { query: "hello" }, providerExecuted: true }), ] yield* session.resume(sessionID) expect(yield* session.context(sessionID)).toMatchObject([ { type: "user", text: "Call provider tool" }, { type: "assistant", content: [{ type: "tool", id: "call-parsed", state: { status: "error", input: { query: "hello" } } }], }, ]) }), ) it.effect("rejects malformed streamed tool input ordering", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service responses = undefined streamGate = undefined streamStarted = undefined response = [LLMEvent.toolInputDelta({ id: "call-1", name: "read", text: "{}" })] expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe( "Tool input delta before start: call-1", ) }), ) })