refactor(core): simplify session run coordination (#33388)

This commit is contained in:
Kit Langton 2026-06-22 18:16:30 +02:00 committed by GitHub
parent f50e4accf3
commit fe840d42b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 287 additions and 1454 deletions

View File

@ -1,7 +1,7 @@
export * as SessionV2 from "./session"
export * from "./session/schema"
import { Cause, DateTime, Effect, Layer, Schema, Context, Stream } from "effect"
import { DateTime, Effect, Layer, Schema, Context, Stream } from "effect"
import { and, asc, desc, eq, gt, like, lt, or, type SQL } from "drizzle-orm"
import { ProjectV2 } from "./project"
import { WorkspaceV2 } from "./workspace"
@ -25,7 +25,6 @@ import { fromRow } from "./session/info"
import { SessionRunner } from "./session/runner/index"
import { SessionStore } from "./session/store"
import { SessionExecution } from "./session/execution"
import { logFailure } from "./session/logging"
import { MessageDecodeError } from "./session/error"
import { SessionEvent } from "./session/event"
import { SessionInput } from "./session/input"
@ -168,20 +167,6 @@ export const layer = Layer.effect(
const store = yield* SessionStore.Service
const decodeMessage = Schema.decodeUnknownEffect(SessionMessage.Message)
const isDurableSessionEvent = Schema.is(SessionEvent.Durable)
const scope = yield* Effect.scope
const enqueueWake = (admitted: SessionInput.Admitted) =>
execution.wake(admitted.sessionID, admitted.admittedSeq).pipe(
Effect.tapCause((cause) =>
Cause.hasInterruptsOnly(cause)
? Effect.void
: logFailure("Failed to wake Session", admitted.sessionID, cause),
),
Effect.ignore,
Effect.forkIn(scope, { startImmediately: true }),
Effect.asVoid,
)
const decode = (row: typeof SessionMessageTable.$inferSelect) =>
decodeMessage({ ...row.data, id: row.id, type: row.type }).pipe(
Effect.mapError(
@ -342,10 +327,6 @@ export const layer = Layer.effect(
Effect.uninterruptible(
Effect.gen(function* () {
yield* result.get(input.sessionID)
const returnPrompt = Effect.fnUntraced(function* (admitted: SessionInput.Admitted) {
if (input.resume !== false) yield* enqueueWake(admitted)
return admitted
}, Effect.uninterruptible)
const messageID = input.id ?? SessionMessage.ID.create()
const delivery = input.delivery ?? "steer"
const expected = { sessionID: input.sessionID, messageID, prompt: input.prompt, delivery }
@ -363,7 +344,8 @@ export const layer = Layer.effect(
)
if (!SessionInput.equivalent(admitted, expected))
return yield* new PromptConflictError({ sessionID: input.sessionID, messageID })
return yield* returnPrompt(admitted)
if (input.resume !== false) yield* execution.wake(admitted.sessionID)
return admitted
}),
),
),
@ -404,19 +386,7 @@ export const layer = Layer.effect(
yield* execution.resume(sessionID)
}),
interrupt: Effect.fn("V2Session.interrupt")((sessionID) =>
Effect.uninterruptible(
Effect.gen(function* () {
const session = yield* store.get(sessionID)
if (!session) return yield* execution.interrupt(sessionID)
const event = yield* events.publish(SessionEvent.InterruptRequested, {
sessionID,
timestamp: yield* DateTime.now,
})
if (event.durable === undefined)
return yield* Effect.die("Interrupt request event is missing aggregate sequence")
yield* execution.interrupt(sessionID, event.durable.seq)
}),
),
Effect.uninterruptible(execution.interrupt(sessionID)),
),
})

View File

@ -118,13 +118,6 @@ export namespace PromptLifecycle {
export type Promoted = typeof Promoted.Type
}
export const InterruptRequested = EventV2.define({
type: "session.next.interrupt.requested",
...options,
schema: Base,
})
export type InterruptRequested = typeof InterruptRequested.Type
export const ContextUpdated = EventV2.define({
type: "session.next.context.updated",
...options,
@ -475,7 +468,6 @@ const DurableDefinitions = [
Prompted,
PromptLifecycle.Admitted,
PromptLifecycle.Promoted,
InterruptRequested,
ContextUpdated,
Synthetic,
Shell.Started,

View File

@ -5,12 +5,12 @@ import { SessionRunner } from "./runner/index"
import { SessionSchema } from "./schema"
export interface Interface {
/** Explicitly drain one Session, making at least one provider attempt. */
/** Starts execution while idle or joins the active execution. */
readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, SessionRunner.RunError>
/** Schedule a drain after durable work is recorded. Repeated wakeups may coalesce. */
readonly wake: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect<void, SessionRunner.RunError>
/** Registers newly recorded work. Repeated wakeups may coalesce. */
readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect<void>
/** Interrupt active work owned by this process. Idle interruption is a no-op. */
readonly interrupt: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect<void>
readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect<void>
}
/** Routes execution from a Session ID to the runner owned by that Session's Location. */

View File

@ -1,11 +1,10 @@
import { Effect, Layer } from "effect"
import { Cause, Effect, Layer } from "effect"
import { LocationServiceMap } from "../../location-layer"
import { SessionRunCoordinator } from "../run-coordinator"
import { SessionRunner } from "../runner"
import { SessionSchema } from "../schema"
import { SessionStore } from "../store"
import { SessionExecution } from "../execution"
import { logFailure } from "../logging"
/** Current-process routing for implicit-local Locations. Future remote placement belongs here. */
export const layer = Layer.effect(
@ -13,15 +12,19 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const store = yield* SessionStore.Service
const locations = yield* LocationServiceMap
const coordinator = yield* SessionRunCoordinator.make<SessionSchema.ID, void, SessionRunner.RunError>({
drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, mode) {
const coordinator = yield* SessionRunCoordinator.make<SessionSchema.ID, SessionRunner.RunError>({
drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, force) {
const session = yield* store.get(sessionID)
if (!session) return yield* Effect.die(`Session not found: ${sessionID}`)
return yield* SessionRunner.Service.use((runner) => runner.run({ sessionID, force: mode === "run" })).pipe(
return yield* SessionRunner.Service.use((runner) => runner.run({ sessionID, force })).pipe(
Effect.provide(locations.get(session.location)),
Effect.tapCause((cause) =>
Cause.hasInterruptsOnly(cause)
? Effect.void
: Effect.logError("Failed to drain Session", cause).pipe(Effect.annotateLogs({ sessionID })),
),
)
}),
onFailure: (sessionID, cause) => logFailure("Failed to drain Session", sessionID, cause),
})
return SessionExecution.Service.of({

View File

@ -1,8 +0,0 @@
import { Cause, Effect } from "effect"
import { SessionSchema } from "./schema"
export const logFailure = (
message: "Failed to drain Session" | "Failed to wake Session",
sessionID: SessionSchema.ID,
cause: Cause.Cause<unknown>,
) => Effect.logError(message, cause).pipe(Effect.annotateLogs({ sessionID }))

View File

@ -138,7 +138,6 @@ export function update(adapter: Adapter, event: SessionEvent.Event) {
},
"session.next.prompt.admitted": () => Effect.void,
"session.next.prompt.promoted": () => Effect.void,
"session.next.interrupt.requested": () => Effect.void,
"session.next.context.updated": (event) =>
adapter.appendMessage(
new SessionMessage.System({

View File

@ -399,7 +399,6 @@ export const layer = Layer.effectDiscard(
)
}),
)
yield* events.project(SessionEvent.InterruptRequested, () => Effect.void)
yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event))
yield* events.project(SessionEvent.Synthetic, (event) => run(db, event))
yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event))

View File

@ -1,106 +1,46 @@
export * as SessionRunCoordinator from "./run-coordinator"
import { Cause, Context, Deferred, Effect, Exit, Fiber, FiberSet, Layer, Scope } from "effect"
import { SessionRunner } from "./runner"
import { logFailure } from "./logging"
import { SessionSchema } from "./schema"
import { Deferred, Effect, Exit, Fiber, FiberSet, Scope } from "effect"
export type Mode = "run" | "wake"
/** Why one drain generation should run. Explicit runs dominate advisory wakes when demands coalesce. */
type Demand = { readonly _tag: "run" } | { readonly _tag: "wake"; readonly seq?: number }
/**
* Runs at most one drain chain per key while allowing different keys to drain concurrently.
*
* For each key:
*
* idle --run/wake--> draining --run/wake--> draining + one coalesced rerun --> idle
*
* `run` is an explicit drain request. It starts a chain or joins the current chain and
* upgrades a pending follow-up so the caller receives explicit-run semantics.
*
* `wake` reports that durable work may now be available. It starts a chain while idle or
* requests one coalesced follow-up while draining. Repeated wakes collapse together.
*
* `interrupt` stops the current ownership chain. Advisory wakes from before the interrupt
* boundary are suppressed; advisory wakes after the boundary run after cleanup.
*/
export interface Coordinator<Key, A, E> {
/** Starts or joins one explicit drain generation. */
readonly run: (key: Key) => Effect.Effect<A, E>
/** Coalesces one wake-up after durable work is recorded. */
readonly wake: (key: Key, seq?: number) => Effect.Effect<void>
/** Waits until the current ownership chain settles. */
readonly awaitIdle: (key: Key) => Effect.Effect<void, E>
/** Interrupts the active ownership chain without automatically draining pending wakes. */
readonly interrupt: (key: Key, seq?: number) => Effect.Effect<void>
/** Serializes execution for each key while allowing different keys to run concurrently. */
export interface Coordinator<Key, E> {
/** Starts execution while idle or joins the active execution. */
readonly run: (key: Key) => Effect.Effect<void, E>
/** Registers one coalesced follow-up after newly recorded work. */
readonly wake: (key: Key) => Effect.Effect<void>
/** Stops active execution and waits for its cleanup. */
readonly interrupt: (key: Key) => Effect.Effect<void>
}
/** One Session's process-local execution lane: one active demand and at most one coalesced follow-up. */
type Entry<A, E> = {
readonly done: Deferred.Deferred<A, E>
readonly settled: Deferred.Deferred<Exit.Exit<A, E>>
current: Demand
pending?: Demand
explicitWaiter?: Deferred.Deferred<A, E>
interruptSeq?: number
type Entry<E> = {
readonly done: Deferred.Deferred<void, E>
owner?: Fiber.Fiber<void, never>
pendingWake: boolean
stopping: boolean
}
/** Combines follow-up demand: runs dominate, while wakes retain the newest durable admission sequence. */
const coalesce = (left: Demand | undefined, right: Demand): Demand => {
if (left?._tag === "run" || right._tag === "run") return { _tag: "run" }
return { _tag: "wake", seq: maxSeq(left?.seq, right.seq) }
}
const maxSeq = (left: number | undefined, right: number | undefined) => {
if (left === undefined) return right
if (right === undefined) return left
return Math.max(left, right)
}
/** Constructs a scoped coordinator. Every in-memory transition is synchronous. */
export const make = <Key, A, E>(options: {
readonly drain: (key: Key, mode: Mode) => Effect.Effect<A, E>
readonly onFailure?: (key: Key, cause: Cause.Cause<E>) => Effect.Effect<void>
}): Effect.Effect<Coordinator<Key, A, E>, never, Scope.Scope> =>
export const make = <Key, E>(options: {
readonly drain: (key: Key, force: boolean) => Effect.Effect<void, E>
}): Effect.Effect<Coordinator<Key, E>, never, Scope.Scope> =>
Effect.gen(function* () {
const active = new Map<Key, Entry<A, E>>()
const interruptSeq = new Map<Key, number>()
const report = yield* FiberSet.makeRuntime<never, void, never>()
const active = new Map<Key, Entry<E>>()
const fork = yield* FiberSet.makeRuntime<never, void, never>()
const shutdown = Deferred.makeUnsafe<void>()
let closed = false
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
closed = true
Deferred.doneUnsafe(shutdown, Effect.void)
active.clear()
interruptSeq.clear()
}),
)
const makeEntry = (current: Demand, explicitWaiter?: Deferred.Deferred<A, E>): Entry<A, E> => ({
done: Deferred.makeUnsafe<A, E>(),
settled: Deferred.makeUnsafe<Exit.Exit<A, E>>(),
current,
explicitWaiter,
const makeEntry = (): Entry<E> => ({
done: Deferred.makeUnsafe<void, E>(),
pendingWake: false,
stopping: false,
})
const start = (key: Key, entry: Entry<A, E>, demand: Demand, successor = false) => {
const start = (key: Key, entry: Entry<E>, force: boolean, successor = false) => {
const ready = Deferred.makeUnsafe<void>()
const drain = Effect.suspend(() => options.drain(key, demand._tag))
// Initial work retains immediate-start behavior but cannot run before ownership is published.
// Observer-started successors yield once so synchronous drains cannot recurse on the JS stack.
const owner = fork(
(successor
? Effect.yieldNow.pipe(Effect.andThen(drain))
: Deferred.await(ready).pipe(Effect.andThen(drain))
? Effect.yieldNow
: Deferred.await(ready)
).pipe(
Effect.onExit((exit) => Effect.sync(() => settle(key, entry, demand, exit))),
Effect.andThen(Effect.suspend(() => options.drain(key, force))),
Effect.onExit((exit) => Effect.sync(() => settle(key, entry, exit))),
Effect.exit,
Effect.asVoid,
),
@ -109,176 +49,57 @@ export const make = <Key, A, E>(options: {
if (!successor) Deferred.doneUnsafe(ready, Effect.void)
}
const settle = (key: Key, entry: Entry<A, E>, demand: Demand, exit: Exit.Exit<A, E>) => {
if (closed) {
Deferred.doneUnsafe(entry.done, exit)
Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
return
}
if (demand._tag === "run" && entry.explicitWaiter !== undefined) {
Deferred.doneUnsafe(entry.explicitWaiter, exit)
entry.explicitWaiter = undefined
}
if (entry.stopping && demand._tag === "wake" && entry.explicitWaiter !== undefined) {
Deferred.doneUnsafe(entry.explicitWaiter, exit)
entry.explicitWaiter = undefined
}
if (active.get(key) !== entry) {
Deferred.doneUnsafe(entry.done, exit)
Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
return
}
if (exit._tag === "Success" && !entry.stopping) {
if (entry.pending !== undefined) {
const pending = entry.pending
entry.pending = undefined
entry.current = pending
start(key, entry, pending, true)
return
}
active.delete(key)
Deferred.doneUnsafe(entry.done, exit)
Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
const settle = (key: Key, entry: Entry<E>, exit: Exit.Exit<void, E>) => {
if (Exit.isSuccess(exit) && !entry.stopping && entry.pendingWake) {
entry.pendingWake = false
start(key, entry, false, true)
return
}
const successor = entry.pending !== undefined ? makeEntry(entry.pending, entry.explicitWaiter) : undefined
const successor = entry.pendingWake ? makeEntry() : undefined
if (successor === undefined) active.delete(key)
else active.set(key, successor)
if (successor !== undefined) start(key, successor, successor.current, true)
Deferred.doneUnsafe(entry.done, exit)
Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
if (
exit._tag === "Failure" &&
!(entry.stopping && Cause.hasInterruptsOnly(exit.cause)) &&
demand._tag === "wake" &&
options.onFailure !== undefined
) {
report(Effect.suspend(() => options.onFailure!(key, exit.cause)))
else {
active.set(key, successor)
start(key, successor, false, true)
}
Deferred.doneUnsafe(entry.done, exit)
}
const wake = (key: Key, seq?: number) =>
Effect.sync(() => {
if (closed) return
if (!isAfterInterrupt(key, seq)) return
const run = (key: Key): Effect.Effect<void, E> =>
Effect.uninterruptibleMask((restore) => {
const entry = active.get(key)
if (entry !== undefined) {
if (!acceptsWake(entry, seq)) return
entry.pending = coalesce(entry.pending, { _tag: "wake", seq })
if (entry.stopping) return restore(Deferred.await(entry.done).pipe(Effect.andThen(run(key))))
return restore(Deferred.await(entry.done))
}
const next = makeEntry()
active.set(key, next)
start(key, next, true)
return restore(Deferred.await(next.done))
})
const wake = (key: Key) =>
Effect.sync(() => {
const entry = active.get(key)
if (entry !== undefined) {
entry.pendingWake = true
return
}
const next = makeEntry({ _tag: "wake", seq })
const next = makeEntry()
active.set(key, next)
start(key, next, next.current)
start(key, next, false)
})
const awaitIdle = (key: Key): Effect.Effect<void, E> =>
Effect.gen(function* () {
let firstFailure: Cause.Cause<E> | undefined
while (!closed) {
const entry = active.get(key)
if (entry === undefined) break
const exit = yield* Effect.raceFirst(
Deferred.await(entry.settled),
Deferred.await(shutdown).pipe(Effect.as(Exit.void)),
)
if (closed) break
if (exit._tag === "Failure" && firstFailure === undefined) firstFailure = exit.cause
}
if (firstFailure !== undefined) return yield* Effect.failCause(firstFailure)
})
const interrupt = (key: Key, seq?: number): Effect.Effect<void> =>
const interrupt = (key: Key): Effect.Effect<void> =>
Effect.suspend(() => {
const entry = active.get(key)
const latest = interruptSeq.get(key)
if (seq !== undefined && latest !== undefined && seq <= latest)
return entry?.stopping && entry.owner !== undefined ? Fiber.interrupt(entry.owner) : Effect.void
if (seq !== undefined) interruptSeq.set(key, seq)
if (entry?.owner === undefined) return Effect.void
if (
seq !== undefined &&
entry.current._tag === "wake" &&
entry.current.seq !== undefined &&
entry.current.seq > seq
)
return Effect.void
if (entry.stopping) {
entry.interruptSeq = maxSeq(entry.interruptSeq, seq)
suppressPendingAtOrBefore(entry, seq)
return Fiber.interrupt(entry.owner)
}
entry.stopping = true
entry.interruptSeq = seq
suppressPendingAtOrBefore(entry, seq)
entry.pendingWake = false
return Fiber.interrupt(entry.owner)
})
return { run, wake, awaitIdle, interrupt }
function run(key: Key): Effect.Effect<A, E> {
return Effect.uninterruptibleMask((restore) => {
if (closed) return Effect.interrupt
const entry = active.get(key)
if (entry !== undefined) {
if (entry.stopping) {
return restore(Deferred.await(entry.settled).pipe(Effect.andThen(run(key))))
}
if (entry.current._tag === "wake") {
entry.pending = coalesce(entry.pending, { _tag: "run" })
entry.explicitWaiter ??= Deferred.makeUnsafe<A, E>()
return restore(awaitRun(entry.explicitWaiter))
}
return restore(awaitRun(entry.done))
}
const next = makeEntry({ _tag: "run" })
active.set(key, next)
start(key, next, next.current)
return restore(awaitRun(next.done))
})
}
function awaitRun(done: Deferred.Deferred<A, E>): Effect.Effect<A, E> {
return Effect.raceFirst(Deferred.await(done), Deferred.await(shutdown).pipe(Effect.andThen(Effect.interrupt)))
}
function acceptsWake(entry: Entry<A, E>, seq: number | undefined) {
return !entry.stopping || (entry.interruptSeq !== undefined && seq !== undefined && seq > entry.interruptSeq)
}
function isAfterInterrupt(key: Key, seq: number | undefined) {
const latest = interruptSeq.get(key)
return latest === undefined || (seq !== undefined && seq > latest)
}
function suppressPendingAtOrBefore(entry: Entry<A, E>, seq: number | undefined) {
if (
entry.pending?._tag === "wake" &&
seq !== undefined &&
entry.pending.seq !== undefined &&
entry.pending.seq > seq
)
return
entry.pending = undefined
}
return { run, wake, interrupt }
})
export interface Interface extends Coordinator<SessionSchema.ID, void, SessionRunner.RunError> {}
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/SessionRunCoordinator") {}
export const layer = Layer.effect(
Service,
SessionRunner.Service.pipe(
Effect.flatMap((runner) =>
make<SessionSchema.ID, void, SessionRunner.RunError>({
drain: (sessionID, mode) => runner.run({ sessionID, force: mode === "run" }),
onFailure: (sessionID, cause) => logFailure("Failed to drain Session", sessionID, cause),
}),
),
Effect.map(Service.of),
),
)

View File

@ -21,7 +21,7 @@ export interface Interface {
/** Drains eligible durable work. Explicit runs perform one provider attempt even when no work is eligible. */
readonly run: (input: {
readonly sessionID: SessionSchema.ID
readonly force?: boolean
readonly force: boolean
}) => Effect.Effect<void, RunError>
}

View File

@ -346,14 +346,14 @@ export const layer = Layer.effect(
const run = Effect.fn("SessionRunner.run")(function* (input: {
readonly sessionID: SessionSchema.ID
readonly force?: boolean
readonly force: boolean
}) {
const hasSteer = yield* SessionInput.hasPending(db, input.sessionID, "steer")
const hasQueue = hasSteer ? false : yield* SessionInput.hasPending(db, input.sessionID, "queue")
if (input.force !== true && !hasSteer && !hasQueue) return
if (!input.force && !hasSteer && !hasQueue) return
yield* failInterruptedTools(input.sessionID)
let promotion: SessionInput.Delivery | undefined = hasSteer ? "steer" : hasQueue ? "queue" : undefined
let openActivity = input.force === true || hasSteer || hasQueue
let openActivity = input.force || hasSteer || hasQueue
while (openActivity) {
let needsContinuation = true
for (let step = 1; needsContinuation; step++) {

View File

@ -1,30 +0,0 @@
import { describe, expect, test } from "bun:test"
import { Cause, Effect, Logger } from "effect"
import { logFailure } from "@opencode-ai/core/session/logging"
import { SessionSchema } from "@opencode-ai/core/session/schema"
describe("Session logging", () => {
for (const message of ["Failed to drain Session", "Failed to wake Session"] as const) {
test(`renders the cause for ${message}`, async () => {
const entries: Array<ReturnType<typeof Logger.formatStructured.log>> = []
const logger = Logger.formatStructured.pipe(
Logger.map((entry): void => {
entries.push(entry)
}),
)
await logFailure(
message,
SessionSchema.ID.make("session-123"),
Cause.fail({ _tag: "SessionFailure", detail: { code: "nested-code" } }),
).pipe(Effect.provide(Logger.layer([logger])), Effect.runPromise)
expect(entries).toHaveLength(1)
expect(entries[0]?.message).toBe(message)
expect(entries[0]?.annotations).toEqual({ sessionID: "session-123" })
expect(entries[0]?.cause).toContain("SessionFailure")
expect(entries[0]?.cause).toContain("nested-code")
expect(entries[0]?.cause).not.toContain("[Object")
})
}
})

View File

@ -20,9 +20,7 @@ import { testEffect } from "./lib/effect"
const executionCalls: SessionV2.ID[] = []
const interruptCalls: SessionV2.ID[] = []
const interruptSeqs: Array<number | undefined> = []
const wakeCalls: SessionV2.ID[] = []
const wakeSeqs: Array<number | undefined> = []
const execution = Layer.succeed(
SessionExecution.Service,
SessionExecution.Service.of({
@ -30,15 +28,13 @@ const execution = Layer.succeed(
Effect.sync(() => {
executionCalls.push(sessionID)
}),
interrupt: (sessionID, seq) =>
interrupt: (sessionID) =>
Effect.sync(() => {
interruptCalls.push(sessionID)
interruptSeqs.push(seq)
}),
wake: (sessionID, seq) =>
wake: (sessionID) =>
Effect.sync(() => {
wakeCalls.push(sessionID)
wakeSeqs.push(seq)
}),
}),
)
@ -109,15 +105,6 @@ const eventCount = (type: string) =>
),
)
const interruptEvent = Database.Service.use(({ db }) =>
db
.select()
.from(EventTable)
.where(eq(EventTable.type, "session.next.interrupt.requested.1"))
.get()
.pipe(Effect.orDie),
)
describe("SessionV2.prompt", () => {
it.effect("delegates execution continuation through SessionExecution", () =>
Effect.gen(function* () {
@ -131,19 +118,14 @@ describe("SessionV2.prompt", () => {
}),
)
it.effect("delegates interruption through SessionExecution", () =>
it.effect("delegates process-local interruption through SessionExecution", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
interruptCalls.length = 0
interruptSeqs.length = 0
yield* session.interrupt(sessionID)
expect(interruptCalls).toEqual([sessionID])
expect(interruptSeqs).toHaveLength(1)
expect(typeof interruptSeqs[0]).toBe("number")
expect(yield* eventCount("session.next.interrupt.requested.1")).toBe(1)
expect(yield* interruptEvent).toMatchObject({ aggregate_id: sessionID, seq: interruptSeqs[0] })
expect(yield* session.messages({ sessionID })).toEqual([])
}),
)
@ -152,11 +134,9 @@ describe("SessionV2.prompt", () => {
Effect.gen(function* () {
const session = yield* SessionV2.Service
interruptCalls.length = 0
interruptSeqs.length = 0
yield* session.interrupt(SessionV2.ID.make("ses_missing"))
expect(interruptCalls).toEqual([SessionV2.ID.make("ses_missing")])
expect(interruptSeqs).toEqual([undefined])
}),
)
@ -515,13 +495,11 @@ describe("SessionV2.prompt", () => {
const session = yield* SessionV2.Service
executionCalls.length = 0
wakeCalls.length = 0
wakeSeqs.length = 0
const admitted = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) })
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) })
expect(executionCalls).toEqual([])
expect(wakeCalls).toEqual([sessionID])
expect(wakeSeqs).toEqual([admitted.admittedSeq])
}),
)
@ -531,9 +509,8 @@ describe("SessionV2.prompt", () => {
const session = yield* SessionV2.Service
executionCalls.length = 0
wakeCalls.length = 0
wakeSeqs.length = 0
const admitted = yield* session.prompt({
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Run explicitly" }),
resume: true,
@ -541,7 +518,6 @@ describe("SessionV2.prompt", () => {
expect(executionCalls).toEqual([])
expect(wakeCalls).toEqual([sessionID])
expect(wakeSeqs).toEqual([admitted.admittedSeq])
}),
)
@ -551,13 +527,11 @@ describe("SessionV2.prompt", () => {
const session = yield* SessionV2.Service
executionCalls.length = 0
wakeCalls.length = 0
wakeSeqs.length = 0
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Do not run" }), resume: false })
expect(executionCalls).toEqual([])
expect(wakeCalls).toEqual([])
expect(wakeSeqs).toEqual([])
}),
)
})

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,7 @@ import { Prompt } from "@opencode-ai/core/session/prompt"
import { SessionProjector } from "@opencode-ai/core/session/projector"
import { SessionExecution } from "@opencode-ai/core/session/execution"
import { SessionRunCoordinator } from "@opencode-ai/core/session/run-coordinator"
import { SessionRunner } from "@opencode-ai/core/session/runner"
import * as SessionRunnerLLM from "@opencode-ai/core/session/runner/llm"
import { SessionRunnerModel } from "@opencode-ai/core/session/runner/model"
import { ToolRegistry } from "@opencode-ai/core/tool/registry"
@ -83,19 +84,20 @@ const runner = SessionRunnerLLM.defaultLayer.pipe(
Layer.provide(referenceGuidance),
Layer.provide(config),
)
const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
const execution = Layer.effect(
SessionExecution.Service,
SessionRunCoordinator.Service.pipe(
Effect.map((coordinator) =>
SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
interrupt: coordinator.interrupt,
}),
),
),
).pipe(Layer.provide(coordinator))
Effect.gen(function* () {
const sessionRunner = yield* SessionRunner.Service
const coordinator = yield* SessionRunCoordinator.make<SessionV2.ID, SessionRunner.RunError>({
drain: (sessionID, force) => sessionRunner.run({ sessionID, force }),
})
return SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
interrupt: coordinator.interrupt,
})
}),
).pipe(Layer.provide(runner))
const sessions = SessionV2.layer.pipe(
Layer.provide(EventV2.defaultLayer),
Layer.provide(Database.defaultLayer),
@ -120,7 +122,6 @@ const it = testEffect(
skillGuidance,
config,
runner,
coordinator,
execution,
sessions,
),

View File

@ -244,19 +244,20 @@ const runner = SessionRunnerLLM.layer.pipe(
Layer.provide(referenceGuidance),
Layer.provide(config),
)
const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
const execution = Layer.effect(
SessionExecution.Service,
SessionRunCoordinator.Service.pipe(
Effect.map((coordinator) =>
SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
interrupt: coordinator.interrupt,
}),
),
),
).pipe(Layer.provide(coordinator))
Effect.gen(function* () {
const sessionRunner = yield* SessionRunner.Service
const coordinator = yield* SessionRunCoordinator.make<SessionV2.ID, SessionRunner.RunError>({
drain: (sessionID, force) => sessionRunner.run({ sessionID, force }),
})
return SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
interrupt: coordinator.interrupt,
})
}),
).pipe(Layer.provide(runner))
const sessions = SessionV2.layer.pipe(
Layer.provide(EventV2.defaultLayer),
Layer.provide(Database.defaultLayer),
@ -283,7 +284,6 @@ const it = testEffect(
skillGuidance,
config,
runner,
coordinator,
execution,
sessions,
),
@ -681,7 +681,6 @@ describe("SessionRunnerLLM", () => {
systemUnavailable = false
yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "First" }) })
yield* (yield* SessionRunCoordinator.Service).awaitIdle(sessionID)
expect(requests).toHaveLength(1)
expect(requests[0]?.messages.map((message) => message.role)).toEqual(["user"])
@ -2161,7 +2160,7 @@ describe("SessionRunnerLLM", () => {
expect(requests).toHaveLength(2)
expect(userTexts(requests[1]!)).toEqual(["Start working", "First steer", "Second steer"])
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
yield* (yield* SessionExecution.Service).wake(sessionID)
yield* Effect.yieldNow
expect(requests).toHaveLength(2)
}),
@ -2367,7 +2366,7 @@ describe("SessionRunnerLLM", () => {
})
requests.length = 0
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
yield* (yield* SessionExecution.Service).wake(sessionID)
yield* Effect.yieldNow
expect(requests).toHaveLength(1)
@ -2417,7 +2416,7 @@ describe("SessionRunnerLLM", () => {
LLMEvent.finish({ reason: "stop" }),
]
yield* (yield* SessionRunCoordinator.Service).wake(sessionID)
yield* (yield* SessionExecution.Service).wake(sessionID)
while (requests.length === 0) yield* Effect.yieldNow
expect(userTexts(requests[0]!)).toEqual(["Recover promoted input"])

View File

@ -21,7 +21,6 @@ export type Event =
| EventSessionNextPrompted
| EventSessionNextPromptAdmitted
| EventSessionNextPromptPromoted
| EventSessionNextInterruptRequested
| EventSessionNextContextUpdated
| EventSessionNextSynthetic
| EventSessionNextShellStarted
@ -876,14 +875,6 @@ export type GlobalEvent = {
timeCreated: number
}
}
| {
id: string
type: "session.next.interrupt.requested"
properties: {
timestamp: number
sessionID: string
}
}
| {
id: string
type: "session.next.context.updated"
@ -1638,7 +1629,6 @@ export type GlobalEvent = {
| SyncEventSessionNextPrompted
| SyncEventSessionNextPromptAdmitted
| SyncEventSessionNextPromptPromoted
| SyncEventSessionNextInterruptRequested
| SyncEventSessionNextContextUpdated
| SyncEventSessionNextSynthetic
| SyncEventSessionNextShellStarted
@ -2781,7 +2771,6 @@ export type V2Event =
| V2EventSessionNextPrompted
| V2EventSessionNextPromptAdmitted
| V2EventSessionNextPromptPromoted
| V2EventSessionNextInterruptRequested
| V2EventSessionNextContextUpdated
| V2EventSessionNextSynthetic
| V2EventSessionNextShellStarted
@ -3249,21 +3238,6 @@ export type SyncEventSessionNextPromptPromoted = {
}
}
export type SyncEventSessionNextInterruptRequested = {
type: "sync"
id: string
syncEvent: {
type: "session.next.interrupt.requested.1"
id: string
seq: number
aggregateID: string
data: {
timestamp: number
sessionID: string
}
}
}
export type SyncEventSessionNextContextUpdated = {
type: "sync"
id: string
@ -4570,24 +4544,6 @@ export type V2EventSessionNextPromptPromoted = {
}
}
export type V2EventSessionNextInterruptRequested = {
id: string
metadata?: {
[key: string]: unknown
}
durable?: {
aggregateID: string
seq: number
version: number
}
location?: LocationRef
type: "session.next.interrupt.requested"
data: {
timestamp: number
sessionID: string
}
}
export type V2EventSessionNextContextUpdated = {
id: string
metadata?: {
@ -6224,15 +6180,6 @@ export type EventSessionNextPromptPromoted = {
}
}
export type EventSessionNextInterruptRequested = {
id: string
type: "session.next.interrupt.requested"
properties: {
timestamp: number
sessionID: string
}
}
export type EventSessionNextContextUpdated = {
id: string
type: "session.next.context.updated"

View File

@ -14689,9 +14689,6 @@
{
"$ref": "#/components/schemas/EventSessionNextPromptPromoted"
},
{
"$ref": "#/components/schemas/EventSessionNextInterruptRequested"
},
{
"$ref": "#/components/schemas/EventSessionNextContextUpdated"
},
@ -17297,35 +17294,6 @@
"required": ["id", "type", "properties"],
"additionalProperties": false
},
{
"type": "object",
"properties": {
"id": {
"type": "string",
"pattern": "^evt_"
},
"type": {
"type": "string",
"enum": ["session.next.interrupt.requested"]
},
"properties": {
"type": "object",
"properties": {
"timestamp": {
"type": "number"
},
"sessionID": {
"type": "string",
"pattern": "^ses"
}
},
"required": ["timestamp", "sessionID"],
"additionalProperties": false
}
},
"required": ["id", "type", "properties"],
"additionalProperties": false
},
{
"type": "object",
"properties": {
@ -19867,9 +19835,6 @@
{
"$ref": "#/components/schemas/SyncEventSessionNextPromptPromoted"
},
{
"$ref": "#/components/schemas/SyncEventSessionNextInterruptRequested"
},
{
"$ref": "#/components/schemas/SyncEventSessionNextContextUpdated"
},
@ -23111,9 +23076,6 @@
{
"$ref": "#/components/schemas/V2EventSessionNextPromptPromoted"
},
{
"$ref": "#/components/schemas/V2EventSessionNextInterruptRequested"
},
{
"$ref": "#/components/schemas/V2EventSessionNextContextUpdated"
},
@ -24498,56 +24460,6 @@
"required": ["type", "id", "syncEvent"],
"additionalProperties": false
},
"SyncEventSessionNextInterruptRequested": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["sync"]
},
"id": {
"type": "string",
"pattern": "^evt_"
},
"syncEvent": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["session.next.interrupt.requested.1"]
},
"id": {
"type": "string",
"pattern": "^evt_"
},
"seq": {
"type": "number"
},
"aggregateID": {
"type": "string"
},
"data": {
"type": "object",
"properties": {
"timestamp": {
"type": "number"
},
"sessionID": {
"type": "string",
"pattern": "^ses"
}
},
"required": ["timestamp", "sessionID"],
"additionalProperties": false
}
},
"required": ["type", "id", "seq", "aggregateID", "data"],
"additionalProperties": false
}
},
"required": ["type", "id", "syncEvent"],
"additionalProperties": false
},
"SyncEventSessionNextContextUpdated": {
"type": "object",
"properties": {
@ -28771,57 +28683,6 @@
"required": ["id", "type", "data"],
"additionalProperties": false
},
"V2EventSessionNextInterruptRequested": {
"type": "object",
"properties": {
"id": {
"type": "string",
"pattern": "^evt_"
},
"metadata": {
"type": "object"
},
"durable": {
"type": "object",
"properties": {
"aggregateID": {
"type": "string"
},
"seq": {
"type": "integer"
},
"version": {
"type": "integer"
}
},
"required": ["aggregateID", "seq", "version"],
"additionalProperties": false
},
"location": {
"$ref": "#/components/schemas/LocationRef"
},
"type": {
"type": "string",
"enum": ["session.next.interrupt.requested"]
},
"data": {
"type": "object",
"properties": {
"timestamp": {
"type": "number"
},
"sessionID": {
"type": "string",
"pattern": "^ses"
}
},
"required": ["timestamp", "sessionID"],
"additionalProperties": false
}
},
"required": ["id", "type", "data"],
"additionalProperties": false
},
"V2EventSessionNextContextUpdated": {
"type": "object",
"properties": {
@ -33460,35 +33321,6 @@
"required": ["id", "type", "properties"],
"additionalProperties": false
},
"EventSessionNextInterruptRequested": {
"type": "object",
"properties": {
"id": {
"type": "string",
"pattern": "^evt_"
},
"type": {
"type": "string",
"enum": ["session.next.interrupt.requested"]
},
"properties": {
"type": "object",
"properties": {
"timestamp": {
"type": "number"
},
"sessionID": {
"type": "string",
"pattern": "^ses"
}
},
"required": ["timestamp", "sessionID"],
"additionalProperties": false
}
},
"required": ["id", "type", "properties"],
"additionalProperties": false
},
"EventSessionNextContextUpdated": {
"type": "object",
"properties": {

View File

@ -1,5 +1,10 @@
# V2 Schema Changelog
## 2026-06-22: Make Session Interruption Process-Local
- Remove the unprojected `session.next.interrupt.requested.1` event from the experimental durable Session event union and generated SDK.
- No canonical V1 data requires migration; experimental V2 event history containing the retired event is disposable.
## 2026-06-05: Execute Automatic Session Compaction
- Trigger automatic compaction before provider turns using the complete estimated request and absolute model-aware headroom.

View File

@ -20,10 +20,10 @@ sessions.prompt({ id?, sessionID, prompt, delivery?, resume? })
-> resume false admits only
sessions.interrupt(sessionID)
-> interrupts the active ownership chain on this process
-> waits for active drain cleanup and settlement
-> suppresses reruns already queued before interruption
-> preserves durable inbox rows for a later fresh wake or resume
-> interrupts active execution on this process
-> waits for runner cleanup and settlement
-> clears a coalesced follow-up wake already registered with this coordinator
-> preserves durable inbox rows for a later wake or resume
-> idle or missing Session is a no-op
```
@ -152,12 +152,12 @@ Inbox delivery is explicit:
Execution has two entry points:
- `run` is an explicit resume. It joins an active drain chain or starts one, and performs at least one provider attempt even when no input is eligible.
- `run` is an explicit resume. It joins any active execution or starts a forced drain while idle. A forced drain bypasses the no-eligible-input guard, but preparation may still fail before a provider attempt.
- `wake` reports newly recorded durable inbox work. Repeated wakes coalesce. A wake calls the provider only when it can promote eligible input.
Post-crash activity recovery is intentionally deferred. A wake does not infer that ambiguous provider work is safe to retry after an input has already been promoted. Explicit `run` may deliberately continue from durable projected history. A future recovery slice should model durable activity identity, provider-dispatch ambiguity, required continuation, queue-opener reservation, retry policy, and visible recovery status together.
A process-global `SessionRunCoordinator` serializes each local Session drain chain while allowing different Sessions to drain concurrently. It enters the Session's current Location only when a drain starts, so interruption targets process execution ownership rather than Location cache identity. Interruption establishes a local ownership-chain boundary by stopping the current chain while preserving pending/unpromoted durable inbox rows for a later fresh wake and projected history for explicit resume. A Location runner also fences every new provider turn against its captured Location so a moved Session cannot begin another turn through source-Location tools or context. An already-dispatched provider turn may still settle source-Location calls until a future move-control slice interrupts active ownership. Automatic startup discovery, durable multi-node ownership, stale-owner fencing, and retry policy remain future work.
A process-global `SessionRunCoordinator` serializes execution for each local Session while allowing different Sessions to run concurrently. Resumes join active execution, overlapping wakes coalesce into one follow-up, and interruption stops current process-local execution without deleting durable inbox work. The runner enters the Session's current Location when execution starts and fences each new provider turn against that Location.
Inbox promotion coalesces pending steers in durable admission order and opens one queued activity at a time in FIFO order. Add explicit inbox backlog and steering-batch limits before exposing broad multi-caller admission or untrusted queue growth.

View File

@ -33,12 +33,7 @@ through legacy `SessionPrompt.loop(...)`:
Prompt admission now uses a durable `session_input` inbox rather than immediate
transcript projection. `steer` inputs coalesce into the active activity at the
next safe provider-turn boundary. `queue` inputs form a FIFO of future activities
that open one at a time. A process-global `SessionRunCoordinator` coalesces process-local wakeups
around settlement races. Explicit `run` resumes perform at least one provider
attempt; advisory `wake` notifications call the provider only for eligible inbox
work. Steers coalesce into the active activity at
safe provider boundaries; queued inputs open later activities one at a time in
FIFO order.
that open one at a time.
Next reviewed slices: