2122 lines
76 KiB
TypeScript
2122 lines
76 KiB
TypeScript
import { describe, expect } from "bun:test"
|
|
import {
|
|
LLMClient,
|
|
LLMError,
|
|
LLMEvent,
|
|
Model,
|
|
Tool,
|
|
TransportReason,
|
|
type LLMClientShape,
|
|
type LLMRequest,
|
|
} from "@opencode-ai/llm"
|
|
import * as OpenAIChat from "@opencode-ai/llm/protocols/openai-chat"
|
|
import { Database } from "@opencode-ai/core/database/database"
|
|
import { EventV2 } from "@opencode-ai/core/event"
|
|
import { PermissionV2 } from "@opencode-ai/core/permission"
|
|
import { EventTable } from "@opencode-ai/core/event/sql"
|
|
import { Project } from "@opencode-ai/core/project"
|
|
import { ProjectTable } from "@opencode-ai/core/project/sql"
|
|
import { QuestionV2 } from "@opencode-ai/core/question"
|
|
import { AbsolutePath } from "@opencode-ai/core/schema"
|
|
import { SessionV2 } from "@opencode-ai/core/session"
|
|
import { SessionEvent } from "@opencode-ai/core/session/event"
|
|
import { SessionInput } from "@opencode-ai/core/session/input"
|
|
import { Prompt } from "@opencode-ai/core/session/prompt"
|
|
import { SessionProjector } from "@opencode-ai/core/session/projector"
|
|
import { SessionExecution } from "@opencode-ai/core/session/execution"
|
|
import { SessionRunCoordinator } from "@opencode-ai/core/session/run-coordinator"
|
|
import { SessionRunner } from "@opencode-ai/core/session/runner"
|
|
import * as SessionRunnerLLM from "@opencode-ai/core/session/runner/llm"
|
|
import { SessionRunnerModel } from "@opencode-ai/core/session/runner/model"
|
|
import { ToolRegistry } from "@opencode-ai/core/tool-registry"
|
|
import { SessionMessageTable, SessionTable } from "@opencode-ai/core/session/sql"
|
|
import { SessionStore } from "@opencode-ai/core/session/store"
|
|
import { ModelV2 } from "@opencode-ai/core/model"
|
|
import { ProviderV2 } from "@opencode-ai/core/provider"
|
|
import { Cause, DateTime, Deferred, Effect, Fiber, Layer, Schema, Stream } from "effect"
|
|
import { asc, eq } from "drizzle-orm"
|
|
import { testEffect } from "./lib/effect"
|
|
|
|
const database = Database.layerFromPath(":memory:")
|
|
const events = EventV2.layer.pipe(Layer.provide(database))
|
|
const questions = QuestionV2.layer.pipe(Layer.provide(events))
|
|
const projector = SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database))
|
|
const store = SessionStore.layer.pipe(Layer.provide(database))
|
|
const requests: LLMRequest[] = []
|
|
let response: LLMEvent[] = []
|
|
let responses: LLMEvent[][] | undefined
|
|
let responseStream: Stream.Stream<LLMEvent, LLMError> | undefined
|
|
let streamGate: Deferred.Deferred<void> | undefined
|
|
let streamStarted: Deferred.Deferred<void> | undefined
|
|
let streamFailure: LLMError | undefined
|
|
let toolExecutionGate: Deferred.Deferred<void> | undefined
|
|
let toolExecutionsStarted: Deferred.Deferred<void> | undefined
|
|
let activeToolExecutions = 0
|
|
let maxActiveToolExecutions = 0
|
|
const client = Layer.succeed(
|
|
LLMClient.Service,
|
|
LLMClient.Service.of({
|
|
prepare: () => Effect.die("unused"),
|
|
stream: ((request: LLMRequest) => {
|
|
requests.push(request)
|
|
if (responseStream) {
|
|
const stream = responseStream
|
|
responseStream = undefined
|
|
return stream
|
|
}
|
|
const events = streamFailure
|
|
? Stream.fail(streamFailure)
|
|
: Stream.fromIterable(responses === undefined ? response : (responses.shift() ?? []))
|
|
if (!streamGate) return events
|
|
return Stream.unwrap(
|
|
(streamStarted ? Deferred.succeed(streamStarted, undefined) : Effect.void).pipe(
|
|
Effect.andThen(Deferred.await(streamGate)),
|
|
Effect.as(events),
|
|
),
|
|
)
|
|
}) as unknown as LLMClientShape["stream"],
|
|
generate: () => Effect.die("unused"),
|
|
}),
|
|
)
|
|
const model = Model.make({ id: "fake-model", provider: "fake", route: OpenAIChat.route })
|
|
const authorizations: ToolRegistry.AuthorizeInput[] = []
|
|
const executions: string[] = []
|
|
const permission = Layer.succeed(
|
|
PermissionV2.Service,
|
|
PermissionV2.Service.of({
|
|
assert: () => Effect.die("unused"),
|
|
ask: () => Effect.die("unused"),
|
|
reply: () => Effect.die("unused"),
|
|
get: () => Effect.die("unused"),
|
|
forSession: () => Effect.die("unused"),
|
|
list: () => Effect.die("unused"),
|
|
}),
|
|
)
|
|
const registry = ToolRegistry.layer.pipe(Layer.provide(permission))
|
|
const echo = Layer.effectDiscard(
|
|
ToolRegistry.Service.use((registry) =>
|
|
registry.contribute((editor) => {
|
|
;(editor.set("echo", {
|
|
authorize: (input) =>
|
|
Effect.sync(() => {
|
|
authorizations.push(input)
|
|
}),
|
|
tool: Tool.make({
|
|
description: "Echo text",
|
|
parameters: Schema.Struct({ text: Schema.String }),
|
|
success: Schema.Struct({ text: Schema.String }),
|
|
toModelOutput: ({ output }) => [{ type: "text", text: output.text }],
|
|
execute: ({ text }) =>
|
|
Effect.gen(function* () {
|
|
executions.push(text)
|
|
activeToolExecutions++
|
|
maxActiveToolExecutions = Math.max(maxActiveToolExecutions, activeToolExecutions)
|
|
if (activeToolExecutions === 5 && toolExecutionsStarted) {
|
|
yield* Deferred.succeed(toolExecutionsStarted, undefined)
|
|
}
|
|
if (toolExecutionGate) yield* Deferred.await(toolExecutionGate)
|
|
return { text }
|
|
}).pipe(Effect.ensuring(Effect.sync(() => activeToolExecutions--))),
|
|
}),
|
|
}),
|
|
editor.set("defect", {
|
|
tool: Tool.make({
|
|
description: "Fail unexpectedly",
|
|
parameters: Schema.Struct({}),
|
|
success: Schema.Struct({}),
|
|
execute: () => Effect.die("unexpected tool defect"),
|
|
}),
|
|
}))
|
|
}),
|
|
),
|
|
).pipe(Layer.provide(registry))
|
|
const models = SessionRunnerModel.layerWith(() => Effect.succeed(model))
|
|
const runner = SessionRunnerLLM.layer.pipe(
|
|
Layer.provide(database),
|
|
Layer.provide(store),
|
|
Layer.provide(events),
|
|
Layer.provide(client),
|
|
Layer.provide(registry),
|
|
Layer.provide(models),
|
|
)
|
|
const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
|
|
const execution = Layer.effect(
|
|
SessionExecution.Service,
|
|
SessionRunCoordinator.Service.pipe(
|
|
Effect.map((coordinator) => SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake })),
|
|
),
|
|
).pipe(Layer.provide(coordinator))
|
|
const sessions = SessionV2.layer.pipe(
|
|
Layer.provide(events),
|
|
Layer.provide(database),
|
|
Layer.provide(store),
|
|
Layer.provide(Project.defaultLayer),
|
|
Layer.provide(execution),
|
|
)
|
|
const it = testEffect(
|
|
Layer.mergeAll(
|
|
database,
|
|
events,
|
|
questions,
|
|
projector,
|
|
store,
|
|
client,
|
|
permission,
|
|
registry,
|
|
echo,
|
|
models,
|
|
runner,
|
|
coordinator,
|
|
execution,
|
|
sessions,
|
|
),
|
|
)
|
|
const sessionID = SessionV2.ID.make("ses_runner_test")
|
|
const otherSessionID = SessionV2.ID.make("ses_runner_other")
|
|
|
|
const insertSession = (id: SessionV2.ID) =>
|
|
Effect.gen(function* () {
|
|
const { db } = yield* Database.Service
|
|
yield* db
|
|
.insert(SessionTable)
|
|
.values({
|
|
id,
|
|
project_id: Project.ID.global,
|
|
slug: id,
|
|
directory: "/project",
|
|
title: "test",
|
|
version: "test",
|
|
})
|
|
.onConflictDoNothing()
|
|
.run()
|
|
.pipe(Effect.orDie)
|
|
})
|
|
|
|
const setup = Effect.gen(function* () {
|
|
const { db } = yield* Database.Service
|
|
response = []
|
|
responses = undefined
|
|
streamFailure = undefined
|
|
responseStream = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
toolExecutionGate = undefined
|
|
toolExecutionsStarted = undefined
|
|
activeToolExecutions = 0
|
|
maxActiveToolExecutions = 0
|
|
yield* db
|
|
.insert(ProjectTable)
|
|
.values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] })
|
|
.onConflictDoNothing()
|
|
.run()
|
|
.pipe(Effect.orDie)
|
|
yield* insertSession(sessionID)
|
|
})
|
|
|
|
const providerUnavailable = () =>
|
|
new LLMError({
|
|
module: "test",
|
|
method: "stream",
|
|
reason: new TransportReason({ message: "Provider unavailable" }),
|
|
})
|
|
|
|
const userTexts = (request: LLMRequest) =>
|
|
request.messages.flatMap((message) =>
|
|
message.role === "user"
|
|
? message.content.flatMap((content) => (content.type === "text" ? [content.text] : []))
|
|
: [],
|
|
)
|
|
|
|
const replaySessionProjection = (id: SessionV2.ID) =>
|
|
Effect.gen(function* () {
|
|
const { db } = yield* Database.Service
|
|
const events = yield* EventV2.Service
|
|
const recorded = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, id))
|
|
.orderBy(asc(EventTable.seq))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
yield* events.remove(id)
|
|
yield* db.delete(SessionMessageTable).where(eq(SessionMessageTable.session_id, id)).run().pipe(Effect.orDie)
|
|
yield* events.replayAll(
|
|
recorded.map((event) => ({
|
|
id: event.id,
|
|
aggregateID: event.aggregate_id,
|
|
seq: event.seq,
|
|
type: event.type,
|
|
data: event.data,
|
|
})),
|
|
)
|
|
})
|
|
|
|
type FragmentKind = "text" | "reasoning" | "tool input"
|
|
|
|
type FragmentFixture = {
|
|
readonly delta: EventV2.Definition
|
|
readonly completeEvents: LLMEvent[]
|
|
readonly partialEvents: LLMEvent[]
|
|
readonly expectedAssistant: unknown
|
|
readonly expectedContent: unknown
|
|
}
|
|
|
|
const fragmentKinds: readonly FragmentKind[] = ["text", "reasoning", "tool input"]
|
|
|
|
const fragmentID = (kind: FragmentKind, suffix: string) => `${kind === "tool input" ? "call" : kind}-${suffix}`
|
|
|
|
const fragmentFixture = (kind: FragmentKind, id: string, chunks: readonly string[]): FragmentFixture => {
|
|
const text = chunks.join("")
|
|
switch (kind) {
|
|
case "text": {
|
|
const partialEvents = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.textStart({ id }),
|
|
...chunks.map((text) => LLMEvent.textDelta({ id, text })),
|
|
]
|
|
const expectedContent = { type: "text", id, text }
|
|
return {
|
|
delta: SessionEvent.Text.Delta,
|
|
partialEvents,
|
|
completeEvents: [
|
|
...partialEvents,
|
|
LLMEvent.textEnd({ id }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
expectedAssistant: { type: "assistant", finish: "stop", content: [expectedContent] },
|
|
expectedContent,
|
|
}
|
|
}
|
|
case "reasoning": {
|
|
const partialEvents = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.reasoningStart({ id }),
|
|
...chunks.map((text) => LLMEvent.reasoningDelta({ id, text })),
|
|
]
|
|
const expectedContent = { type: "reasoning", id, text }
|
|
return {
|
|
delta: SessionEvent.Reasoning.Delta,
|
|
partialEvents,
|
|
completeEvents: [
|
|
...partialEvents,
|
|
LLMEvent.reasoningEnd({ id }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
expectedAssistant: { type: "assistant", finish: "stop", content: [expectedContent] },
|
|
expectedContent,
|
|
}
|
|
}
|
|
case "tool input": {
|
|
const partialEvents = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolInputStart({ id, name: "echo" }),
|
|
...chunks.map((text) => LLMEvent.toolInputDelta({ id, name: "echo", text })),
|
|
]
|
|
const expectedContent = { type: "tool", id, state: { status: "pending", input: text } }
|
|
return {
|
|
delta: SessionEvent.Tool.Input.Delta,
|
|
partialEvents,
|
|
completeEvents: [...partialEvents, LLMEvent.toolInputEnd({ id, name: "echo" })],
|
|
expectedAssistant: { type: "assistant", content: [expectedContent] },
|
|
expectedContent,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const verifyEphemeralDeltas = (kind: FragmentKind) =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const prompt = `Stream ${kind}`
|
|
const chunks = Array.from({ length: 32 }, (_, index) => `${index},`)
|
|
const fixture = fragmentFixture(kind, fragmentID(kind, "many"), chunks)
|
|
const expectedContext = [{ type: "user", text: prompt }, fixture.expectedAssistant]
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false })
|
|
const events = yield* EventV2.Service
|
|
const live = yield* events.subscribe(fixture.delta).pipe(Stream.take(32), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
response = fixture.completeEvents
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
const { db } = yield* Database.Service
|
|
const deltas = yield* db
|
|
.select({ type: EventTable.type })
|
|
.from(EventTable)
|
|
.where(eq(EventTable.type, EventV2.versionedType(fixture.delta.type, 1)))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
expect(Array.from(yield* Fiber.join(live))).toHaveLength(32)
|
|
expect(deltas).toHaveLength(0)
|
|
expect(yield* session.context(sessionID)).toMatchObject(expectedContext)
|
|
|
|
yield* replaySessionProjection(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject(expectedContext)
|
|
})
|
|
|
|
const verifyPartialFlushOnFailure = (kind: FragmentKind) =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const prompt = `Fail after ${kind}`
|
|
const fixture = fragmentFixture(kind, fragmentID(kind, "partial"), ["Partial"])
|
|
const failure = providerUnavailable()
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false })
|
|
responseStream = Stream.concat(Stream.fromIterable(fixture.partialEvents), Stream.fail(failure))
|
|
|
|
expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: prompt },
|
|
{
|
|
type: "assistant",
|
|
finish: "error",
|
|
error: { type: "unknown", message: "Provider unavailable" },
|
|
content: [fixture.expectedContent],
|
|
},
|
|
])
|
|
})
|
|
|
|
const verifyPartialFlushOnInterruption = (kind: FragmentKind) =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const prompt = `Interrupt after ${kind}`
|
|
const fixture = fragmentFixture(kind, fragmentID(kind, "interrupted"), ["Partial"])
|
|
const streamed = yield* Deferred.make<void>()
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: prompt }), resume: false })
|
|
responseStream = Stream.concat(
|
|
Stream.fromIterable(fixture.partialEvents),
|
|
Stream.fromEffect(Deferred.succeed(streamed, undefined)).pipe(Stream.flatMap(() => Stream.never)),
|
|
)
|
|
|
|
const runner = yield* SessionRunner.Service
|
|
const fiber = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamed)
|
|
yield* Fiber.interrupt(fiber)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: prompt },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
kind === "tool input"
|
|
? { type: "tool", id: fragmentID(kind, "interrupted"), state: { status: "error" } }
|
|
: fixture.expectedContent,
|
|
],
|
|
},
|
|
])
|
|
})
|
|
|
|
describe("SessionRunnerLLM", () => {
|
|
it.effect("starts a real runner turn after default prompt recording", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
requests.length = 0
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = []
|
|
|
|
const message = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run automatically" }) })
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(yield* session.messages({ sessionID })).toEqual([message])
|
|
}),
|
|
)
|
|
|
|
it.effect("streams one request with registry definitions from chronological V2 user history", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(requests[0]?.model).toBe(model)
|
|
expect(requests[0]?.tools.map((tool) => tool.name)).toEqual(["echo", "defect"])
|
|
expect(requests[0]?.messages.map((message) => ({ role: message.role, content: message.content }))).toEqual([
|
|
{ role: "user", content: [{ type: "text", text: "First" }] },
|
|
{ role: "user", content: [{ type: "text", text: "Second" }] },
|
|
])
|
|
expect(yield* session.messages({ sessionID })).toHaveLength(2)
|
|
}),
|
|
)
|
|
|
|
it.effect("projects reasoning and tool events without executing or continuing tools", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Use tools" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.reasoningStart({ id: "reasoning-1" }),
|
|
LLMEvent.reasoningDelta({ id: "reasoning-1", text: "Think" }),
|
|
LLMEvent.reasoningEnd({ id: "reasoning-1" }),
|
|
LLMEvent.toolInputStart({ id: "call-error", name: "write" }),
|
|
LLMEvent.toolInputDelta({ id: "call-error", name: "write", text: '{"path":"README.md"}' }),
|
|
LLMEvent.toolInputEnd({ id: "call-error", name: "write" }),
|
|
LLMEvent.toolCall({ id: "call-error", name: "write", input: { path: "README.md" }, providerExecuted: true }),
|
|
LLMEvent.toolError({ id: "call-error", name: "write", message: "Denied" }),
|
|
LLMEvent.toolResult({ id: "call-error", name: "write", result: { type: "error", value: "Denied" } }),
|
|
LLMEvent.toolCall({
|
|
id: "call-provider",
|
|
name: "web_search",
|
|
input: { query: "hello" },
|
|
providerExecuted: true,
|
|
providerMetadata: { fake: { source: "provider" } },
|
|
}),
|
|
LLMEvent.toolResult({
|
|
id: "call-provider",
|
|
name: "web_search",
|
|
result: {
|
|
type: "content",
|
|
value: [
|
|
{ type: "text", text: "Hello" },
|
|
{ type: "media", mediaType: "image/png", data: "data:image/png;base64,aGVsbG8=", filename: "hello.png" },
|
|
],
|
|
},
|
|
providerExecuted: true,
|
|
providerMetadata: { fake: { source: "provider" } },
|
|
}),
|
|
LLMEvent.stepFinish({
|
|
index: 0,
|
|
reason: "tool-calls",
|
|
usage: {
|
|
inputTokens: 10,
|
|
nonCachedInputTokens: 8,
|
|
outputTokens: 4,
|
|
reasoningTokens: 1,
|
|
cacheReadInputTokens: 2,
|
|
},
|
|
}),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(requests[0]?.tools.map((tool) => tool.name)).toEqual(["echo", "defect"])
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Use tools" },
|
|
{
|
|
type: "assistant",
|
|
finish: "tool-calls",
|
|
tokens: { input: 8, output: 3, reasoning: 1, cache: { read: 2, write: 0 } },
|
|
content: [
|
|
{ type: "reasoning", id: "reasoning-1", text: "Think" },
|
|
{
|
|
type: "tool",
|
|
id: "call-error",
|
|
name: "write",
|
|
state: {
|
|
status: "error",
|
|
input: { path: "README.md" },
|
|
error: { type: "unknown", message: "Denied" },
|
|
},
|
|
},
|
|
{
|
|
type: "tool",
|
|
id: "call-provider",
|
|
name: "web_search",
|
|
provider: { executed: true, metadata: { fake: { source: "provider" } } },
|
|
state: {
|
|
status: "completed",
|
|
input: { query: "hello" },
|
|
structured: {},
|
|
content: [
|
|
{ type: "text", text: "Hello" },
|
|
{ type: "file", mime: "image/png", source: { type: "data", data: "aGVsbG8=" }, name: "hello.png" },
|
|
],
|
|
},
|
|
},
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("continues with reloaded history after durably settling one local tool call", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo this" }), resume: false })
|
|
|
|
requests.length = 0
|
|
authorizations.length = 0
|
|
executions.length = 0
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.textStart({ id: "text-final" }),
|
|
LLMEvent.textDelta({ id: "text-final", text: "Done" }),
|
|
LLMEvent.textEnd({ id: "text-final" }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
|
|
expect(authorizations).toMatchObject([{ sessionID, call: { id: "call-echo", name: "echo" } }])
|
|
expect(executions).toEqual(["hello"])
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Echo this" },
|
|
{
|
|
type: "assistant",
|
|
finish: "tool-calls",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-echo",
|
|
name: "echo",
|
|
state: {
|
|
status: "completed",
|
|
input: { text: "hello" },
|
|
structured: { text: "hello" },
|
|
content: [{ type: "text", text: "hello" }],
|
|
},
|
|
},
|
|
],
|
|
},
|
|
{ type: "assistant", finish: "stop", content: [{ type: "text", id: "text-final", text: "Done" }] },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("restores durable reasoning provider metadata in a second-turn request", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Think first" }), resume: false })
|
|
|
|
requests.length = 0
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.reasoningStart({ id: "reasoning-anthropic" }),
|
|
LLMEvent.reasoningDelta({ id: "reasoning-anthropic", text: "Signed thought" }),
|
|
LLMEvent.reasoningEnd({ id: "reasoning-anthropic", providerMetadata: { anthropic: { signature: "sig_1" } } }),
|
|
LLMEvent.reasoningStart({
|
|
id: "reasoning-openai",
|
|
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: null } },
|
|
}),
|
|
LLMEvent.reasoningDelta({ id: "reasoning-openai", text: "Encrypted thought" }),
|
|
LLMEvent.reasoningEnd({
|
|
id: "reasoning-openai",
|
|
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } },
|
|
}),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
]
|
|
yield* session.resume(sessionID)
|
|
yield* replaySessionProjection(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Think first" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{ type: "reasoning", text: "Signed thought", providerMetadata: { anthropic: { signature: "sig_1" } } },
|
|
{
|
|
type: "reasoning",
|
|
text: "Encrypted thought",
|
|
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } },
|
|
},
|
|
],
|
|
},
|
|
])
|
|
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests[1]?.messages[1]?.content).toEqual([
|
|
{ type: "reasoning", text: "Signed thought", providerMetadata: { anthropic: { signature: "sig_1" } } },
|
|
{
|
|
type: "reasoning",
|
|
text: "Encrypted thought",
|
|
providerMetadata: { openai: { itemId: "rs_1", reasoningEncryptedContent: "encrypted-state" } },
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("replays durable provider-executed tool results inline in a second-turn request", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Search first" }), resume: false })
|
|
|
|
requests.length = 0
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({
|
|
id: "hosted-search",
|
|
name: "web_search",
|
|
input: { query: "Effect" },
|
|
providerExecuted: true,
|
|
providerMetadata: { openai: { itemId: "hosted-search" } },
|
|
}),
|
|
LLMEvent.toolResult({
|
|
id: "hosted-search",
|
|
name: "web_search",
|
|
result: { type: "json", value: [{ title: "Effect" }] },
|
|
providerExecuted: true,
|
|
providerMetadata: { anthropic: { blockType: "web_search_tool_result" } },
|
|
}),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
]
|
|
yield* session.resume(sessionID)
|
|
yield* replaySessionProjection(sessionID)
|
|
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Continue" }), resume: false })
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "user"])
|
|
expect(requests[1]?.messages[1]?.content).toMatchObject([
|
|
{
|
|
type: "tool-call",
|
|
id: "hosted-search",
|
|
name: "web_search",
|
|
input: { query: "Effect" },
|
|
providerExecuted: true,
|
|
providerMetadata: { openai: { itemId: "hosted-search" } },
|
|
},
|
|
{
|
|
type: "tool-result",
|
|
id: "hosted-search",
|
|
name: "web_search",
|
|
result: { type: "json", value: [{ title: "Effect" }] },
|
|
providerExecuted: true,
|
|
providerMetadata: { anthropic: { blockType: "web_search_tool_result" } },
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("starts recorded local tools eagerly and awaits settlement before continuing", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo five times" }), resume: false })
|
|
|
|
requests.length = 0
|
|
executions.length = 0
|
|
toolExecutionGate = yield* Deferred.make<void>()
|
|
toolExecutionsStarted = yield* Deferred.make<void>()
|
|
const providerGate = yield* Deferred.make<void>()
|
|
response = []
|
|
responses = undefined
|
|
const initial = Stream.fromIterable([
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
...Array.from({ length: 5 }, (_, index) =>
|
|
LLMEvent.toolCall({ id: `call-echo-${index}`, name: "echo", input: { text: `${index}` } }),
|
|
),
|
|
])
|
|
const final = Stream.fromIterable([
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
])
|
|
streamGate = undefined
|
|
responseStream = Stream.concat(
|
|
initial,
|
|
Stream.fromEffect(Deferred.await(providerGate)).pipe(Stream.flatMap(() => final)),
|
|
)
|
|
|
|
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(toolExecutionsStarted)
|
|
|
|
expect(executions).toHaveLength(5)
|
|
expect(maxActiveToolExecutions).toBe(5)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Echo five times" },
|
|
{
|
|
type: "assistant",
|
|
content: Array.from({ length: 5 }, (_, index) => ({
|
|
type: "tool",
|
|
id: `call-echo-${index}`,
|
|
state: { status: "running", input: { text: `${index}` } },
|
|
})),
|
|
},
|
|
])
|
|
|
|
yield* Deferred.succeed(providerGate, undefined)
|
|
yield* Effect.yieldNow
|
|
expect(requests).toHaveLength(1)
|
|
|
|
yield* Deferred.succeed(toolExecutionGate, undefined)
|
|
yield* Fiber.join(run)
|
|
toolExecutionGate = undefined
|
|
toolExecutionsStarted = undefined
|
|
|
|
expect(executions).toHaveLength(5)
|
|
expect(maxActiveToolExecutions).toBe(5)
|
|
expect(requests).toHaveLength(2)
|
|
}),
|
|
)
|
|
|
|
it.effect("settles repeated provider-local tool call IDs against their owning assistant messages", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Echo twice" }), resume: false })
|
|
|
|
requests.length = 0
|
|
executions.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "tool_0", name: "echo", input: { text: "first" } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "tool_0", name: "echo", input: { text: "second" } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[],
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(executions).toEqual(["first", "second"])
|
|
expect(requests).toHaveLength(3)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Echo twice" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "tool_0",
|
|
state: { status: "completed", structured: { text: "first" }, content: [{ type: "text", text: "first" }] },
|
|
},
|
|
],
|
|
},
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "tool_0",
|
|
state: {
|
|
status: "completed",
|
|
structured: { text: "second" },
|
|
content: [{ type: "text", text: "second" }],
|
|
},
|
|
},
|
|
],
|
|
},
|
|
])
|
|
|
|
yield* replaySessionProjection(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Echo twice" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "tool_0",
|
|
state: { status: "completed", structured: { text: "first" }, content: [{ type: "text", text: "first" }] },
|
|
},
|
|
],
|
|
},
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "tool_0",
|
|
state: {
|
|
status: "completed",
|
|
structured: { text: "second" },
|
|
content: [{ type: "text", text: "second" }],
|
|
},
|
|
},
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("joins concurrent resume calls into one active provider run", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run once" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.textStart({ id: "text-once" }),
|
|
LLMEvent.textDelta({ id: "text-once", text: "Once" }),
|
|
LLMEvent.textEnd({ id: "text-once" }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
]
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
const second = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(1)
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
yield* Fiber.join(first)
|
|
yield* Fiber.join(second)
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Run once" },
|
|
{ type: "assistant", finish: "stop", content: [{ type: "text", id: "text-once", text: "Once" }] },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("steers an active provider turn with newly recorded prompts", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Change direction" }) })
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
yield* Fiber.join(first)
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(userTexts(requests[0]!)).toEqual(["Start working"])
|
|
expect(userTexts(requests[1]!)).toEqual(["Start working", "Change direction"])
|
|
expect((yield* session.context(sessionID)).map((message) => message.type)).toEqual([
|
|
"user",
|
|
"assistant",
|
|
"user",
|
|
"assistant",
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("starts queued input after the active activity settles", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-echo", name: "echo", input: { text: "hello" } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Wait until the next activity" }),
|
|
delivery: "queue",
|
|
})
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
yield* Fiber.join(first)
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
|
|
expect(requests).toHaveLength(3)
|
|
expect(userTexts(requests[0]!)).toEqual(["Start working"])
|
|
expect(userTexts(requests[1]!)).toEqual(["Start working"])
|
|
expect(userTexts(requests[2]!)).toEqual(["Start working", "Wait until the next activity"])
|
|
}),
|
|
)
|
|
|
|
it.effect("runs queued active inputs as separate FIFO activities", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue first" }), delivery: "queue" })
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue second" }), delivery: "queue" })
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
yield* Fiber.join(first)
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
|
|
expect(requests).toHaveLength(3)
|
|
expect(userTexts(requests[0]!)).toEqual(["Start working"])
|
|
expect(userTexts(requests[1]!)).toEqual(["Start working", "Queue first"])
|
|
expect(userTexts(requests[2]!)).toEqual(["Start working", "Queue first", "Queue second"])
|
|
}),
|
|
)
|
|
|
|
it.effect("opens queued input after idle steering activity settles", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start steering activity" }), resume: false })
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Queue later activity" }),
|
|
delivery: "queue",
|
|
resume: false,
|
|
})
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(userTexts(requests[0]!)).toEqual(["Start steering activity"])
|
|
expect(userTexts(requests[1]!)).toEqual(["Start steering activity", "Queue later activity"])
|
|
}),
|
|
)
|
|
|
|
it.effect("coalesces steers into the active queued activity before starting the next queued activity", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
const firstGate = yield* Deferred.make<void>()
|
|
const secondGate = yield* Deferred.make<void>()
|
|
streamGate = firstGate
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
while (requests.length < 1) yield* Effect.yieldNow
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue first" }), delivery: "queue" })
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Queue second" }), delivery: "queue" })
|
|
streamGate = secondGate
|
|
yield* Deferred.succeed(firstGate, undefined)
|
|
while (requests.length < 2) yield* Effect.yieldNow
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Steer first queued activity" }) })
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Also steer first queued activity" }) })
|
|
yield* Deferred.succeed(secondGate, undefined)
|
|
yield* Fiber.join(first)
|
|
streamGate = undefined
|
|
|
|
expect(requests).toHaveLength(4)
|
|
expect(userTexts(requests[0]!)).toEqual(["Start working"])
|
|
expect(userTexts(requests[1]!)).toEqual(["Start working", "Queue first"])
|
|
expect(userTexts(requests[2]!)).toEqual([
|
|
"Start working",
|
|
"Queue first",
|
|
"Steer first queued activity",
|
|
"Also steer first queued activity",
|
|
])
|
|
expect(userTexts(requests[3]!)).toEqual([
|
|
"Start working",
|
|
"Queue first",
|
|
"Steer first queued activity",
|
|
"Also steer first queued activity",
|
|
"Queue second",
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("coalesces multiple active steering prompts into one continuation turn", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First steer" }) })
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second steer" }) })
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
yield* Fiber.join(first)
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(userTexts(requests[1]!)).toEqual(["Start working", "First steer", "Second steer"])
|
|
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
|
|
yield* Effect.yieldNow
|
|
expect(requests).toHaveLength(2)
|
|
}),
|
|
)
|
|
|
|
it.effect("runs steering input accepted while the active provider turn fails", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Start working" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
response = []
|
|
streamFailure = providerUnavailable()
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover with this" }) })
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
expect(yield* Fiber.join(first).pipe(Effect.flip)).toBe(streamFailure)
|
|
|
|
streamFailure = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(userTexts(requests[1]!)).toEqual(["Start working", "Recover with this"])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails local tools left running by a prior process before continuing", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover interrupted tool" }), resume: false })
|
|
yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID)
|
|
const assistant = yield* events.publish(SessionEvent.Step.Started, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
agent: "build",
|
|
model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") },
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Input.Started, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-interrupted",
|
|
name: "echo",
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Input.Ended, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-interrupted",
|
|
text: '{"text":"stale"}',
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Called, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-interrupted",
|
|
tool: "echo",
|
|
input: { text: "stale" },
|
|
provider: { executed: false },
|
|
})
|
|
requests.length = 0
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Recover interrupted tool" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-interrupted",
|
|
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
|
|
},
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails hosted tools left running by a prior process before continuing inline", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Recover interrupted hosted tool" }),
|
|
resume: false,
|
|
})
|
|
yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID)
|
|
const assistant = yield* events.publish(SessionEvent.Step.Started, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
agent: "build",
|
|
model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") },
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Input.Started, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-hosted-interrupted",
|
|
name: "web_search",
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Input.Ended, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-hosted-interrupted",
|
|
text: '{"query":"stale"}',
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Called, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-hosted-interrupted",
|
|
tool: "web_search",
|
|
input: { query: "stale" },
|
|
provider: { executed: true, metadata: { openai: { itemId: "call-hosted-interrupted" } } },
|
|
})
|
|
requests.length = 0
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant"])
|
|
expect(requests[0]?.messages[1]?.content).toMatchObject([
|
|
{
|
|
type: "tool-call",
|
|
id: "call-hosted-interrupted",
|
|
providerExecuted: true,
|
|
providerMetadata: { openai: { itemId: "call-hosted-interrupted" } },
|
|
},
|
|
{ type: "tool-result", id: "call-hosted-interrupted", providerExecuted: true, result: { type: "error" } },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails pending tool input left by a prior process before continuing", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Recover interrupted tool input" }),
|
|
resume: false,
|
|
})
|
|
yield* SessionInput.promoteSteers((yield* Database.Service).db, events, sessionID)
|
|
const assistant = yield* events.publish(SessionEvent.Step.Started, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
agent: "build",
|
|
model: { id: ModelV2.ID.make("fake-model"), providerID: ProviderV2.ID.make("fake") },
|
|
})
|
|
yield* events.publish(SessionEvent.Tool.Input.Started, {
|
|
sessionID,
|
|
timestamp: yield* DateTime.now,
|
|
assistantMessageID: assistant.id,
|
|
callID: "call-pending-interrupted",
|
|
name: "echo",
|
|
})
|
|
requests.length = 0
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Recover interrupted tool input" },
|
|
{ type: "assistant", content: [{ type: "tool", id: "call-pending-interrupted", state: { status: "error" } }] },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("starts the first queued activity when woken while idle", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Wait for fresh activity" }),
|
|
delivery: "queue",
|
|
resume: false,
|
|
})
|
|
|
|
requests.length = 0
|
|
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(userTexts(requests[0]!)).toEqual(["Wait for fresh activity"])
|
|
}),
|
|
)
|
|
|
|
it.effect("does not spend one activity step budget across queued activities", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const queued = Array.from({ length: 26 }, (_, index) => `Queued activity ${index + 1}`)
|
|
for (const text of queued) {
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text }), delivery: "queue", resume: false })
|
|
}
|
|
|
|
requests.length = 0
|
|
responses = queued.map(() => [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
])
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(queued.length)
|
|
expect(userTexts(requests.at(-1)!)).toEqual(queued)
|
|
}),
|
|
)
|
|
|
|
it.effect("retries inbox input after prompt projection rolls back", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
const defect = new Error("fail after prompt promotion")
|
|
let fail = true
|
|
yield* events.project(SessionEvent.Prompted, () => (fail ? Effect.die(defect) : Effect.void))
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover promoted input" }), resume: false })
|
|
|
|
expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect)
|
|
fail = false
|
|
requests.length = 0
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
]
|
|
|
|
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
|
|
while (requests.length === 0) yield* Effect.yieldNow
|
|
|
|
expect(userTexts(requests[0]!)).toEqual(["Recover promoted input"])
|
|
}),
|
|
)
|
|
|
|
it.effect("runs different sessions concurrently", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
yield* insertSession(otherSessionID)
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run first" }), resume: false })
|
|
yield* session.prompt({ sessionID: otherSessionID, prompt: new Prompt({ text: "Run second" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
response = []
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
const second = yield* session.resume(otherSessionID).pipe(Effect.forkChild)
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(2)
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
yield* Fiber.join(first)
|
|
yield* Fiber.join(second)
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
}),
|
|
)
|
|
|
|
it.effect("fans out one failed run and allows a later retry", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Retry after failure" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
response = []
|
|
streamFailure = providerUnavailable()
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const first = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
const second = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(1)
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
const [firstExit, secondExit] = yield* Effect.all([Fiber.await(first), Fiber.await(second)])
|
|
expect(secondExit).toEqual(firstExit)
|
|
|
|
streamFailure = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
yield* session.resume(sessionID)
|
|
expect(requests).toHaveLength(2)
|
|
}),
|
|
)
|
|
|
|
it.effect("durably settles local tool failures before continuing", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call missing" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-missing", name: "missing", input: {} }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.textStart({ id: "text-after-error" }),
|
|
LLMEvent.textDelta({ id: "text-after-error", text: "Recovered" }),
|
|
LLMEvent.textEnd({ id: "text-after-error" }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Call missing" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-missing",
|
|
state: { status: "error", error: { message: "Unknown tool: missing" } },
|
|
},
|
|
],
|
|
},
|
|
{ type: "assistant", finish: "stop", content: [{ type: "text", id: "text-after-error", text: "Recovered" }] },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably settles unexpected local tool defects before continuing", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call defect" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-defect", name: "defect", input: {} }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[],
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(2)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Call defect" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-defect",
|
|
state: { status: "error", error: { message: "unexpected tool defect" } },
|
|
},
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("interrupts runner continuation when a question is dismissed", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const registry = yield* ToolRegistry.Service
|
|
const questions = yield* QuestionV2.Service
|
|
const transform = yield* registry.transform()
|
|
yield* transform((editor) =>
|
|
editor.set("question", {
|
|
tool: Tool.make({
|
|
description: "Ask the user",
|
|
parameters: Schema.Struct({}),
|
|
success: Schema.Struct({}),
|
|
}),
|
|
execute: ({ sessionID }) => questions.ask({ sessionID, questions: [] }).pipe(Effect.as({}), Effect.orDie),
|
|
}),
|
|
)
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Ask then stop" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-question", name: "question", input: {} }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
],
|
|
[],
|
|
]
|
|
|
|
const run = yield* session.resume(sessionID).pipe(Effect.exit, Effect.forkChild)
|
|
let pending = yield* questions.list()
|
|
while (pending.length === 0) {
|
|
yield* Effect.yieldNow
|
|
pending = yield* questions.list()
|
|
}
|
|
yield* questions.reject(pending[0]!.id)
|
|
const exit = yield* Fiber.join(run)
|
|
|
|
expect(exit._tag).toBe("Failure")
|
|
if (exit._tag === "Failure") expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
|
|
expect(requests).toHaveLength(1)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Ask then stop" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-question",
|
|
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
|
|
},
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("awaits started local tools before surfacing provider stream failure", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Settle before failing" }), resume: false })
|
|
const failure = providerUnavailable()
|
|
toolExecutionGate = yield* Deferred.make<void>()
|
|
responseStream = Stream.concat(
|
|
Stream.fromIterable([
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-before-failure", name: "echo", input: { text: "settle" } }),
|
|
]),
|
|
Stream.fail(failure),
|
|
)
|
|
|
|
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
while (executions.length === 0) yield* Effect.yieldNow
|
|
yield* Effect.yieldNow
|
|
yield* Deferred.succeed(toolExecutionGate, undefined)
|
|
expect(yield* Fiber.join(run).pipe(Effect.flip)).toBe(failure)
|
|
toolExecutionGate = undefined
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Settle before failing" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{ type: "tool", id: "call-before-failure", state: { status: "completed", structured: { text: "settle" } } },
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails blocked local tools when a provider turn is interrupted", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt blocked tool" }), resume: false })
|
|
executions.length = 0
|
|
toolExecutionGate = yield* Deferred.make<void>()
|
|
responseStream = Stream.concat(
|
|
Stream.fromIterable([
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-before-interrupt", name: "echo", input: { text: "blocked" } }),
|
|
]),
|
|
Stream.never,
|
|
)
|
|
|
|
const runner = yield* SessionRunner.Service
|
|
const run = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild)
|
|
while (executions.length === 0) yield* Effect.yieldNow
|
|
yield* Fiber.interrupt(run)
|
|
toolExecutionGate = undefined
|
|
|
|
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Interrupt blocked tool" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-before-interrupt",
|
|
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
|
|
},
|
|
],
|
|
},
|
|
])
|
|
|
|
yield* replaySessionProjection(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Interrupt blocked tool" },
|
|
{ type: "assistant", content: [{ type: "tool", id: "call-before-interrupt", state: { status: "error" } }] },
|
|
])
|
|
requests.length = 0
|
|
responseStream = undefined
|
|
response = []
|
|
yield* session.resume(sessionID)
|
|
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user", "assistant", "tool"])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails blocked local tools when interrupted while awaiting settlement", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt tool settlement" }), resume: false })
|
|
executions.length = 0
|
|
toolExecutionGate = yield* Deferred.make<void>()
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-await-interrupt", name: "echo", input: { text: "blocked" } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
]
|
|
|
|
const runner = yield* SessionRunner.Service
|
|
const run = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild)
|
|
while (executions.length === 0) yield* Effect.yieldNow
|
|
yield* Fiber.interrupt(run)
|
|
toolExecutionGate = undefined
|
|
|
|
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Interrupt tool settlement" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{
|
|
type: "tool",
|
|
id: "call-await-interrupt",
|
|
state: { status: "error", error: { type: "unknown", message: "Tool execution interrupted" } },
|
|
},
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("fails after the bounded number of local tool continuation steps", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Loop forever" }), resume: false })
|
|
|
|
requests.length = 0
|
|
authorizations.length = 0
|
|
executions.length = 0
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
responses = Array.from({ length: 25 }, (_, index) => [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: `call-echo-${index}`, name: "echo", input: { text: `${index}` } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
])
|
|
|
|
const failure = yield* session.resume(sessionID).pipe(Effect.flip)
|
|
|
|
expect(failure).toMatchObject({ _tag: "SessionRunner.StepLimitExceededError", sessionID, limit: 25 })
|
|
expect(requests).toHaveLength(25)
|
|
expect(executions).toHaveLength(25)
|
|
}),
|
|
)
|
|
|
|
it.effect("does not restart a capped tool loop for a coalesced stale wake", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const coordinator = yield* SessionRunCoordinator.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Loop forever" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = Array.from({ length: 25 }, (_, index) => [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: `call-capped-${index}`, name: "echo", input: { text: `${index}` } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
])
|
|
streamGate = yield* Deferred.make<void>()
|
|
streamStarted = yield* Deferred.make<void>()
|
|
|
|
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
|
|
yield* Deferred.await(streamStarted)
|
|
yield* coordinator.wake(sessionID)
|
|
yield* Deferred.succeed(streamGate, undefined)
|
|
expect(yield* Fiber.join(run).pipe(Effect.flip)).toMatchObject({ _tag: "SessionRunner.StepLimitExceededError" })
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
yield* Effect.yieldNow
|
|
|
|
expect(requests).toHaveLength(25)
|
|
}),
|
|
)
|
|
|
|
it.effect("accepts a terminal response on the final bounded provider turn", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Finish at the limit" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = [
|
|
...Array.from({ length: 24 }, (_, index) => [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: `call-terminal-${index}`, name: "echo", input: { text: `${index}` } }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "tool-calls" }),
|
|
LLMEvent.finish({ reason: "tool-calls" }),
|
|
]),
|
|
[
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
],
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(25)
|
|
}),
|
|
)
|
|
|
|
it.effect("projects provider errors as terminal assistant step failures", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail durably" }), resume: false })
|
|
|
|
requests.length = 0
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = [LLMEvent.stepStart({ index: 0 }), LLMEvent.providerError({ message: "Provider unavailable" })]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Fail durably" },
|
|
{ type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("projects provider errors emitted before assistant step start", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail before step" }), resume: false })
|
|
|
|
requests.length = 0
|
|
response = [LLMEvent.providerError({ message: "Provider unavailable" })]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Fail before step" },
|
|
{ type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("projects raw provider stream failures as terminal assistant step failures", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail raw stream durably" }), resume: false })
|
|
const failure = providerUnavailable()
|
|
responseStream = Stream.fail(failure)
|
|
|
|
expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure)
|
|
yield* replaySessionProjection(sessionID)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Fail raw stream durably" },
|
|
{ type: "assistant", finish: "error", error: { type: "unknown", message: "Provider unavailable" } },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("does not continue automatically after a provider error follows a local tool call", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Do not continue failed provider" }),
|
|
resume: false,
|
|
})
|
|
|
|
requests.length = 0
|
|
const executionCount = executions.length
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({ id: "call-before-provider-error", name: "echo", input: { text: "settled" } }),
|
|
LLMEvent.providerError({ message: "Provider unavailable" }),
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(executions.slice(executionCount)).toEqual(["settled"])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails a hosted tool when its provider errors before returning a result", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool durably" }), resume: false })
|
|
|
|
requests.length = 0
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({
|
|
id: "call-hosted-provider-error",
|
|
name: "web_search",
|
|
input: { query: "effect" },
|
|
providerExecuted: true,
|
|
}),
|
|
LLMEvent.providerError({ message: "Provider unavailable" }),
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(requests).toHaveLength(1)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Fail hosted tool durably" },
|
|
{
|
|
type: "assistant",
|
|
content: [{ type: "tool", id: "call-hosted-provider-error", state: { status: "error" } }],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails a hosted tool left unresolved at normal provider EOF", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail hosted tool at EOF" }), resume: false })
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({
|
|
id: "call-hosted-eof",
|
|
name: "web_search",
|
|
input: { query: "effect" },
|
|
providerExecuted: true,
|
|
}),
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
yield* replaySessionProjection(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Fail hosted tool at EOF" },
|
|
{ type: "assistant", content: [{ type: "tool", id: "call-hosted-eof", state: { status: "error" } }] },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably fails a hosted tool left unresolved by a raw provider stream failure", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: new Prompt({ text: "Fail hosted tool on raw failure" }),
|
|
resume: false,
|
|
})
|
|
const failure = providerUnavailable()
|
|
responseStream = Stream.concat(
|
|
Stream.fromIterable([
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolCall({
|
|
id: "call-hosted-raw-failure",
|
|
name: "web_search",
|
|
input: { query: "effect" },
|
|
providerExecuted: true,
|
|
}),
|
|
]),
|
|
Stream.fail(failure),
|
|
)
|
|
|
|
expect(yield* session.resume(sessionID).pipe(Effect.flip)).toBe(failure)
|
|
yield* replaySessionProjection(sessionID)
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Fail hosted tool on raw failure" },
|
|
{
|
|
type: "assistant",
|
|
finish: "error",
|
|
error: { type: "unknown", message: "Provider unavailable" },
|
|
content: [{ type: "tool", id: "call-hosted-raw-failure", state: { status: "error" } }],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("keeps interleaved assistant text blocks separate", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Two blocks" }), resume: false })
|
|
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.textStart({ id: "text-1" }),
|
|
LLMEvent.textStart({ id: "text-2" }),
|
|
LLMEvent.textDelta({ id: "text-1", text: "First" }),
|
|
LLMEvent.textDelta({ id: "text-2", text: "Second" }),
|
|
LLMEvent.textEnd({ id: "text-1" }),
|
|
LLMEvent.textEnd({ id: "text-2" }),
|
|
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
|
|
LLMEvent.finish({ reason: "stop" }),
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Two blocks" },
|
|
{
|
|
type: "assistant",
|
|
content: [
|
|
{ type: "text", id: "text-1", text: "First" },
|
|
{ type: "text", id: "text-2", text: "Second" },
|
|
],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
for (const kind of fragmentKinds) {
|
|
it.effect(`broadcasts provider ${kind} deltas without storing projection rewrites`, () =>
|
|
verifyEphemeralDeltas(kind),
|
|
)
|
|
|
|
it.effect(`durably closes partial ${kind} when the provider stream fails`, () => verifyPartialFlushOnFailure(kind))
|
|
|
|
it.effect(`durably closes partial ${kind} when the provider stream is interrupted`, () =>
|
|
verifyPartialFlushOnInterruption(kind),
|
|
)
|
|
}
|
|
|
|
it.effect("rejects duplicate streamed text starts", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = [LLMEvent.textStart({ id: "text-1" }), LLMEvent.textStart({ id: "text-1" })]
|
|
|
|
expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(
|
|
"Duplicate text start: text-1",
|
|
)
|
|
}),
|
|
)
|
|
|
|
it.effect("transitions streamed raw tool input to parsed called input", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Call provider tool" }), resume: false })
|
|
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = [
|
|
LLMEvent.stepStart({ index: 0 }),
|
|
LLMEvent.toolInputStart({ id: "call-parsed", name: "web_search" }),
|
|
LLMEvent.toolInputDelta({ id: "call-parsed", name: "web_search", text: '{"query":"hello"}' }),
|
|
LLMEvent.toolInputEnd({ id: "call-parsed", name: "web_search" }),
|
|
LLMEvent.toolCall({ id: "call-parsed", name: "web_search", input: { query: "hello" }, providerExecuted: true }),
|
|
]
|
|
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(yield* session.context(sessionID)).toMatchObject([
|
|
{ type: "user", text: "Call provider tool" },
|
|
{
|
|
type: "assistant",
|
|
content: [{ type: "tool", id: "call-parsed", state: { status: "error", input: { query: "hello" } } }],
|
|
},
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects malformed streamed tool input ordering", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
responses = undefined
|
|
streamGate = undefined
|
|
streamStarted = undefined
|
|
response = [LLMEvent.toolInputDelta({ id: "call-1", name: "read", text: "{}" })]
|
|
|
|
expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(
|
|
"Tool input delta before start: call-1",
|
|
)
|
|
}),
|
|
)
|
|
})
|