diff --git a/packages/core/src/database/migration.gen.ts b/packages/core/src/database/migration.gen.ts index fd778414a..19b1b5684 100644 --- a/packages/core/src/database/migration.gen.ts +++ b/packages/core/src/database/migration.gen.ts @@ -38,5 +38,6 @@ export const migrations = ( import("./migration/20260611192811_lush_chimera"), import("./migration/20260612174303_project_dir_strategy"), import("./migration/20260622142730_simplify_session_context_epoch"), + import("./migration/20260622170816_reset_v2_session_state"), ]) ).map((module) => module.default) satisfies DatabaseMigration.Migration[] diff --git a/packages/core/src/database/migration/20260622170816_reset_v2_session_state.ts b/packages/core/src/database/migration/20260622170816_reset_v2_session_state.ts new file mode 100644 index 000000000..b771a64bb --- /dev/null +++ b/packages/core/src/database/migration/20260622170816_reset_v2_session_state.ts @@ -0,0 +1,17 @@ +import { Effect } from "effect" +import type { DatabaseMigration } from "../migration" + +export default { + id: "20260622170816_reset_v2_session_state", + up(tx) { + return Effect.gen(function* () { + yield* tx.run(`DELETE FROM \`session_context_epoch\`;`) + yield* tx.run(`DELETE FROM \`session_input\`;`) + yield* tx.run(`DELETE FROM \`session_message\`;`) + yield* tx.run(`DELETE FROM \`event\`;`) + yield* tx.run(`DELETE FROM \`event_sequence\`;`) + yield* tx.run(`UPDATE \`session\` SET \`workspace_id\` = NULL WHERE \`workspace_id\` IS NOT NULL;`) + yield* tx.run(`DELETE FROM \`workspace\`;`) + }) + }, +} satisfies DatabaseMigration.Migration diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index 97e334617..d4b773d18 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -436,20 +436,9 @@ export namespace Compaction { }) export type Delta = typeof Delta.Type - // Retain the unpublished v1 decoder so stored beta events remain replayable. - export const EndedV1 = EventV2.define({ - type: "session.next.compaction.ended", - ...options, - schema: { - ...Base, - text: Schema.String, - include: Schema.String.pipe(Schema.optional), - }, - }) - export const Ended = EventV2.define({ type: "session.next.compaction.ended", - durable: { aggregate: "sessionID", version: 2 }, + ...options, schema: { ...Base, messageID: SessionMessageID.ID, diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 50bfad465..2c87dfb8d 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -417,9 +417,7 @@ export const layer = Layer.effectDiscard( yield* events.project(SessionEvent.Reasoning.Started, (event) => run(db, event)) yield* events.project(SessionEvent.Reasoning.Ended, (event) => run(db, event)) // yield* events.project(SessionEvent.Retried, (event) => run(db, event)) - yield* events.project(SessionEvent.Compaction.Ended, (event) => - event.durable?.version === 1 ? Effect.void : run(db, event), - ) + yield* events.project(SessionEvent.Compaction.Ended, (event) => run(db, event)) }), ) diff --git a/packages/core/test/database-migration.test.ts b/packages/core/test/database-migration.test.ts index 914243a58..835f41931 100644 --- a/packages/core/test/database-migration.test.ts +++ b/packages/core/test/database-migration.test.ts @@ -14,6 +14,8 @@ import sessionMessageProjectionOrderMigration from "@opencode-ai/core/database/m import eventSourcedSessionInputMigration from "@opencode-ai/core/database/migration/20260604172448_event_sourced_session_input" import contextEpochAgentMigration from "@opencode-ai/core/database/migration/20260605042240_add_context_epoch_agent" import simplifyIntegrationCredentialsMigration from "@opencode-ai/core/database/migration/20260611192811_lush_chimera" +import resetV2SessionStateMigration from "@opencode-ai/core/database/migration/20260622170816_reset_v2_session_state" +import { EventV2 } from "@opencode-ai/core/event" import { ProjectV2 } from "@opencode-ai/core/project" import { ProjectTable } from "@opencode-ai/core/project/sql" import { AbsolutePath } from "@opencode-ai/core/schema" @@ -22,6 +24,8 @@ import { SessionTable } from "@opencode-ai/core/session/sql" import sessionMetadataMigration from "@opencode-ai/core/database/migration/20260511173437_session-metadata" import type { SqlClient as SqlClientService } from "effect/unstable/sql/SqlClient" import { Database } from "@opencode-ai/core/database/database" +import { SessionProjector } from "@opencode-ai/core/session/projector" +import { SessionV1 } from "@opencode-ai/core/v1/session" import { tmpdir } from "./fixture/tmpdir" const run = (effect: Effect.Effect) => @@ -226,6 +230,94 @@ describe("DatabaseMigration", () => { ) }) + test("preserves canonical V1 state and restarts its event stream", async () => { + await run( + Effect.gen(function* () { + const db = yield* makeDb + yield* db.run(sql`PRAGMA foreign_keys = ON`) + yield* DatabaseMigration.apply(db) + yield* db.run( + sql`INSERT INTO project (id, worktree, time_created, time_updated, sandboxes) VALUES ('global', '/project', 1, 1, '[]')`, + ) + yield* db.run( + sql`INSERT INTO workspace (id, type, project_id, time_used) VALUES ('workspace', 'local', 'global', 1)`, + ) + yield* db.run( + sql`INSERT INTO session (id, project_id, workspace_id, slug, directory, title, version, time_created, time_updated) VALUES ('session', 'global', 'workspace', 'session', '/project', 'Before', 'test', 1, 1)`, + ) + yield* db.run( + sql`INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES ('message', 'session', 1, 1, '{}')`, + ) + yield* db.run( + sql`INSERT INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES ('part', 'message', 'session', 1, 1, '{}')`, + ) + yield* db.run(sql`INSERT INTO event_sequence (aggregate_id, seq) VALUES ('session', 9)`) + yield* db.run( + sql`INSERT INTO event (id, aggregate_id, seq, type, data) VALUES ('event', 'session', 9, 'session.updated.1', '{}')`, + ) + yield* db.run( + sql`INSERT INTO session_input (id, session_id, prompt, delivery, admitted_seq, time_created) VALUES ('input', 'session', '{}', 'steer', 9, 1)`, + ) + yield* db.run( + sql`INSERT INTO session_message (id, session_id, type, seq, time_created, time_updated, data) VALUES ('projected', 'session', 'user', 9, 1, 1, '{}')`, + ) + yield* db.run( + sql`INSERT INTO session_context_epoch (session_id, baseline, snapshot, baseline_seq) VALUES ('session', 'baseline', '{}', 9)`, + ) + yield* db.run(sql`DELETE FROM migration WHERE id = ${resetV2SessionStateMigration.id}`) + yield* DatabaseMigration.applyOnly(db, [resetV2SessionStateMigration]) + + const database = Layer.succeed(Database.Service, { db }) + const events = EventV2.layer.pipe(Layer.provide(database)) + yield* EventV2.Service.use((service) => + service.publish(SessionV1.Event.Updated, { + sessionID: SessionSchema.ID.make("session"), + info: { + id: SessionSchema.ID.make("session"), + slug: "session", + projectID: ProjectV2.ID.global, + directory: "/project", + title: "After", + version: "test", + time: { created: 1, updated: 2 }, + }, + }), + ).pipe( + Effect.provide( + Layer.merge(events, SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database))), + ), + ) + + expect( + yield* db.get(sql` + SELECT + (SELECT title FROM session WHERE id = 'session') AS title, + (SELECT workspace_id FROM session WHERE id = 'session') AS workspaceID, + (SELECT COUNT(*) FROM message WHERE id = 'message') AS messages, + (SELECT COUNT(*) FROM part WHERE id = 'part') AS parts, + (SELECT COUNT(*) FROM workspace) AS workspaces, + (SELECT COUNT(*) FROM session_input) AS sessionInputs, + (SELECT COUNT(*) FROM session_message) AS sessionMessages, + (SELECT COUNT(*) FROM session_context_epoch) AS contextEpochs, + (SELECT seq FROM event_sequence WHERE aggregate_id = 'session') AS seq, + (SELECT type FROM event WHERE aggregate_id = 'session') AS eventType + `), + ).toEqual({ + title: "After", + workspaceID: null, + messages: 1, + parts: 1, + workspaces: 0, + sessionInputs: 0, + sessionMessages: 0, + contextEpochs: 0, + seq: 0, + eventType: "session.updated.1", + }) + }), + ) + }) + test("resets incompatible projected Session messages before adding sequence order", async () => { await run( Effect.gen(function* () { diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index e27957f88..b15ff9347 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -3637,7 +3637,7 @@ export type SyncEventSessionNextCompactionEnded = { type: "sync" id: string syncEvent: { - type: "session.next.compaction.ended.2" + type: "session.next.compaction.ended.1" id: string seq: number aggregateID: string diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 7501fc4b9..0d8dc1aec 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -25744,7 +25744,7 @@ "properties": { "type": { "type": "string", - "enum": ["session.next.compaction.ended.2"] + "enum": ["session.next.compaction.ended.1"] }, "id": { "type": "string", diff --git a/specs/v2/schema-changelog.md b/specs/v2/schema-changelog.md index f32948c82..6d9c0efdd 100644 --- a/specs/v2/schema-changelog.md +++ b/specs/v2/schema-changelog.md @@ -1,5 +1,11 @@ # V2 Schema Changelog +## 2026-06-22: Reset Unpublished Compaction Event + +- Replace the unpublished `session.next.compaction.ended.1` payload with the current checkpoint payload and remove its legacy decoder. +- Reset experimental events, sequences, Session inputs, projected Session messages, Context Epochs, synchronized workspace rows, and Session workspace links. +- Preserve canonical V1 `session`, `message`, and `part` rows. + ## 2026-06-22: Make Session Interruption Process-Local - Remove the unprojected `session.next.interrupt.requested.1` event from the experimental durable Session event union and generated SDK. @@ -11,7 +17,7 @@ - Preserve the existing structured summary contract and update prior summaries with newly compacted history. - Store token-bounded recent history as plain serialized text inside the checkpoint instead of replaying provider-native messages. - Keep compaction starts durable and progress deltas live-only; activate history cutover only from a durable completed summary. -- Version the completed event as `session.next.compaction.ended.2` rather than changing the existing synchronized v1 payload in place. +- Store the completed event with the current checkpoint payload containing stable message identity, reason, summary, and recent context. - Reload the replacement Context Epoch and continue the original pending turn after compaction. - Preserve full durable history; compaction changes only the active model representation. - Defer provider-overflow recovery, explicit manual compaction, and deterministic old tool-result pruning. diff --git a/specs/v2/session.md b/specs/v2/session.md index ea22e6008..994675832 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -107,7 +107,7 @@ Before each provider turn, the runner estimates the complete model-visible reque Compaction keeps the full transcript durable while replacing its active model representation with one hidden checkpoint containing a structured rolling summary and token-bounded serialized recent context. Provider-native assistant, reasoning, and tool messages never survive across the boundary, avoiding signature and encrypted-reasoning failures when the earlier prefix changes. -`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.2` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message. On the next provider attempt, the runner observes that completed compaction and directly renders a fresh Context Epoch baseline. A failed or interrupted attempt therefore leaves the previous history boundary active. +`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.1` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message. On the next provider attempt, the runner observes that completed compaction and directly renders a fresh Context Epoch baseline. A failed or interrupted attempt therefore leaves the previous history boundary active. Repeated compactions update the previous structured summary with newly compacted messages. The runner then reloads projected history and executes the original pending turn.