feat(core): interrupt v2 session execution (#30850)

This commit is contained in:
Kit Langton 2026-06-05 12:06:40 -04:00 committed by GitHub
parent 41bd9124f4
commit 12e38866ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1047 additions and 112 deletions

View File

@ -143,7 +143,7 @@ const table = sqliteTable("session", {
- Keep durable prompt admission separate from model execution. `SessionV2.prompt(...)` admits one durable `session_input` row before scheduling advisory `SessionExecution.wake(sessionID)` unless `resume: false` requests admit-only behavior. The serialized runner promotes admitted inputs into visible user messages at safe boundaries. - Keep durable prompt admission separate from model execution. `SessionV2.prompt(...)` admits one durable `session_input` row before scheduling advisory `SessionExecution.wake(sessionID)` unless `resume: false` requests admit-only behavior. The serialized runner promotes admitted inputs into visible user messages at safe boundaries.
- Reusing a Session ID adopts the existing Session. Reusing a prompt message ID reconciles an exact retry only when Session, prompt, and delivery mode match; conflicting reuse fails. Historical projected prompts lazily synthesize promoted inbox records during exact retry. - Reusing a Session ID adopts the existing Session. Reusing a prompt message ID reconciles an exact retry only when Session, prompt, and delivery mode match; conflicting reuse fails. Historical projected prompts lazily synthesize promoted inbox records during exact retry.
- Keep `SessionExecution` process-global and Session-ID based. It discovers placement through the read-side `SessionStore` and `LocationServiceMap.get(session.location)`; no layer should take a Session ID. - Keep `SessionExecution` process-global and Session-ID based. Its local implementation owns the process-local Session coordinator and discovers placement through `SessionStore` plus `LocationServiceMap.get(session.location)` only when a drain starts; no layer should take a Session ID. V2 interruption targets the active process-local ownership chain for that Session; idle or missing interruption is a no-op.
- Keep `SessionRunner`, model resolution, tool registry, permissions, and filesystem Location-scoped. Omitted `Location.workspaceID` means implicit-local placement; explicit workspace identity remains reserved for future placement semantics. - Keep `SessionRunner`, model resolution, tool registry, permissions, and filesystem Location-scoped. Omitted `Location.workspaceID` means implicit-local placement; explicit workspace identity remains reserved for future placement semantics.
- Preserve one explicit `llm.stream(request)` call per provider turn and reload projected history before durable continuation. Do not bridge through legacy `SessionPrompt.loop(...)` or delegate orchestration to an in-memory tool loop. - Preserve one explicit `llm.stream(request)` call per provider turn and reload projected history before durable continuation. Do not bridge through legacy `SessionPrompt.loop(...)` or delegate orchestration to an in-memory tool loop.
- Keep local Session drains process-local until clustering is implemented. `SessionRunCoordinator` joins explicit same-Session resumes, coalesces prompt wakeups, and allows different Sessions to run concurrently. Advisory wakes drain eligible durable inbox rows only; post-crash activity recovery requires a separate explicit design before it may retry provider work. - Keep local Session drains process-local until clustering is implemented. `SessionRunCoordinator` joins explicit same-Session resumes, coalesces prompt wakeups, and allows different Sessions to run concurrently. Advisory wakes drain eligible durable inbox rows only; post-crash activity recovery requires a separate explicit design before it may retry provider work.

View File

@ -40,7 +40,6 @@ import { LLMClient } from "@opencode-ai/llm"
import { RequestExecutor } from "@opencode-ai/llm/route" import { RequestExecutor } from "@opencode-ai/llm/route"
import * as SessionRunnerLLM from "./session/runner/llm" import * as SessionRunnerLLM from "./session/runner/llm"
import { SessionRunnerModel } from "./session/runner/model" import { SessionRunnerModel } from "./session/runner/model"
import { SessionRunCoordinator } from "./session/run-coordinator"
import { SystemContextBuiltIns } from "./system-context/builtins" import { SystemContextBuiltIns } from "./system-context/builtins"
import { FetchHttpClient } from "effect/unstable/http" import { FetchHttpClient } from "effect/unstable/http"
@ -87,7 +86,6 @@ export class LocationServiceMap extends LayerMap.Service<LocationServiceMap>()("
Layer.provide(model), Layer.provide(model),
Layer.provide(skillGuidance), Layer.provide(skillGuidance),
) )
const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
return Layer.mergeAll( return Layer.mergeAll(
services, services,
commits, commits,
@ -97,7 +95,6 @@ export class LocationServiceMap extends LayerMap.Service<LocationServiceMap>()("
questions, questions,
model, model,
runner, runner,
coordinator,
builtInTools, builtInTools,
).pipe(Layer.fresh) ).pipe(Layer.fresh)
}, },

View File

@ -51,6 +51,7 @@ export const layer = Layer.effect(
}), }),
get: sessions.get, get: sessions.get,
list: sessions.list, list: sessions.list,
interrupt: sessions.interrupt,
prompt: (input) => prompt: (input) =>
sessions.prompt({ sessions.prompt({
id: input.id, id: input.id,

View File

@ -84,6 +84,8 @@ export interface Interface {
readonly get: (sessionID: ID) => Effect.Effect<Info, NotFoundError> readonly get: (sessionID: ID) => Effect.Effect<Info, NotFoundError>
readonly list: (input?: ListInput) => Effect.Effect<Info[]> readonly list: (input?: ListInput) => Effect.Effect<Info[]>
readonly prompt: (input: PromptInput) => Effect.Effect<Admission, NotFoundError | PromptConflictError> readonly prompt: (input: PromptInput) => Effect.Effect<Admission, NotFoundError | PromptConflictError>
/** Interrupt the active V2 execution chain for one Session on this process. Interrupting an idle or missing Session is a no-op. */
readonly interrupt: (sessionID: ID) => Effect.Effect<void>
readonly messages: (input: MessagesInput) => Effect.Effect<Message[], NotFoundError | MessageDecodeError> readonly messages: (input: MessagesInput) => Effect.Effect<Message[], NotFoundError | MessageDecodeError>
readonly message: (input: MessageInput) => Effect.Effect<Message | undefined> readonly message: (input: MessageInput) => Effect.Effect<Message | undefined>
readonly context: (sessionID: ID) => Effect.Effect<Message[], NotFoundError | MessageDecodeError> readonly context: (sessionID: ID) => Effect.Effect<Message[], NotFoundError | MessageDecodeError>

View File

@ -1,7 +1,7 @@
export * as SessionV2 from "./session" export * as SessionV2 from "./session"
export * from "./session/schema" export * from "./session/schema"
import { Cause, Effect, Layer, Schema, Context, Stream } from "effect" import { Cause, DateTime, Effect, Layer, Schema, Context, Stream } from "effect"
import { and, asc, desc, eq, gt, like, lt, or, type SQL } from "drizzle-orm" import { and, asc, desc, eq, gt, like, lt, or, type SQL } from "drizzle-orm"
import { ProjectV2 } from "./project" import { ProjectV2 } from "./project"
import { WorkspaceV2 } from "./workspace" import { WorkspaceV2 } from "./workspace"
@ -155,6 +155,7 @@ export interface Interface {
readonly compact: (input: CompactInput) => Effect.Effect<void, NotFoundError | OperationUnavailableError> readonly compact: (input: CompactInput) => Effect.Effect<void, NotFoundError | OperationUnavailableError>
readonly wait: (id: SessionSchema.ID) => Effect.Effect<void, NotFoundError | OperationUnavailableError> readonly wait: (id: SessionSchema.ID) => Effect.Effect<void, NotFoundError | OperationUnavailableError>
readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, NotFoundError | SessionRunner.RunError> readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, NotFoundError | SessionRunner.RunError>
readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect<void>
} }
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Session") {} export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Session") {}
@ -171,13 +172,13 @@ export const layer = Layer.effect(
const isDurableSessionEvent = Schema.is(SessionEvent.Durable) const isDurableSessionEvent = Schema.is(SessionEvent.Durable)
const scope = yield* Effect.scope const scope = yield* Effect.scope
const enqueueWake = (sessionID: SessionSchema.ID) => const enqueueWake = (admitted: SessionInput.Admitted) =>
execution.wake(sessionID).pipe( execution.wake(admitted.sessionID, admitted.admittedSeq).pipe(
Effect.tapCause((cause) => Effect.tapCause((cause) =>
Cause.hasInterruptsOnly(cause) Cause.hasInterruptsOnly(cause)
? Effect.void ? Effect.void
: Effect.logError("Failed to wake Session").pipe( : Effect.logError("Failed to wake Session").pipe(
Effect.annotateLogs("sessionID", sessionID), Effect.annotateLogs("sessionID", admitted.sessionID),
Effect.annotateLogs("cause", cause), Effect.annotateLogs("cause", cause),
), ),
), ),
@ -351,7 +352,7 @@ export const layer = Layer.effect(
Effect.gen(function* () { Effect.gen(function* () {
yield* result.get(input.sessionID) yield* result.get(input.sessionID)
const returnPrompt = Effect.fnUntraced(function* (admitted: SessionInput.Admitted) { const returnPrompt = Effect.fnUntraced(function* (admitted: SessionInput.Admitted) {
if (input.resume !== false) yield* enqueueWake(input.sessionID) if (input.resume !== false) yield* enqueueWake(admitted)
return admitted return admitted
}, Effect.uninterruptible) }, Effect.uninterruptible)
const messageID = input.id ?? SessionMessage.ID.create() const messageID = input.id ?? SessionMessage.ID.create()
@ -399,6 +400,20 @@ export const layer = Layer.effect(
yield* result.get(sessionID) yield* result.get(sessionID)
yield* execution.resume(sessionID) yield* execution.resume(sessionID)
}), }),
interrupt: Effect.fn("V2Session.interrupt")((sessionID) =>
Effect.uninterruptible(
Effect.gen(function* () {
const session = yield* store.get(sessionID)
if (!session) return yield* execution.interrupt(sessionID)
const event = yield* events.publish(SessionEvent.InterruptRequested, {
sessionID,
timestamp: yield* DateTime.now,
})
if (event.seq === undefined) return yield* Effect.die("Interrupt request event is missing aggregate sequence")
yield* execution.interrupt(sessionID, event.seq)
}),
),
),
}) })
return result return result

View File

@ -119,6 +119,13 @@ export namespace PromptLifecycle {
export type Promoted = typeof Promoted.Type export type Promoted = typeof Promoted.Type
} }
export const InterruptRequested = EventV2.define({
type: "session.next.interrupt.requested",
...options,
schema: Base,
})
export type InterruptRequested = typeof InterruptRequested.Type
export const ContextUpdated = EventV2.define({ export const ContextUpdated = EventV2.define({
type: "session.next.context.updated", type: "session.next.context.updated",
...options, ...options,
@ -455,6 +462,7 @@ const DurableDefinitions = [
Prompted, Prompted,
PromptLifecycle.Admitted, PromptLifecycle.Admitted,
PromptLifecycle.Promoted, PromptLifecycle.Promoted,
InterruptRequested,
ContextUpdated, ContextUpdated,
Synthetic, Synthetic,
Shell.Started, Shell.Started,

View File

@ -8,11 +8,16 @@ export interface Interface {
/** Explicitly drain one Session, making at least one provider attempt. */ /** Explicitly drain one Session, making at least one provider attempt. */
readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, SessionRunner.RunError> readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, SessionRunner.RunError>
/** Schedule a drain after durable work is recorded. Repeated wakeups may coalesce. */ /** Schedule a drain after durable work is recorded. Repeated wakeups may coalesce. */
readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect<void, SessionRunner.RunError> readonly wake: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect<void, SessionRunner.RunError>
/** Interrupt active work owned by this process. Idle interruption is a no-op. */
readonly interrupt: (sessionID: SessionSchema.ID, seq?: number) => Effect.Effect<void>
} }
/** Routes execution from a Session ID to the runner owned by that Session's Location. */ /** Routes execution from a Session ID to the runner owned by that Session's Location. */
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/SessionExecution") {} export class Service extends Context.Service<Service, Interface>()("@opencode/v2/SessionExecution") {}
/** Low-level compatibility layer for callers that only need durable Session recording. */ /** Low-level compatibility layer for callers that only need durable Session recording. */
export const noopLayer = Layer.succeed(Service, Service.of({ resume: () => Effect.void, wake: () => Effect.void })) export const noopLayer = Layer.succeed(
Service,
Service.of({ resume: () => Effect.void, wake: () => Effect.void, interrupt: () => Effect.void }),
)

View File

@ -1,6 +1,7 @@
import { Effect, Layer } from "effect" import { Effect, Layer } from "effect"
import { LocationServiceMap } from "../../location-layer" import { LocationServiceMap } from "../../location-layer"
import { SessionRunCoordinator } from "../run-coordinator" import { SessionRunCoordinator } from "../run-coordinator"
import { SessionRunner } from "../runner"
import { SessionSchema } from "../schema" import { SessionSchema } from "../schema"
import { SessionStore } from "../store" import { SessionStore } from "../store"
import { SessionExecution } from "../execution" import { SessionExecution } from "../execution"
@ -11,25 +12,25 @@ export const layer = Layer.effect(
Effect.gen(function* () { Effect.gen(function* () {
const store = yield* SessionStore.Service const store = yield* SessionStore.Service
const locations = yield* LocationServiceMap const locations = yield* LocationServiceMap
const scope = yield* Effect.scope const coordinator = yield* SessionRunCoordinator.make<SessionSchema.ID, void, SessionRunner.RunError>({
const withCoordinator = Effect.fnUntraced(function* <A, E>( drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, mode) {
sessionID: SessionSchema.ID, const session = yield* store.get(sessionID)
use: (coordinator: SessionRunCoordinator.Interface) => Effect.Effect<A, E>, if (!session) return yield* Effect.die(`Session not found: ${sessionID}`)
) { return yield* SessionRunner.Service.use((runner) => runner.run({ sessionID, force: mode === "run" })).pipe(
const session = yield* store.get(sessionID) Effect.provide(locations.get(session.location)),
if (!session) return yield* Effect.die(`Session not found: ${sessionID}`) )
return yield* SessionRunCoordinator.Service.use(use).pipe(Effect.provide(locations.get(session.location))) }),
onFailure: (sessionID, cause) =>
Effect.logError("Failed to drain Session").pipe(
Effect.annotateLogs("sessionID", sessionID),
Effect.annotateLogs("cause", cause),
),
}) })
return SessionExecution.Service.of({ return SessionExecution.Service.of({
resume: Effect.fn("SessionExecution.resume")(function* (sessionID) { interrupt: coordinator.interrupt,
return yield* withCoordinator(sessionID, (coordinator) => coordinator.run(sessionID)) resume: coordinator.run,
}), wake: coordinator.wake,
wake: Effect.fn("SessionExecution.wake")(function* (sessionID) {
yield* withCoordinator(sessionID, (coordinator) =>
coordinator.wake(sessionID).pipe(Effect.andThen(coordinator.awaitIdle(sessionID))),
).pipe(Effect.forkIn(scope), Effect.asVoid)
}),
}) })
}), }),
) )

View File

@ -159,6 +159,7 @@ export function update(adapter: Adapter, event: SessionEvent.Event) {
}, },
"session.next.prompt.admitted": () => Effect.void, "session.next.prompt.admitted": () => Effect.void,
"session.next.prompt.promoted": () => Effect.void, "session.next.prompt.promoted": () => Effect.void,
"session.next.interrupt.requested": () => Effect.void,
"session.next.context.updated": (event) => "session.next.context.updated": (event) =>
adapter.appendMessage( adapter.appendMessage(
new SessionMessage.System({ new SessionMessage.System({

View File

@ -428,6 +428,7 @@ export const layer = Layer.effectDiscard(
) )
}), }),
) )
yield* events.project(SessionEvent.InterruptRequested, () => Effect.void)
yield* events.project(SessionEvent.ContextUpdated, (event) => { yield* events.project(SessionEvent.ContextUpdated, (event) => {
if (!event.replay || event.seq === undefined) return run(db, event) if (!event.replay || event.seq === undefined) return run(db, event)
return run(db, event).pipe( return run(db, event).pipe(

View File

@ -1,11 +1,14 @@
export * as SessionRunCoordinator from "./run-coordinator" export * as SessionRunCoordinator from "./run-coordinator"
import { Cause, Context, Deferred, Effect, Exit, FiberSet, Layer, Scope } from "effect" import { Cause, Context, Deferred, Effect, Exit, Fiber, FiberSet, Layer, Scope } from "effect"
import { SessionRunner } from "./runner" import { SessionRunner } from "./runner"
import { SessionSchema } from "./schema" import { SessionSchema } from "./schema"
export type Mode = "run" | "wake" export type Mode = "run" | "wake"
/** Why one drain generation should run. Explicit runs dominate advisory wakes when demands coalesce. */
type Demand = { readonly _tag: "run" } | { readonly _tag: "wake"; readonly seq?: number }
/** /**
* Runs at most one drain chain per key while allowing different keys to drain concurrently. * Runs at most one drain chain per key while allowing different keys to drain concurrently.
* *
@ -18,24 +21,44 @@ export type Mode = "run" | "wake"
* *
* `wake` reports that durable work may now be available. It starts a chain while idle or * `wake` reports that durable work may now be available. It starts a chain while idle or
* requests one coalesced follow-up while draining. Repeated wakes collapse together. * requests one coalesced follow-up while draining. Repeated wakes collapse together.
*
* `interrupt` stops the current ownership chain. Advisory wakes from before the interrupt
* boundary are suppressed; advisory wakes after the boundary run after cleanup.
*/ */
export interface Coordinator<Key, A, E> { export interface Coordinator<Key, A, E> {
/** Starts or joins one explicit drain generation. */ /** Starts or joins one explicit drain generation. */
readonly run: (key: Key) => Effect.Effect<A, E> readonly run: (key: Key) => Effect.Effect<A, E>
/** Coalesces one wake-up after durable work is recorded. */ /** Coalesces one wake-up after durable work is recorded. */
readonly wake: (key: Key) => Effect.Effect<void> readonly wake: (key: Key, seq?: number) => Effect.Effect<void>
/** Waits until the current ownership chain settles. */ /** Waits until the current ownership chain settles. */
readonly awaitIdle: (key: Key) => Effect.Effect<void, E> readonly awaitIdle: (key: Key) => Effect.Effect<void, E>
/** Interrupts the active ownership chain without automatically draining pending wakes. */
readonly interrupt: (key: Key, seq?: number) => Effect.Effect<void>
} }
/** One Session's process-local execution lane: one active demand and at most one coalesced follow-up. */
type Entry<A, E> = { type Entry<A, E> = {
readonly done: Deferred.Deferred<A, E> readonly done: Deferred.Deferred<A, E>
mode: Mode readonly settled: Deferred.Deferred<Exit.Exit<A, E>>
rerun?: Mode current: Demand
explicit?: Deferred.Deferred<A, E> pending?: Demand
explicitWaiter?: Deferred.Deferred<A, E>
interruptSeq?: number
owner?: Fiber.Fiber<void, never>
stopping: boolean
} }
const strongest = (left: Mode | undefined, right: Mode): Mode => (left === "run" || right === "run" ? "run" : "wake") /** Combines follow-up demand: runs dominate, while wakes retain the newest durable admission sequence. */
const coalesce = (left: Demand | undefined, right: Demand): Demand => {
if (left?._tag === "run" || right._tag === "run") return { _tag: "run" }
return { _tag: "wake", seq: maxSeq(left?.seq, right.seq) }
}
const maxSeq = (left: number | undefined, right: number | undefined) => {
if (left === undefined) return right
if (right === undefined) return left
return Math.max(left, right)
}
/** Constructs a scoped coordinator. Every in-memory transition is synchronous. */ /** Constructs a scoped coordinator. Every in-memory transition is synchronous. */
export const make = <Key, A, E>(options: { export const make = <Key, A, E>(options: {
@ -44,7 +67,8 @@ export const make = <Key, A, E>(options: {
}): Effect.Effect<Coordinator<Key, A, E>, never, Scope.Scope> => }): Effect.Effect<Coordinator<Key, A, E>, never, Scope.Scope> =>
Effect.gen(function* () { Effect.gen(function* () {
const active = new Map<Key, Entry<A, E>>() const active = new Map<Key, Entry<A, E>>()
const scope = yield* Effect.scope const interruptSeq = new Map<Key, number>()
const report = yield* FiberSet.makeRuntime<never, void, never>()
const fork = yield* FiberSet.makeRuntime<never, void, never>() const fork = yield* FiberSet.makeRuntime<never, void, never>()
const shutdown = Deferred.makeUnsafe<void>() const shutdown = Deferred.makeUnsafe<void>()
let closed = false let closed = false
@ -53,67 +77,97 @@ export const make = <Key, A, E>(options: {
closed = true closed = true
Deferred.doneUnsafe(shutdown, Effect.void) Deferred.doneUnsafe(shutdown, Effect.void)
active.clear() active.clear()
interruptSeq.clear()
}), }),
) )
const makeEntry = (mode: Mode, explicit?: Deferred.Deferred<A, E>): Entry<A, E> => ({ const makeEntry = (current: Demand, explicitWaiter?: Deferred.Deferred<A, E>): Entry<A, E> => ({
done: Deferred.makeUnsafe<A, E>(), done: Deferred.makeUnsafe<A, E>(),
mode, settled: Deferred.makeUnsafe<Exit.Exit<A, E>>(),
explicit, current,
explicitWaiter,
stopping: false,
}) })
const start = (key: Key, entry: Entry<A, E>, mode: Mode) => { const start = (key: Key, entry: Entry<A, E>, demand: Demand, successor = false) => {
fork(own(key, entry, mode)) const ready = Deferred.makeUnsafe<void>()
const drain = Effect.suspend(() => options.drain(key, demand._tag))
// Initial work retains immediate-start behavior but cannot run before ownership is published.
// Observer-started successors yield once so synchronous drains cannot recurse on the JS stack.
const owner = fork(
(successor ? Effect.yieldNow.pipe(Effect.andThen(drain)) : Deferred.await(ready).pipe(Effect.andThen(drain))).pipe(
Effect.onExit((exit) => Effect.sync(() => settle(key, entry, demand, exit))),
Effect.exit,
Effect.asVoid,
),
)
entry.owner = owner
if (!successor) Deferred.doneUnsafe(ready, Effect.void)
} }
const own = (key: Key, entry: Entry<A, E>, mode: Mode): Effect.Effect<void> => const settle = (key: Key, entry: Entry<A, E>, demand: Demand, exit: Exit.Exit<A, E>) => {
Effect.suspend(() => options.drain(key, mode)).pipe( if (closed) {
Effect.exit, Deferred.doneUnsafe(entry.done, exit)
Effect.flatMap((exit) => { Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
if (closed) return Deferred.done(entry.done, exit).pipe(Effect.asVoid) return
if (mode === "run" && entry.explicit !== undefined) { }
Deferred.doneUnsafe(entry.explicit, exit) if (demand._tag === "run" && entry.explicitWaiter !== undefined) {
entry.explicit = undefined Deferred.doneUnsafe(entry.explicitWaiter, exit)
} entry.explicitWaiter = undefined
if (exit._tag === "Success") { }
if (active.get(key) !== entry) return Deferred.done(entry.done, exit).pipe(Effect.asVoid) if (entry.stopping && demand._tag === "wake" && entry.explicitWaiter !== undefined) {
if (entry.rerun !== undefined) { Deferred.doneUnsafe(entry.explicitWaiter, exit)
const mode = entry.rerun entry.explicitWaiter = undefined
entry.rerun = undefined }
entry.mode = mode if (active.get(key) !== entry) {
return own(key, entry, mode) Deferred.doneUnsafe(entry.done, exit)
} Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
active.delete(key) return
return Deferred.done(entry.done, exit).pipe(Effect.asVoid) }
} if (exit._tag === "Success" && !entry.stopping) {
if (entry.pending !== undefined) {
const pending = entry.pending
entry.pending = undefined
entry.current = pending
start(key, entry, pending, true)
return
}
active.delete(key)
Deferred.doneUnsafe(entry.done, exit)
Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
return
}
const successor = const successor = entry.pending !== undefined ? makeEntry(entry.pending, entry.explicitWaiter) : undefined
active.get(key) === entry && entry.rerun !== undefined ? makeEntry(entry.rerun, entry.explicit) : undefined if (successor === undefined) active.delete(key)
if (successor === undefined) active.delete(key) else active.set(key, successor)
else { if (successor !== undefined) start(key, successor, successor.current, true)
active.set(key, successor) Deferred.doneUnsafe(entry.done, exit)
} Deferred.doneUnsafe(entry.settled, Effect.succeed(exit))
if (successor !== undefined) start(key, successor, successor.mode) if (
const report = exit._tag === "Failure" &&
mode === "wake" && options.onFailure !== undefined !(entry.stopping && Cause.hasInterruptsOnly(exit.cause)) &&
? options.onFailure(key, exit.cause).pipe(Effect.forkIn(scope), Effect.asVoid) demand._tag === "wake" &&
: Effect.void options.onFailure !== undefined
return Deferred.done(entry.done, exit).pipe(Effect.andThen(report), Effect.asVoid) ) {
}), report(Effect.suspend(() => options.onFailure!(key, exit.cause)))
) }
}
const wake = (key: Key) => const wake = (key: Key, seq?: number) =>
Effect.sync(() => { Effect.sync(() => {
if (closed) return if (closed) return
if (!isAfterInterrupt(key, seq)) return
const entry = active.get(key) const entry = active.get(key)
if (entry !== undefined) { if (entry !== undefined) {
entry.rerun = strongest(entry.rerun, "wake") if (!acceptsWake(entry, seq)) return
entry.pending = coalesce(entry.pending, { _tag: "wake", seq })
return return
} }
const next = makeEntry("wake") const next = makeEntry({ _tag: "wake", seq })
active.set(key, next) active.set(key, next)
start(key, next, "wake") start(key, next, next.current)
}) })
const awaitIdle = (key: Key): Effect.Effect<void, E> => const awaitIdle = (key: Key): Effect.Effect<void, E> =>
@ -123,7 +177,7 @@ export const make = <Key, A, E>(options: {
const entry = active.get(key) const entry = active.get(key)
if (entry === undefined) break if (entry === undefined) break
const exit = yield* Effect.raceFirst( const exit = yield* Effect.raceFirst(
Deferred.await(entry.done).pipe(Effect.exit), Deferred.await(entry.settled),
Deferred.await(shutdown).pipe(Effect.as(Exit.void)), Deferred.await(shutdown).pipe(Effect.as(Exit.void)),
) )
if (closed) break if (closed) break
@ -132,24 +186,48 @@ export const make = <Key, A, E>(options: {
if (firstFailure !== undefined) return yield* Effect.failCause(firstFailure) if (firstFailure !== undefined) return yield* Effect.failCause(firstFailure)
}) })
return { run, wake, awaitIdle } const interrupt = (key: Key, seq?: number): Effect.Effect<void> =>
Effect.suspend(() => {
const entry = active.get(key)
const latest = interruptSeq.get(key)
if (seq !== undefined && latest !== undefined && seq <= latest)
return entry?.stopping && entry.owner !== undefined ? Fiber.interrupt(entry.owner) : Effect.void
if (seq !== undefined) interruptSeq.set(key, seq)
if (entry?.owner === undefined) return Effect.void
if (seq !== undefined && entry.current._tag === "wake" && entry.current.seq !== undefined && entry.current.seq > seq)
return Effect.void
if (entry.stopping) {
entry.interruptSeq = maxSeq(entry.interruptSeq, seq)
suppressPendingAtOrBefore(entry, seq)
return Fiber.interrupt(entry.owner)
}
entry.stopping = true
entry.interruptSeq = seq
suppressPendingAtOrBefore(entry, seq)
return Fiber.interrupt(entry.owner)
})
return { run, wake, awaitIdle, interrupt }
function run(key: Key): Effect.Effect<A, E> { function run(key: Key): Effect.Effect<A, E> {
return Effect.uninterruptibleMask((restore) => { return Effect.uninterruptibleMask((restore) => {
if (closed) return Effect.interrupt if (closed) return Effect.interrupt
const entry = active.get(key) const entry = active.get(key)
if (entry !== undefined) { if (entry !== undefined) {
if (entry.mode === "wake") { if (entry.stopping) {
entry.rerun = "run" return restore(Deferred.await(entry.settled).pipe(Effect.andThen(run(key))))
entry.explicit ??= Deferred.makeUnsafe<A, E>() }
return restore(awaitRun(entry.explicit)) if (entry.current._tag === "wake") {
entry.pending = coalesce(entry.pending, { _tag: "run" })
entry.explicitWaiter ??= Deferred.makeUnsafe<A, E>()
return restore(awaitRun(entry.explicitWaiter))
} }
return restore(awaitRun(entry.done)) return restore(awaitRun(entry.done))
} }
const next = makeEntry("run") const next = makeEntry({ _tag: "run" })
active.set(key, next) active.set(key, next)
start(key, next, "run") start(key, next, next.current)
return restore(awaitRun(next.done)) return restore(awaitRun(next.done))
}) })
} }
@ -157,6 +235,21 @@ export const make = <Key, A, E>(options: {
function awaitRun(done: Deferred.Deferred<A, E>): Effect.Effect<A, E> { function awaitRun(done: Deferred.Deferred<A, E>): Effect.Effect<A, E> {
return Effect.raceFirst(Deferred.await(done), Deferred.await(shutdown).pipe(Effect.andThen(Effect.interrupt))) return Effect.raceFirst(Deferred.await(done), Deferred.await(shutdown).pipe(Effect.andThen(Effect.interrupt)))
} }
function acceptsWake(entry: Entry<A, E>, seq: number | undefined) {
return !entry.stopping || (entry.interruptSeq !== undefined && seq !== undefined && seq > entry.interruptSeq)
}
function isAfterInterrupt(key: Key, seq: number | undefined) {
const latest = interruptSeq.get(key)
return latest === undefined || (seq !== undefined && seq > latest)
}
function suppressPendingAtOrBefore(entry: Entry<A, E>, seq: number | undefined) {
if (entry.pending?._tag === "wake" && seq !== undefined && entry.pending.seq !== undefined && entry.pending.seq > seq)
return
entry.pending = undefined
}
}) })
export interface Interface extends Coordinator<SessionSchema.ID, void, SessionRunner.RunError> {} export interface Interface extends Coordinator<SessionSchema.ID, void, SessionRunner.RunError> {}
@ -165,19 +258,17 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/v2
export const layer = Layer.effect( export const layer = Layer.effect(
Service, Service,
Effect.gen(function* () { SessionRunner.Service.pipe(
const runner = yield* SessionRunner.Service Effect.flatMap((runner) =>
return Service.of( make<SessionSchema.ID, void, SessionRunner.RunError>({
yield* make<SessionSchema.ID, void, SessionRunner.RunError>({
drain: (sessionID, mode) => runner.run({ sessionID, force: mode === "run" }), drain: (sessionID, mode) => runner.run({ sessionID, force: mode === "run" }),
onFailure: (sessionID, cause) => onFailure: (sessionID, cause) =>
Cause.hasInterruptsOnly(cause) Effect.logError("Failed to drain Session").pipe(
? Effect.void Effect.annotateLogs("sessionID", sessionID),
: Effect.logError("Failed to drain Session").pipe( Effect.annotateLogs("cause", cause),
Effect.annotateLogs("sessionID", sessionID), ),
Effect.annotateLogs("cause", cause),
),
}), }),
) ),
}), Effect.map(Service.of),
),
) )

View File

@ -3,6 +3,7 @@ import { Cause, DateTime, Effect, FiberSet, Layer, Schema, Semaphore, Stream } f
import { AgentV2 } from "../../agent" import { AgentV2 } from "../../agent"
import { Database } from "../../database/database" import { Database } from "../../database/database"
import { EventV2 } from "../../event" import { EventV2 } from "../../event"
import { Location } from "../../location"
import { ModelV2 } from "../../model" import { ModelV2 } from "../../model"
import { ProviderV2 } from "../../provider" import { ProviderV2 } from "../../provider"
import { QuestionV2 } from "../../question" import { QuestionV2 } from "../../question"
@ -82,6 +83,7 @@ export const layer = Layer.effect(
const tools = yield* ToolRegistry.Service const tools = yield* ToolRegistry.Service
const models = yield* SessionRunnerModel.Service const models = yield* SessionRunnerModel.Service
const store = yield* SessionStore.Service const store = yield* SessionStore.Service
const location = yield* Location.Service
const systemContext = yield* SystemContextRegistry.Service const systemContext = yield* SystemContextRegistry.Service
const skillGuidance = yield* SkillGuidance.Service const skillGuidance = yield* SkillGuidance.Service
const db = (yield* Database.Service).db const db = (yield* Database.Service).db
@ -144,6 +146,8 @@ export const layer = Layer.effect(
promotion: SessionInput.Delivery | undefined, promotion: SessionInput.Delivery | undefined,
) { ) {
const session = yield* getSession(sessionID) const session = yield* getSession(sessionID)
if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID)
return yield* Effect.interrupt
const agent = yield* agents.select(session.agent) const agent = yield* agents.select(session.agent)
const initialized = yield* SessionContextEpoch.initialize( const initialized = yield* SessionContextEpoch.initialize(
db, db,

View File

@ -17,6 +17,7 @@ describe("public native OpenCode API", () => {
"create", "create",
"events", "events",
"get", "get",
"interrupt",
"list", "list",
"message", "message",
"messages", "messages",

View File

@ -23,7 +23,10 @@ const events = EventV2.layer.pipe(Layer.provide(database))
const projector = SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database)) const projector = SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database))
const store = SessionStore.layer.pipe(Layer.provide(database)) const store = SessionStore.layer.pipe(Layer.provide(database))
const executionCalls: SessionV2.ID[] = [] const executionCalls: SessionV2.ID[] = []
const interruptCalls: SessionV2.ID[] = []
const interruptSeqs: Array<number | undefined> = []
const wakeCalls: SessionV2.ID[] = [] const wakeCalls: SessionV2.ID[] = []
const wakeSeqs: Array<number | undefined> = []
const execution = Layer.succeed( const execution = Layer.succeed(
SessionExecution.Service, SessionExecution.Service,
SessionExecution.Service.of({ SessionExecution.Service.of({
@ -31,9 +34,15 @@ const execution = Layer.succeed(
Effect.sync(() => { Effect.sync(() => {
executionCalls.push(sessionID) executionCalls.push(sessionID)
}), }),
wake: (sessionID) => interrupt: (sessionID, seq) =>
Effect.sync(() => {
interruptCalls.push(sessionID)
interruptSeqs.push(seq)
}),
wake: (sessionID, seq) =>
Effect.sync(() => { Effect.sync(() => {
wakeCalls.push(sessionID) wakeCalls.push(sessionID)
wakeSeqs.push(seq)
}), }),
}), }),
) )
@ -95,6 +104,15 @@ const eventCount = (type: string) =>
), ),
) )
const interruptEvent = Database.Service.use(({ db }) =>
db
.select()
.from(EventTable)
.where(eq(EventTable.type, "session.next.interrupt.requested.1"))
.get()
.pipe(Effect.orDie),
)
describe("SessionV2.prompt", () => { describe("SessionV2.prompt", () => {
it.effect("delegates execution continuation through SessionExecution", () => it.effect("delegates execution continuation through SessionExecution", () =>
Effect.gen(function* () { Effect.gen(function* () {
@ -108,6 +126,35 @@ describe("SessionV2.prompt", () => {
}), }),
) )
it.effect("delegates interruption through SessionExecution", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
interruptCalls.length = 0
interruptSeqs.length = 0
yield* session.interrupt(sessionID)
expect(interruptCalls).toEqual([sessionID])
expect(interruptSeqs).toHaveLength(1)
expect(typeof interruptSeqs[0]).toBe("number")
expect(yield* eventCount("session.next.interrupt.requested.1")).toBe(1)
expect(yield* interruptEvent).toMatchObject({ aggregate_id: sessionID, seq: interruptSeqs[0] })
expect(yield* session.messages({ sessionID })).toEqual([])
}),
)
it.effect("delegates interruption without requiring a recorded Session", () =>
Effect.gen(function* () {
const session = yield* SessionV2.Service
interruptCalls.length = 0
interruptSeqs.length = 0
yield* session.interrupt(SessionV2.ID.make("ses_missing"))
expect(interruptCalls).toEqual([SessionV2.ID.make("ses_missing")])
expect(interruptSeqs).toEqual([undefined])
}),
)
it.effect("durably admits one user message before transcript promotion", () => it.effect("durably admits one user message before transcript promotion", () =>
Effect.gen(function* () { Effect.gen(function* () {
yield* setup yield* setup
@ -513,11 +560,13 @@ describe("SessionV2.prompt", () => {
const session = yield* SessionV2.Service const session = yield* SessionV2.Service
executionCalls.length = 0 executionCalls.length = 0
wakeCalls.length = 0 wakeCalls.length = 0
wakeSeqs.length = 0
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) }) const admitted = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run by default" }) })
expect(executionCalls).toEqual([]) expect(executionCalls).toEqual([])
expect(wakeCalls).toEqual([sessionID]) expect(wakeCalls).toEqual([sessionID])
expect(wakeSeqs).toEqual([admitted.admittedSeq])
}), }),
) )
@ -527,11 +576,13 @@ describe("SessionV2.prompt", () => {
const session = yield* SessionV2.Service const session = yield* SessionV2.Service
executionCalls.length = 0 executionCalls.length = 0
wakeCalls.length = 0 wakeCalls.length = 0
wakeSeqs.length = 0
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run explicitly" }), resume: true }) const admitted = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Run explicitly" }), resume: true })
expect(executionCalls).toEqual([]) expect(executionCalls).toEqual([])
expect(wakeCalls).toEqual([sessionID]) expect(wakeCalls).toEqual([sessionID])
expect(wakeSeqs).toEqual([admitted.admittedSeq])
}), }),
) )
@ -541,11 +592,13 @@ describe("SessionV2.prompt", () => {
const session = yield* SessionV2.Service const session = yield* SessionV2.Service
executionCalls.length = 0 executionCalls.length = 0
wakeCalls.length = 0 wakeCalls.length = 0
wakeSeqs.length = 0
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Do not run" }), resume: false }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Do not run" }), resume: false })
expect(executionCalls).toEqual([]) expect(executionCalls).toEqual([])
expect(wakeCalls).toEqual([]) expect(wakeCalls).toEqual([])
expect(wakeSeqs).toEqual([])
}), }),
) )
}) })

View File

@ -41,6 +41,536 @@ describe("SessionRunCoordinator", () => {
), ),
) )
it.effect("does nothing when interrupted while idle", () =>
Effect.scoped(
Effect.gen(function* () {
const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.void })
yield* coordinator.interrupt("session")
}),
),
)
it.effect("suppresses stale wakes after an idle interrupt boundary", () =>
Effect.scoped(
Effect.gen(function* () {
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.sync(() => runs++) })
yield* coordinator.interrupt("session", 2)
yield* coordinator.wake("session", 1)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(0)
yield* coordinator.wake("session", 3)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(1)
}),
),
)
it.effect("does not interrupt a wake newer than the interrupt boundary", () =>
Effect.scoped(
Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const gate = yield* Deferred.make<void>()
const interrupted = yield* Deferred.make<void>()
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Deferred.succeed(started, undefined).pipe(
Effect.andThen(Deferred.await(gate)),
Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)),
),
})
yield* coordinator.wake("session", 3)
yield* Deferred.await(started)
yield* coordinator.interrupt("session", 2)
expect(yield* Deferred.isDone(interrupted)).toBeFalse()
yield* Deferred.succeed(gate, undefined)
yield* coordinator.awaitIdle("session")
}),
),
)
it.effect("preserves a queued wake newer than the interrupt boundary", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never))
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session", 1)
yield* Deferred.await(firstStarted)
yield* coordinator.wake("session", 3)
yield* coordinator.interrupt("session", 2)
yield* Deferred.await(secondStarted)
yield* coordinator.awaitIdle("session").pipe(Effect.exit)
expect(runs).toBe(2)
}),
),
)
it.effect("interrupts only the requested key", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
const secondGate = yield* Deferred.make<void>()
const secondInterrupted = yield* Deferred.make<void>()
const coordinator = yield* SessionRunCoordinator.make({
drain: (key: string) =>
key === "first"
? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never))
: Deferred.succeed(secondStarted, undefined).pipe(
Effect.andThen(Deferred.await(secondGate)),
Effect.onInterrupt(() => Deferred.succeed(secondInterrupted, undefined)),
),
})
yield* coordinator.wake("first")
yield* coordinator.wake("second")
yield* Effect.all([Deferred.await(firstStarted), Deferred.await(secondStarted)])
yield* coordinator.interrupt("first")
expect(yield* Deferred.isDone(secondInterrupted)).toBeFalse()
yield* Deferred.succeed(secondGate, undefined)
yield* coordinator.awaitIdle("second")
}),
),
)
it.effect("interrupts the active drain and suppresses its queued wake", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const interrupted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() => Deferred.succeed(interrupted, undefined)),
)
: Effect.void,
),
),
})
const run = yield* coordinator.run("session").pipe(Effect.forkChild)
yield* Deferred.await(firstStarted)
yield* coordinator.wake("session")
yield* coordinator.interrupt("session")
yield* Deferred.await(interrupted)
yield* coordinator.awaitIdle("session")
const exit = yield* Fiber.await(run)
expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue()
expect(runs).toBe(1)
yield* coordinator.interrupt("session")
}),
),
)
it.effect("suppresses a wake received during interruption cleanup", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const firstInterrupted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
)
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const interrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild)
yield* Effect.yieldNow
yield* coordinator.wake("session", 1)
yield* Deferred.await(firstInterrupted)
expect(runs).toBe(1)
yield* Deferred.succeed(cleanupGate, undefined)
yield* Fiber.join(interrupt)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(1)
yield* coordinator.wake("session", 3)
yield* Deferred.await(secondStarted)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(2)
}),
),
)
it.effect("remembers a wake received after the interrupt boundary during cleanup", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const firstInterrupted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
)
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const interrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild)
yield* Deferred.await(firstInterrupted)
yield* coordinator.wake("session", 3)
const staleInterrupt = yield* coordinator.interrupt("session", 1).pipe(Effect.forkChild)
expect(runs).toBe(1)
yield* Deferred.succeed(cleanupGate, undefined)
yield* Fiber.join(interrupt)
yield* Fiber.join(staleInterrupt)
yield* Deferred.await(secondStarted)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(2)
}),
),
)
it.effect("moves the stop barrier forward for repeated interrupts", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const firstInterrupted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(firstInterrupted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
)
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const firstInterrupt = yield* coordinator.interrupt("session", 2).pipe(Effect.forkChild)
yield* Deferred.await(firstInterrupted)
yield* coordinator.wake("session", 3)
const secondInterrupt = yield* coordinator.interrupt("session", 4).pipe(Effect.forkChild)
yield* Deferred.succeed(cleanupGate, undefined)
yield* Fiber.join(firstInterrupt)
yield* Fiber.join(secondInterrupt)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(1)
yield* coordinator.wake("session", 5)
yield* Deferred.await(secondStarted)
yield* coordinator.awaitIdle("session")
expect(runs).toBe(2)
}),
),
)
it.effect("interrupts an explicit run queued before the interruption request", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(Effect.andThen(Effect.never))
: Effect.void,
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const run = yield* coordinator.run("session").pipe(Effect.forkChild)
yield* Effect.yieldNow
yield* coordinator.interrupt("session")
const exit = yield* Fiber.await(run)
expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue()
expect(runs).toBe(1)
}),
),
)
it.effect("settles a pre-interrupt explicit run only after active wake cleanup", () =>
Effect.scoped(
Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const cleanupStarted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const runSettled = yield* Deferred.make<void>()
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () =>
Deferred.succeed(started, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(started)
const run = yield* coordinator
.run("session")
.pipe(Effect.exit, Effect.ensuring(Deferred.succeed(runSettled, undefined)), Effect.forkChild)
const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.await(cleanupStarted)
expect(yield* Deferred.isDone(runSettled)).toBeFalse()
yield* Deferred.succeed(cleanupGate, undefined)
const runExit = yield* Fiber.join(run)
expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue()
yield* Fiber.join(interrupt)
}),
),
)
it.effect("starts an explicit run arriving during interrupt cleanup after the stop barrier", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const cleanupStarted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
)
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.await(cleanupStarted)
const run = yield* coordinator.run("session").pipe(Effect.forkChild)
yield* Deferred.succeed(cleanupGate, undefined)
yield* Fiber.join(interrupt)
yield* Fiber.join(run)
yield* Deferred.await(secondStarted)
expect(runs).toBe(2)
}),
),
)
it.effect("interrupts pre-stop waiters and runs post-stop waiters after cleanup", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const cleanupStarted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
)
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const before = yield* coordinator.run("session").pipe(Effect.exit, Effect.forkChild)
const interrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.await(cleanupStarted)
const after = yield* coordinator.run("session").pipe(Effect.exit, Effect.forkChild)
yield* Deferred.succeed(cleanupGate, undefined)
const beforeExit = yield* Fiber.join(before)
expect(Exit.isFailure(beforeExit) && Cause.hasInterruptsOnly(beforeExit.cause)).toBeTrue()
yield* Fiber.join(interrupt)
yield* Fiber.join(after)
yield* Deferred.await(secondStarted)
expect(runs).toBe(2)
}),
),
)
it.effect("waits for interrupt cleanup before settling callers", () =>
Effect.scoped(
Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const cleanupStarted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const runSettled = yield* Deferred.make<void>()
const idleSettled = yield* Deferred.make<void>()
const interruptSettled = yield* Deferred.make<void>()
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () =>
Deferred.succeed(started, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
),
})
const run = yield* coordinator
.run("session")
.pipe(Effect.ensuring(Deferred.succeed(runSettled, undefined)), Effect.forkChild)
yield* Deferred.await(started)
const idle = yield* coordinator
.awaitIdle("session")
.pipe(Effect.exit, Effect.ensuring(Deferred.succeed(idleSettled, undefined)), Effect.forkChild)
const interrupt = yield* coordinator
.interrupt("session")
.pipe(Effect.ensuring(Deferred.succeed(interruptSettled, undefined)), Effect.forkChild)
yield* Deferred.await(cleanupStarted)
expect(yield* Deferred.isDone(runSettled)).toBeFalse()
expect(yield* Deferred.isDone(idleSettled)).toBeFalse()
expect(yield* Deferred.isDone(interruptSettled)).toBeFalse()
yield* Deferred.succeed(cleanupGate, undefined)
const runExit = yield* Fiber.await(run)
const idleExit = yield* Fiber.join(idle)
expect(Exit.isFailure(runExit) && Cause.hasInterruptsOnly(runExit.cause)).toBeTrue()
expect(Exit.isFailure(idleExit) && Cause.hasInterruptsOnly(idleExit.cause)).toBeTrue()
yield* Fiber.join(interrupt)
}),
),
)
it.effect("joins concurrent interruption requests for one active drain", () =>
Effect.scoped(
Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const cleanupStarted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () =>
Deferred.succeed(started, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(started)
const first = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.await(cleanupStarted)
const second = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.succeed(cleanupGate, undefined)
yield* Fiber.join(first)
yield* Fiber.join(second)
}),
),
)
it.effect("does not discard a post-stop explicit run when interrupted again", () =>
Effect.scoped(
Effect.gen(function* () {
const firstStarted = yield* Deferred.make<void>()
const cleanupStarted = yield* Deferred.make<void>()
const cleanupGate = yield* Deferred.make<void>()
const secondStarted = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () =>
Effect.sync(() => ++runs).pipe(
Effect.flatMap((run) =>
run === 1
? Deferred.succeed(firstStarted, undefined).pipe(
Effect.andThen(Effect.never),
Effect.onInterrupt(() =>
Deferred.succeed(cleanupStarted, undefined).pipe(Effect.andThen(Deferred.await(cleanupGate))),
),
)
: Deferred.succeed(secondStarted, undefined),
),
),
})
yield* coordinator.wake("session")
yield* Deferred.await(firstStarted)
const firstInterrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.await(cleanupStarted)
const run = yield* coordinator.run("session").pipe(Effect.forkChild)
const secondInterrupt = yield* coordinator.interrupt("session").pipe(Effect.forkChild)
yield* Deferred.succeed(cleanupGate, undefined)
yield* Effect.all([Fiber.join(firstInterrupt), Fiber.join(secondInterrupt), Fiber.join(run)])
yield* Deferred.await(secondStarted)
expect(runs).toBe(2)
}),
),
)
it.effect("coalesces wakes received during an active run", () => it.effect("coalesces wakes received during an active run", () =>
Effect.scoped( Effect.scoped(
Effect.gen(function* () { Effect.gen(function* () {
@ -381,4 +911,103 @@ describe("SessionRunCoordinator", () => {
}), }),
), ),
) )
it.effect("reports an advisory drain failure exactly once", () =>
Effect.scoped(
Effect.gen(function* () {
const failure = new Error("wake failed")
const reported: Cause.Cause<Error>[] = []
const reportedOnce = yield* Deferred.make<void>()
const coordinator = yield* SessionRunCoordinator.make<string, void, Error>({
drain: () => Effect.fail(failure),
onFailure: (_key, cause) =>
Effect.sync(() => reported.push(cause)).pipe(Effect.andThen(Deferred.succeed(reportedOnce, undefined))),
})
yield* coordinator.wake("session")
yield* Deferred.await(reportedOnce)
yield* Effect.yieldNow
expect(reported).toHaveLength(1)
expect(Cause.squash(reported[0]!)).toBe(failure)
}),
),
)
it.effect("contains defects thrown while constructing an advisory failure report", () =>
Effect.scoped(
Effect.gen(function* () {
const coordinator = yield* SessionRunCoordinator.make<string, void, Error>({
drain: () => Effect.fail(new Error("wake failed")),
onFailure: () => {
throw new Error("report defect")
},
})
yield* coordinator.wake("session")
yield* coordinator.awaitIdle("session").pipe(Effect.exit)
yield* coordinator.wake("session")
yield* coordinator.awaitIdle("session").pipe(Effect.exit)
}),
),
)
it.effect("reports an independently interrupted advisory drain", () =>
Effect.scoped(
Effect.gen(function* () {
const reported = yield* Deferred.make<Cause.Cause<never>>()
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () => Effect.interrupt,
onFailure: (_key, cause) => Deferred.succeed(reported, cause).pipe(Effect.asVoid),
})
yield* coordinator.wake("session")
expect(Cause.hasInterruptsOnly(yield* Deferred.await(reported))).toBeTrue()
}),
),
)
it.effect("does not report deliberate interruption as an advisory failure", () =>
Effect.scoped(
Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const reported: Cause.Cause<never>[] = []
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: () => Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)),
onFailure: (_key, cause) => Effect.sync(() => reported.push(cause)),
})
yield* coordinator.wake("session")
yield* Deferred.await(started)
yield* coordinator.interrupt("session")
yield* Effect.yieldNow
expect(reported).toEqual([])
}),
),
)
it.effect("trampolines many synchronous self-waking drains", () =>
Effect.scoped(
Effect.gen(function* () {
const limit = 20_000
let runs = 0
let wake: (key: string) => Effect.Effect<void> = () => Effect.void
const coordinator = yield* SessionRunCoordinator.make<string, void, never>({
drain: (key) =>
Effect.sync(() => ++runs).pipe(
Effect.tap((run) => (run < limit ? wake(key) : Effect.void)),
Effect.asVoid,
),
})
wake = coordinator.wake
yield* coordinator.wake("session")
yield* coordinator.awaitIdle("session")
expect(runs).toBe(limit)
}),
),
)
}) })

View File

@ -20,6 +20,7 @@ import { SessionRunnerModel } from "@opencode-ai/core/session/runner/model"
import { ToolRegistry } from "@opencode-ai/core/tool/registry" import { ToolRegistry } from "@opencode-ai/core/tool/registry"
import { SessionTable } from "@opencode-ai/core/session/sql" import { SessionTable } from "@opencode-ai/core/session/sql"
import { SessionStore } from "@opencode-ai/core/session/store" import { SessionStore } from "@opencode-ai/core/session/store"
import { Location } from "@opencode-ai/core/location"
import { SystemContextRegistry } from "@opencode-ai/core/system-context/registry" import { SystemContextRegistry } from "@opencode-ai/core/system-context/registry"
import { SystemContext } from "@opencode-ai/core/system-context" import { SystemContext } from "@opencode-ai/core/system-context"
import { SkillGuidance } from "@opencode-ai/core/skill/guidance" import { SkillGuidance } from "@opencode-ai/core/skill/guidance"
@ -61,6 +62,7 @@ const model = OpenAIChat.route
.model({ id: "gpt-4o-mini" }) .model({ id: "gpt-4o-mini" })
const models = SessionRunnerModel.layerWith(() => Effect.succeed(model)) const models = SessionRunnerModel.layerWith(() => Effect.succeed(model))
const systemContext = SystemContextRegistry.layer const systemContext = SystemContextRegistry.layer
const location = Location.layer({ directory: AbsolutePath.make("/project") }).pipe(Layer.provide(Project.defaultLayer))
const skillGuidance = Layer.mock(SkillGuidance.Service, { load: () => Effect.succeed(SystemContext.empty) }) const skillGuidance = Layer.mock(SkillGuidance.Service, { load: () => Effect.succeed(SystemContext.empty) })
const runner = SessionRunnerLLM.defaultLayer.pipe( const runner = SessionRunnerLLM.defaultLayer.pipe(
Layer.provide(database), Layer.provide(database),
@ -70,6 +72,7 @@ const runner = SessionRunnerLLM.defaultLayer.pipe(
Layer.provide(registry), Layer.provide(registry),
Layer.provide(models), Layer.provide(models),
Layer.provide(systemContext), Layer.provide(systemContext),
Layer.provide(location),
Layer.provide(agents), Layer.provide(agents),
Layer.provide(skillGuidance), Layer.provide(skillGuidance),
) )
@ -77,7 +80,9 @@ const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
const execution = Layer.effect( const execution = Layer.effect(
SessionExecution.Service, SessionExecution.Service,
SessionRunCoordinator.Service.pipe( SessionRunCoordinator.Service.pipe(
Effect.map((coordinator) => SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake })), Effect.map((coordinator) =>
SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt }),
),
), ),
).pipe(Layer.provide(coordinator)) ).pipe(Layer.provide(coordinator))
const sessions = SessionV2.layer.pipe( const sessions = SessionV2.layer.pipe(
@ -100,6 +105,7 @@ const it = testEffect(
registry, registry,
models, models,
systemContext, systemContext,
location,
skillGuidance, skillGuidance,
runner, runner,
coordinator, coordinator,

View File

@ -46,6 +46,7 @@ import { SystemContext } from "@opencode-ai/core/system-context"
import { SystemContextRegistry } from "@opencode-ai/core/system-context/registry" import { SystemContextRegistry } from "@opencode-ai/core/system-context/registry"
import { SkillGuidance } from "@opencode-ai/core/skill/guidance" import { SkillGuidance } from "@opencode-ai/core/skill/guidance"
import { ModelV2 } from "@opencode-ai/core/model" import { ModelV2 } from "@opencode-ai/core/model"
import { Location } from "@opencode-ai/core/location"
import { ProviderV2 } from "@opencode-ai/core/provider" import { ProviderV2 } from "@opencode-ai/core/provider"
import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect" import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect"
import { asc, eq } from "drizzle-orm" import { asc, eq } from "drizzle-orm"
@ -187,6 +188,7 @@ const systemContext = Layer.effectDiscard(
), ),
), ),
).pipe(Layer.provideMerge(SystemContextRegistry.layer)) ).pipe(Layer.provideMerge(SystemContextRegistry.layer))
const location = Location.layer({ directory: AbsolutePath.make("/project") }).pipe(Layer.provide(Project.defaultLayer))
const skillGuidance = Layer.mock(SkillGuidance.Service, { const skillGuidance = Layer.mock(SkillGuidance.Service, {
load: (agent) => load: (agent) =>
Effect.succeed( Effect.succeed(
@ -210,6 +212,7 @@ const runner = SessionRunnerLLM.layer.pipe(
Layer.provide(registry), Layer.provide(registry),
Layer.provide(models), Layer.provide(models),
Layer.provide(systemContext), Layer.provide(systemContext),
Layer.provide(location),
Layer.provide(agents), Layer.provide(agents),
Layer.provide(skillGuidance), Layer.provide(skillGuidance),
) )
@ -217,7 +220,9 @@ const coordinator = SessionRunCoordinator.layer.pipe(Layer.provide(runner))
const execution = Layer.effect( const execution = Layer.effect(
SessionExecution.Service, SessionExecution.Service,
SessionRunCoordinator.Service.pipe( SessionRunCoordinator.Service.pipe(
Effect.map((coordinator) => SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake })), Effect.map((coordinator) =>
SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, interrupt: coordinator.interrupt }),
),
), ),
).pipe(Layer.provide(coordinator)) ).pipe(Layer.provide(coordinator))
const sessions = SessionV2.layer.pipe( const sessions = SessionV2.layer.pipe(
@ -242,6 +247,7 @@ const it = testEffect(
echo, echo,
models, models,
systemContext, systemContext,
location,
skillGuidance, skillGuidance,
runner, runner,
coordinator, coordinator,
@ -624,7 +630,7 @@ describe("SessionRunnerLLM", () => {
}), }),
) )
it.effect("requires a complete new baseline after a Session moves", () => it.effect("interrupts a source Location runner after a Session moves", () =>
Effect.gen(function* () { Effect.gen(function* () {
yield* setup yield* setup
const session = yield* SessionV2.Service const session = yield* SessionV2.Service
@ -648,12 +654,10 @@ describe("SessionRunnerLLM", () => {
.get(), .get(),
).toBeUndefined() ).toBeUndefined()
systemUnavailable = true
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
const exit = yield* session.resume(sessionID).pipe(Effect.exit) const exit = yield* session.resume(sessionID).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true) expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBe(true)
if (Exit.isFailure(exit)) expect(Cause.squash(exit.cause)).toBeInstanceOf(SystemContext.InitializationBlocked)
expect(requests).toHaveLength(1) expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true)
}), }),
@ -1986,6 +1990,92 @@ describe("SessionRunnerLLM", () => {
}), }),
) )
it.effect("preserves durable queued input for a later wake after interruption", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false })
requests.length = 0
responses = [
[],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Run after interrupt" }),
delivery: "queue",
})
yield* session.interrupt(sessionID)
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(true)
const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (requests.length < 2) yield* Effect.yieldNow
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(resumed)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(2)
expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"])
expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Run after interrupt"])
}),
)
it.effect("preserves durable steering input for a later resume after interruption", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt current work" }), resume: false })
requests.length = 0
responses = [
[],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Steer after interrupt" }),
})
yield* session.interrupt(sessionID)
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true)
const resumed = yield* session.resume(sessionID).pipe(Effect.forkChild)
while (requests.length < 2) yield* Effect.yieldNow
yield* Deferred.succeed(streamGate, undefined)
yield* Fiber.join(resumed)
streamGate = undefined
streamStarted = undefined
expect(requests).toHaveLength(2)
expect(userTexts(requests[0]!)).toEqual(["Interrupt current work"])
expect(userTexts(requests[1]!)).toEqual(["Interrupt current work", "Steer after interrupt"])
}),
)
it.effect("runs queued active inputs as separate FIFO activities", () => it.effect("runs queued active inputs as separate FIFO activities", () =>
Effect.gen(function* () { Effect.gen(function* () {
yield* setup yield* setup
@ -2697,13 +2787,13 @@ describe("SessionRunnerLLM", () => {
Stream.never, Stream.never,
) )
const runner = yield* SessionRunner.Service const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
const run = yield* runner.run({ sessionID, force: true }).pipe(Effect.forkChild)
while (executions.length === 0) yield* Effect.yieldNow while (executions.length === 0) yield* Effect.yieldNow
yield* Fiber.interrupt(run) yield* session.interrupt(sessionID)
toolExecutionGate = undefined toolExecutionGate = undefined
expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" }) expect(yield* Fiber.await(run)).toMatchObject({ _tag: "Failure" })
yield* session.interrupt(sessionID)
expect(yield* session.context(sessionID)).toMatchObject([ expect(yield* session.context(sessionID)).toMatchObject([
{ type: "user", text: "Interrupt blocked tool" }, { type: "user", text: "Interrupt blocked tool" },
{ {
@ -2732,6 +2822,29 @@ describe("SessionRunnerLLM", () => {
}), }),
) )
it.effect("interrupts a blocked provider turn without local tool activity", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Interrupt provider" }), resume: false })
requests.length = 0
response = []
streamGate = yield* Deferred.make<void>()
streamStarted = yield* Deferred.make<void>()
const run = yield* session.resume(sessionID).pipe(Effect.forkChild)
yield* Deferred.await(streamStarted)
yield* session.interrupt(sessionID)
const exit = yield* Fiber.await(run)
streamGate = undefined
streamStarted = undefined
expect(Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)).toBeTrue()
expect(requests).toHaveLength(1)
yield* session.interrupt(sessionID)
}),
)
it.effect("durably fails blocked local tools when interrupted while awaiting settlement", () => it.effect("durably fails blocked local tools when interrupted while awaiting settlement", () =>
Effect.gen(function* () { Effect.gen(function* () {
yield* setup yield* setup

View File

@ -18,6 +18,13 @@ sessions.prompt({ id?, sessionID, prompt, delivery?, resume? })
-> exact retry schedules another wake unless resume is false -> exact retry schedules another wake unless resume is false
-> resume omitted or true schedules execution after admission -> resume omitted or true schedules execution after admission
-> resume false admits only -> resume false admits only
sessions.interrupt(sessionID)
-> interrupts the active ownership chain on this process
-> waits for active drain cleanup and settlement
-> suppresses reruns already queued before interruption
-> preserves durable inbox rows for a later fresh wake or resume
-> idle or missing Session is a no-op
``` ```
`session_input` is the durable admission inbox. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `PromptLifecycle.Promoted`. The projector atomically writes the visible user message and marks its inbox row promoted in the same event transaction. The legacy V1-to-V2 shadow bridge continues publishing ordinary `Prompted` events for already-visible V1 prompts. `session_input` is the durable admission inbox. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `PromptLifecycle.Promoted`. The projector atomically writes the visible user message and marks its inbox row promoted in the same event transaction. The legacy V1-to-V2 shadow bridge continues publishing ordinary `Prompted` events for already-visible V1 prompts.
@ -141,7 +148,7 @@ Execution has two entry points:
Post-crash activity recovery is intentionally deferred. A wake does not infer that ambiguous provider work is safe to retry after an input has already been promoted. Explicit `run` may deliberately continue from durable projected history. A future recovery slice should model durable activity identity, provider-dispatch ambiguity, required continuation, queue-opener reservation, retry policy, and visible recovery status together. Post-crash activity recovery is intentionally deferred. A wake does not infer that ambiguous provider work is safe to retry after an input has already been promoted. Explicit `run` may deliberately continue from durable projected history. A future recovery slice should model durable activity identity, provider-dispatch ambiguity, required continuation, queue-opener reservation, retry policy, and visible recovery status together.
A location-scoped `SessionRunCoordinator` serializes each Session drain chain while allowing different Sessions to drain concurrently. Automatic startup discovery, durable multi-node ownership, stale-owner fencing, interruption controls, and retry policy remain future work. A process-global `SessionRunCoordinator` serializes each local Session drain chain while allowing different Sessions to drain concurrently. It enters the Session's current Location only when a drain starts, so interruption targets process execution ownership rather than Location cache identity. Interruption establishes a local ownership-chain boundary by stopping the current chain while preserving pending/unpromoted durable inbox rows for a later fresh wake and projected history for explicit resume. A Location runner also fences every new provider turn against its captured Location so a moved Session cannot begin another turn through source-Location tools or context. An already-dispatched provider turn may still settle source-Location calls until a future move-control slice interrupts active ownership. Automatic startup discovery, durable multi-node ownership, stale-owner fencing, and retry policy remain future work.
Inbox promotion coalesces pending steers in durable admission order and opens one queued activity at a time in FIFO order. Add explicit inbox backlog and steering-batch limits before exposing broad multi-caller admission or untrusted queue growth. Inbox promotion coalesces pending steers in durable admission order and opens one queued activity at a time in FIFO order. Add explicit inbox backlog and steering-batch limits before exposing broad multi-caller admission or untrusted queue growth.

View File

@ -33,7 +33,7 @@ through legacy `SessionPrompt.loop(...)`:
Prompt admission now uses a durable `session_input` inbox rather than immediate Prompt admission now uses a durable `session_input` inbox rather than immediate
transcript projection. `steer` inputs coalesce into the active activity at the transcript projection. `steer` inputs coalesce into the active activity at the
next safe provider-turn boundary. `queue` inputs form a FIFO of future activities next safe provider-turn boundary. `queue` inputs form a FIFO of future activities
that open one at a time. A location-scoped `SessionRunCoordinator` coalesces process-local wakeups that open one at a time. A process-global `SessionRunCoordinator` coalesces process-local wakeups
around settlement races. Explicit `run` resumes perform at least one provider around settlement races. Explicit `run` resumes perform at least one provider
attempt; advisory `wake` notifications call the provider only for eligible inbox attempt; advisory `wake` notifications call the provider only for eligible inbox
work. Steers coalesce into the active activity at work. Steers coalesce into the active activity at
@ -55,7 +55,7 @@ Next reviewed slices:
- integrate the new BackgroundJob service with V2 tool execution: support background - integrate the new BackgroundJob service with V2 tool execution: support background
bash jobs and background agent dispatch with durable status observation, bash jobs and background agent dispatch with durable status observation,
completion delivery, and explicit cancellation / continuation semantics completion delivery, and explicit cancellation / continuation semantics
- add compaction, interruption, retries, and stale-owner fencing - add compaction, durable/clustered interruption, retries, and stale-owner fencing
only as their slices become concrete only as their slices become concrete
### Deferred durable activity recovery ### Deferred durable activity recovery