fix(core): reset pre-launch session projections (#30728)

This commit is contained in:
Kit Langton 2026-06-04 09:43:10 -04:00 committed by GitHub
parent 69cfc44dba
commit caea930074
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 41 additions and 38 deletions

View File

@ -1,3 +1,4 @@
DELETE FROM `session_message`;--> statement-breakpoint
ALTER TABLE `session_message` ADD `seq` integer NOT NULL;--> statement-breakpoint
DROP INDEX IF EXISTS `session_message_session_time_created_id_idx`;--> statement-breakpoint
DROP INDEX IF EXISTS `session_message_session_type_time_created_id_idx`;--> statement-breakpoint

View File

@ -5,16 +5,10 @@ export default {
id: "20260603040000_session_message_projection_order",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session_message\` ADD COLUMN \`seq\` integer NOT NULL DEFAULT 0;`)
yield* tx.run(
`UPDATE \`session_message\` SET \`seq\` = COALESCE((SELECT \`seq\` + 1 FROM \`event\` WHERE \`event\`.\`id\` = \`session_message\`.\`id\`), 0);`,
)
const unmatched = yield* tx.get<{ count: number }>(
`SELECT COUNT(*) AS \`count\` FROM \`session_message\` WHERE \`seq\` = 0;`,
)
if ((unmatched?.count ?? 0) > 0)
return yield* Effect.die("Cannot migrate session_message projections without matching durable events")
yield* tx.run(`UPDATE \`session_message\` SET \`seq\` = \`seq\` - 1;`)
// Pre-launch Session projections were written before durable event persistence
// became unconditional, so they cannot be assigned truthful aggregate order.
yield* tx.run(`DELETE FROM \`session_message\`;`)
yield* tx.run(`ALTER TABLE \`session_message\` ADD COLUMN \`seq\` integer NOT NULL;`)
yield* tx.run(`DROP INDEX IF EXISTS \`session_message_session_type_time_created_id_idx\`;`)
yield* tx.run(`CREATE INDEX \`session_message_session_seq_idx\` ON \`session_message\` (\`session_id\`,\`seq\`);`)
yield* tx.run(

View File

@ -79,13 +79,20 @@ describe("DatabaseMigration", () => {
)
})
test("backfills projected Session message order from durable event sequence", async () => {
test("resets incompatible projected Session messages before adding sequence order", async () => {
await run(
Effect.gen(function* () {
const db = yield* makeDb
yield* db.run(sql`CREATE TABLE session (id text PRIMARY KEY)`)
yield* db.run(
sql`CREATE TABLE message (id text PRIMARY KEY, session_id text NOT NULL, time_created integer NOT NULL, time_updated integer NOT NULL, data text NOT NULL)`,
)
yield* db.run(
sql`CREATE TABLE part (id text PRIMARY KEY, message_id text NOT NULL, session_id text NOT NULL, time_created integer NOT NULL, time_updated integer NOT NULL, data text NOT NULL)`,
)
yield* db.run(sql`CREATE TABLE event (id text PRIMARY KEY, seq integer NOT NULL)`)
yield* db.run(
sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, data text NOT NULL)`,
sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, time_updated integer NOT NULL, data text NOT NULL)`,
)
yield* db.run(
sql`CREATE INDEX session_message_session_time_created_id_idx ON session_message (session_id, time_created, id)`,
@ -93,40 +100,40 @@ describe("DatabaseMigration", () => {
yield* db.run(
sql`CREATE INDEX session_message_session_type_time_created_id_idx ON session_message (session_id, type, time_created, id)`,
)
yield* db.run(sql`INSERT INTO event (id, seq) VALUES ('evt_z', 0), ('evt_a', 1)`)
yield* db.run(sql`INSERT INTO session (id) VALUES ('session')`)
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, time_created, data) VALUES ('evt_z', 'session', 'user', 0, '{}'), ('evt_a', 'session', 'user', 0, '{}')`,
sql`INSERT INTO message (id, session_id, time_created, time_updated, data) VALUES ('legacy_message', 'session', 1, 1, '{"role":"user"}')`,
)
yield* db.run(
sql`INSERT INTO part (id, message_id, session_id, time_created, time_updated, data) VALUES ('legacy_part', 'legacy_message', 'session', 1, 1, '{"type":"text","text":"hello"}')`,
)
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, time_created, time_updated, data) VALUES ('stale_projection', 'session', 'user', 1, 1, '{}')`,
)
yield* DatabaseMigration.applyOnly(db, [sessionMessageProjectionOrderMigration])
expect(yield* db.all(sql`SELECT id, seq FROM session_message ORDER BY seq`)).toEqual([
{ id: "evt_z", seq: 0 },
{ id: "evt_a", seq: 1 },
expect(yield* db.all(sql`SELECT id, session_id, data FROM message`)).toEqual([
{ id: "legacy_message", session_id: "session", data: '{"role":"user"}' },
])
expect(yield* db.all(sql`SELECT id, message_id, session_id, data FROM part`)).toEqual([
{
id: "legacy_part",
message_id: "legacy_message",
session_id: "session",
data: '{"type":"text","text":"hello"}',
},
])
expect(yield* db.all(sql`SELECT id FROM session_message`)).toEqual([])
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, seq, time_created, time_updated, data) VALUES ('fresh_projection', 'session', 'user', 7, 2, 2, '{}')`,
)
expect(yield* db.get(sql`SELECT id, seq FROM session_message`)).toEqual({ id: "fresh_projection", seq: 7 })
}),
)
})
test("fails projected Session message order backfill without a durable event", async () => {
await expect(
run(
Effect.gen(function* () {
const db = yield* makeDb
yield* db.run(sql`CREATE TABLE event (id text PRIMARY KEY, seq integer NOT NULL)`)
yield* db.run(
sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, data text NOT NULL)`,
)
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, time_created, data) VALUES ('evt_missing', 'session', 'user', 0, '{}')`,
)
yield* DatabaseMigration.applyOnly(db, [sessionMessageProjectionOrderMigration])
}),
),
).rejects.toThrow("Cannot migrate session_message projections without matching durable events")
})
test("runs session usage backfill in order with schema changes", async () => {
await run(
Effect.gen(function* () {

View File

@ -103,7 +103,7 @@ Affected schema:
Change:
- Add and backfill `session_message.seq` from matching synchronized events.
- Reset pre-launch Session-message projections and add `session_message.seq` for newly projected synchronized events.
- Add event aggregate-sequence and aggregate-type-sequence indexes.
- Add Session-message sequence, type-sequence, and compatibility timestamp indexes.
@ -114,7 +114,8 @@ Reason:
Compatibility:
- Migration fails rather than inventing chronology if an existing projected Session message has no matching durable event.
- Pre-launch Session-message projections are disposable because historical versions could write them without durable creator events.
- The migration resets those projections rather than inventing chronology or blocking startup.
- The timestamp compatibility index remains for legacy or transitional query shapes.
### Structured Tool Registry And Canonical Output