refactor(core): drop legacy compaction event (#33404)

This commit is contained in:
Kit Langton 2026-06-22 21:39:41 +02:00 committed by GitHub
parent 130957288e
commit 1787fa4261
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 122 additions and 19 deletions

View File

@ -38,5 +38,6 @@ export const migrations = (
import("./migration/20260611192811_lush_chimera"),
import("./migration/20260612174303_project_dir_strategy"),
import("./migration/20260622142730_simplify_session_context_epoch"),
import("./migration/20260622170816_reset_v2_session_state"),
])
).map((module) => module.default) satisfies DatabaseMigration.Migration[]

View File

@ -0,0 +1,17 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260622170816_reset_v2_session_state",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`DELETE FROM \`session_context_epoch\`;`)
yield* tx.run(`DELETE FROM \`session_input\`;`)
yield* tx.run(`DELETE FROM \`session_message\`;`)
yield* tx.run(`DELETE FROM \`event\`;`)
yield* tx.run(`DELETE FROM \`event_sequence\`;`)
yield* tx.run(`UPDATE \`session\` SET \`workspace_id\` = NULL WHERE \`workspace_id\` IS NOT NULL;`)
yield* tx.run(`DELETE FROM \`workspace\`;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -436,20 +436,9 @@ export namespace Compaction {
})
export type Delta = typeof Delta.Type
// Retain the unpublished v1 decoder so stored beta events remain replayable.
export const EndedV1 = EventV2.define({
type: "session.next.compaction.ended",
...options,
schema: {
...Base,
text: Schema.String,
include: Schema.String.pipe(Schema.optional),
},
})
export const Ended = EventV2.define({
type: "session.next.compaction.ended",
durable: { aggregate: "sessionID", version: 2 },
...options,
schema: {
...Base,
messageID: SessionMessageID.ID,

View File

@ -417,9 +417,7 @@ export const layer = Layer.effectDiscard(
yield* events.project(SessionEvent.Reasoning.Started, (event) => run(db, event))
yield* events.project(SessionEvent.Reasoning.Ended, (event) => run(db, event))
// yield* events.project(SessionEvent.Retried, (event) => run(db, event))
yield* events.project(SessionEvent.Compaction.Ended, (event) =>
event.durable?.version === 1 ? Effect.void : run(db, event),
)
yield* events.project(SessionEvent.Compaction.Ended, (event) => run(db, event))
}),
)

View File

@ -14,6 +14,8 @@ import sessionMessageProjectionOrderMigration from "@opencode-ai/core/database/m
import eventSourcedSessionInputMigration from "@opencode-ai/core/database/migration/20260604172448_event_sourced_session_input"
import contextEpochAgentMigration from "@opencode-ai/core/database/migration/20260605042240_add_context_epoch_agent"
import simplifyIntegrationCredentialsMigration from "@opencode-ai/core/database/migration/20260611192811_lush_chimera"
import resetV2SessionStateMigration from "@opencode-ai/core/database/migration/20260622170816_reset_v2_session_state"
import { EventV2 } from "@opencode-ai/core/event"
import { ProjectV2 } from "@opencode-ai/core/project"
import { ProjectTable } from "@opencode-ai/core/project/sql"
import { AbsolutePath } from "@opencode-ai/core/schema"
@ -22,6 +24,8 @@ import { SessionTable } from "@opencode-ai/core/session/sql"
import sessionMetadataMigration from "@opencode-ai/core/database/migration/20260511173437_session-metadata"
import type { SqlClient as SqlClientService } from "effect/unstable/sql/SqlClient"
import { Database } from "@opencode-ai/core/database/database"
import { SessionProjector } from "@opencode-ai/core/session/projector"
import { SessionV1 } from "@opencode-ai/core/v1/session"
import { tmpdir } from "./fixture/tmpdir"
const run = <A, E>(effect: Effect.Effect<A, E, SqlClientService>) =>
@ -226,6 +230,94 @@ describe("DatabaseMigration", () => {
)
})
test("preserves canonical V1 state and restarts its event stream", async () => {
await run(
Effect.gen(function* () {
const db = yield* makeDb
yield* db.run(sql`PRAGMA foreign_keys = ON`)
yield* DatabaseMigration.apply(db)
yield* db.run(
sql`INSERT INTO project (id, worktree, time_created, time_updated, sandboxes) VALUES ('global', '/project', 1, 1, '[]')`,
)
yield* db.run(
sql`INSERT INTO workspace (id, type, project_id, time_used) VALUES ('workspace', 'local', 'global', 1)`,
)
yield* db.run(
sql`INSERT INTO session (id, project_id, workspace_id, slug, directory, title, version, time_created, time_updated) VALUES ('session', 'global', 'workspace', 'session', '/project', 'Before', 'test', 1, 1)`,
)
yield* db.run(
sql`INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES ('message', 'session', 1, 1, '{}')`,
)
yield* db.run(
sql`INSERT INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES ('part', 'message', 'session', 1, 1, '{}')`,
)
yield* db.run(sql`INSERT INTO event_sequence (aggregate_id, seq) VALUES ('session', 9)`)
yield* db.run(
sql`INSERT INTO event (id, aggregate_id, seq, type, data) VALUES ('event', 'session', 9, 'session.updated.1', '{}')`,
)
yield* db.run(
sql`INSERT INTO session_input (id, session_id, prompt, delivery, admitted_seq, time_created) VALUES ('input', 'session', '{}', 'steer', 9, 1)`,
)
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, seq, time_created, time_updated, data) VALUES ('projected', 'session', 'user', 9, 1, 1, '{}')`,
)
yield* db.run(
sql`INSERT INTO session_context_epoch (session_id, baseline, snapshot, baseline_seq) VALUES ('session', 'baseline', '{}', 9)`,
)
yield* db.run(sql`DELETE FROM migration WHERE id = ${resetV2SessionStateMigration.id}`)
yield* DatabaseMigration.applyOnly(db, [resetV2SessionStateMigration])
const database = Layer.succeed(Database.Service, { db })
const events = EventV2.layer.pipe(Layer.provide(database))
yield* EventV2.Service.use((service) =>
service.publish(SessionV1.Event.Updated, {
sessionID: SessionSchema.ID.make("session"),
info: {
id: SessionSchema.ID.make("session"),
slug: "session",
projectID: ProjectV2.ID.global,
directory: "/project",
title: "After",
version: "test",
time: { created: 1, updated: 2 },
},
}),
).pipe(
Effect.provide(
Layer.merge(events, SessionProjector.layer.pipe(Layer.provide(events), Layer.provide(database))),
),
)
expect(
yield* db.get(sql`
SELECT
(SELECT title FROM session WHERE id = 'session') AS title,
(SELECT workspace_id FROM session WHERE id = 'session') AS workspaceID,
(SELECT COUNT(*) FROM message WHERE id = 'message') AS messages,
(SELECT COUNT(*) FROM part WHERE id = 'part') AS parts,
(SELECT COUNT(*) FROM workspace) AS workspaces,
(SELECT COUNT(*) FROM session_input) AS sessionInputs,
(SELECT COUNT(*) FROM session_message) AS sessionMessages,
(SELECT COUNT(*) FROM session_context_epoch) AS contextEpochs,
(SELECT seq FROM event_sequence WHERE aggregate_id = 'session') AS seq,
(SELECT type FROM event WHERE aggregate_id = 'session') AS eventType
`),
).toEqual({
title: "After",
workspaceID: null,
messages: 1,
parts: 1,
workspaces: 0,
sessionInputs: 0,
sessionMessages: 0,
contextEpochs: 0,
seq: 0,
eventType: "session.updated.1",
})
}),
)
})
test("resets incompatible projected Session messages before adding sequence order", async () => {
await run(
Effect.gen(function* () {

View File

@ -3637,7 +3637,7 @@ export type SyncEventSessionNextCompactionEnded = {
type: "sync"
id: string
syncEvent: {
type: "session.next.compaction.ended.2"
type: "session.next.compaction.ended.1"
id: string
seq: number
aggregateID: string

View File

@ -25744,7 +25744,7 @@
"properties": {
"type": {
"type": "string",
"enum": ["session.next.compaction.ended.2"]
"enum": ["session.next.compaction.ended.1"]
},
"id": {
"type": "string",

View File

@ -1,5 +1,11 @@
# V2 Schema Changelog
## 2026-06-22: Reset Unpublished Compaction Event
- Replace the unpublished `session.next.compaction.ended.1` payload with the current checkpoint payload and remove its legacy decoder.
- Reset experimental events, sequences, Session inputs, projected Session messages, Context Epochs, synchronized workspace rows, and Session workspace links.
- Preserve canonical V1 `session`, `message`, and `part` rows.
## 2026-06-22: Make Session Interruption Process-Local
- Remove the unprojected `session.next.interrupt.requested.1` event from the experimental durable Session event union and generated SDK.
@ -11,7 +17,7 @@
- Preserve the existing structured summary contract and update prior summaries with newly compacted history.
- Store token-bounded recent history as plain serialized text inside the checkpoint instead of replaying provider-native messages.
- Keep compaction starts durable and progress deltas live-only; activate history cutover only from a durable completed summary.
- Version the completed event as `session.next.compaction.ended.2` rather than changing the existing synchronized v1 payload in place.
- Store the completed event with the current checkpoint payload containing stable message identity, reason, summary, and recent context.
- Reload the replacement Context Epoch and continue the original pending turn after compaction.
- Preserve full durable history; compaction changes only the active model representation.
- Defer provider-overflow recovery, explicit manual compaction, and deterministic old tool-result pruning.

View File

@ -107,7 +107,7 @@ Before each provider turn, the runner estimates the complete model-visible reque
Compaction keeps the full transcript durable while replacing its active model representation with one hidden checkpoint containing a structured rolling summary and token-bounded serialized recent context. Provider-native assistant, reasoning, and tool messages never survive across the boundary, avoiding signature and encrypted-reasoning failures when the earlier prefix changes.
`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.2` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message. On the next provider attempt, the runner observes that completed compaction and directly renders a fresh Context Epoch baseline. A failed or interrupted attempt therefore leaves the previous history boundary active.
`session.next.compaction.started.1` durably identifies the attempt. Compaction deltas are live-only progress. `session.next.compaction.ended.1` durably stores the final summary and serialized recent context; only this completed event projects a model-visible compaction message. On the next provider attempt, the runner observes that completed compaction and directly renders a fresh Context Epoch baseline. A failed or interrupted attempt therefore leaves the previous history boundary active.
Repeated compactions update the previous structured summary with newly compacted messages. The runner then reloads projected history and executes the original pending turn.