diff --git a/CONTEXT.md b/CONTEXT.md index 1c12ba641..faf8ce9d1 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -24,7 +24,7 @@ A durable chronological instruction that tells the model the newly effective sta _Avoid_: System update, system notification, raw text diff **Context Epoch**: -The span during which one effective agent's initially rendered **System Context** remains immutable, ending at compaction or another baseline-replacing transition. +The span during which one initially rendered **System Context** remains the immutable provider-cache baseline, ending at completed compaction, Session movement, or an incompatible context transition that requires a fresh baseline. **Baseline System Context**: The full **System Context** rendered at the start of a **Context Epoch**. @@ -75,31 +75,28 @@ The host-supplied environment overlay applied by the server when creating a PTY, - Each **Context Source** loader returns one coherent typed value. `SystemContext.make(...)` hides that value type so differently typed sources compose uniformly. Its codec compares and stores that value; its pure renderers produce model-visible baseline, update, and removal text only when needed. - `SystemContext.initialize(...)` observes a composed **System Context** once and produces a fresh **Baseline System Context** with its **Context Snapshot**. - `SystemContext.reconcile(...)` observes a composed **System Context** once and returns exactly one next action: unchanged, updated, replacement ready, or replacement blocked. -- `SystemContext.replace(...)` represents an explicit baseline-replacing transition such as compaction or model/provider switch; it either produces a fresh generation or reports that replacement is blocked by unavailable admitted context. -- Context Epoch preparation retries until stable after optimistic revision mismatches so concurrent replacement requests cannot terminate an otherwise valid safe-boundary run. +- `SystemContext.replace(...)` renders a fresh generation after completed compaction or another baseline-replacing transition; it reports replacement blocked while previously admitted context is unavailable. - **Unavailable Context** uses stale-while-revalidate semantics and is distinct from a successfully loaded absence, which may emit removal text. - Ordinary **Context Source** loaders return values directly; loaders that intentionally use stale-while-revalidate may explicitly return **Unavailable Context**. - Nested project instruction discovery after successful reads remains a follow-up; when implemented, discovered instructions must be admitted durably at the next **Safe Provider-Turn Boundary**. - Location-scoped services naturally re-resolve effective context when a moved session next runs in its destination location. - Moving a Session clears its active **Context Epoch**, so the destination must initialize a complete baseline before another prompt can promote. -- Context Epoch initialization is fenced against the authoritative Session Location, so an old-Location runner cannot recreate source context after a concurrent move. - Instruction discovery, source identity, persistence, and file loading belong to the instruction service; the **System Context** abstraction only composes effectful producers and renders loaded values. - The first instruction-service slice observes global and upward project `AGENTS.md` files as one ordered aggregate **Context Source** at each **Safe Provider-Turn Boundary**. - Built-in and instruction context producers register through the **System Context Registry** with stable contribution keys. Plugin-defined context registration and hot-reload lifecycle remain a follow-up built on the same scoped registry seam. - Selected-agent available-skill guidance is a **Context Source** composed with Location-wide registry sources immediately before Context Epoch admission. It lists only names and descriptions permitted for that agent; skill bodies and locations are exposed only through the permission-checked `skill` tool. -- Switching the selected agent requests **Context Epoch** replacement. A switch admitted after the current **Safe Provider-Turn Boundary** applies to the next provider turn while leaving the already-prepared baseline durable. Epoch creation is fenced against the authoritative effective agent, and retries re-observe the current agent. -- A cross-agent replacement must complete before another provider turn; unavailable admitted context blocks that replacement instead of exposing the previous agent's privileged baseline. +- The selected agent and model are sampled when a provider turn starts. Changes admitted after that boundary apply to the next provider turn and do not restart the current turn. +- Selected-agent available-skill guidance remains a **Context Source**. An agent switch that changes that guidance produces a **Mid-Conversation System Message** while preserving the current baseline. - Local tool authorization and pending permission requests retain the effective agent of the provider turn that issued the call; a later agent switch cannot change that call's policy. - Context source changes never wake idle sessions; the next naturally scheduled **Safe Provider-Turn Boundary** loads and compares current values lazily. - Once admitted, a **Mid-Conversation System Message** remains durable even if the following provider attempt fails and is replayed unchanged on retry. - **Mid-Conversation System Messages** remain durable Session-message history; normal user-facing transcript surfaces may hide them. - The date **Context Source** initially preserves host-local calendar-date behavior; a configured user timezone may replace that default later. - A **Context Epoch** begins with one immutable **Baseline System Context**. -- A **Context Epoch** durably records the effective agent that owns its **Baseline System Context**. - A **Baseline System Context** is stored durably and reused verbatim across process restarts within its **Context Epoch**. - A **Baseline System Context** durably preserves the exact joined text used for the active provider-cache prefix. -- Compaction or a model/provider switch starts a new **Context Epoch** because the baseline can be replaced without preserving the prior provider cache. -- A model/provider switch always starts a new **Context Epoch** while preserving chronological conversation history. +- Completed compaction starts a new **Context Epoch** on the next provider attempt, folding the current complete **System Context** into a fresh baseline and removing earlier **Mid-Conversation System Messages** from active model history. +- A model/provider switch preserves the current **Context Epoch** and chronological conversation history; the new selection applies to the next provider turn. - **Model Request Options** remain provider-semantic through Catalog resolution. The Session runner maps them into the LLM package's provider-option namespace; the selected protocol adapter alone owns provider wire encoding. - **Generation Controls**, protocol-semantic **Model Request Options**, and compatibility request body fields are separate Catalog domains. A shared ingestion adapter partitions legacy and models.dev AI-SDK-shaped options before routing. - The **PTY Environment** is a server concern rather than a Core PTY concern. PTY creation merges caller values, then the host overlay, then Core-forced terminal invariants such as `TERM` and `OPENCODE_TERMINAL`. diff --git a/packages/core/schema.json b/packages/core/schema.json index c041a4e01..5f71bfdb6 100644 --- a/packages/core/schema.json +++ b/packages/core/schema.json @@ -1,8 +1,10 @@ { "version": "7", "dialect": "sqlite", - "id": "169a0f0f-d58f-479f-b024-fa1c7b9a09db", - "prevIds": ["abd2f920-b822-49af-b8a7-2e48367d424f"], + "id": "f14a9b18-8207-487e-a3d3-227e629ba9ad", + "prevIds": [ + "169a0f0f-d58f-479f-b024-fa1c7b9a09db" + ], "ddl": [ { "name": "workspace", @@ -900,16 +902,6 @@ "entityType": "columns", "table": "session_context_epoch" }, - { - "type": "text", - "notNull": true, - "autoincrement": false, - "default": "'build'", - "generated": null, - "name": "agent", - "entityType": "columns", - "table": "session_context_epoch" - }, { "type": "text", "notNull": true, @@ -930,26 +922,6 @@ "entityType": "columns", "table": "session_context_epoch" }, - { - "type": "integer", - "notNull": false, - "autoincrement": false, - "default": null, - "generated": null, - "name": "replacement_seq", - "entityType": "columns", - "table": "session_context_epoch" - }, - { - "type": "integer", - "notNull": true, - "autoincrement": false, - "default": "0", - "generated": null, - "name": "revision", - "entityType": "columns", - "table": "session_context_epoch" - }, { "type": "text", "notNull": false, @@ -1511,9 +1483,13 @@ "table": "session_share" }, { - "columns": ["project_id"], + "columns": [ + "project_id" + ], "tableTo": "project", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1522,9 +1498,13 @@ "table": "workspace" }, { - "columns": ["active_account_id"], + "columns": [ + "active_account_id" + ], "tableTo": "account", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "SET NULL", "nameExplicit": false, @@ -1533,9 +1513,13 @@ "table": "account_state" }, { - "columns": ["aggregate_id"], + "columns": [ + "aggregate_id" + ], "tableTo": "event_sequence", - "columnsTo": ["aggregate_id"], + "columnsTo": [ + "aggregate_id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1544,9 +1528,13 @@ "table": "event" }, { - "columns": ["project_id"], + "columns": [ + "project_id" + ], "tableTo": "project", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1555,9 +1543,13 @@ "table": "permission" }, { - "columns": ["project_id"], + "columns": [ + "project_id" + ], "tableTo": "project", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1566,9 +1558,13 @@ "table": "project_directory" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "tableTo": "session", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1577,9 +1573,13 @@ "table": "message" }, { - "columns": ["message_id"], + "columns": [ + "message_id" + ], "tableTo": "message", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1588,9 +1588,13 @@ "table": "part" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "tableTo": "session", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1599,9 +1603,13 @@ "table": "session_context_epoch" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "tableTo": "session", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1610,9 +1618,13 @@ "table": "session_input" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "tableTo": "session", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1621,9 +1633,13 @@ "table": "session_message" }, { - "columns": ["project_id"], + "columns": [ + "project_id" + ], "tableTo": "project", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1632,9 +1648,13 @@ "table": "session" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "tableTo": "session", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1643,9 +1663,13 @@ "table": "todo" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "tableTo": "session", - "columnsTo": ["id"], + "columnsTo": [ + "id" + ], "onUpdate": "NO ACTION", "onDelete": "CASCADE", "nameExplicit": false, @@ -1654,133 +1678,174 @@ "table": "session_share" }, { - "columns": ["email", "url"], + "columns": [ + "email", + "url" + ], "nameExplicit": false, "name": "control_account_pk", "entityType": "pks", "table": "control_account" }, { - "columns": ["project_id", "directory"], + "columns": [ + "project_id", + "directory" + ], "nameExplicit": false, "name": "project_directory_pk", "entityType": "pks", "table": "project_directory" }, { - "columns": ["session_id", "position"], + "columns": [ + "session_id", + "position" + ], "nameExplicit": false, "name": "todo_pk", "entityType": "pks", "table": "todo" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "workspace_pk", "table": "workspace", "entityType": "pks" }, { - "columns": ["name"], + "columns": [ + "name" + ], "nameExplicit": false, "name": "data_migration_pk", "table": "data_migration", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "account_state_pk", "table": "account_state", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "account_pk", "table": "account", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "credential_pk", "table": "credential", "entityType": "pks" }, { - "columns": ["aggregate_id"], + "columns": [ + "aggregate_id" + ], "nameExplicit": false, "name": "event_sequence_pk", "table": "event_sequence", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "event_pk", "table": "event", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "permission_pk", "table": "permission", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "project_pk", "table": "project", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "message_pk", "table": "message", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "part_pk", "table": "part", "entityType": "pks" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "nameExplicit": false, "name": "session_context_epoch_pk", "table": "session_context_epoch", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "session_input_pk", "table": "session_input", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "session_message_pk", "table": "session_message", "entityType": "pks" }, { - "columns": ["id"], + "columns": [ + "id" + ], "nameExplicit": false, "name": "session_pk", "table": "session", "entityType": "pks" }, { - "columns": ["session_id"], + "columns": [ + "session_id" + ], "nameExplicit": false, "name": "session_share_pk", "table": "session_share", diff --git a/packages/core/src/database/migration.gen.ts b/packages/core/src/database/migration.gen.ts index 1e915bb3c..fd778414a 100644 --- a/packages/core/src/database/migration.gen.ts +++ b/packages/core/src/database/migration.gen.ts @@ -37,5 +37,6 @@ export const migrations = ( import("./migration/20260611035744_credential"), import("./migration/20260611192811_lush_chimera"), import("./migration/20260612174303_project_dir_strategy"), + import("./migration/20260622142730_simplify_session_context_epoch"), ]) ).map((module) => module.default) satisfies DatabaseMigration.Migration[] diff --git a/packages/core/src/database/migration/20260622142730_simplify_session_context_epoch.ts b/packages/core/src/database/migration/20260622142730_simplify_session_context_epoch.ts new file mode 100644 index 000000000..1520bac4c --- /dev/null +++ b/packages/core/src/database/migration/20260622142730_simplify_session_context_epoch.ts @@ -0,0 +1,13 @@ +import { Effect } from "effect" +import type { DatabaseMigration } from "../migration" + +export default { + id: "20260622142730_simplify_session_context_epoch", + up(tx) { + return Effect.gen(function* () { + yield* tx.run(`ALTER TABLE \`session_context_epoch\` DROP COLUMN \`agent\`;`) + yield* tx.run(`ALTER TABLE \`session_context_epoch\` DROP COLUMN \`replacement_seq\`;`) + yield* tx.run(`ALTER TABLE \`session_context_epoch\` DROP COLUMN \`revision\`;`) + }) + }, +} satisfies DatabaseMigration.Migration diff --git a/packages/core/src/database/schema.gen.ts b/packages/core/src/database/schema.gen.ts index 5c044ec60..ed60fde6c 100644 --- a/packages/core/src/database/schema.gen.ts +++ b/packages/core/src/database/schema.gen.ts @@ -149,11 +149,8 @@ export default { CREATE TABLE \`session_context_epoch\` ( \`session_id\` text PRIMARY KEY, \`baseline\` text NOT NULL, - \`agent\` text DEFAULT 'build' NOT NULL, \`snapshot\` text NOT NULL, \`baseline_seq\` integer NOT NULL, - \`replacement_seq\` integer, - \`revision\` integer DEFAULT 0 NOT NULL, CONSTRAINT \`fk_session_context_epoch_session_id_session_id_fk\` FOREIGN KEY (\`session_id\`) REFERENCES \`session\`(\`id\`) ON DELETE CASCADE ); `) diff --git a/packages/core/src/session/context-epoch.ts b/packages/core/src/session/context-epoch.ts index 1fb8df92e..18624706a 100644 --- a/packages/core/src/session/context-epoch.ts +++ b/packages/core/src/session/context-epoch.ts @@ -1,54 +1,31 @@ export * as SessionContextEpoch from "./context-epoch" -import { and, eq, isNull, lt, or, sql } from "drizzle-orm" +import { eq } from "drizzle-orm" import { DateTime, Effect, Schema } from "effect" -import { AgentV2 } from "../agent" import type { Database } from "../database/database" import { EventV2 } from "../event" -import { Location } from "../location" import { SystemContext } from "../system-context/index" import { ContextSnapshotDecodeError } from "./error" import { SessionEvent } from "./event" +import { SessionHistory } from "./history" import { SessionInput } from "./input" import { SessionMessageID } from "./message-id" import { SessionSchema } from "./schema" -import { SessionContextEpochTable, SessionTable } from "./sql" +import { SessionContextEpochTable } from "./sql" type DatabaseService = Database.Interface["db"] -class RevisionMismatch extends Error {} -class LocationMismatch extends Error {} -export class AgentMismatch extends Error {} -export class AgentReplacementBlocked extends Schema.TaggedErrorClass()( - "SessionContextEpoch.AgentReplacementBlocked", - { sessionID: SessionSchema.ID, previous: AgentV2.ID, current: AgentV2.ID }, -) {} - -const retryRevisionMismatch = (attempt: () => Effect.Effect): Effect.Effect => - attempt().pipe( - Effect.catchDefect((defect) => - defect instanceof RevisionMismatch - ? Effect.yieldNow.pipe(Effect.andThen(retryRevisionMismatch(attempt))) - : Effect.die(defect), - ), - ) - interface Prepared { readonly baseline: string readonly baselineSeq: number - readonly revision: number } export function initialize( db: DatabaseService, context: Effect.Effect, sessionID: SessionSchema.ID, - location: Location.Ref, - agent: AgentV2.ID, ): Effect.Effect { - return retryRevisionMismatch(() => initializeOnce(db, context, sessionID, location, agent)).pipe( - Effect.withSpan("SessionContextEpoch.initialize"), - ) + return initializeOnce(db, context, sessionID).pipe(Effect.withSpan("SessionContextEpoch.initialize")) } export function prepare( @@ -56,12 +33,8 @@ export function prepare( events: EventV2.Interface, context: Effect.Effect, sessionID: SessionSchema.ID, - location: Location.Ref, - agent: AgentV2.ID, -): Effect.Effect { - return retryRevisionMismatch(() => prepareOnce(db, events, context, sessionID, location, agent)).pipe( - Effect.withSpan("SessionContextEpoch.prepare"), - ) +): Effect.Effect { + return prepareOnce(db, events, context, sessionID).pipe(Effect.withSpan("SessionContextEpoch.prepare")) } const prepareOnce = Effect.fnUntraced(function* ( @@ -69,57 +42,50 @@ const prepareOnce = Effect.fnUntraced(function* ( events: EventV2.Interface, context: Effect.Effect, sessionID: SessionSchema.ID, - location: Location.Ref, - agent: AgentV2.ID, ) { - const [value, stored] = yield* Effect.all([context, find(db, sessionID)], { concurrency: "unbounded" }) + const [value, stored, compaction] = yield* Effect.all( + [context, find(db, sessionID), SessionHistory.latestCompaction(db, sessionID)], + { concurrency: "unbounded" }, + ) if (!stored) { const generation = yield* SystemContext.initialize(value) - const baselineSeq = yield* insert(db, sessionID, location, agent, generation) - return { baseline: generation.baseline, baselineSeq, revision: 0 } + const baselineSeq = yield* insert(db, sessionID, generation) + return { baseline: generation.baseline, baselineSeq } } const snapshot = yield* Schema.decodeUnknownEffect(SystemContext.Snapshot)(stored.snapshot).pipe( Effect.mapError((error) => new ContextSnapshotDecodeError({ sessionID, details: String(error) })), ) - const replacingAgent = stored.agent !== agent - const result = - stored.replacement_seq === null && !replacingAgent - ? yield* SystemContext.reconcile(value, snapshot) - : yield* SystemContext.replace(value, snapshot) - if (result._tag === "ReplacementBlocked" && replacingAgent) { - yield* fence(db, sessionID, agent, stored.revision) - return yield* new AgentReplacementBlocked({ sessionID, previous: stored.agent, current: agent }) - } + const replacementSeq = compaction !== undefined && compaction.seq > stored.baseline_seq ? compaction.seq : undefined + const result = replacementSeq + ? yield* SystemContext.replace(value, snapshot) + : yield* SystemContext.reconcile(value, snapshot) if (result._tag === "Unchanged" || result._tag === "ReplacementBlocked") { - yield* fence(db, sessionID, agent, stored.revision) - return { baseline: stored.baseline, baselineSeq: stored.baseline_seq, revision: stored.revision } + return { baseline: stored.baseline, baselineSeq: stored.baseline_seq } } if (result._tag === "ReplacementReady") { - const replacementSeq = stored.replacement_seq ?? (yield* SessionInput.latestSeq(db, sessionID)) - yield* replace(db, sessionID, agent, stored.revision, replacementSeq, result.generation) - return { baseline: result.generation.baseline, baselineSeq: replacementSeq, revision: stored.revision + 1 } + const baselineSeq = replacementSeq ?? (yield* SessionInput.latestSeq(db, sessionID)) + yield* replace(db, sessionID, baselineSeq, result.generation) + return { baseline: result.generation.baseline, baselineSeq } } yield* events.publish( SessionEvent.ContextUpdated, { sessionID, messageID: SessionMessageID.ID.create(), timestamp: yield* DateTime.now, text: result.text }, - { commit: () => advance(db, sessionID, stored.revision, result.snapshot).pipe(Effect.orDie) }, + { commit: () => advance(db, sessionID, result.snapshot).pipe(Effect.orDie) }, ) - return { baseline: stored.baseline, baselineSeq: stored.baseline_seq, revision: stored.revision + 1 } + return { baseline: stored.baseline, baselineSeq: stored.baseline_seq } }) const initializeOnce = Effect.fnUntraced(function* ( db: DatabaseService, context: Effect.Effect, sessionID: SessionSchema.ID, - location: Location.Ref, - agent: AgentV2.ID, ) { if (yield* exists(db, sessionID)) return const generation = yield* context.pipe(Effect.flatMap(SystemContext.initialize)) - const baselineSeq = yield* insert(db, sessionID, location, agent, generation) - return { baseline: generation.baseline, baselineSeq, revision: 0 } + const baselineSeq = yield* insert(db, sessionID, generation) + return { baseline: generation.baseline, baselineSeq } }) const exists = Effect.fn("SessionContextEpoch.exists")(function* (db: DatabaseService, sessionID: SessionSchema.ID) { @@ -142,39 +108,6 @@ const find = Effect.fn("SessionContextEpoch.find")(function* (db: DatabaseServic .pipe(Effect.orDie) }) -const requireAgentSelection = Effect.fnUntraced(function* ( - db: DatabaseService, - sessionID: SessionSchema.ID, - agent: AgentV2.ID, -) { - const selected = yield* db - .select({ agent: SessionTable.agent }) - .from(SessionTable) - .where(eq(SessionTable.id, sessionID)) - .get() - .pipe(Effect.orDie) - if (!selected || (selected.agent !== null && selected.agent !== agent)) return yield* Effect.die(new AgentMismatch()) -}) - -export const requestReplacement = Effect.fn("SessionContextEpoch.requestReplacement")(function* ( - db: DatabaseService, - sessionID: SessionSchema.ID, - seq: number, -) { - return yield* db - .update(SessionContextEpochTable) - .set({ replacement_seq: seq, revision: sql`${SessionContextEpochTable.revision} + 1` }) - .where( - and( - eq(SessionContextEpochTable.session_id, sessionID), - lt(SessionContextEpochTable.baseline_seq, seq), - or(isNull(SessionContextEpochTable.replacement_seq), lt(SessionContextEpochTable.replacement_seq, seq)), - ), - ) - .run() - .pipe(Effect.orDie) -}) - export const reset = Effect.fn("SessionContextEpoch.reset")(function* ( db: DatabaseService, sessionID: SessionSchema.ID, @@ -189,155 +122,53 @@ export const reset = Effect.fn("SessionContextEpoch.reset")(function* ( const insert = Effect.fnUntraced(function* ( db: DatabaseService, sessionID: SessionSchema.ID, - location: Location.Ref, - agent: AgentV2.ID, generation: SystemContext.Generation, ) { - return yield* db - .transaction( - () => - Effect.gen(function* () { - const placed = yield* db - .select({ agent: SessionTable.agent }) - .from(SessionTable) - .where( - and( - eq(SessionTable.id, sessionID), - eq(SessionTable.directory, location.directory), - location.workspaceID === undefined - ? isNull(SessionTable.workspace_id) - : eq(SessionTable.workspace_id, location.workspaceID), - ), - ) - .get() - .pipe(Effect.orDie) - if (!placed) return yield* Effect.die(new LocationMismatch()) - if (placed.agent !== null && placed.agent !== agent) return yield* Effect.die(new AgentMismatch()) - const baselineSeq = yield* SessionInput.latestSeq(db, sessionID) - yield* db - .insert(SessionContextEpochTable) - .values({ - session_id: sessionID, - baseline: generation.baseline, - agent, - snapshot: generation.snapshot, - baseline_seq: baselineSeq, - revision: 0, - }) - .onConflictDoNothing() - .returning({ sessionID: SessionContextEpochTable.session_id }) - .get() - .pipe( - Effect.orDie, - Effect.flatMap((inserted) => (inserted ? Effect.void : Effect.die(new RevisionMismatch()))), - ) - return baselineSeq - }), - { behavior: "immediate" }, - ) + const baselineSeq = yield* SessionInput.latestSeq(db, sessionID) + yield* db + .insert(SessionContextEpochTable) + .values({ + session_id: sessionID, + baseline: generation.baseline, + snapshot: generation.snapshot, + baseline_seq: baselineSeq, + }) + .run() .pipe(Effect.orDie) + return baselineSeq }) const replace = Effect.fnUntraced(function* ( db: DatabaseService, sessionID: SessionSchema.ID, - agent: AgentV2.ID, - expectedRevision: number, baselineSeq: number, generation: SystemContext.Generation, ) { - yield* db - .transaction( - () => - Effect.gen(function* () { - yield* requireAgentSelection(db, sessionID, agent) - const updated = yield* db - .update(SessionContextEpochTable) - .set({ - baseline: generation.baseline, - agent, - snapshot: generation.snapshot, - baseline_seq: baselineSeq, - replacement_seq: null, - revision: expectedRevision + 1, - }) - .where( - and( - eq(SessionContextEpochTable.session_id, sessionID), - eq(SessionContextEpochTable.revision, expectedRevision), - ), - ) - .returning({ revision: SessionContextEpochTable.revision }) - .get() - .pipe(Effect.orDie) - if (!updated) return yield* Effect.die(new RevisionMismatch()) - }), - { behavior: "immediate" }, - ) - .pipe(Effect.orDie) -}) - -const fence = Effect.fnUntraced(function* ( - db: DatabaseService, - sessionID: SessionSchema.ID, - agent: AgentV2.ID, - expectedRevision: number, -) { - const current = yield* db - .select({ selected: SessionTable.agent, revision: SessionContextEpochTable.revision }) - .from(SessionContextEpochTable) - .innerJoin(SessionTable, eq(SessionTable.id, SessionContextEpochTable.session_id)) - .where(eq(SessionContextEpochTable.session_id, sessionID)) - .get() - .pipe(Effect.orDie) - if (!current || (current.selected !== null && current.selected !== agent)) - return yield* Effect.die(new AgentMismatch()) - if (current.revision !== expectedRevision) return yield* Effect.die(new RevisionMismatch()) -}) - -export const current = Effect.fn("SessionContextEpoch.current")(function* ( - db: DatabaseService, - sessionID: SessionSchema.ID, - agent: AgentV2.ID, - revision: number, -) { - const value = yield* db - .select({ - agent: SessionContextEpochTable.agent, - selected: SessionTable.agent, - revision: SessionContextEpochTable.revision, + const updated = yield* db + .update(SessionContextEpochTable) + .set({ + baseline: generation.baseline, + snapshot: generation.snapshot, + baseline_seq: baselineSeq, }) - .from(SessionContextEpochTable) - .innerJoin(SessionTable, eq(SessionTable.id, SessionContextEpochTable.session_id)) .where(eq(SessionContextEpochTable.session_id, sessionID)) + .returning({ sessionID: SessionContextEpochTable.session_id }) .get() .pipe(Effect.orDie) - return ( - value !== undefined && - value.agent === agent && - (value.selected === null || value.selected === agent) && - value.revision === revision - ) + if (!updated) return yield* Effect.die("Context Epoch not found") }) const advance = Effect.fnUntraced(function* ( db: DatabaseService, sessionID: SessionSchema.ID, - expectedRevision: number, snapshot: SystemContext.Snapshot, ) { const updated = yield* db .update(SessionContextEpochTable) - .set({ snapshot, revision: expectedRevision + 1 }) - .where( - and( - eq(SessionContextEpochTable.session_id, sessionID), - eq(SessionContextEpochTable.revision, expectedRevision), - isNull(SessionContextEpochTable.replacement_seq), - ), - ) - .returning({ revision: SessionContextEpochTable.revision }) + .set({ snapshot }) + .where(eq(SessionContextEpochTable.session_id, sessionID)) + .returning({ sessionID: SessionContextEpochTable.session_id }) .get() .pipe(Effect.orDie) - if (!updated) return yield* Effect.die(new RevisionMismatch()) + if (!updated) return yield* Effect.die("Context Epoch not found") }) diff --git a/packages/core/src/session/history.ts b/packages/core/src/session/history.ts index 285c1bcd5..fb55ab075 100644 --- a/packages/core/src/session/history.ts +++ b/packages/core/src/session/history.ts @@ -10,9 +10,9 @@ type DatabaseService = Database.Interface["db"] const decode = Schema.decodeUnknownEffect(SessionMessage.Message) -const latestCompaction = Effect.fnUntraced(function* (db: DatabaseService, sessionID: SessionSchema.ID) { +export const latestCompaction = Effect.fnUntraced(function* (db: DatabaseService, sessionID: SessionSchema.ID) { return yield* db - .select() + .select({ seq: SessionMessageTable.seq }) .from(SessionMessageTable) .where(and(eq(SessionMessageTable.session_id, sessionID), eq(SessionMessageTable.type, "compaction"))) .orderBy(desc(SessionMessageTable.seq)) diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index bffe4e74c..30af9f2e6 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -329,19 +329,14 @@ export const layer = Layer.effectDiscard( if (next) yield* applyUsage(db, sessionID, next) }), ) - yield* events.project(SessionEvent.AgentSwitched, (event) => { - if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence") - return db + yield* events.project(SessionEvent.AgentSwitched, (event) => + db .update(SessionTable) .set({ agent: event.data.agent, time_updated: DateTime.toEpochMillis(event.data.timestamp) }) .where(eq(SessionTable.id, event.data.sessionID)) .run() - .pipe( - Effect.orDie, - Effect.andThen(run(db, event)), - Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq)), - ) - }) + .pipe(Effect.orDie, Effect.andThen(run(db, event))), + ) yield* events.project(SessionEvent.ModelSwitched, (event) => Effect.gen(function* () { yield* db @@ -351,8 +346,6 @@ export const layer = Layer.effectDiscard( .run() .pipe(Effect.orDie) yield* run(db, event) - if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") - yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq) }), ) yield* events.project(SessionEvent.Prompted, (event) => @@ -407,7 +400,6 @@ export const layer = Layer.effectDiscard( }), ) yield* events.project(SessionEvent.InterruptRequested, () => Effect.void) - // TODO: Reconstruct context epoch replacement state during replay without adding replay state to every EventV2 payload. yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event)) yield* events.project(SessionEvent.Synthetic, (event) => run(db, event)) yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event)) @@ -426,15 +418,9 @@ export const layer = Layer.effectDiscard( yield* events.project(SessionEvent.Reasoning.Started, (event) => run(db, event)) yield* events.project(SessionEvent.Reasoning.Ended, (event) => run(db, event)) // yield* events.project(SessionEvent.Retried, (event) => run(db, event)) - yield* events.project(SessionEvent.Compaction.Ended, (event) => { - if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence") - if (event.durable.version === 1) return Effect.void - const seq = event.durable.seq - return Effect.gen(function* () { - yield* run(db, event) - yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, seq) - }) - }) + yield* events.project(SessionEvent.Compaction.Ended, (event) => + event.durable?.version === 1 ? Effect.void : run(db, event), + ) }), ) diff --git a/packages/core/src/session/runner/index.ts b/packages/core/src/session/runner/index.ts index 4060cc6b0..2210b57b3 100644 --- a/packages/core/src/session/runner/index.ts +++ b/packages/core/src/session/runner/index.ts @@ -6,7 +6,6 @@ import { SessionSchema } from "../schema" import type { ContextSnapshotDecodeError, MessageDecodeError } from "../error" import { SessionRunnerModel } from "./model" import type { SystemContext } from "../../system-context/index" -import type { SessionContextEpoch } from "../context-epoch" import type { ToolOutputStore } from "../../tool-output-store" export type RunError = @@ -15,7 +14,6 @@ export type RunError = | MessageDecodeError | ContextSnapshotDecodeError | SystemContext.InitializationBlocked - | SessionContextEpoch.AgentReplacementBlocked | ToolOutputStore.Error /** Runs one local continuation from already-recorded Session history. */ diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index 5d84e985a..756a11928 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -8,7 +8,7 @@ import { isContextOverflowFailure, type ProviderErrorEvent, } from "@opencode-ai/llm" -import { Cause, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect" +import { Cause, DateTime, Effect, FiberSet, Layer, Option, Semaphore, Stream } from "effect" import { AgentV2 } from "../../agent" import { Config } from "../../config" import { Database } from "../../database/database" @@ -141,8 +141,8 @@ export const layer = Layer.effect( cause.reasons.some((reason) => Cause.isDieReason(reason) && reason.defect instanceof QuestionV2.RejectedError) type TurnTransition = - // Request preparation observed a concurrent Session change and must restart from durable state. - | { readonly _tag: "RebuildPreparedTurn"; readonly promotion?: SessionInput.Delivery } + // Automatic compaction completed; rebuild the request from compacted history. + | { readonly _tag: "ContinueAfterCompaction" } // Overflow compaction completed; rebuild once through the path without overflow recovery. | { readonly _tag: "ContinueAfterOverflowCompaction" } @@ -152,20 +152,11 @@ export const layer = Layer.effect( } } - const rebuildPreparedTurn = (promotion?: SessionInput.Delivery) => - new TurnTransitionError({ _tag: "RebuildPreparedTurn", promotion }) + const continueAfterCompaction = new TurnTransitionError({ _tag: "ContinueAfterCompaction" }) const continueAfterOverflowCompaction = new TurnTransitionError({ _tag: "ContinueAfterOverflowCompaction", }) - const retryAgentMismatch = (promotion: SessionInput.Delivery | undefined) => - Effect.catchDefect((defect) => - defect instanceof SessionContextEpoch.AgentMismatch - ? Effect.die(rebuildPreparedTurn(promotion)) - : Effect.die(defect), - ) - - const sameModel = Schema.toEquivalence(Schema.UndefinedOr(ModelV2.Ref)) const loadSystemContext = (agent: AgentV2.Selection) => Effect.all([systemContext.load(), skillGuidance.load(agent), referenceGuidance.load()], { concurrency: "unbounded", @@ -181,13 +172,7 @@ export const layer = Layer.effect( if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID) return yield* Effect.interrupt const agent = yield* agents.select(session.agent) - const initialized = yield* SessionContextEpoch.initialize( - db, - loadSystemContext(agent), - session.id, - session.location, - agent.id, - ).pipe(retryAgentMismatch(promotion)) + const initialized = yield* SessionContextEpoch.initialize(db, loadSystemContext(agent), session.id) const toolFibers = yield* FiberSet.make() let needsContinuation = false if (promotion) { @@ -199,18 +184,7 @@ export const layer = Layer.effect( } } const system = - initialized ?? - (yield* SessionContextEpoch.prepare( - db, - events, - loadSystemContext(agent), - session.id, - session.location, - agent.id, - ).pipe(retryAgentMismatch(undefined))) - const current = yield* getSession(sessionID) - if ((yield* agents.select(current.agent)).id !== agent.id || !sameModel(current.model, session.model)) - return yield* Effect.die(rebuildPreparedTurn()) + initialized ?? (yield* SessionContextEpoch.prepare(db, events, loadSystemContext(agent), session.id)) const model = yield* models.resolve(session) const entries = yield* SessionHistory.entriesForRunner(db, session.id, system.baselineSeq) const context = entries.map((entry) => entry.message) @@ -228,7 +202,7 @@ export const layer = Layer.effect( toolChoice: isLastStep ? "none" : undefined, }) if (yield* compaction.compactIfNeeded({ sessionID: session.id, entries, model, request })) - return yield* Effect.die(rebuildPreparedTurn()) + return yield* Effect.die(continueAfterCompaction) const publisher = createLLMEventPublisher(events, { sessionID: session.id, agent: agent.id, @@ -242,8 +216,6 @@ export const layer = Layer.effect( const publish = (event: LLMEvent, outputPaths: ReadonlyArray = []) => withPublication(publisher.publish(event, outputPaths)) let overflowFailure: ProviderErrorEvent | undefined - if (!(yield* SessionContextEpoch.current(db, session.id, agent.id, system.revision))) - return yield* Effect.die(rebuildPreparedTurn()) const providerStream = llm.stream(request).pipe( Stream.runForEach((event) => Effect.gen(function* () { @@ -352,7 +324,7 @@ export const layer = Layer.effect( if (defect.transition._tag === "ContinueAfterOverflowCompaction") return yield* Effect.die("Post-compaction provider attempt cannot recover another overflow") yield* Effect.yieldNow - return yield* runAfterOverflowCompaction(sessionID, defect.transition.promotion, step) + return yield* runAfterOverflowCompaction(sessionID, undefined, step) }), ), ) @@ -366,7 +338,7 @@ export const layer = Layer.effect( yield* Effect.yieldNow if (defect.transition._tag === "ContinueAfterOverflowCompaction") return yield* runAfterOverflowCompaction(sessionID, undefined, step) - return yield* runTurn(sessionID, defect.transition.promotion, step) + return yield* runTurn(sessionID, undefined, step) }), ), ) diff --git a/packages/core/src/session/sql.ts b/packages/core/src/session/sql.ts index ca3d8e1b5..a9499554b 100644 --- a/packages/core/src/session/sql.ts +++ b/packages/core/src/session/sql.ts @@ -170,9 +170,6 @@ export const SessionContextEpochTable = sqliteTable("session_context_epoch", { .primaryKey() .references(() => SessionTable.id, { onDelete: "cascade" }), baseline: text().notNull(), - agent: text().$type().notNull().default(AgentV2.defaultID), snapshot: text({ mode: "json" }).notNull().$type(), baseline_seq: integer().notNull(), - replacement_seq: integer(), - revision: integer().notNull().default(0), }) diff --git a/packages/core/test/database-migration.test.ts b/packages/core/test/database-migration.test.ts index d7126f76e..914243a58 100644 --- a/packages/core/test/database-migration.test.ts +++ b/packages/core/test/database-migration.test.ts @@ -71,9 +71,9 @@ describe("DatabaseMigration", () => { ).toEqual({ name: "session_context_epoch" }) expect( yield* db.get( - sql`SELECT name, dflt_value FROM pragma_table_info('session_context_epoch') WHERE name = 'agent'`, + sql`SELECT name FROM pragma_table_info('session_context_epoch') WHERE name IN ('agent', 'replacement_seq', 'revision')`, ), - ).toEqual({ name: "agent", dflt_value: "'build'" }) + ).toBeUndefined() expect(yield* db.get(sql`SELECT count(*) as count FROM migration`)).toEqual({ count: migrations.length }) expect( yield* db.all( diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 862bb56d3..f37a4c357 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -360,12 +360,12 @@ const setupOverflowRecovery = Effect.gen(function* () { return session }) -const userTexts = (request: LLMRequest) => +const messageTexts = (request: LLMRequest, role: "user" | "system") => request.messages.flatMap((message) => - message.role === "user" - ? message.content.flatMap((content) => (content.type === "text" ? [content.text] : [])) - : [], + message.role === role ? message.content.flatMap((content) => (content.type === "text" ? [content.text] : [])) : [], ) +const userTexts = (request: LLMRequest) => messageTexts(request, "user") +const systemTexts = (request: LLMRequest) => messageTexts(request, "system") const replaySessionProjection = (id: SessionV2.ID) => Effect.gen(function* () { @@ -746,39 +746,6 @@ describe("SessionRunnerLLM", () => { }), ) - it.effect("does not create a source Location epoch after a concurrent Session move", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - const { db } = yield* Database.Service - let moved = false - systemLoadHook = Effect.suspend(() => { - if (moved) return Effect.void - moved = true - return events - .publish(SessionEvent.Moved, { - sessionID, - timestamp: DateTime.makeUnsafe(1), - location: Location.Ref.make({ directory: AbsolutePath.make("/moved") }), - }) - .pipe(Effect.asVoid) - }) - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - - expect(Exit.isFailure(yield* session.resume(sessionID).pipe(Effect.exit))).toBe(true) - expect(yield* SessionInput.hasPending(db, sessionID, "steer")).toBe(true) - expect( - yield* db - .select() - .from(SessionContextEpochTable) - .where(eq(SessionContextEpochTable.session_id, sessionID)) - .get(), - ).toBeUndefined() - expect((yield* session.get(sessionID)).location.directory).toBe(AbsolutePath.make("/moved")) - }), - ) - it.effect("reuses one durable baseline after the context producer changes", () => Effect.gen(function* () { yield* setup @@ -890,7 +857,7 @@ describe("SessionRunnerLLM", () => { }), ) - it.effect("composes selected-agent skill guidance and replaces it after an agent switch", () => + it.effect("updates selected-agent skill guidance after an agent switch", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -913,12 +880,13 @@ describe("SessionRunnerLLM", () => { expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context\n\nBuild skills"], - ["Initial context\n\nReviewer skills"], + ["Initial context\n\nBuild skills"], ]) + expect(systemTexts(requests[1]!)).toContainEqual(expect.stringContaining("Reviewer skills")) }), ) - it.effect("retries first-epoch preparation when the selected agent changes during observation", () => + it.effect("keeps the sampled agent when selection changes during observation", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -945,88 +913,12 @@ describe("SessionRunnerLLM", () => { yield* session.resume(sessionID) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ - ["Initial context\n\nReviewer skills"], + ["Initial context\n\nBuild skills"], ]) }), ) - it.effect("opens a queued activity once when the selected agent changes during observation", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - skillBaselines.set(AgentV2.ID.make("build"), "Build skills") - skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") - let switched = false - systemLoadHook = Effect.suspend(() => { - if (switched) return Effect.void - switched = true - return events - .publish(SessionEvent.AgentSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - agent: "reviewer", - }) - .pipe(Effect.asVoid) - }) - yield* session.prompt({ - sessionID, - prompt: new Prompt({ text: "Queued" }), - delivery: "queue", - resume: false, - }) - - requests.length = 0 - response = [] - yield* session.resume(sessionID) - - expect(requests).toHaveLength(1) - expect((yield* session.context(sessionID)).filter((message) => message.type === "user")).toHaveLength(1) - }), - ) - - it.effect("retries an agent switch before the final provider-dispatch boundary", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - const { db } = yield* Database.Service - skillBaselines.set(AgentV2.ID.make("build"), "Build skills") - skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") - let switched = false - modelResolveHook = Effect.suspend(() => { - if (switched) return Effect.void - switched = true - return events - .publish(SessionEvent.AgentSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - agent: "reviewer", - }) - .pipe(Effect.asVoid) - }) - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - - requests.length = 0 - response = [] - yield* session.resume(sessionID) - expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ - ["Initial context\n\nReviewer skills"], - ]) - expect( - yield* db - .select({ replacementSeq: SessionContextEpochTable.replacement_seq }) - .from(SessionContextEpochTable) - .where(eq(SessionContextEpochTable.session_id, sessionID)) - .get() - .pipe(Effect.orDie), - ).toEqual({ replacementSeq: null }) - }), - ) - - it.effect("retries a model switch before the final provider-dispatch boundary", () => + it.effect("keeps the sampled model when selection changes during model resolution", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -1049,145 +941,11 @@ describe("SessionRunnerLLM", () => { requests.length = 0 response = [] yield* session.resume(sessionID) - expect(requests.map((request) => request.model)).toEqual([replacementModel]) + expect(requests.map((request) => request.model)).toEqual([model]) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([["Initial context"]]) }), ) - it.effect("fences an unchanged epoch read across an agent ABA replacement request", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - const { db } = yield* Database.Service - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - response = [] - yield* session.resume(sessionID) - let switched = false - systemLoadHook = Effect.suspend(() => { - if (switched) return Effect.void - switched = true - return events - .publish(SessionEvent.AgentSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - agent: AgentV2.ID.make("reviewer"), - }) - .pipe( - Effect.andThen( - events.publish(SessionEvent.AgentSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(2), - agent: AgentV2.defaultID, - }), - ), - Effect.asVoid, - ) - }) - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) - - requests.length = 0 - yield* session.resume(sessionID) - - expect(requests).toHaveLength(1) - expect( - yield* db - .select({ replacementSeq: SessionContextEpochTable.replacement_seq }) - .from(SessionContextEpochTable) - .where(eq(SessionContextEpochTable.session_id, sessionID)) - .get() - .pipe(Effect.orDie), - ).toEqual({ replacementSeq: null }) - }), - ) - - it.effect("rejects stale agent guidance when committing an existing-epoch replacement", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - const { db } = yield* Database.Service - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - response = [] - yield* session.resume(sessionID) - yield* events.publish(SessionEvent.AgentSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - agent: AgentV2.ID.make("reviewer"), - }) - const context = (text: string) => - Effect.succeed( - SystemContext.make({ - key: systemContextKey, - codec: Schema.toCodecJson(Schema.String), - load: Effect.succeed(text), - baseline: String, - update: (_previous, current) => current, - }), - ) - const location = (yield* session.get(sessionID)).location - - expect( - yield* SessionContextEpoch.prepare( - db, - events, - context("Stale build context"), - sessionID, - location, - AgentV2.defaultID, - ).pipe(Effect.catchDefect(Effect.succeed)), - ).toBeInstanceOf(SessionContextEpoch.AgentMismatch) - - expect( - yield* SessionContextEpoch.prepare( - db, - events, - context("Reviewer context"), - sessionID, - location, - AgentV2.ID.make("reviewer"), - ), - ).toMatchObject({ baseline: "Reviewer context" }) - }), - ) - - it.effect("blocks a cross-agent provider turn while replacement context is unavailable", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - skillBaselines.set(AgentV2.defaultID, "Build skills") - skillBaselines.set(AgentV2.ID.make("reviewer"), "Reviewer skills") - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - response = [] - yield* session.resume(sessionID) - yield* events.publish(SessionEvent.AgentSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - agent: AgentV2.ID.make("reviewer"), - }) - systemUnavailable = true - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) - - requests.length = 0 - const blocked = yield* session.resume(sessionID).pipe(Effect.exit) - expect(Exit.isFailure(blocked)).toBe(true) - if (Exit.isFailure(blocked)) - expect(Cause.squash(blocked.cause)).toBeInstanceOf(SessionContextEpoch.AgentReplacementBlocked) - expect(requests).toHaveLength(0) - - systemUnavailable = false - yield* session.resume(sessionID) - expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ - ["Initial context\n\nReviewer skills"], - ]) - }), - ) - it.effect("admits removed context as a chronological System message", () => Effect.gen(function* () { yield* setup @@ -1209,7 +967,7 @@ describe("SessionRunnerLLM", () => { }), ) - it.effect("replaces the baseline lazily after a model switch and drops prior System updates", () => + it.effect("keeps the baseline and chronological System updates after a model switch", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -1235,24 +993,26 @@ describe("SessionRunnerLLM", () => { expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Initial context"], - ["Replacement context"], + ["Initial context"], ]) expect(requests[1]?.messages.map((message) => message.role)).toEqual(["user", "user", "system"]) - expect(requests[2]?.messages.map((message) => message.role)).toEqual(["user", "user", "user"]) + expect(requests[2]?.messages.filter((message) => message.role === "system")).toHaveLength(2) expect((yield* session.context(sessionID)).map((message) => message.type)).toEqual([ "user", "user", + "system", "model-switched", "user", + "system", ]) yield* replaySessionProjection(sessionID) - expect(yield* session.messages({ sessionID })).toHaveLength(5) + expect(yield* session.messages({ sessionID })).toHaveLength(6) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fourth" }), resume: false }) yield* session.resume(sessionID) }), ) - it.effect("defers replacement while admitted context is temporarily unavailable", () => + it.effect("preserves the baseline while context is temporarily unavailable", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -1279,81 +1039,12 @@ describe("SessionRunnerLLM", () => { expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], ["Initial context"], - ["Replacement context"], + ["Initial context"], ]) }), ) - it.effect("advances a pending replacement to the latest invalidation boundary", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - const { db } = yield* Database.Service - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - response = [] - yield* session.resume(sessionID) - - yield* events.publish(SessionEvent.ModelSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - model: { id: ModelV2.ID.make("replacement-1"), providerID: ProviderV2.ID.make("fake") }, - }) - yield* events.publish(SessionEvent.ModelSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(2), - model: { id: ModelV2.ID.make("replacement-2"), providerID: ProviderV2.ID.make("fake") }, - }) - const latest = yield* SessionInput.latestSeq(db, sessionID) - - expect( - yield* db - .select({ replacementSeq: SessionContextEpochTable.replacement_seq }) - .from(SessionContextEpochTable) - .where(eq(SessionContextEpochTable.session_id, sessionID)) - .get() - .pipe(Effect.orDie), - ).toEqual({ replacementSeq: latest }) - }), - ) - - it.effect("retries epoch preparation until observation-time invalidations settle", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - response = [] - yield* session.resume(sessionID) - - requests.length = 0 - systemBaseline = "Changed context" - let invalidations = 0 - systemLoadHook = Effect.suspend(() => { - if (invalidations === 4) return Effect.void - invalidations++ - return events - .publish(SessionEvent.ModelSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(invalidations), - model: { id: ModelV2.ID.make(`replacement-${invalidations}`), providerID: ProviderV2.ID.make("fake") }, - }) - .pipe(Effect.asVoid) - }) - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) - - yield* session.resume(sessionID) - - expect(invalidations).toBe(4) - expect(requests).toHaveLength(1) - expect(requests[0]?.system.map((part) => part.text)).toEqual(["Changed context"]) - }), - ) - - it.effect("replaces the baseline lazily after completed compaction without reopening replacement on replay", () => + it.effect("rebuilds the baseline directly after completed compaction", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -1580,7 +1271,7 @@ describe("SessionRunnerLLM", () => { }), ) - it.effect("preserves effective System updates while compaction replacement is blocked", () => + it.effect("preserves effective System updates while compaction rebaseline is blocked", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -1613,16 +1304,7 @@ describe("SessionRunnerLLM", () => { yield* session.resume(sessionID) expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Initial context"]) - expect( - requests - .at(-1) - ?.messages.some( - (message) => - message.role === "system" && - message.content[0]?.type === "text" && - message.content[0].text === "Changed context", - ), - ).toBe(true) + expect(systemTexts(requests.at(-1)!)).toContain("Changed context") }), ) @@ -1821,8 +1503,9 @@ describe("SessionRunnerLLM", () => { expect(requests.map((request) => request.model)).toEqual([model, replacementModel]) expect(requests.map((request) => request.system.map((part) => part.text))).toEqual([ ["Initial context"], - ["Replacement context"], + ["Initial context"], ]) + expect(systemTexts(requests[1]!)).toContain("Replacement context") }), ) diff --git a/specs/v2/schema-changelog.md b/specs/v2/schema-changelog.md index 4356c234a..a9a08b501 100644 --- a/specs/v2/schema-changelog.md +++ b/specs/v2/schema-changelog.md @@ -800,3 +800,23 @@ Change: Compatibility: - Existing Context Epoch rows backfill the default `build` agent and reconcile to another selected agent at the next safe provider-turn boundary. + +## 2026-06-22: Simplify Session Context Rebaselining + +Affected schema: + +- Remove `session_context_epoch.agent`, `session_context_epoch.replacement_seq`, and `session_context_epoch.revision`. +- No synchronized event, public HTTP API, or generated SDK schema changes. + +Change: + +- Sample the effective agent and model once for each provider turn; selection changes apply to the next turn. +- Preserve the immutable baseline and admit ordinary System Context changes as chronological `ContextUpdated` messages. +- Rebuild the baseline directly after completed compaction instead of maintaining pending replacement state. +- Preserve the old baseline and its effective chronological updates while a post-compaction baseline cannot be rendered completely. +- Rely on the process-local Session execution lane instead of optimistic concurrency state between Context Epoch writers. + +Compatibility: + +- Existing Context Epoch rows migrate in place by dropping the obsolete selection and pending-replacement columns. +- Model and agent switches no longer discard earlier chronological System Context updates by forcing a new baseline. diff --git a/specs/v2/session.md b/specs/v2/session.md index ed30890fa..43c93b175 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -46,7 +46,7 @@ Projected hosted tools preserve call-side and settlement-side provider metadata ## Context Epochs -V2 Sessions persist the exact privileged System Context shown to the model. A Context Epoch owns one effective agent, one immutable baseline, and a model-hidden structured snapshot used to compare independently observed Context Sources. Environment facts, the host-local date, ambient global/upward-project `AGENTS.md` files, and selected-agent available-skill guidance are the initial sources. Location-wide sources come from the System Context Registry; selected-agent guidance composes with them immediately before Context Epoch admission. +V2 Sessions persist the exact privileged System Context shown to the model. A Context Epoch stores one immutable provider-cache baseline and a model-hidden structured snapshot used to compare independently observed Context Sources. Environment facts, the host-local date, ambient global/upward-project `AGENTS.md` files, and selected-agent available-skill guidance are the initial sources. Location-wide sources come from the System Context Registry; selected-agent guidance composes with them immediately before Context Epoch admission. The first complete observation initializes the epoch before any pending prompt becomes model-visible. If initial context is temporarily unavailable, execution stops while the prompt remains pending and retryable. On later provider turns, the runner promotes eligible input first, then reconciles current sources at the safe boundary. Changed context becomes one durable chronological System message, and its event commit advances the epoch snapshot atomically. @@ -72,7 +72,7 @@ Client Runner System Context Registry C │ ├─ Baseline + chronological history ─────────────────────────────────────────────────────────────────────────▶ ``` -Agent switches, model switches, and completed compactions request lazy baseline replacement. A switch admitted after the current safe provider-turn boundary applies to the next provider turn while leaving the already-prepared baseline durable. Before another cross-agent provider turn, the replacement must complete; unavailable admitted context blocks instead of exposing the prior agent's privileged baseline. A Session move clears the epoch so the destination Location must initialize a complete baseline before another provider turn. Epoch creation and replacement are fenced against the authoritative Session Location/effective agent and the epoch revision, preventing stale or ABA-observed context from becoming durable. +Agent and model selection are provider-turn scoped. A switch admitted after the current safe provider-turn boundary applies to the next provider turn without restarting the current turn or replacing the baseline. Agent-specific skill guidance remains a Context Source, so changed guidance is admitted as a chronological System message. A completed compaction causes the next provider attempt to render a fresh baseline directly from current complete context. A Session move clears the epoch so the destination Location initializes a complete baseline on its next run. ```text Session Epoch @@ -83,11 +83,8 @@ Session Epoch │ │ reconcile chronological update │ │ ◀─────────────────────────────────╯ │ │ - ├─ request replacement ───────────▶ - │ │ - │ ├─────────────────────────────────────╮ - │ │ replace after complete observation │ - │ ◀─────────────────────────────────────╯ + ├─ completed compaction ──────────▶ + │ ├─ render fresh baseline │ │ ├─ clear after Location move ─────▶ ``` @@ -110,7 +107,7 @@ Before each provider turn, the runner estimates the complete model-visible reque Compaction keeps the full transcript durable while replacing its active model representation with one hidden checkpoint containing a structured rolling summary and token-bounded serialized recent context. Provider-native assistant, reasoning, and tool messages never survive across the boundary, avoiding signature and encrypted-reasoning failures when the earlier prefix changes. -`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.2` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message and requests Context Epoch replacement. A failed or interrupted attempt therefore leaves the previous history boundary active. +`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.2` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message. On the next provider attempt, the runner observes that completed compaction and directly renders a fresh Context Epoch baseline. A failed or interrupted attempt therefore leaves the previous history boundary active. Repeated compactions update the previous structured summary with newly compacted messages. The runner then reloads projected history and executes the original pending turn. @@ -138,7 +135,7 @@ Status: `complete` is usable in the native V2 path, `partial` covers only part o | Per-turn request assembly | Plugin message, system, parameter, and header transforms | missing | Design V2 plugin hooks and lifecycle semantics. | | Per-turn request assembly | Model variants and request settings | partial | Apply effective agent options and future plugin-mutated request settings. | | Per-turn request assembly | Structured-output policy | missing | Add prompt format, generated tool, tool choice, and model-visible policy together. | -| Per-turn request assembly | Automatic/context-pressure compaction | partial | V2 replays completed compactions and replaces epochs but cannot initiate compaction. | +| Per-turn request assembly | Automatic/context-pressure compaction | complete | V2 initiates automatic and overflow-triggered compaction, then rebuilds the baseline from the completed checkpoint. | | Prompt/reference expansion | Durable typed prompt attachments | complete | None. | | Prompt/reference expansion | Native template and `@` mention expansion | missing | Parse and resolve native V2 prompt input before durable admission. | | Prompt/reference expansion | File, directory, media, and MCP-resource materialization | partial | Materialize and normalize sources instead of lowering unresolved attachment metadata. | diff --git a/specs/v2/todo.md b/specs/v2/todo.md index 002139cdd..5d77cbea3 100644 --- a/specs/v2/todo.md +++ b/specs/v2/todo.md @@ -55,8 +55,8 @@ Next reviewed slices: - integrate the new BackgroundJob service with V2 tool execution: support background bash jobs and background agent dispatch with durable status observation, completion delivery, and explicit cancellation / continuation semantics -- add compaction, durable/clustered interruption, retries, and stale-owner fencing - only as their slices become concrete +- add durable/clustered interruption, retries, and stale-owner fencing only as + their slices become concrete ### Deferred durable activity recovery @@ -75,10 +75,6 @@ Design post-crash activity recovery as one explicit slice. It should model: - retry budget, backoff, visible recovery status, startup discovery, and future clustered ownership fencing -## Rework compaction - Aiden? - -The new agent loop needs to trigger compaction properly - ## Plugin API design - James? We need to figure out how we want server plugins to work and what hooks are useful.