From fe840d42b859f6439be8374774cdddbb6f93468b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 22 Jun 2026 18:16:30 +0200 Subject: [PATCH] refactor(core): simplify session run coordination (#33388) --- packages/core/src/session.ts | 38 +- packages/core/src/session/event.ts | 8 - packages/core/src/session/execution.ts | 8 +- packages/core/src/session/execution/local.ts | 15 +- packages/core/src/session/logging.ts | 8 - packages/core/src/session/message-updater.ts | 1 - packages/core/src/session/projector.ts | 1 - packages/core/src/session/run-coordinator.ts | 289 +---- packages/core/src/session/runner/index.ts | 2 +- packages/core/src/session/runner/llm.ts | 6 +- packages/core/test/session-logging.test.ts | 30 - packages/core/test/session-prompt.test.ts | 36 +- .../core/test/session-run-coordinator.test.ts | 998 +++--------------- .../core/test/session-runner-recorded.test.ts | 25 +- packages/core/test/session-runner.test.ts | 31 +- packages/sdk/js/src/v2/gen/types.gen.ts | 53 - packages/sdk/openapi.json | 168 --- specs/v2/schema-changelog.md | 5 + specs/v2/session.md | 12 +- specs/v2/todo.md | 7 +- 20 files changed, 287 insertions(+), 1454 deletions(-) delete mode 100644 packages/core/src/session/logging.ts delete mode 100644 packages/core/test/session-logging.test.ts diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 7454e0fa7..5dae69973 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -1,7 +1,7 @@ export * as SessionV2 from "./session" export * from "./session/schema" -import { Cause, DateTime, Effect, Layer, Schema, Context, Stream } from "effect" +import { DateTime, Effect, Layer, Schema, Context, Stream } from "effect" import { and, asc, desc, eq, gt, like, lt, or, type SQL } from "drizzle-orm" import { ProjectV2 } from "./project" import { WorkspaceV2 } from "./workspace" @@ -25,7 +25,6 @@ import { fromRow } from "./session/info" import { SessionRunner } from "./session/runner/index" import { SessionStore } from "./session/store" import { SessionExecution } from "./session/execution" -import { logFailure } from "./session/logging" import { MessageDecodeError } from "./session/error" import { SessionEvent } from "./session/event" import { SessionInput } from "./session/input" @@ -168,20 +167,6 @@ export const layer = Layer.effect( const store = yield* SessionStore.Service const decodeMessage = Schema.decodeUnknownEffect(SessionMessage.Message) const isDurableSessionEvent = Schema.is(SessionEvent.Durable) - const scope = yield* Effect.scope - - const enqueueWake = (admitted: SessionInput.Admitted) => - execution.wake(admitted.sessionID, admitted.admittedSeq).pipe( - Effect.tapCause((cause) => - Cause.hasInterruptsOnly(cause) - ? Effect.void - : logFailure("Failed to wake Session", admitted.sessionID, cause), - ), - Effect.ignore, - Effect.forkIn(scope, { startImmediately: true }), - Effect.asVoid, - ) - const decode = (row: typeof SessionMessageTable.$inferSelect) => decodeMessage({ ...row.data, id: row.id, type: row.type }).pipe( Effect.mapError( @@ -342,10 +327,6 @@ export const layer = Layer.effect( Effect.uninterruptible( Effect.gen(function* () { yield* result.get(input.sessionID) - const returnPrompt = Effect.fnUntraced(function* (admitted: SessionInput.Admitted) { - if (input.resume !== false) yield* enqueueWake(admitted) - return admitted - }, Effect.uninterruptible) const messageID = input.id ?? SessionMessage.ID.create() const delivery = input.delivery ?? "steer" const expected = { sessionID: input.sessionID, messageID, prompt: input.prompt, delivery } @@ -363,7 +344,8 @@ export const layer = Layer.effect( ) if (!SessionInput.equivalent(admitted, expected)) return yield* new PromptConflictError({ sessionID: input.sessionID, messageID }) - return yield* returnPrompt(admitted) + if (input.resume !== false) yield* execution.wake(admitted.sessionID) + return admitted }), ), ), @@ -404,19 +386,7 @@ export const layer = Layer.effect( yield* execution.resume(sessionID) }), interrupt: Effect.fn("V2Session.interrupt")((sessionID) => - Effect.uninterruptible( - Effect.gen(function* () { - const session = yield* store.get(sessionID) - if (!session) return yield* execution.interrupt(sessionID) - const event = yield* events.publish(SessionEvent.InterruptRequested, { - sessionID, - timestamp: yield* DateTime.now, - }) - if (event.durable === undefined) - return yield* Effect.die("Interrupt request event is missing aggregate sequence") - yield* execution.interrupt(sessionID, event.durable.seq) - }), - ), + Effect.uninterruptible(execution.interrupt(sessionID)), ), }) diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index 5eaf03716..97e334617 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -118,13 +118,6 @@ export namespace PromptLifecycle { export type Promoted = typeof Promoted.Type } -export const InterruptRequested = EventV2.define({ - type: "session.next.interrupt.requested", - ...options, - schema: Base, -}) -export type InterruptRequested = typeof InterruptRequested.Type - export const ContextUpdated = EventV2.define({ type: "session.next.context.updated", ...options, @@ -475,7 +468,6 @@ const DurableDefinitions = [ Prompted, PromptLifecycle.Admitted, PromptLifecycle.Promoted, - InterruptRequested, ContextUpdated, Synthetic, Shell.Started, diff --git a/packages/core/src/session/execution.ts b/packages/core/src/session/execution.ts index 9a99145bf..a08912e95 100644 --- a/packages/core/src/session/execution.ts +++ b/packages/core/src/session/execution.ts @@ -5,12 +5,12 @@ import { SessionRunner } from "./runner/index" import { SessionSchema } from "./schema" export interface Interface { - /** Explicitly drain one Session, making at least one provider attempt. */ + /** Starts execution while idle or joins the active execution. */ readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect - /** Schedule a drain after durable work is recorded. Repeated wakeups may coalesce. */ - readonly wake: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect + /** Registers newly recorded work. Repeated wakeups may coalesce. */ + readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect /** Interrupt active work owned by this process. Idle interruption is a no-op. */ - readonly interrupt: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect + readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect } /** Routes execution from a Session ID to the runner owned by that Session's Location. */ diff --git a/packages/core/src/session/execution/local.ts b/packages/core/src/session/execution/local.ts index 8f1b1763a..7e0e3ca70 100644 --- a/packages/core/src/session/execution/local.ts +++ b/packages/core/src/session/execution/local.ts @@ -1,11 +1,10 @@ -import { Effect, Layer } from "effect" +import { Cause, Effect, Layer } from "effect" import { LocationServiceMap } from "../../location-layer" import { SessionRunCoordinator } from "../run-coordinator" import { SessionRunner } from "../runner" import { SessionSchema } from "../schema" import { SessionStore } from "../store" import { SessionExecution } from "../execution" -import { logFailure } from "../logging" /** Current-process routing for implicit-local Locations. Future remote placement belongs here. */ export const layer = Layer.effect( @@ -13,15 +12,19 @@ export const layer = Layer.effect( Effect.gen(function* () { const store = yield* SessionStore.Service const locations = yield* LocationServiceMap - const coordinator = yield* SessionRunCoordinator.make({ - drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, mode) { + const coordinator = yield* SessionRunCoordinator.make({ + drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, force) { const session = yield* store.get(sessionID) if (!session) return yield* Effect.die(`Session not found: ${sessionID}`) - return yield* SessionRunner.Service.use((runner) => runner.run({ sessionID, force: mode === "run" })).pipe( + return yield* SessionRunner.Service.use((runner) => runner.run({ sessionID, force })).pipe( Effect.provide(locations.get(session.location)), + Effect.tapCause((cause) => + Cause.hasInterruptsOnly(cause) + ? Effect.void + : Effect.logError("Failed to drain Session", cause).pipe(Effect.annotateLogs({ sessionID })), + ), ) }), - onFailure: (sessionID, cause) => logFailure("Failed to drain Session", sessionID, cause), }) return SessionExecution.Service.of({ diff --git a/packages/core/src/session/logging.ts b/packages/core/src/session/logging.ts deleted file mode 100644 index c579ec15d..000000000 --- a/packages/core/src/session/logging.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Cause, Effect } from "effect" -import { SessionSchema } from "./schema" - -export const logFailure = ( - message: "Failed to drain Session" | "Failed to wake Session", - sessionID: SessionSchema.ID, - cause: Cause.Cause, -) => Effect.logError(message, cause).pipe(Effect.annotateLogs({ sessionID })) diff --git a/packages/core/src/session/message-updater.ts b/packages/core/src/session/message-updater.ts index cf1eb2ced..2c836dcd0 100644 --- a/packages/core/src/session/message-updater.ts +++ b/packages/core/src/session/message-updater.ts @@ -138,7 +138,6 @@ export function update(adapter: Adapter, event: SessionEvent.Event) { }, "session.next.prompt.admitted": () => Effect.void, "session.next.prompt.promoted": () => Effect.void, - "session.next.interrupt.requested": () => Effect.void, "session.next.context.updated": (event) => adapter.appendMessage( new SessionMessage.System({ diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 30af9f2e6..50bfad465 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -399,7 +399,6 @@ export const layer = Layer.effectDiscard( ) }), ) - yield* events.project(SessionEvent.InterruptRequested, () => Effect.void) yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event)) yield* events.project(SessionEvent.Synthetic, (event) => run(db, event)) yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event)) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index d52b63e8f..6597842be 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -1,106 +1,46 @@ export * as SessionRunCoordinator from "./run-coordinator" -import { Cause, Context, Deferred, Effect, Exit, Fiber, FiberSet, Layer, Scope } from "effect" -import { SessionRunner } from "./runner" -import { logFailure } from "./logging" -import { SessionSchema } from "./schema" +import { Deferred, Effect, Exit, Fiber, FiberSet, Scope } from "effect" -export type Mode = "run" | "wake" - -/** Why one drain generation should run. Explicit runs dominate advisory wakes when demands coalesce. */ -type Demand = { readonly _tag: "run" } | { readonly _tag: "wake"; readonly seq?: number } - -/** - * Runs at most one drain chain per key while allowing different keys to drain concurrently. - * - * For each key: - * - * idle --run/wake--> draining --run/wake--> draining + one coalesced rerun --> idle - * - * `run` is an explicit drain request. It starts a chain or joins the current chain and - * upgrades a pending follow-up so the caller receives explicit-run semantics. - * - * `wake` reports that durable work may now be available. It starts a chain while idle or - * requests one coalesced follow-up while draining. Repeated wakes collapse together. - * - * `interrupt` stops the current ownership chain. Advisory wakes from before the interrupt - * boundary are suppressed; advisory wakes after the boundary run after cleanup. - */ -export interface Coordinator { - /** Starts or joins one explicit drain generation. */ - readonly run: (key: Key) => Effect.Effect - /** Coalesces one wake-up after durable work is recorded. */ - readonly wake: (key: Key, seq?: number) => Effect.Effect - /** Waits until the current ownership chain settles. */ - readonly awaitIdle: (key: Key) => Effect.Effect - /** Interrupts the active ownership chain without automatically draining pending wakes. */ - readonly interrupt: (key: Key, seq?: number) => Effect.Effect +/** Serializes execution for each key while allowing different keys to run concurrently. */ +export interface Coordinator { + /** Starts execution while idle or joins the active execution. */ + readonly run: (key: Key) => Effect.Effect + /** Registers one coalesced follow-up after newly recorded work. */ + readonly wake: (key: Key) => Effect.Effect + /** Stops active execution and waits for its cleanup. */ + readonly interrupt: (key: Key) => Effect.Effect } -/** One Session's process-local execution lane: one active demand and at most one coalesced follow-up. */ -type Entry = { - readonly done: Deferred.Deferred - readonly settled: Deferred.Deferred> - current: Demand - pending?: Demand - explicitWaiter?: Deferred.Deferred - interruptSeq?: number +type Entry = { + readonly done: Deferred.Deferred owner?: Fiber.Fiber + pendingWake: boolean stopping: boolean } -/** Combines follow-up demand: runs dominate, while wakes retain the newest durable admission sequence. */ -const coalesce = (left: Demand | undefined, right: Demand): Demand => { - if (left?._tag === "run" || right._tag === "run") return { _tag: "run" } - return { _tag: "wake", seq: maxSeq(left?.seq, right.seq) } -} - -const maxSeq = (left: number | undefined, right: number | undefined) => { - if (left === undefined) return right - if (right === undefined) return left - return Math.max(left, right) -} - -/** Constructs a scoped coordinator. Every in-memory transition is synchronous. */ -export const make = (options: { - readonly drain: (key: Key, mode: Mode) => Effect.Effect - readonly onFailure?: (key: Key, cause: Cause.Cause) => Effect.Effect -}): Effect.Effect, never, Scope.Scope> => +export const make = (options: { + readonly drain: (key: Key, force: boolean) => Effect.Effect +}): Effect.Effect, never, Scope.Scope> => Effect.gen(function* () { - const active = new Map>() - const interruptSeq = new Map() - const report = yield* FiberSet.makeRuntime() + const active = new Map>() const fork = yield* FiberSet.makeRuntime() - const shutdown = Deferred.makeUnsafe() - let closed = false - yield* Effect.addFinalizer(() => - Effect.sync(() => { - closed = true - Deferred.doneUnsafe(shutdown, Effect.void) - active.clear() - interruptSeq.clear() - }), - ) - const makeEntry = (current: Demand, explicitWaiter?: Deferred.Deferred): Entry => ({ - done: Deferred.makeUnsafe(), - settled: Deferred.makeUnsafe>(), - current, - explicitWaiter, + const makeEntry = (): Entry => ({ + done: Deferred.makeUnsafe(), + pendingWake: false, stopping: false, }) - const start = (key: Key, entry: Entry, demand: Demand, successor = false) => { + const start = (key: Key, entry: Entry, force: boolean, successor = false) => { const ready = Deferred.makeUnsafe() - const drain = Effect.suspend(() => options.drain(key, demand._tag)) - // Initial work retains immediate-start behavior but cannot run before ownership is published. - // Observer-started successors yield once so synchronous drains cannot recurse on the JS stack. const owner = fork( (successor - ? Effect.yieldNow.pipe(Effect.andThen(drain)) - : Deferred.await(ready).pipe(Effect.andThen(drain)) + ? Effect.yieldNow + : Deferred.await(ready) ).pipe( - Effect.onExit((exit) => Effect.sync(() => settle(key, entry, demand, exit))), + Effect.andThen(Effect.suspend(() => options.drain(key, force))), + Effect.onExit((exit) => Effect.sync(() => settle(key, entry, exit))), Effect.exit, Effect.asVoid, ), @@ -109,176 +49,57 @@ export const make = (options: { if (!successor) Deferred.doneUnsafe(ready, Effect.void) } - const settle = (key: Key, entry: Entry, demand: Demand, exit: Exit.Exit) => { - if (closed) { - Deferred.doneUnsafe(entry.done, exit) - Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) - return - } - if (demand._tag === "run" && entry.explicitWaiter !== undefined) { - Deferred.doneUnsafe(entry.explicitWaiter, exit) - entry.explicitWaiter = undefined - } - if (entry.stopping && demand._tag === "wake" && entry.explicitWaiter !== undefined) { - Deferred.doneUnsafe(entry.explicitWaiter, exit) - entry.explicitWaiter = undefined - } - if (active.get(key) !== entry) { - Deferred.doneUnsafe(entry.done, exit) - Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) - return - } - if (exit._tag === "Success" && !entry.stopping) { - if (entry.pending !== undefined) { - const pending = entry.pending - entry.pending = undefined - entry.current = pending - start(key, entry, pending, true) - return - } - active.delete(key) - Deferred.doneUnsafe(entry.done, exit) - Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) + const settle = (key: Key, entry: Entry, exit: Exit.Exit) => { + if (Exit.isSuccess(exit) && !entry.stopping && entry.pendingWake) { + entry.pendingWake = false + start(key, entry, false, true) return } - const successor = entry.pending !== undefined ? makeEntry(entry.pending, entry.explicitWaiter) : undefined + const successor = entry.pendingWake ? makeEntry() : undefined if (successor === undefined) active.delete(key) - else active.set(key, successor) - if (successor !== undefined) start(key, successor, successor.current, true) - Deferred.doneUnsafe(entry.done, exit) - Deferred.doneUnsafe(entry.settled, Effect.succeed(exit)) - if ( - exit._tag === "Failure" && - !(entry.stopping && Cause.hasInterruptsOnly(exit.cause)) && - demand._tag === "wake" && - options.onFailure !== undefined - ) { - report(Effect.suspend(() => options.onFailure!(key, exit.cause))) + else { + active.set(key, successor) + start(key, successor, false, true) } + Deferred.doneUnsafe(entry.done, exit) } - const wake = (key: Key, seq?: number) => - Effect.sync(() => { - if (closed) return - if (!isAfterInterrupt(key, seq)) return + const run = (key: Key): Effect.Effect => + Effect.uninterruptibleMask((restore) => { const entry = active.get(key) if (entry !== undefined) { - if (!acceptsWake(entry, seq)) return - entry.pending = coalesce(entry.pending, { _tag: "wake", seq }) + if (entry.stopping) return restore(Deferred.await(entry.done).pipe(Effect.andThen(run(key)))) + return restore(Deferred.await(entry.done)) + } + + const next = makeEntry() + active.set(key, next) + start(key, next, true) + return restore(Deferred.await(next.done)) + }) + + const wake = (key: Key) => + Effect.sync(() => { + const entry = active.get(key) + if (entry !== undefined) { + entry.pendingWake = true return } - const next = makeEntry({ _tag: "wake", seq }) + const next = makeEntry() active.set(key, next) - start(key, next, next.current) + start(key, next, false) }) - const awaitIdle = (key: Key): Effect.Effect => - Effect.gen(function* () { - let firstFailure: Cause.Cause | undefined - while (!closed) { - const entry = active.get(key) - if (entry === undefined) break - const exit = yield* Effect.raceFirst( - Deferred.await(entry.settled), - Deferred.await(shutdown).pipe(Effect.as(Exit.void)), - ) - if (closed) break - if (exit._tag === "Failure" && firstFailure === undefined) firstFailure = exit.cause - } - if (firstFailure !== undefined) return yield* Effect.failCause(firstFailure) - }) - - const interrupt = (key: Key, seq?: number): Effect.Effect => + const interrupt = (key: Key): Effect.Effect => Effect.suspend(() => { const entry = active.get(key) - const latest = interruptSeq.get(key) - if (seq !== undefined && latest !== undefined && seq <= latest) - return entry?.stopping && entry.owner !== undefined ? Fiber.interrupt(entry.owner) : Effect.void - if (seq !== undefined) interruptSeq.set(key, seq) if (entry?.owner === undefined) return Effect.void - if ( - seq !== undefined && - entry.current._tag === "wake" && - entry.current.seq !== undefined && - entry.current.seq > seq - ) - return Effect.void - if (entry.stopping) { - entry.interruptSeq = maxSeq(entry.interruptSeq, seq) - suppressPendingAtOrBefore(entry, seq) - return Fiber.interrupt(entry.owner) - } entry.stopping = true - entry.interruptSeq = seq - suppressPendingAtOrBefore(entry, seq) + entry.pendingWake = false return Fiber.interrupt(entry.owner) }) - return { run, wake, awaitIdle, interrupt } - - function run(key: Key): Effect.Effect { - return Effect.uninterruptibleMask((restore) => { - if (closed) return Effect.interrupt - const entry = active.get(key) - if (entry !== undefined) { - if (entry.stopping) { - return restore(Deferred.await(entry.settled).pipe(Effect.andThen(run(key)))) - } - if (entry.current._tag === "wake") { - entry.pending = coalesce(entry.pending, { _tag: "run" }) - entry.explicitWaiter ??= Deferred.makeUnsafe() - return restore(awaitRun(entry.explicitWaiter)) - } - return restore(awaitRun(entry.done)) - } - - const next = makeEntry({ _tag: "run" }) - active.set(key, next) - start(key, next, next.current) - return restore(awaitRun(next.done)) - }) - } - - function awaitRun(done: Deferred.Deferred): Effect.Effect { - return Effect.raceFirst(Deferred.await(done), Deferred.await(shutdown).pipe(Effect.andThen(Effect.interrupt))) - } - - function acceptsWake(entry: Entry, seq: number | undefined) { - return !entry.stopping || (entry.interruptSeq !== undefined && seq !== undefined && seq > entry.interruptSeq) - } - - function isAfterInterrupt(key: Key, seq: number | undefined) { - const latest = interruptSeq.get(key) - return latest === undefined || (seq !== undefined && seq > latest) - } - - function suppressPendingAtOrBefore(entry: Entry, seq: number | undefined) { - if ( - entry.pending?._tag === "wake" && - seq !== undefined && - entry.pending.seq !== undefined && - entry.pending.seq > seq - ) - return - entry.pending = undefined - } + return { run, wake, interrupt } }) - -export interface Interface extends Coordinator {} - -export class Service extends Context.Service()("@opencode/v2/SessionRunCoordinator") {} - -export const layer = Layer.effect( - Service, - SessionRunner.Service.pipe( - Effect.flatMap((runner) => - make({ - drain: (sessionID, mode) => runner.run({ sessionID, force: mode === "run" }), - onFailure: (sessionID, cause) => logFailure("Failed to drain Session", sessionID, cause), - }), - ), - Effect.map(Service.of), - ), -) diff --git a/packages/core/src/session/runner/index.ts b/packages/core/src/session/runner/index.ts index 2210b57b3..634075dd9 100644 --- a/packages/core/src/session/runner/index.ts +++ b/packages/core/src/session/runner/index.ts @@ -21,7 +21,7 @@ export interface Interface { /** Drains eligible durable work. Explicit runs perform one provider attempt even when no work is eligible. */ readonly run: (input: { readonly sessionID: SessionSchema.ID - readonly force?: boolean + readonly force: boolean }) => Effect.Effect } diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index 756a11928..ddd2bf4e1 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -346,14 +346,14 @@ export const layer = Layer.effect( const run = Effect.fn("SessionRunner.run")(function* (input: { readonly sessionID: SessionSchema.ID - readonly force?: boolean + readonly force: boolean }) { const hasSteer = yield* SessionInput.hasPending(db, input.sessionID, "steer") const hasQueue = hasSteer ? false : yield* SessionInput.hasPending(db, input.sessionID, "queue") - if (input.force !== true && !hasSteer && !hasQueue) return + if (!input.force && !hasSteer && !hasQueue) return yield* failInterruptedTools(input.sessionID) let promotion: SessionInput.Delivery | undefined = hasSteer ? "steer" : hasQueue ? "queue" : undefined - let openActivity = input.force === true || hasSteer || hasQueue + let openActivity = input.force || hasSteer || hasQueue while (openActivity) { let needsContinuation = true for (let step = 1; needsContinuation; step++) { diff --git a/packages/core/test/session-logging.test.ts b/packages/core/test/session-logging.test.ts deleted file mode 100644 index 3d6cff2e4..000000000 --- a/packages/core/test/session-logging.test.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { describe, expect, test } from "bun:test" -import { Cause, Effect, Logger } from "effect" -import { logFailure } from "@opencode-ai/core/session/logging" -import { SessionSchema } from "@opencode-ai/core/session/schema" - -describe("Session logging", () => { - for (const message of ["Failed to drain Session", "Failed to wake Session"] as const) { - test(`renders the cause for ${message}`, async () => { - const entries: Array> = [] - const logger = Logger.formatStructured.pipe( - Logger.map((entry): void => { - entries.push(entry) - }), - ) - - await logFailure( - message, - SessionSchema.ID.make("session-123"), - Cause.fail({ _tag: "SessionFailure", detail: { code: "nested-code" } }), - ).pipe(Effect.provide(Logger.layer([logger])), Effect.runPromise) - - expect(entries).toHaveLength(1) - expect(entries[0]?.message).toBe(message) - expect(entries[0]?.annotations).toEqual({ sessionID: "session-123" }) - expect(entries[0]?.cause).toContain("SessionFailure") - expect(entries[0]?.cause).toContain("nested-code") - expect(entries[0]?.cause).not.toContain("[Object") - }) - } -}) diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index c84a3ab30..166b5deed 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -20,9 +20,7 @@ import { testEffect } from "./lib/effect" const executionCalls: SessionV2.ID[] = [] const interruptCalls: SessionV2.ID[] = [] -const interruptSeqs: Array = [] const wakeCalls: SessionV2.ID[] = [] -const wakeSeqs: Array = [] const execution = Layer.succeed( SessionExecution.Service, SessionExecution.Service.of({ @@ -30,15 +28,13 @@ const execution = Layer.succeed( Effect.sync(() => { executionCalls.push(sessionID) }), - interrupt: (sessionID, seq) => + interrupt: (sessionID) => Effect.sync(() => { interruptCalls.push(sessionID) - interruptSeqs.push(seq) }), - wake: (sessionID, seq) => + wake: (sessionID) => Effect.sync(() => { wakeCalls.push(sessionID) - wakeSeqs.push(seq) }), }), ) @@ -109,15 +105,6 @@ const eventCount = (type: string) => ), ) -const interruptEvent = Database.Service.use(({ db }) => - db - .select() - .from(EventTable) - .where(eq(EventTable.type, "session.next.interrupt.requested.1")) - .get() - .pipe(Effect.orDie), -) - describe("SessionV2.prompt", () => { it.effect("delegates execution continuation through SessionExecution", () => Effect.gen(function* () { @@ -131,19 +118,14 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("delegates interruption through SessionExecution", () => + it.effect("delegates process-local interruption through SessionExecution", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service interruptCalls.length = 0 - interruptSeqs.length = 0 yield* session.interrupt(sessionID) expect(interruptCalls).toEqual([sessionID]) - expect(interruptSeqs).toHaveLength(1) - expect(typeof interruptSeqs[0]).toBe("number") - expect(yield* eventCount("session.next.interrupt.requested.1")).toBe(1) - expect(yield* interruptEvent).toMatchObject({ aggregate_id: sessionID, seq: interruptSeqs[0] }) expect(yield* session.messages({ sessionID })).toEqual([]) }), ) @@ -152,11 +134,9 @@ describe("SessionV2.prompt", () => { Effect.gen(function* () { const session = yield* SessionV2.Service interruptCalls.length = 0 - interruptSeqs.length = 0 yield* session.interrupt(SessionV2.ID.make("ses_missing")) expect(interruptCalls).toEqual([SessionV2.ID.make("ses_missing")]) - expect(interruptSeqs).toEqual([undefined]) }), ) @@ -515,13 +495,11 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service executionCalls.length = 0 wakeCalls.length = 0 - wakeSeqs.length = 0 - const admitted = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) }) + yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) }) expect(executionCalls).toEqual([]) expect(wakeCalls).toEqual([sessionID]) - expect(wakeSeqs).toEqual([admitted.admittedSeq]) }), ) @@ -531,9 +509,8 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service executionCalls.length = 0 wakeCalls.length = 0 - wakeSeqs.length = 0 - const admitted = yield* session.prompt({ + yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run explicitly" }), resume: true, @@ -541,7 +518,6 @@ describe("SessionV2.prompt", () => { expect(executionCalls).toEqual([]) expect(wakeCalls).toEqual([sessionID]) - expect(wakeSeqs).toEqual([admitted.admittedSeq]) }), ) @@ -551,13 +527,11 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service executionCalls.length = 0 wakeCalls.length = 0 - wakeSeqs.length = 0 yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Do not run" }), resume: false }) expect(executionCalls).toEqual([]) expect(wakeCalls).toEqual([]) - expect(wakeSeqs).toEqual([]) }), ) }) diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index 39fb5779d..909ba6487 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -1,5 +1,5 @@ import { describe, expect } from "bun:test" -import { Cause, Deferred, Effect, Exit, Fiber, Layer, Scope } from "effect" +import { Cause, Deferred, Effect, Exit, Fiber, Layer } from "effect" import { SessionRunCoordinator } from "@opencode-ai/core/session/run-coordinator" import { testEffect } from "./lib/effect" @@ -22,14 +22,39 @@ describe("SessionRunCoordinator", () => { expect(runs).toBe(1) yield* Deferred.succeed(gate, undefined) - yield* Fiber.join(first) - yield* Fiber.join(second) + yield* Effect.all([Fiber.join(first), Fiber.join(second)]) expect(runs).toBe(1) }), ), ) - it.effect("starts a drain when woken while idle", () => + it.effect("joins a wake-started execution without forcing a successor", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const gate = yield* Deferred.make() + const forces: boolean[] = [] + const coordinator = yield* SessionRunCoordinator.make({ + drain: (_key, force) => + Effect.sync(() => forces.push(force)).pipe( + Effect.andThen(Deferred.succeed(started, undefined)), + Effect.andThen(Deferred.await(gate)), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(started) + const resumed = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Effect.yieldNow + yield* Deferred.succeed(gate, undefined) + yield* Fiber.join(resumed) + + expect(forces).toEqual([false]) + }), + ), + ) + + it.effect("starts execution when woken while idle", () => Effect.scoped( Effect.gen(function* () { const drained = yield* Deferred.make() @@ -41,62 +66,11 @@ describe("SessionRunCoordinator", () => { ), ) - it.effect("does nothing when interrupted while idle", () => - Effect.scoped( - Effect.gen(function* () { - const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.void }) - - yield* coordinator.interrupt("session") - }), - ), - ) - - it.effect("suppresses stale wakes after an idle interrupt boundary", () => - Effect.scoped( - Effect.gen(function* () { - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.sync(() => runs++) }) - - yield* coordinator.interrupt("session", 2) - yield* coordinator.wake("session", 1) - yield* coordinator.awaitIdle("session") - expect(runs).toBe(0) - - yield* coordinator.wake("session", 3) - yield* coordinator.awaitIdle("session") - expect(runs).toBe(1) - }), - ), - ) - - it.effect("does not interrupt a wake newer than the interrupt boundary", () => - Effect.scoped( - Effect.gen(function* () { - const started = yield* Deferred.make() - const gate = yield* Deferred.make() - const interrupted = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Deferred.succeed(started, undefined).pipe( - Effect.andThen(Deferred.await(gate)), - Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)), - ), - }) - - yield* coordinator.wake("session", 3) - yield* Deferred.await(started) - yield* coordinator.interrupt("session", 2) - expect(yield* Deferred.isDone(interrupted)).toBeFalse() - yield* Deferred.succeed(gate, undefined) - yield* coordinator.awaitIdle("session") - }), - ), - ) - - it.effect("preserves a queued wake newer than the interrupt boundary", () => + it.effect("coalesces wakes received during active execution", () => Effect.scoped( Effect.gen(function* () { const firstStarted = yield* Deferred.make() + const firstGate = yield* Deferred.make() const secondStarted = yield* Deferred.make() let runs = 0 const coordinator = yield* SessionRunCoordinator.make({ @@ -104,569 +78,33 @@ describe("SessionRunCoordinator", () => { Effect.sync(() => ++runs).pipe( Effect.flatMap((run) => run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never)) + ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Deferred.await(firstGate))) : Deferred.succeed(secondStarted, undefined), ), ), }) - yield* coordinator.wake("session", 1) + const resumed = yield* coordinator.run("session").pipe(Effect.forkChild) yield* Deferred.await(firstStarted) - yield* coordinator.wake("session", 3) - yield* coordinator.interrupt("session", 2) - yield* Deferred.await(secondStarted) - yield* coordinator.awaitIdle("session").pipe(Effect.exit) - - expect(runs).toBe(2) - }), - ), - ) - - it.effect("interrupts only the requested key", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - const secondGate = yield* Deferred.make() - const secondInterrupted = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: (key: string) => - key === "first" - ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never)) - : Deferred.succeed(secondStarted, undefined).pipe( - Effect.andThen(Deferred.await(secondGate)), - Effect.onInterrupt(() => Deferred.succeed(secondInterrupted, undefined)), - ), - }) - - yield* coordinator.wake("first") - yield* coordinator.wake("second") - yield* Effect.all([Deferred.await(firstStarted), Deferred.await(secondStarted)]) - - yield* coordinator.interrupt("first") - expect(yield* Deferred.isDone(secondInterrupted)).toBeFalse() - yield* Deferred.succeed(secondGate, undefined) - yield* coordinator.awaitIdle("second") - }), - ), - ) - - it.effect("interrupts the active drain and suppresses its queued wake", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const interrupted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)), - ) - : Effect.void, - ), - ), - }) - - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Deferred.await(firstStarted) - yield* coordinator.wake("session") - - yield* coordinator.interrupt("session") - yield* Deferred.await(interrupted) - yield* coordinator.awaitIdle("session") - const exit = yield* Fiber.await(run) - expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() - expect(runs).toBe(1) - yield* coordinator.interrupt("session") - }), - ), - ) - - it.effect("suppresses a wake received during interruption cleanup", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const firstInterrupted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ) - : Deferred.succeed(secondStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const interrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild) - yield* Effect.yieldNow - yield* coordinator.wake("session", 1) - yield* Deferred.await(firstInterrupted) - expect(runs).toBe(1) - yield* Deferred.succeed(cleanupGate, undefined) - yield* Fiber.join(interrupt) - yield* coordinator.awaitIdle("session") - - expect(runs).toBe(1) - yield* coordinator.wake("session", 3) - yield* Deferred.await(secondStarted) - yield* coordinator.awaitIdle("session") - expect(runs).toBe(2) - }), - ), - ) - - it.effect("remembers a wake received after the interrupt boundary during cleanup", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const firstInterrupted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ) - : Deferred.succeed(secondStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const interrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild) - yield* Deferred.await(firstInterrupted) - yield* coordinator.wake("session", 3) - const staleInterrupt = yield* coordinator.interrupt("session", 1).pipe(Effect.forkChild) - expect(runs).toBe(1) - yield* Deferred.succeed(cleanupGate, undefined) - yield* Fiber.join(interrupt) - yield* Fiber.join(staleInterrupt) - yield* Deferred.await(secondStarted) - yield* coordinator.awaitIdle("session") - - expect(runs).toBe(2) - }), - ), - ) - - it.effect("moves the stop barrier forward for repeated interrupts", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const firstInterrupted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ) - : Deferred.succeed(secondStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const firstInterrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild) - yield* Deferred.await(firstInterrupted) - yield* coordinator.wake("session", 3) - const secondInterrupt = yield* coordinator.interrupt("session", 4).pipe(Effect.forkChild) - yield* Deferred.succeed(cleanupGate, undefined) - yield* Fiber.join(firstInterrupt) - yield* Fiber.join(secondInterrupt) - yield* coordinator.awaitIdle("session") - expect(runs).toBe(1) - - yield* coordinator.wake("session", 5) - yield* Deferred.await(secondStarted) - yield* coordinator.awaitIdle("session") - expect(runs).toBe(2) - }), - ), - ) - - it.effect("interrupts an explicit run queued before the interruption request", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 ? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never)) : Effect.void, - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Effect.yieldNow - - yield* coordinator.interrupt("session") - const exit = yield* Fiber.await(run) - expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() - expect(runs).toBe(1) - }), - ), - ) - - it.effect("settles a pre-interrupt explicit run only after active wake cleanup", () => - Effect.scoped( - Effect.gen(function* () { - const started = yield* Deferred.make() - const cleanupStarted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const runSettled = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Deferred.succeed(started, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(started) - const run = yield* coordinator - .run("session") - .pipe(Effect.exit, Effect.ensuring(Deferred.succeed(runSettled, undefined)), Effect.forkChild) - const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.await(cleanupStarted) - - expect(yield* Deferred.isDone(runSettled)).toBeFalse() - yield* Deferred.succeed(cleanupGate, undefined) - const runExit = yield* Fiber.join(run) - expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue() - yield* Fiber.join(interrupt) - }), - ), - ) - - it.effect("starts an explicit run arriving during interrupt cleanup after the stop barrier", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const cleanupStarted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ) - : Deferred.succeed(secondStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.await(cleanupStarted) - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Deferred.succeed(cleanupGate, undefined) - yield* Fiber.join(interrupt) - yield* Fiber.join(run) - yield* Deferred.await(secondStarted) - expect(runs).toBe(2) - }), - ), - ) - - it.effect("interrupts pre-stop waiters and runs post-stop waiters after cleanup", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const cleanupStarted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ) - : Deferred.succeed(secondStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const before = yield* coordinator.run("session").pipe(Effect.exit, Effect.forkChild) - const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.await(cleanupStarted) - const after = yield* coordinator.run("session").pipe(Effect.exit, Effect.forkChild) - yield* Deferred.succeed(cleanupGate, undefined) - - const beforeExit = yield* Fiber.join(before) - expect(Exit.isFailure(beforeExit) && Cause.hasInterruptsOnly(beforeExit.cause)).toBeTrue() - yield* Fiber.join(interrupt) - yield* Fiber.join(after) - yield* Deferred.await(secondStarted) - expect(runs).toBe(2) - }), - ), - ) - - it.effect("waits for interrupt cleanup before settling callers", () => - Effect.scoped( - Effect.gen(function* () { - const started = yield* Deferred.make() - const cleanupStarted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const runSettled = yield* Deferred.make() - const idleSettled = yield* Deferred.make() - const interruptSettled = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Deferred.succeed(started, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ), - }) - - const run = yield* coordinator - .run("session") - .pipe(Effect.ensuring(Deferred.succeed(runSettled, undefined)), Effect.forkChild) - yield* Deferred.await(started) - const idle = yield* coordinator - .awaitIdle("session") - .pipe(Effect.exit, Effect.ensuring(Deferred.succeed(idleSettled, undefined)), Effect.forkChild) - const interrupt = yield* coordinator - .interrupt("session") - .pipe(Effect.ensuring(Deferred.succeed(interruptSettled, undefined)), Effect.forkChild) - yield* Deferred.await(cleanupStarted) - - expect(yield* Deferred.isDone(runSettled)).toBeFalse() - expect(yield* Deferred.isDone(idleSettled)).toBeFalse() - expect(yield* Deferred.isDone(interruptSettled)).toBeFalse() - yield* Deferred.succeed(cleanupGate, undefined) - const runExit = yield* Fiber.await(run) - const idleExit = yield* Fiber.join(idle) - expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue() - expect(Exit.isFailure(idleExit) && Cause.hasInterruptsOnly(idleExit.cause)).toBeTrue() - yield* Fiber.join(interrupt) - }), - ), - ) - - it.effect("joins concurrent interruption requests for one active drain", () => - Effect.scoped( - Effect.gen(function* () { - const started = yield* Deferred.make() - const cleanupStarted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Deferred.succeed(started, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(started) - const first = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.await(cleanupStarted) - const second = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.succeed(cleanupGate, undefined) - - yield* Fiber.join(first) - yield* Fiber.join(second) - }), - ), - ) - - it.effect("does not discard a post-stop explicit run when interrupted again", () => - Effect.scoped( - Effect.gen(function* () { - const firstStarted = yield* Deferred.make() - const cleanupStarted = yield* Deferred.make() - const cleanupGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(firstStarted, undefined).pipe( - Effect.andThen(Effect.never), - Effect.onInterrupt(() => - Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), - ), - ) - : Deferred.succeed(secondStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(firstStarted) - const firstInterrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.await(cleanupStarted) - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - const secondInterrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) - yield* Deferred.succeed(cleanupGate, undefined) - - yield* Effect.all([Fiber.join(firstInterrupt), Fiber.join(secondInterrupt), Fiber.join(run)]) - yield* Deferred.await(secondStarted) - expect(runs).toBe(2) - }), - ), - ) - - it.effect("coalesces wakes received during an active run", () => - Effect.scoped( - Effect.gen(function* () { - const gate = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe(Effect.flatMap((run) => (run === 1 ? Deferred.await(gate) : Effect.void))), - }) - - const first = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Effect.yieldNow yield* Effect.all([coordinator.wake("session"), coordinator.wake("session"), coordinator.wake("session")], { concurrency: "unbounded", }) - yield* Deferred.succeed(gate, undefined) - yield* Fiber.join(first) - - expect(runs).toBe(2) - }), - ), - ) - - it.effect("waits for a coalesced ownership chain to become idle", () => - Effect.scoped( - Effect.gen(function* () { - const firstGate = yield* Deferred.make() - const secondGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - const idleSettled = yield* Deferred.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.await(firstGate) - : Deferred.succeed(secondStarted, undefined).pipe(Effect.andThen(Deferred.await(secondGate))), - ), - ), - }) - - yield* coordinator.wake("session") - const idle = yield* coordinator - .awaitIdle("session") - .pipe(Effect.andThen(Deferred.succeed(idleSettled, undefined)), Effect.forkChild) - yield* coordinator.wake("session") yield* Deferred.succeed(firstGate, undefined) yield* Deferred.await(secondStarted) - expect(yield* Deferred.isDone(idleSettled)).toBeFalse() - yield* Deferred.succeed(secondGate, undefined) - yield* Fiber.join(idle) + yield* Fiber.join(resumed) expect(runs).toBe(2) }), ), ) - it.effect("reports the first defect after a failed chain becomes idle", () => - Effect.scoped( - Effect.gen(function* () { - const firstGate = yield* Deferred.make() - const secondGate = yield* Deferred.make() - const secondStarted = yield* Deferred.make() - const defect = new Error("defect") - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => - Effect.sync(() => ++runs).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.await(firstGate).pipe(Effect.andThen(Effect.die(defect))) - : Deferred.succeed(secondStarted, undefined).pipe(Effect.andThen(Deferred.await(secondGate))), - ), - ), - }) - - yield* coordinator.wake("session") - const idle = yield* coordinator - .awaitIdle("session") - .pipe(Effect.catchDefect(Effect.succeed), Effect.forkChild({ startImmediately: true })) - yield* coordinator.wake("session") - yield* Deferred.succeed(firstGate, undefined) - yield* Deferred.await(secondStarted) - yield* Deferred.succeed(secondGate, undefined) - - expect(yield* Fiber.join(idle)).toBe(defect) - expect(runs).toBe(2) - }), - ), - ) - - it.effect("runs again when woken during the coalesced drain", () => + it.effect("runs again when woken during the follow-up", () => Effect.scoped( Effect.gen(function* () { const firstGate = yield* Deferred.make() const secondStarted = yield* Deferred.make() const secondGate = yield* Deferred.make() + const thirdStarted = yield* Deferred.make() let runs = 0 const coordinator = yield* SessionRunCoordinator.make({ drain: () => @@ -676,196 +114,168 @@ describe("SessionRunCoordinator", () => { ? Deferred.await(firstGate) : run === 2 ? Deferred.succeed(secondStarted, undefined).pipe(Effect.andThen(Deferred.await(secondGate))) - : Effect.void, + : Deferred.succeed(thirdStarted, undefined), ), ), }) - const first = yield* coordinator.run("session").pipe(Effect.forkChild) + const resumed = yield* coordinator.run("session").pipe(Effect.forkChild) yield* Effect.yieldNow yield* coordinator.wake("session") yield* Deferred.succeed(firstGate, undefined) yield* Deferred.await(secondStarted) yield* coordinator.wake("session") yield* Deferred.succeed(secondGate, undefined) - yield* Fiber.join(first) + yield* Deferred.await(thirdStarted) + yield* Fiber.join(resumed) expect(runs).toBe(3) }), ), ) - it.effect("starts one successor after a wake races with failure", () => + it.effect("does nothing when interrupted while idle", () => + Effect.scoped( + Effect.gen(function* () { + const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.void }) + yield* coordinator.interrupt("session") + }), + ), + ) + + it.effect("interrupts active execution and clears its pending wake", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const interrupted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.andThen(Deferred.succeed(started, undefined)), + Effect.andThen(Effect.never), + Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)), + ), + }) + + const resumed = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Deferred.await(started) + yield* coordinator.wake("session") + yield* coordinator.interrupt("session") + yield* Deferred.await(interrupted) + + const exit = yield* Fiber.await(resumed) + expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue() + expect(runs).toBe(1) + }), + ), + ) + + it.effect("runs a wake registered during interruption cleanup", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => ++runs).pipe( + Effect.flatMap((run) => + run === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined), + ), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + yield* coordinator.wake("session") + yield* Deferred.succeed(cleanupGate, undefined) + yield* Fiber.join(interrupt) + yield* Deferred.await(secondStarted) + + expect(runs).toBe(2) + }), + ), + ) + + it.effect("starts a resume registered during interruption cleanup", () => + Effect.scoped( + Effect.gen(function* () { + const firstStarted = yield* Deferred.make() + const cleanupStarted = yield* Deferred.make() + const cleanupGate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() + const forces: boolean[] = [] + const coordinator = yield* SessionRunCoordinator.make({ + drain: (_key, force) => { + forces.push(force) + return forces.length === 1 + ? Deferred.succeed(firstStarted, undefined).pipe( + Effect.andThen(Effect.never), + Effect.onInterrupt(() => + Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))), + ), + ) + : Deferred.succeed(secondStarted, undefined) + }, + }) + + yield* coordinator.wake("session") + yield* Deferred.await(firstStarted) + const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild) + yield* Deferred.await(cleanupStarted) + const resumed = yield* coordinator.run("session").pipe(Effect.forkChild) + yield* Deferred.succeed(cleanupGate, undefined) + yield* Effect.all([Fiber.join(interrupt), Fiber.join(resumed)]) + yield* Deferred.await(secondStarted) + + expect(forces).toEqual([false, true]) + }), + ), + ) + + it.effect("starts one follow-up when a wake races with failure", () => Effect.scoped( Effect.gen(function* () { const gate = yield* Deferred.make() + const secondStarted = yield* Deferred.make() const failure = new Error("failed") let runs = 0 const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.sync(() => ++runs).pipe( Effect.flatMap((run) => - run === 1 ? Deferred.await(gate).pipe(Effect.andThen(Effect.fail(failure))) : Effect.void, + run === 1 + ? Deferred.await(gate).pipe(Effect.andThen(Effect.fail(failure))) + : Deferred.succeed(secondStarted, undefined), ), ), }) - const first = yield* coordinator.run("session").pipe(Effect.forkChild) + const resumed = yield* coordinator.run("session").pipe(Effect.forkChild) yield* Effect.yieldNow yield* coordinator.wake("session") yield* Deferred.succeed(gate, undefined) - expect(yield* Fiber.join(first).pipe(Effect.flip)).toBe(failure) - yield* Effect.yieldNow + expect(yield* Fiber.join(resumed).pipe(Effect.flip)).toBe(failure) + yield* Deferred.await(secondStarted) expect(runs).toBe(2) }), ), ) - it.effect("upgrades an active wake when an explicit run joins it", () => - Effect.scoped( - Effect.gen(function* () { - const wakeStarted = yield* Deferred.make() - const wakeGate = yield* Deferred.make() - const modes: SessionRunCoordinator.Mode[] = [] - const coordinator = yield* SessionRunCoordinator.make({ - drain: (_key, mode) => - Effect.sync(() => modes.push(mode)).pipe( - Effect.andThen( - mode === "wake" - ? Deferred.succeed(wakeStarted, undefined).pipe(Effect.andThen(Deferred.await(wakeGate))) - : Effect.void, - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(wakeStarted) - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Deferred.succeed(wakeGate, undefined) - yield* Fiber.join(run) - - expect(modes).toEqual(["wake", "run"]) - }), - ), - ) - - it.effect("upgrades a recursive wake drain when an explicit run joins it", () => - Effect.scoped( - Effect.gen(function* () { - const runGate = yield* Deferred.make() - const wakeStarted = yield* Deferred.make() - const wakeGate = yield* Deferred.make() - const forcedStarted = yield* Deferred.make() - const modes: SessionRunCoordinator.Mode[] = [] - const coordinator = yield* SessionRunCoordinator.make({ - drain: (_key, mode) => - Effect.gen(function* () { - modes.push(mode) - if (modes.length === 1) return yield* Deferred.await(runGate) - if (modes.length === 2) - return yield* Deferred.succeed(wakeStarted, undefined).pipe(Effect.andThen(Deferred.await(wakeGate))) - yield* Deferred.succeed(forcedStarted, undefined) - }), - }) - - const first = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Effect.yieldNow - yield* coordinator.wake("session") - yield* Deferred.succeed(runGate, undefined) - yield* Deferred.await(wakeStarted) - const second = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Deferred.succeed(wakeGate, undefined) - yield* Deferred.await(forcedStarted) - yield* Fiber.join(first) - yield* Fiber.join(second) - - expect(modes).toEqual(["run", "wake", "run"]) - }), - ), - ) - - it.effect("propagates an upgraded explicit run failure before a successful advisory successor", () => - Effect.scoped( - Effect.gen(function* () { - const wakeStarted = yield* Deferred.make() - const wakeGate = yield* Deferred.make() - const runStarted = yield* Deferred.make() - const runGate = yield* Deferred.make() - const advisoryStarted = yield* Deferred.make() - const failure = new Error("explicit run failed") - const modes: SessionRunCoordinator.Mode[] = [] - const coordinator = yield* SessionRunCoordinator.make({ - drain: (_key, mode) => - Effect.sync(() => modes.push(mode)).pipe( - Effect.flatMap((run) => - run === 1 - ? Deferred.succeed(wakeStarted, undefined).pipe(Effect.andThen(Deferred.await(wakeGate))) - : run === 2 - ? Deferred.succeed(runStarted, undefined).pipe( - Effect.andThen(Deferred.await(runGate)), - Effect.andThen(Effect.fail(failure)), - ) - : Deferred.succeed(advisoryStarted, undefined), - ), - ), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(wakeStarted) - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Deferred.succeed(wakeGate, undefined) - yield* Deferred.await(runStarted) - yield* coordinator.wake("session") - yield* Deferred.succeed(runGate, undefined) - yield* Deferred.await(advisoryStarted) - - expect(yield* Fiber.join(run).pipe(Effect.flip)).toBe(failure) - expect(modes).toEqual(["wake", "run", "wake"]) - }), - ), - ) - - it.effect("settles active callers when its owning scope closes", () => - Effect.gen(function* () { - const scope = yield* Scope.make() - const started = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)), - }).pipe(Scope.provide(scope)) - - const run = yield* coordinator.run("session").pipe(Effect.forkChild) - yield* Deferred.await(started) - const idle = yield* coordinator.awaitIdle("session").pipe(Effect.forkChild) - yield* Effect.yieldNow - yield* Scope.close(scope, Exit.void) - - const runExit = yield* Fiber.await(run) - const idleExit = yield* Fiber.await(idle) - expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue() - expect(Exit.isSuccess(idleExit)).toBeTrue() - }), - ) - - it.effect("does not start work after its owning scope closes", () => - Effect.gen(function* () { - const scope = yield* Scope.make() - let runs = 0 - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => Effect.sync(() => runs++), - }).pipe(Scope.provide(scope)) - yield* Scope.close(scope, Exit.void) - - yield* coordinator.wake("session") - yield* coordinator.awaitIdle("session") - const runExit = yield* coordinator.run("session").pipe(Effect.exit) - - expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue() - expect(runs).toBe(0) - }), - ) - - it.effect("does not cancel the owner when one joined waiter is interrupted", () => + it.effect("does not cancel execution when a joined waiter is interrupted", () => Effect.scoped( Effect.gen(function* () { const gate = yield* Deferred.make() @@ -904,105 +314,29 @@ describe("SessionRunCoordinator", () => { const second = yield* coordinator.run("second").pipe(Effect.forkChild) yield* Deferred.await(bothStarted) yield* Deferred.succeed(gate, undefined) - yield* Fiber.join(first) - yield* Fiber.join(second) + yield* Effect.all([Fiber.join(first), Fiber.join(second)]) }), ), ) - it.effect("reports an advisory drain failure exactly once", () => - Effect.scoped( - Effect.gen(function* () { - const failure = new Error("wake failed") - const reported: Cause.Cause[] = [] - const reportedOnce = yield* Deferred.make() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => Effect.fail(failure), - onFailure: (_key, cause) => - Effect.sync(() => reported.push(cause)).pipe(Effect.andThen(Deferred.succeed(reportedOnce, undefined))), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(reportedOnce) - yield* Effect.yieldNow - - expect(reported).toHaveLength(1) - expect(Cause.squash(reported[0]!)).toBe(failure) - }), - ), - ) - - it.effect("contains defects thrown while constructing an advisory failure report", () => - Effect.scoped( - Effect.gen(function* () { - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => Effect.fail(new Error("wake failed")), - onFailure: () => { - throw new Error("report defect") - }, - }) - - yield* coordinator.wake("session") - yield* coordinator.awaitIdle("session").pipe(Effect.exit) - yield* coordinator.wake("session") - yield* coordinator.awaitIdle("session").pipe(Effect.exit) - }), - ), - ) - - it.effect("reports an independently interrupted advisory drain", () => - Effect.scoped( - Effect.gen(function* () { - const reported = yield* Deferred.make>() - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => Effect.interrupt, - onFailure: (_key, cause) => Deferred.succeed(reported, cause).pipe(Effect.asVoid), - }) - - yield* coordinator.wake("session") - - expect(Cause.hasInterruptsOnly(yield* Deferred.await(reported))).toBeTrue() - }), - ), - ) - - it.effect("does not report deliberate interruption as an advisory failure", () => - Effect.scoped( - Effect.gen(function* () { - const started = yield* Deferred.make() - const reported: Cause.Cause[] = [] - const coordinator = yield* SessionRunCoordinator.make({ - drain: () => Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)), - onFailure: (_key, cause) => Effect.sync(() => reported.push(cause)), - }) - - yield* coordinator.wake("session") - yield* Deferred.await(started) - yield* coordinator.interrupt("session") - yield* Effect.yieldNow - - expect(reported).toEqual([]) - }), - ), - ) - - it.effect("trampolines many synchronous self-waking drains", () => + it.effect("trampolines synchronous self-waking execution", () => Effect.scoped( Effect.gen(function* () { const limit = 20_000 + const completed = yield* Deferred.make() let runs = 0 let wake: (key: string) => Effect.Effect = () => Effect.void - const coordinator = yield* SessionRunCoordinator.make({ + const coordinator = yield* SessionRunCoordinator.make({ drain: (key) => Effect.sync(() => ++runs).pipe( - Effect.tap((run) => (run < limit ? wake(key) : Effect.void)), + Effect.tap((run) => (run < limit ? wake(key) : Deferred.succeed(completed, undefined))), Effect.asVoid, ), }) wake = coordinator.wake yield* coordinator.wake("session") - yield* coordinator.awaitIdle("session") + yield* Deferred.await(completed) expect(runs).toBe(limit) }), diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index 91d7a2447..65e90cb6d 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -16,6 +16,7 @@ import { Prompt } from "@opencode-ai/core/session/prompt" import { SessionProjector } from "@opencode-ai/core/session/projector" import { SessionExecution } from "@opencode-ai/core/session/execution" import { SessionRunCoordinator } from "@opencode-ai/core/session/run-coordinator" +import { SessionRunner } from "@opencode-ai/core/session/runner" import * as SessionRunnerLLM from "@opencode-ai/core/session/runner/llm" import { SessionRunnerModel } from "@opencode-ai/core/session/runner/model" import { ToolRegistry } from "@opencode-ai/core/tool/registry" @@ -83,19 +84,20 @@ const runner = SessionRunnerLLM.defaultLayer.pipe( Layer.provide(referenceGuidance), Layer.provide(config), ) -const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner)) const execution = Layer.effect( SessionExecution.Service, - SessionRunCoordinator.Service.pipe( - Effect.map((coordinator) => - SessionExecution.Service.of({ - resume: coordinator.run, - wake: coordinator.wake, - interrupt: coordinator.interrupt, - }), - ), - ), -).pipe(Layer.provide(coordinator)) + Effect.gen(function* () { + const sessionRunner = yield* SessionRunner.Service + const coordinator = yield* SessionRunCoordinator.make({ + drain: (sessionID, force) => sessionRunner.run({ sessionID, force }), + }) + return SessionExecution.Service.of({ + resume: coordinator.run, + wake: coordinator.wake, + interrupt: coordinator.interrupt, + }) + }), +).pipe(Layer.provide(runner)) const sessions = SessionV2.layer.pipe( Layer.provide(EventV2.defaultLayer), Layer.provide(Database.defaultLayer), @@ -120,7 +122,6 @@ const it = testEffect( skillGuidance, config, runner, - coordinator, execution, sessions, ), diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index f37a4c357..6e97ab793 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -244,19 +244,20 @@ const runner = SessionRunnerLLM.layer.pipe( Layer.provide(referenceGuidance), Layer.provide(config), ) -const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner)) const execution = Layer.effect( SessionExecution.Service, - SessionRunCoordinator.Service.pipe( - Effect.map((coordinator) => - SessionExecution.Service.of({ - resume: coordinator.run, - wake: coordinator.wake, - interrupt: coordinator.interrupt, - }), - ), - ), -).pipe(Layer.provide(coordinator)) + Effect.gen(function* () { + const sessionRunner = yield* SessionRunner.Service + const coordinator = yield* SessionRunCoordinator.make({ + drain: (sessionID, force) => sessionRunner.run({ sessionID, force }), + }) + return SessionExecution.Service.of({ + resume: coordinator.run, + wake: coordinator.wake, + interrupt: coordinator.interrupt, + }) + }), +).pipe(Layer.provide(runner)) const sessions = SessionV2.layer.pipe( Layer.provide(EventV2.defaultLayer), Layer.provide(Database.defaultLayer), @@ -283,7 +284,6 @@ const it = testEffect( skillGuidance, config, runner, - coordinator, execution, sessions, ), @@ -681,7 +681,6 @@ describe("SessionRunnerLLM", () => { systemUnavailable = false yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "First" }) }) - yield* (yield* SessionRunCoordinator.Service).awaitIdle(sessionID) expect(requests).toHaveLength(1) expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user"]) @@ -2161,7 +2160,7 @@ describe("SessionRunnerLLM", () => { expect(requests).toHaveLength(2) expect(userTexts(requests[1]!)).toEqual(["Start working", "First steer", "Second steer"]) - yield* (yield* SessionRunCoordinator.Service).wake(sessionID) + yield* (yield* SessionExecution.Service).wake(sessionID) yield* Effect.yieldNow expect(requests).toHaveLength(2) }), @@ -2367,7 +2366,7 @@ describe("SessionRunnerLLM", () => { }) requests.length = 0 - yield* (yield* SessionRunCoordinator.Service).wake(sessionID) + yield* (yield* SessionExecution.Service).wake(sessionID) yield* Effect.yieldNow expect(requests).toHaveLength(1) @@ -2417,7 +2416,7 @@ describe("SessionRunnerLLM", () => { LLMEvent.finish({ reason: "stop" }), ] - yield* (yield* SessionRunCoordinator.Service).wake(sessionID) + yield* (yield* SessionExecution.Service).wake(sessionID) while (requests.length === 0) yield* Effect.yieldNow expect(userTexts(requests[0]!)).toEqual(["Recover promoted input"]) diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index b2900e8d6..e27957f88 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -21,7 +21,6 @@ export type Event = | EventSessionNextPrompted | EventSessionNextPromptAdmitted | EventSessionNextPromptPromoted - | EventSessionNextInterruptRequested | EventSessionNextContextUpdated | EventSessionNextSynthetic | EventSessionNextShellStarted @@ -876,14 +875,6 @@ export type GlobalEvent = { timeCreated: number } } - | { - id: string - type: "session.next.interrupt.requested" - properties: { - timestamp: number - sessionID: string - } - } | { id: string type: "session.next.context.updated" @@ -1638,7 +1629,6 @@ export type GlobalEvent = { | SyncEventSessionNextPrompted | SyncEventSessionNextPromptAdmitted | SyncEventSessionNextPromptPromoted - | SyncEventSessionNextInterruptRequested | SyncEventSessionNextContextUpdated | SyncEventSessionNextSynthetic | SyncEventSessionNextShellStarted @@ -2781,7 +2771,6 @@ export type V2Event = | V2EventSessionNextPrompted | V2EventSessionNextPromptAdmitted | V2EventSessionNextPromptPromoted - | V2EventSessionNextInterruptRequested | V2EventSessionNextContextUpdated | V2EventSessionNextSynthetic | V2EventSessionNextShellStarted @@ -3249,21 +3238,6 @@ export type SyncEventSessionNextPromptPromoted = { } } -export type SyncEventSessionNextInterruptRequested = { - type: "sync" - id: string - syncEvent: { - type: "session.next.interrupt.requested.1" - id: string - seq: number - aggregateID: string - data: { - timestamp: number - sessionID: string - } - } -} - export type SyncEventSessionNextContextUpdated = { type: "sync" id: string @@ -4570,24 +4544,6 @@ export type V2EventSessionNextPromptPromoted = { } } -export type V2EventSessionNextInterruptRequested = { - id: string - metadata?: { - [key: string]: unknown - } - durable?: { - aggregateID: string - seq: number - version: number - } - location?: LocationRef - type: "session.next.interrupt.requested" - data: { - timestamp: number - sessionID: string - } -} - export type V2EventSessionNextContextUpdated = { id: string metadata?: { @@ -6224,15 +6180,6 @@ export type EventSessionNextPromptPromoted = { } } -export type EventSessionNextInterruptRequested = { - id: string - type: "session.next.interrupt.requested" - properties: { - timestamp: number - sessionID: string - } -} - export type EventSessionNextContextUpdated = { id: string type: "session.next.context.updated" diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index d9aee6051..7501fc4b9 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -14689,9 +14689,6 @@ { "$ref": "#/components/schemas/EventSessionNextPromptPromoted" }, - { - "$ref": "#/components/schemas/EventSessionNextInterruptRequested" - }, { "$ref": "#/components/schemas/EventSessionNextContextUpdated" }, @@ -17297,35 +17294,6 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, - { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.interrupt.requested"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - } - }, - "required": ["timestamp", "sessionID"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, { "type": "object", "properties": { @@ -19867,9 +19835,6 @@ { "$ref": "#/components/schemas/SyncEventSessionNextPromptPromoted" }, - { - "$ref": "#/components/schemas/SyncEventSessionNextInterruptRequested" - }, { "$ref": "#/components/schemas/SyncEventSessionNextContextUpdated" }, @@ -23111,9 +23076,6 @@ { "$ref": "#/components/schemas/V2EventSessionNextPromptPromoted" }, - { - "$ref": "#/components/schemas/V2EventSessionNextInterruptRequested" - }, { "$ref": "#/components/schemas/V2EventSessionNextContextUpdated" }, @@ -24498,56 +24460,6 @@ "required": ["type", "id", "syncEvent"], "additionalProperties": false }, - "SyncEventSessionNextInterruptRequested": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["sync"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "syncEvent": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["session.next.interrupt.requested.1"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "seq": { - "type": "number" - }, - "aggregateID": { - "type": "string" - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - } - }, - "required": ["timestamp", "sessionID"], - "additionalProperties": false - } - }, - "required": ["type", "id", "seq", "aggregateID", "data"], - "additionalProperties": false - } - }, - "required": ["type", "id", "syncEvent"], - "additionalProperties": false - }, "SyncEventSessionNextContextUpdated": { "type": "object", "properties": { @@ -28771,57 +28683,6 @@ "required": ["id", "type", "data"], "additionalProperties": false }, - "V2EventSessionNextInterruptRequested": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "metadata": { - "type": "object" - }, - "durable": { - "type": "object", - "properties": { - "aggregateID": { - "type": "string" - }, - "seq": { - "type": "integer" - }, - "version": { - "type": "integer" - } - }, - "required": ["aggregateID", "seq", "version"], - "additionalProperties": false - }, - "location": { - "$ref": "#/components/schemas/LocationRef" - }, - "type": { - "type": "string", - "enum": ["session.next.interrupt.requested"] - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - } - }, - "required": ["timestamp", "sessionID"], - "additionalProperties": false - } - }, - "required": ["id", "type", "data"], - "additionalProperties": false - }, "V2EventSessionNextContextUpdated": { "type": "object", "properties": { @@ -33460,35 +33321,6 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, - "EventSessionNextInterruptRequested": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.interrupt.requested"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - } - }, - "required": ["timestamp", "sessionID"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, "EventSessionNextContextUpdated": { "type": "object", "properties": { diff --git a/specs/v2/schema-changelog.md b/specs/v2/schema-changelog.md index a9a08b501..f32948c82 100644 --- a/specs/v2/schema-changelog.md +++ b/specs/v2/schema-changelog.md @@ -1,5 +1,10 @@ # V2 Schema Changelog +## 2026-06-22: Make Session Interruption Process-Local + +- Remove the unprojected `session.next.interrupt.requested.1` event from the experimental durable Session event union and generated SDK. +- No canonical V1 data requires migration; experimental V2 event history containing the retired event is disposable. + ## 2026-06-05: Execute Automatic Session Compaction - Trigger automatic compaction before provider turns using the complete estimated request and absolute model-aware headroom. diff --git a/specs/v2/session.md b/specs/v2/session.md index 43c93b175..ea22e6008 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -20,10 +20,10 @@ sessions.prompt({ id?, sessionID, prompt, delivery?, resume? }) -> resume false admits only sessions.interrupt(sessionID) - -> interrupts the active ownership chain on this process - -> waits for active drain cleanup and settlement - -> suppresses reruns already queued before interruption - -> preserves durable inbox rows for a later fresh wake or resume + -> interrupts active execution on this process + -> waits for runner cleanup and settlement + -> clears a coalesced follow-up wake already registered with this coordinator + -> preserves durable inbox rows for a later wake or resume -> idle or missing Session is a no-op ``` @@ -152,12 +152,12 @@ Inbox delivery is explicit: Execution has two entry points: -- `run` is an explicit resume. It joins an active drain chain or starts one, and performs at least one provider attempt even when no input is eligible. +- `run` is an explicit resume. It joins any active execution or starts a forced drain while idle. A forced drain bypasses the no-eligible-input guard, but preparation may still fail before a provider attempt. - `wake` reports newly recorded durable inbox work. Repeated wakes coalesce. A wake calls the provider only when it can promote eligible input. Post-crash activity recovery is intentionally deferred. A wake does not infer that ambiguous provider work is safe to retry after an input has already been promoted. Explicit `run` may deliberately continue from durable projected history. A future recovery slice should model durable activity identity, provider-dispatch ambiguity, required continuation, queue-opener reservation, retry policy, and visible recovery status together. -A process-global `SessionRunCoordinator` serializes each local Session drain chain while allowing different Sessions to drain concurrently. It enters the Session's current Location only when a drain starts, so interruption targets process execution ownership rather than Location cache identity. Interruption establishes a local ownership-chain boundary by stopping the current chain while preserving pending/unpromoted durable inbox rows for a later fresh wake and projected history for explicit resume. A Location runner also fences every new provider turn against its captured Location so a moved Session cannot begin another turn through source-Location tools or context. An already-dispatched provider turn may still settle source-Location calls until a future move-control slice interrupts active ownership. Automatic startup discovery, durable multi-node ownership, stale-owner fencing, and retry policy remain future work. +A process-global `SessionRunCoordinator` serializes execution for each local Session while allowing different Sessions to run concurrently. Resumes join active execution, overlapping wakes coalesce into one follow-up, and interruption stops current process-local execution without deleting durable inbox work. The runner enters the Session's current Location when execution starts and fences each new provider turn against that Location. Inbox promotion coalesces pending steers in durable admission order and opens one queued activity at a time in FIFO order. Add explicit inbox backlog and steering-batch limits before exposing broad multi-caller admission or untrusted queue growth. diff --git a/specs/v2/todo.md b/specs/v2/todo.md index 5d77cbea3..893b1dc2d 100644 --- a/specs/v2/todo.md +++ b/specs/v2/todo.md @@ -33,12 +33,7 @@ through legacy `SessionPrompt.loop(...)`: Prompt admission now uses a durable `session_input` inbox rather than immediate transcript projection. `steer` inputs coalesce into the active activity at the next safe provider-turn boundary. `queue` inputs form a FIFO of future activities -that open one at a time. A process-global `SessionRunCoordinator` coalesces process-local wakeups -around settlement races. Explicit `run` resumes perform at least one provider -attempt; advisory `wake` notifications call the provider only for eligible inbox -work. Steers coalesce into the active activity at -safe provider boundaries; queued inputs open later activities one at a time in -FIFO order. +that open one at a time. Next reviewed slices: