diff --git a/packages/core/src/event.ts b/packages/core/src/event.ts index 7a33eedc4..b483b280d 100644 --- a/packages/core/src/event.ts +++ b/packages/core/src/event.ts @@ -5,7 +5,7 @@ import { and, asc, eq, gt } from "drizzle-orm" import { Database } from "./database/database" import { EventSequenceTable, EventTable } from "./event/sql" import { Location } from "./location" -import { externalID, type ExternalID, NonNegativeInt, withStatics } from "./schema" +import { externalID, type ExternalID, withStatics } from "./schema" import { Identifier } from "./util/identifier" import { LayerNode } from "./effect/layer-node" import { isDeepStrictEqual } from "node:util" @@ -19,16 +19,9 @@ export const ID = Schema.String.check(Schema.isStartsWith("evt_")).pipe( ) export type ID = typeof ID.Type -/** - * Durable aggregate continuation position for embedded replay streams. - * TODO: Decide whether a future HTTP / SDK surface should expose an opaque cursor instead. - */ -export const Cursor = NonNegativeInt.pipe(Schema.brand("EventV2.Cursor")) -export type Cursor = typeof Cursor.Type - export type Definition = { readonly type: Type - readonly sync?: { + readonly durable?: { readonly version: number readonly aggregate: string } @@ -41,20 +34,16 @@ export type Payload = { readonly id: ID readonly type: D["type"] readonly data: Data - /** Durable aggregate order, populated while synchronized events are projected. */ - readonly seq?: number - readonly version?: number + readonly durable?: { + readonly aggregateID: string + readonly seq: number + readonly version: number + } readonly location?: Location.Ref readonly metadata?: Record - /** Internal replay marker for projectors that own non-replicated operational state. */ - readonly replay?: boolean } -export type Projector = (event: Payload) => Effect.Effect -type AnyProjector = (event: Payload) => Effect.Effect -export type CommitGuard = (event: Payload) => Effect.Effect -export type Listener = (event: Payload) => Effect.Effect -export type Sync = (event: Payload) => Effect.Effect +export type Subscriber = (event: Payload) => Effect.Effect export type Unsubscribe = Effect.Effect export type SerializedEvent = { @@ -65,13 +54,8 @@ export type SerializedEvent = { readonly data: Record } -export type CursorEvent = { - readonly cursor: Cursor - readonly event: E -} - -export class InvalidSyncEventError extends Schema.TaggedErrorClass()( - "EventV2.InvalidSyncEvent", +export class InvalidDurableEventError extends Schema.TaggedErrorClass()( + "EventV2.InvalidDurableEvent", { type: Schema.String, message: Schema.String, @@ -83,19 +67,11 @@ export function versionedType(type: string, version: number) { } export const registry = new Map() -type SyncDefinition = Definition & { - readonly sync: NonNullable - readonly encode: (data: unknown) => unknown - readonly decode: (data: unknown) => unknown -} -const syncRegistry = new Map() - -// Synchronized events cross a JSON boundary, so their data schemas must encode and decode without services. -const syncCodec = (definition: Definition) => definition.data as Schema.Codec +const durableRegistry = new Map() export function define(input: { readonly type: Type - readonly sync?: { + readonly durable?: { readonly version: number readonly aggregate: string } @@ -106,28 +82,26 @@ export function define= existing.sync.version) { + if ( + input.durable === undefined || + existing?.durable === undefined || + input.durable.version >= existing.durable.version + ) { registry.set(input.type, definition) } - if (input.sync) - syncRegistry.set( - versionedType(input.type, input.sync.version), - Object.assign(definition, { - encode: Schema.encodeUnknownSync(syncCodec(definition)), - decode: Schema.decodeUnknownSync(syncCodec(definition)), - }) as SyncDefinition, - ) + if (input.durable) + durableRegistry.set(versionedType(input.type, input.durable.version), definition) return definition as Schema.Schema>>> & Definition> } @@ -140,7 +114,7 @@ export interface PublishOptions { readonly id?: ID readonly metadata?: Record readonly location?: Location.Ref - /** Local operational projection committed atomically with a new synchronized event. Not replayed or serialized. */ + /** Local operational projection committed atomically with a new durable event. Not replayed or serialized. */ readonly commit?: (seq: number) => Effect.Effect } @@ -152,14 +126,13 @@ export interface Interface { ) => Effect.Effect> readonly subscribe: (definition: D) => Stream.Stream> readonly all: () => Stream.Stream - readonly aggregateEvents: (input: { + readonly durable: (input: { readonly aggregateID: string - readonly after?: Cursor - }) => Stream.Stream - readonly sync: (handler: Sync) => Effect.Effect - readonly listen: (listener: Listener) => Effect.Effect - readonly beforeCommit: (guard: CommitGuard) => Effect.Effect - readonly project: (definition: D, projector: Projector) => Effect.Effect + readonly after?: number + }) => Stream.Stream + /** @deprecated Use `all()` and consume the returned stream. */ + readonly listen: (listener: Subscriber) => Effect.Effect + readonly project: (definition: D, projector: Subscriber) => Effect.Effect readonly replay: ( event: SerializedEvent, options?: { readonly publish?: boolean; readonly ownerID?: string; readonly strictOwner?: boolean }, @@ -182,37 +155,37 @@ export const layerWith = (options?: LayerOptions) => Layer.effect( Service, Effect.gen(function* () { - const all = yield* PubSub.unbounded() - const synchronized = new Map>>() - const typed = new Map>() - const projectors = new Map() - const commitGuards = new Array() - const listeners = new Array() - const syncHandlers = new Array() + const pubsub = { + all: yield* PubSub.unbounded(), + durable: new Map>>(), + typed: new Map>(), + } + const projectors = new Map() + const listeners = new Array() const { db } = yield* Database.Service const getOrCreate = (definition: Definition) => Effect.gen(function* () { - const existing = typed.get(definition.type) + const existing = pubsub.typed.get(definition.type) if (existing) return existing - const pubsub = yield* PubSub.unbounded() - typed.set(definition.type, pubsub) - return pubsub + const created = yield* PubSub.unbounded() + pubsub.typed.set(definition.type, created) + return created }) yield* Effect.addFinalizer(() => Effect.gen(function* () { - yield* PubSub.shutdown(all) + yield* PubSub.shutdown(pubsub.all) yield* Effect.forEach( - synchronized.values(), + pubsub.durable.values(), (pubsubs) => Effect.forEach(pubsubs, PubSub.shutdown, { discard: true }), { discard: true }, ) - yield* Effect.forEach(typed.values(), PubSub.shutdown, { discard: true }) + yield* Effect.forEach(pubsub.typed.values(), PubSub.shutdown, { discard: true }) }), ) - function commitSyncEvent( + function commitDurableEvent( event: Payload, input?: { readonly seq: number @@ -224,28 +197,20 @@ export const layerWith = (options?: LayerOptions) => ) { return Effect.gen(function* () { const definition = registry.get(event.type) - const sync = definition?.sync - if (sync) { - if (event.version !== sync.version) { - yield* Effect.die( - new InvalidSyncEventError({ - type: event.type, - message: `Expected event version ${sync.version}, got ${event.version}`, - }), - ) - } - const aggregateID = (event.data as Record)[sync.aggregate] + const durable = definition?.durable + if (durable) { + const aggregateID = (event.data as Record)[durable.aggregate] if (typeof aggregateID !== "string") { yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, - message: `Expected string aggregate field ${sync.aggregate}`, + message: `Expected string aggregate field ${durable.aggregate}`, }), ) } else { if (input && input.aggregateID !== aggregateID) { yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, message: `Aggregate mismatch: expected ${input.aggregateID}, got ${aggregateID}`, }), @@ -265,12 +230,12 @@ export const layerWith = (options?: LayerOptions) => .get() .pipe(Effect.orDie) const latest = row?.seq ?? -1 - const encoded = syncRegistry - .get(versionedType(definition.type, sync.version))! - .encode(event.data) as Record + const encoded = Schema.encodeUnknownSync( + definition.data as Schema.Codec, + )(event.data) as Record if (input?.strictOwner && row?.ownerID && row.ownerID !== input.ownerID) { yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, message: `Replay owner mismatch for aggregate ${aggregateID}: expected ${row.ownerID}, got ${input.ownerID ?? "none"}`, }), @@ -285,7 +250,7 @@ export const layerWith = (options?: LayerOptions) => .pipe(Effect.orDie) if ( stored?.id === event.id && - stored.type === versionedType(definition.type, sync.version) && + stored.type === versionedType(definition.type, durable.version) && isDeepStrictEqual(stored.data, encoded) ) { if (input.ownerID && row?.ownerID == null) { @@ -299,7 +264,7 @@ export const layerWith = (options?: LayerOptions) => return } yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, message: `Replay diverged at aggregate ${aggregateID} sequence ${input.seq}`, }), @@ -311,7 +276,7 @@ export const layerWith = (options?: LayerOptions) => const seq = input?.seq ?? latest + 1 if (input && seq !== latest + 1) { yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, message: `Sequence mismatch for aggregate ${aggregateID}: expected ${latest + 1}, got ${seq}`, }), @@ -325,16 +290,17 @@ export const layerWith = (options?: LayerOptions) => .pipe(Effect.orDie) if (stored) yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, message: `Event ${event.id} already exists at aggregate ${stored.aggregateID} sequence ${stored.seq}`, }), ) - for (const guard of commitGuards) { - yield* guard(event) - } + const committed = { + ...event, + durable: { aggregateID, seq, version: durable.version }, + } as Payload for (const projector of list) { - yield* projector({ ...event, seq } as Payload) + yield* projector(committed) } if (commit) yield* commit(seq) yield* db @@ -356,7 +322,7 @@ export const layerWith = (options?: LayerOptions) => id: event.id, aggregate_id: aggregateID, seq, - type: versionedType(definition.type, sync.version), + type: versionedType(definition.type, durable.version), data: encoded, }, ]) @@ -369,8 +335,8 @@ export const layerWith = (options?: LayerOptions) => .pipe(Effect.orDie) if (committed) { yield* Effect.forEach( - synchronized.get(committed.aggregateID) ?? [], - (pubsub) => PubSub.publish(pubsub, undefined), + pubsub.durable.get(committed.aggregateID) ?? [], + (wake) => PubSub.publish(wake, undefined), { discard: true }, ) } @@ -384,19 +350,25 @@ export const layerWith = (options?: LayerOptions) => function publishEvent(event: Payload, commit?: PublishOptions["commit"]) { return Effect.gen(function* () { - const durable = registry.get(event.type)?.sync !== undefined - if (!durable && commit) + const definition = registry.get(event.type) + if (!definition?.durable && commit) return yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, - message: "Local commit hooks require a synchronized event", + message: "Local commit hooks require a durable event", }), ) - if (durable) { - const committed = yield* commitSyncEvent(event as Payload, undefined, commit) + if (definition?.durable) { + const committed = yield* commitDurableEvent(event as Payload, undefined, commit) if (committed) { - event = { ...event, seq: committed.seq } - yield* Effect.forEach(syncHandlers, (sync) => observe(event as Payload, "sync", sync), { discard: true }) + event = { + ...event, + durable: { + aggregateID: committed.aggregateID, + seq: committed.seq, + version: definition.durable.version, + }, + } yield* notify(event as Payload, true) return event } @@ -406,12 +378,12 @@ export const layerWith = (options?: LayerOptions) => }) } - const observe = (event: Payload, kind: "sync" | "listener", observer: (event: Payload) => Effect.Effect) => + const observe = (event: Payload, observer: (event: Payload) => Effect.Effect) => Effect.suspend(() => observer(event)).pipe( Effect.catchCauseIf( (cause) => !Cause.hasInterrupts(cause), (cause) => - Effect.logError("Event observer failed", { eventID: event.id, eventType: event.type, kind, cause }), + Effect.logError("Event listener failed", { eventID: event.id, eventType: event.type, cause }), ), ) @@ -419,12 +391,12 @@ export const layerWith = (options?: LayerOptions) => return Effect.gen(function* () { yield* Effect.forEach( listeners, - (listener) => (isolateListeners ? observe(event, "listener", listener) : listener(event)), + (listener) => (isolateListeners ? observe(event, listener) : listener(event)), { discard: true }, ) - const pubsub = typed.get(event.type) - if (pubsub) yield* PubSub.publish(pubsub, event) - yield* PubSub.publish(all, event) + const typed = pubsub.typed.get(event.type) + if (typed) yield* PubSub.publish(typed, event) + yield* PubSub.publish(pubsub.all, event) }) } @@ -441,7 +413,6 @@ export const layerWith = (options?: LayerOptions) => id: options?.id ?? ID.create(), ...(options?.metadata ? { metadata: options.metadata } : {}), type: definition.type, - ...(definition.sync === undefined ? {} : { version: definition.sync.version }), ...(location ? { location } : {}), data, } as Payload, @@ -455,27 +426,37 @@ export const layerWith = (options?: LayerOptions) => options?: { readonly publish?: boolean; readonly ownerID?: string; readonly strictOwner?: boolean }, ) { return Effect.gen(function* () { - const definition = syncRegistry.get(event.type) - if (!definition) { + const definition = durableRegistry.get(event.type) + if (!definition?.durable) { yield* Effect.die( - new InvalidSyncEventError({ type: event.type, message: `Unknown sync event type ${event.type}` }), + new InvalidDurableEventError({ type: event.type, message: `Unknown durable event type ${event.type}` }), ) } else { const payload = { id: event.id, type: definition.type, - version: definition.sync.version, - data: definition.decode(event.data), - replay: true, + data: Schema.decodeUnknownSync( + definition.data as Schema.Codec, + )(event.data), } as Payload - const committed = yield* commitSyncEvent(payload, { + const committed = yield* commitDurableEvent(payload, { seq: event.seq, aggregateID: event.aggregateID, ownerID: options?.ownerID, strictOwner: options?.strictOwner, }) if (committed && options?.publish) { - yield* notify({ ...payload, seq: committed.seq }, true) + yield* notify( + { + ...payload, + durable: { + aggregateID: committed.aggregateID, + seq: committed.seq, + version: definition.durable.version, + }, + }, + true, + ) } } }) @@ -490,7 +471,7 @@ export const layerWith = (options?: LayerOptions) => if (!source) return undefined if (events.some((event) => event.aggregateID !== source)) { yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: events[0]?.type ?? "unknown", message: "Replay events must belong to the same aggregate", }), @@ -501,7 +482,7 @@ export const layerWith = (options?: LayerOptions) => const seq = start + index if (event.seq !== seq) { yield* Effect.die( - new InvalidSyncEventError({ + new InvalidDurableEventError({ type: event.type, message: `Replay sequence mismatch at index ${index}: expected ${seq}, got ${event.seq}`, }), @@ -540,22 +521,18 @@ export const layerWith = (options?: LayerOptions) => Stream.map((event) => event as Payload), ) - const streamAll = (): Stream.Stream => Stream.fromPubSub(all) + const streamAll = (): Stream.Stream => Stream.fromPubSub(pubsub.all) - const decodeSerializedEvent = (event: SerializedEvent): CursorEvent => { - const definition = syncRegistry.get(event.type) - if (!definition) { - throw new InvalidSyncEventError({ type: event.type, message: `Unknown sync event type ${event.type}` }) + const decodeSerializedEvent = (event: SerializedEvent): Payload => { + const definition = durableRegistry.get(event.type) + if (!definition?.durable) { + throw new InvalidDurableEventError({ type: event.type, message: `Unknown durable event type ${event.type}` }) } return { - cursor: Cursor.make(event.seq), - event: { - id: event.id, - type: definition.type, - version: definition.sync.version, - seq: event.seq, - data: definition.decode(event.data), - }, + id: event.id, + type: definition.type, + durable: { aggregateID: event.aggregateID, seq: event.seq, version: definition.durable.version }, + data: Schema.decodeUnknownSync(definition.data as Schema.Codec)(event.data), } } @@ -583,43 +560,43 @@ export const layerWith = (options?: LayerOptions) => ), ) - const subscribeSynchronized = (aggregateID: string) => + const subscribeDurable = (aggregateID: string) => Effect.gen(function* () { - const pubsub = yield* PubSub.sliding(1) - const subscription = yield* PubSub.subscribe(pubsub) + const wake = yield* PubSub.sliding(1) + const subscription = yield* PubSub.subscribe(wake) yield* Effect.acquireRelease( Effect.sync(() => { - const pubsubs = synchronized.get(aggregateID) ?? new Set() - pubsubs.add(pubsub) - synchronized.set(aggregateID, pubsubs) + const wakes = pubsub.durable.get(aggregateID) ?? new Set() + wakes.add(wake) + pubsub.durable.set(aggregateID, wakes) }), () => Effect.sync(() => { - const pubsubs = synchronized.get(aggregateID) - pubsubs?.delete(pubsub) - if (pubsubs?.size === 0) synchronized.delete(aggregateID) - }).pipe(Effect.andThen(PubSub.shutdown(pubsub))), + const wakes = pubsub.durable.get(aggregateID) + wakes?.delete(wake) + if (wakes?.size === 0) pubsub.durable.delete(aggregateID) + }).pipe(Effect.andThen(PubSub.shutdown(wake))), ) return subscription }) - const streamEvents = (input: { + const durable = (input: { readonly aggregateID: string - readonly after?: Cursor - }): Stream.Stream => + readonly after?: number + }): Stream.Stream => Stream.unwrap( Effect.gen(function* () { - const synchronized = yield* subscribeSynchronized(input.aggregateID) - let cursor = input.after ?? -1 - const read = Effect.suspend(() => readAfter(input.aggregateID, cursor)).pipe( + const wakes = yield* subscribeDurable(input.aggregateID) + let sequence = input.after ?? -1 + const read = Effect.suspend(() => readAfter(input.aggregateID, sequence)).pipe( Effect.tap((events) => Effect.sync(() => { - cursor = events.at(-1)?.cursor ?? cursor + sequence = events.at(-1)?.durable?.seq ?? sequence }), ), ) const historical = yield* read - const live = Stream.fromSubscription(synchronized).pipe( + const live = Stream.fromSubscription(wakes).pipe( Stream.mapEffect(() => read), Stream.flattenIterable, ) @@ -627,7 +604,7 @@ export const layerWith = (options?: LayerOptions) => }), ) - const listen = (listener: Listener): Effect.Effect => + const listen = (listener: Subscriber): Effect.Effect => Effect.sync(() => { listeners.push(listener) return Effect.sync(() => { @@ -636,21 +613,7 @@ export const layerWith = (options?: LayerOptions) => }) }) - const sync = (handler: Sync): Effect.Effect => - Effect.sync(() => { - syncHandlers.push(handler) - return Effect.sync(() => { - const index = syncHandlers.indexOf(handler) - if (index >= 0) syncHandlers.splice(index, 1) - }) - }) - - const beforeCommit = (guard: CommitGuard): Effect.Effect => - Effect.sync(() => { - commitGuards.push(guard) - }) - - const project = (definition: D, projector: Projector): Effect.Effect => + const project = (definition: D, projector: Subscriber): Effect.Effect => Effect.sync(() => { const list = projectors.get(definition.type) ?? [] list.push((event) => projector(event as Payload)) @@ -661,10 +624,8 @@ export const layerWith = (options?: LayerOptions) => publish, subscribe, all: streamAll, - aggregateEvents: streamEvents, - sync, + durable, listen, - beforeCommit, project, replay, replayAll, diff --git a/packages/core/src/public/session.ts b/packages/core/src/public/session.ts index 6c61aff3b..2610cec00 100644 --- a/packages/core/src/public/session.ts +++ b/packages/core/src/public/session.ts @@ -1,7 +1,6 @@ export * as Session from "./session" import { Effect, Schema, Stream } from "effect" -import { EventV2 } from "../event" import { ModelV2 } from "../model" import { SessionV2 } from "../session" import { MessageDecodeError } from "../session/error" @@ -34,9 +33,7 @@ export type Delivery = SessionInput.Delivery export const ListInput = SessionV2.ListInput export type ListInput = SessionV2.ListInput -export const EventCursor = EventV2.Cursor -export type EventCursor = EventV2.Cursor -export type Event = EventV2.CursorEvent +export type Event = SessionEvent.DurableEvent export const NotFoundError = SessionV2.NotFoundError export type NotFoundError = SessionV2.NotFoundError @@ -99,7 +96,7 @@ export interface MessageInput { export interface EventsInput { readonly sessionID: ID - readonly after?: EventCursor + readonly after?: number } export interface Interface { diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index d5163cf88..be314e1f9 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -124,8 +124,8 @@ export interface Interface { ) => Effect.Effect readonly events: (input: { sessionID: SessionSchema.ID - after?: EventV2.Cursor - }) => Stream.Stream, NotFoundError> + after?: number + }) => Stream.Stream readonly switchAgent: (input: { sessionID: SessionSchema.ID agent: string @@ -339,11 +339,9 @@ export const layer = Layer.effect( Stream.unwrap( result .get(input.sessionID) - .pipe(Effect.as(events.aggregateEvents({ aggregateID: input.sessionID, after: input.after }))), + .pipe(Effect.as(events.durable({ aggregateID: input.sessionID, after: input.after }))), ).pipe( - Stream.filter((event): event is EventV2.CursorEvent => - isDurableSessionEvent(event.event), - ), + Stream.filter((event): event is SessionEvent.DurableEvent => isDurableSessionEvent(event)), ), prompt: Effect.fn("V2Session.prompt")((input) => Effect.uninterruptible( @@ -413,9 +411,9 @@ export const layer = Layer.effect( sessionID, timestamp: yield* DateTime.now, }) - if (event.seq === undefined) + if (event.durable === undefined) return yield* Effect.die("Interrupt request event is missing aggregate sequence") - yield* execution.interrupt(sessionID, event.seq) + yield* execution.interrupt(sessionID, event.durable.seq) }), ), ), diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index 3472cc114..5eaf03716 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -27,13 +27,13 @@ const Base = { } const options = { - sync: { + durable: { aggregate: "sessionID", version: 1, }, } as const const stepSettlementOptions = { - sync: { + durable: { aggregate: "sessionID", version: 2, }, @@ -456,7 +456,7 @@ export namespace Compaction { export const Ended = EventV2.define({ type: "session.next.compaction.ended", - sync: { aggregate: "sessionID", version: 2 }, + durable: { aggregate: "sessionID", version: 2 }, schema: { ...Base, messageID: SessionMessageID.ID, diff --git a/packages/core/src/session/input.ts b/packages/core/src/session/input.ts index 041c62998..f8bc2b0e6 100644 --- a/packages/core/src/session/input.ts +++ b/packages/core/src/session/input.ts @@ -74,11 +74,11 @@ export const admit = Effect.fn("SessionInput.admit")(function* ( }) .pipe( Effect.flatMap((event) => - event.seq === undefined + event.durable === undefined ? Effect.die("Prompt admission event is missing aggregate sequence") : Effect.succeed( new Admitted({ - admittedSeq: event.seq, + admittedSeq: event.durable.seq, id: input.id, sessionID: input.sessionID, prompt: input.prompt, @@ -117,13 +117,6 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio readonly timeCreated: DateTime.Utc }, ) { - const message = yield* db - .select({ id: SessionMessageTable.id }) - .from(SessionMessageTable) - .where(eq(SessionMessageTable.id, input.id)) - .get() - .pipe(Effect.orDie) - if (message) return yield* Effect.die(new LifecycleConflict({ id: input.id })) const stored = yield* db .insert(SessionInputTable) .values({ @@ -208,37 +201,6 @@ const matchesPrompt = (input: Admitted, expected: { readonly sessionID: SessionS input.sessionID === expected.sessionID && JSON.stringify(encodePrompt(input.prompt)) === JSON.stringify(encodePrompt(expected.prompt)) -export const guardReservedID = Effect.fn("SessionInput.guardReservedID")(function* ( - db: DatabaseService, - event: EventV2.Payload, -) { - if ( - Schema.is(SessionEvent.PromptLifecycle.Admitted)(event) || - Schema.is(SessionEvent.PromptLifecycle.Promoted)(event) - ) - return - const id = reservedID(event) - if (id === undefined) return - const admitted = yield* db - .select({ id: SessionInputTable.id }) - .from(SessionInputTable) - .where(eq(SessionInputTable.id, id)) - .get() - .pipe(Effect.orDie) - if (admitted === undefined) return - return yield* Effect.die(new LifecycleConflict({ id })) -}) - -const reservedID = (event: EventV2.Payload) => { - if (Schema.is(SessionEvent.Step.Started)(event)) return event.data.assistantMessageID - if (Schema.is(SessionEvent.AgentSwitched)(event)) return event.data.messageID - if (Schema.is(SessionEvent.ModelSwitched)(event)) return event.data.messageID - if (Schema.is(SessionEvent.Prompted)(event)) return event.data.messageID - if (Schema.is(SessionEvent.Synthetic)(event)) return event.data.messageID - if (Schema.is(SessionEvent.Shell.Started)(event)) return event.data.messageID - if (Schema.is(SessionEvent.Compaction.Started)(event)) return event.data.messageID -} - export const projectLegacyPrompted = Effect.fn("SessionInput.projectLegacyPrompted")(function* ( db: DatabaseService, input: { diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index caf63de78..b8945494f 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -115,7 +115,7 @@ function run(db: DatabaseService, event: SessionEvent.Event) { const decodeRow = (row: typeof SessionMessageTable.$inferSelect) => decodeMessage({ ...row.data, id: row.id, type: row.type }) const updateMessage = (message: SessionMessage.Message) => { - if (event.seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence") const encoded = encodeMessage(message) const { id, type, ...data } = encoded return db @@ -192,7 +192,7 @@ function run(db: DatabaseService, event: SessionEvent.Event) { } function insertMessage(db: DatabaseService, event: SessionEvent.Event, message: SessionMessage.Message) { - if (event.seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence") const encoded = encodeMessage(message) const { id, type, ...data } = encoded return db @@ -201,7 +201,7 @@ function insertMessage(db: DatabaseService, event: SessionEvent.Event, message: id: SessionMessage.ID.make(id), session_id: event.data.sessionID, type, - seq: event.seq, + seq: event.durable.seq, time_created: DateTime.toEpochMillis(message.time.created), data, }) @@ -213,7 +213,6 @@ export const layer = Layer.effectDiscard( Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service - yield* events.beforeCommit((event) => SessionInput.guardReservedID(db, event)) yield* events.project(SessionV1.Event.Created, (event) => Effect.gen(function* () { const stored = yield* db @@ -331,7 +330,7 @@ export const layer = Layer.effectDiscard( }), ) yield* events.project(SessionEvent.AgentSwitched, (event) => { - if (event.seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence") return db .update(SessionTable) .set({ agent: event.data.agent, time_updated: DateTime.toEpochMillis(event.data.timestamp) }) @@ -340,7 +339,7 @@ export const layer = Layer.effectDiscard( .pipe( Effect.orDie, Effect.andThen(run(db, event)), - Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.seq)), + Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq)), ) }) yield* events.project(SessionEvent.ModelSwitched, (event) => @@ -352,9 +351,9 @@ export const layer = Layer.effectDiscard( .run() .pipe(Effect.orDie) yield* run(db, event) - if (event.seq === undefined) - return yield* Effect.die("Synchronized Session event is missing aggregate sequence") - yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.seq) + if (event.durable === undefined) + return yield* Effect.die("Durable Session event is missing aggregate sequence") + yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq) }), ) yield* events.project(SessionEvent.Prompted, (event) => @@ -368,24 +367,24 @@ export const layer = Layer.effectDiscard( .pipe(Effect.orDie) if (existing) return yield* Effect.die(new PromptAlreadyProjected()) yield* run(db, event) - if (event.seq === undefined) - return yield* Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) + return yield* Effect.die("Durable Session event is missing aggregate sequence") yield* SessionInput.projectLegacyPrompted(db, { id: messageID, sessionID: event.data.sessionID, prompt: event.data.prompt, delivery: event.data.delivery, timeCreated: event.data.timestamp, - promotedSeq: event.seq, + promotedSeq: event.durable.seq, }) }), ) yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) => Effect.gen(function* () { - if (event.seq === undefined) - return yield* Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) + return yield* Effect.die("Durable Session event is missing aggregate sequence") yield* SessionInput.projectAdmitted(db, { - admittedSeq: event.seq, + admittedSeq: event.durable.seq, id: event.data.messageID, sessionID: event.data.sessionID, prompt: event.data.prompt, @@ -396,8 +395,8 @@ export const layer = Layer.effectDiscard( ) yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) => Effect.gen(function* () { - if (event.seq === undefined) - return yield* Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) + return yield* Effect.die("Durable Session event is missing aggregate sequence") yield* insertMessage( db, event, @@ -406,18 +405,14 @@ export const layer = Layer.effectDiscard( sessionID: event.data.sessionID, prompt: event.data.prompt, timeCreated: event.data.timeCreated, - promotedSeq: event.seq, + promotedSeq: event.durable.seq, }), ) }), ) yield* events.project(SessionEvent.InterruptRequested, () => Effect.void) - yield* events.project(SessionEvent.ContextUpdated, (event) => { - if (!event.replay || event.seq === undefined) return run(db, event) - return run(db, event).pipe( - Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.seq)), - ) - }) + // TODO: Reconstruct context epoch replacement state during replay without adding replay state to every EventV2 payload. + yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event)) yield* events.project(SessionEvent.Synthetic, (event) => run(db, event)) yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event)) yield* events.project(SessionEvent.Shell.Ended, (event) => run(db, event)) @@ -436,9 +431,9 @@ export const layer = Layer.effectDiscard( yield* events.project(SessionEvent.Reasoning.Ended, (event) => run(db, event)) // yield* events.project(SessionEvent.Retried, (event) => run(db, event)) yield* events.project(SessionEvent.Compaction.Ended, (event) => { - if (event.version === 1) return Effect.void - const seq = event.seq - if (seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence") + if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence") + if (event.durable.version === 1) return Effect.void + const seq = event.durable.seq return Effect.gen(function* () { yield* run(db, event) yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, seq) diff --git a/packages/core/src/v1/session.ts b/packages/core/src/v1/session.ts index 34bb72968..181ba9807 100644 --- a/packages/core/src/v1/session.ts +++ b/packages/core/src/v1/session.ts @@ -502,7 +502,7 @@ export type WithParts = { } const options = { - sync: { + durable: { aggregate: "sessionID", version: 1, }, diff --git a/packages/core/test/event.test.ts b/packages/core/test/event.test.ts index cd8ba6925..c28d902ac 100644 --- a/packages/core/test/event.test.ts +++ b/packages/core/test/event.test.ts @@ -30,7 +30,7 @@ const Message = EventV2.define({ const SyncMessage = EventV2.define({ type: "test.sync", - sync: { + durable: { version: 1, aggregate: "id", }, @@ -42,7 +42,7 @@ const SyncMessage = EventV2.define({ const SyncSent = EventV2.define({ type: "test.sent", - sync: { + durable: { version: 1, aggregate: "messageID", }, @@ -61,7 +61,7 @@ const GlobalMessage = EventV2.define({ const VersionedMessage = EventV2.define({ type: "test.versioned", - sync: { + durable: { version: 2, aggregate: "id", }, @@ -73,7 +73,7 @@ const VersionedMessage = EventV2.define({ const SyncTimestamp = EventV2.define({ type: "test.timestamp", - sync: { + durable: { version: 1, aggregate: "id", }, @@ -132,7 +132,7 @@ describe("EventV2", () => { const event = yield* events.publish(VersionedMessage, { id: "one", text: "hello" }) expect(event.type).toBe("test.versioned") - expect(event.version).toBe(2) + expect(event.durable?.version).toBe(2) }), ) @@ -146,12 +146,12 @@ describe("EventV2", () => { Effect.sync(() => { const latest = EventV2.define({ type: "test.out-of-order", - sync: { version: 2, aggregate: "id" }, + durable: { version: 2, aggregate: "id" }, schema: { id: Schema.String }, }) EventV2.define({ type: "test.out-of-order", - sync: { version: 1, aggregate: "id" }, + durable: { version: 1, aggregate: "id" }, schema: { id: Schema.String }, }) @@ -190,7 +190,7 @@ describe("EventV2", () => { }), ) - it.effect("commits local operational state inside a new synchronized event transaction", () => + it.effect("commits local operational state inside a new durable event transaction", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() @@ -207,7 +207,7 @@ describe("EventV2", () => { }), ) - it.effect("rolls back the synchronized event and projector when the local commit fails", () => + it.effect("rolls back the durable event and projector when the local commit fails", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service @@ -236,7 +236,7 @@ describe("EventV2", () => { const events = yield* EventV2.Service const exit = yield* events.publish(Message, { text: "hello" }, { commit: () => Effect.void }).pipe(Effect.exit) - expect(String(exit)).toContain("Local commit hooks require a synchronized event") + expect(String(exit)).toContain("Local commit hooks require a durable event") }), ) @@ -290,7 +290,6 @@ describe("EventV2", () => { Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() - yield* events.sync(() => Effect.die("sync defect")) yield* events.listen(() => { throw new Error("listener defect") }) @@ -303,7 +302,7 @@ describe("EventV2", () => { const event = yield* events.publish(SyncMessage, { id: "one", text: "hello" }) expect(received).toEqual([SyncMessage.type]) - expect(event.seq).toBeNumber() + expect(event.durable?.seq).toBeNumber() }), ) @@ -336,49 +335,7 @@ describe("EventV2", () => { }), ) - it.effect("does not synchronize live-only events", () => - Effect.gen(function* () { - const events = yield* EventV2.Service - const synchronized = new Array() - const unsubscribe = yield* events.sync((event) => - Effect.sync(() => { - synchronized.push(event.type) - }), - ) - yield* Effect.addFinalizer(() => unsubscribe) - - yield* events.publish(Message, { text: "live only" }) - yield* events.publish(SyncMessage, { id: "one", text: "durable" }) - - expect(synchronized).toEqual([SyncMessage.type]) - }), - ) - - it.effect("synchronizes only after the durable event commits", () => - Effect.gen(function* () { - const events = yield* EventV2.Service - const { db } = yield* Database.Service - const synchronized = new Array() - yield* events.sync((event) => - db - .select({ id: EventTable.id }) - .from(EventTable) - .where(eq(EventTable.id, event.id)) - .get() - .pipe( - Effect.orDie, - Effect.map((row) => synchronized.push(row !== undefined)), - Effect.asVoid, - ), - ) - - yield* events.publish(SyncMessage, { id: EventV2.ID.create(), text: "durable" }) - - expect(synchronized).toEqual([true]) - }), - ) - - it.effect("inserts sync event rows on publish", () => + it.effect("inserts durable event rows on publish", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service @@ -398,7 +355,7 @@ describe("EventV2", () => { }), ) - it.effect("increments sync event seq per aggregate", () => + it.effect("increments durable event seq per aggregate", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service @@ -417,22 +374,22 @@ describe("EventV2", () => { }), ) - it.effect("replays durable aggregate events after a cursor and tails new events", () => + it.effect("replays durable aggregate events after a sequence and tails new events", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" }) yield* events.publish(SyncMessage, { id: aggregateID, text: "one" }) const fiber = yield* events - .aggregateEvents({ aggregateID, after: EventV2.Cursor.make(0) }) + .durable({ aggregateID, after: 0 }) .pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* events.publish(SyncMessage, { id: aggregateID, text: "two" }) - expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual([ - [EventV2.Cursor.make(1), { id: aggregateID, text: "one" }], - [EventV2.Cursor.make(2), { id: aggregateID, text: "two" }], + expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual([ + [1, { id: aggregateID, text: "one" }], + [2, { id: aggregateID, text: "two" }], ]) }), ) @@ -443,19 +400,19 @@ describe("EventV2", () => { const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" }) const fiber = yield* events - .aggregateEvents({ aggregateID }) + .durable({ aggregateID }) .pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) yield* events.publish(SyncMessage, { id: aggregateID, text: "one" }) expect( Array.from(yield* Fiber.join(fiber)).map((event) => [ - event.cursor, - (event.event.data as { text: string }).text, + event.durable?.seq, + (event.data as { text: string }).text, ]), ).toEqual([ - [EventV2.Cursor.make(0), "zero"], - [EventV2.Cursor.make(1), "one"], + [0, "zero"], + [1, "one"], ]) }), ) @@ -477,7 +434,7 @@ describe("EventV2", () => { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const fiber = yield* events - .aggregateEvents({ aggregateID }) + .durable({ aggregateID }) .pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) yield* Deferred.await(readStarted) @@ -485,8 +442,8 @@ describe("EventV2", () => { yield* events.publish(SyncMessage, { id: aggregateID, text: "during handoff" }) yield* Deferred.succeed(continueRead, undefined) - expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual([ - [EventV2.Cursor.make(0), { id: aggregateID, text: "during handoff" }], + expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual([ + [0, { id: aggregateID, text: "during handoff" }], ]) }).pipe(Effect.provide(Layer.mergeAll(database, eventLayer))) }), @@ -498,7 +455,7 @@ describe("EventV2", () => { const aggregateID = EventV2.ID.create() const count = 64 const fiber = yield* events - .aggregateEvents({ aggregateID }) + .durable({ aggregateID }) .pipe(Stream.take(count), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow @@ -506,9 +463,9 @@ describe("EventV2", () => { yield* events.publish(SyncMessage, { id: aggregateID, text: String(index) }) } - expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual( + expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual( Array.from({ length: count }, (_, index) => [ - EventV2.Cursor.make(index), + index, { id: aggregateID, text: String(index) }, ]), ) @@ -520,14 +477,14 @@ describe("EventV2", () => { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const fiber = yield* events - .aggregateEvents({ aggregateID }) + .durable({ aggregateID }) .pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* events.publish(Message, { text: "live only" }) yield* events.publish(SyncMessage, { id: aggregateID, text: "durable" }) - expect(Array.from(yield* Fiber.join(fiber)).map((event) => event.event.type)).toEqual([SyncMessage.type]) + expect(Array.from(yield* Fiber.join(fiber)).map((event) => event.type)).toEqual([SyncMessage.type]) }), ) @@ -550,7 +507,7 @@ describe("EventV2", () => { }), ) - it.effect("replays sync events through projectors", () => + it.effect("replays durable events through projectors", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() @@ -706,7 +663,7 @@ describe("EventV2", () => { }) .pipe(Effect.exit) - expect(String(exit)).toContain("Unknown sync event type") + expect(String(exit)).toContain("Unknown durable event type") }), ) @@ -843,7 +800,7 @@ describe("EventV2", () => { const replayed = { id: published.id, type: EventV2.versionedType(SyncMessage.type, 1), - seq: published.seq!, + seq: published.durable!.seq, aggregateID, data: published.data, } @@ -988,7 +945,7 @@ describe("EventV2", () => { yield* events.replay(replayed, { publish: true }) yield* events.replay(replayed, { publish: true }) - expect(received).toMatchObject([{ id: replayed.id, seq: 0, data: replayed.data }]) + expect(received).toMatchObject([{ id: replayed.id, durable: { seq: 0, version: 1 }, data: replayed.data }]) }), ) @@ -1110,7 +1067,7 @@ describe("EventV2", () => { }), ) - it.effect("remove clears sync event sequence", () => + it.effect("remove clears durable event sequence", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() diff --git a/packages/core/test/session-create.test.ts b/packages/core/test/session-create.test.ts index 3551ec52f..471e86ff9 100644 --- a/packages/core/test/session-create.test.ts +++ b/packages/core/test/session-create.test.ts @@ -220,8 +220,8 @@ describe("SessionV2.create", () => { expect( Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(2), Stream.runCollect)), ).toMatchObject([ - { cursor: 1, event: { type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } } }, - { cursor: 2, event: { type: "session.next.prompt.promoted" } }, + { durable: { seq: 1 }, type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } }, + { durable: { seq: 2 }, type: "session.next.prompt.promoted" }, ]) }), ) @@ -355,7 +355,7 @@ describe("SessionV2.create", () => { expect(yield* session.get(created.id)).toMatchObject({ model }) expect( Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(1), Stream.runCollect)), - ).toMatchObject([{ event: { type: "session.next.model.switched", data: { model } } }]) + ).toMatchObject([{ type: "session.next.model.switched", data: { model } }]) }), ) diff --git a/packages/core/test/session-projector.test.ts b/packages/core/test/session-projector.test.ts index f84f60f30..df9ac731b 100644 --- a/packages/core/test/session-projector.test.ts +++ b/packages/core/test/session-projector.test.ts @@ -162,7 +162,7 @@ describe("SessionProjector", () => { expect( yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie), - ).toMatchObject({ promoted_seq: event.seq }) + ).toMatchObject({ promoted_seq: event.durable?.seq }) }), ) @@ -334,134 +334,6 @@ describe("SessionProjector", () => { }), ) - it.effect("rejects a Prompted event that conflicts with an admitted inbox row", () => - Effect.gen(function* () { - const { db } = yield* Database.Service - yield* db - .insert(ProjectTable) - .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) - .run() - .pipe(Effect.orDie) - yield* db - .insert(SessionTable) - .values({ - id: sessionID, - project_id: Project.ID.global, - slug: "test", - directory: "/project", - title: "test", - version: "test", - }) - .run() - .pipe(Effect.orDie) - const events = yield* EventV2.Service - const id = SessionMessage.ID.make("msg_conflict") - yield* SessionInput.admit(db, events, { - id, - sessionID, - prompt: new Prompt({ text: "admitted" }), - delivery: "steer", - }) - - const exit = yield* events - .publish(SessionEvent.Prompted, { - sessionID, - messageID: id, - timestamp: created, - prompt: new Prompt({ text: "different" }), - delivery: "steer", - }) - .pipe(Effect.exit) - - expect(String(exit)).toContain("SessionInput.LifecycleConflict") - expect( - yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie), - ).toMatchObject({ promoted_seq: null }) - }), - ) - - it.effect("rejects an assistant message ID that conflicts with an admitted inbox row", () => - Effect.gen(function* () { - const { db } = yield* Database.Service - yield* db - .insert(ProjectTable) - .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) - .run() - .pipe(Effect.orDie) - yield* db - .insert(SessionTable) - .values({ - id: sessionID, - project_id: Project.ID.global, - slug: "test", - directory: "/project", - title: "test", - version: "test", - }) - .run() - .pipe(Effect.orDie) - const events = yield* EventV2.Service - const id = SessionMessage.ID.make("msg_conflict") - yield* SessionInput.admit(db, events, { - id, - sessionID, - prompt: new Prompt({ text: "admitted" }), - delivery: "steer", - }) - - const exit = yield* events - .publish(SessionEvent.Step.Started, { - sessionID, - timestamp: created, - assistantMessageID: id, - agent: "build", - model, - }) - .pipe(Effect.exit) - - expect(String(exit)).toContain("SessionInput.LifecycleConflict") - expect( - yield* db.select().from(SessionMessageTable).where(eq(SessionMessageTable.id, id)).get().pipe(Effect.orDie), - ).toBeUndefined() - }), - ) - - it.effect("rejects a Prompted delivery mode that conflicts with an admitted inbox row", () => - Effect.gen(function* () { - const { db } = yield* Database.Service - yield* db - .insert(ProjectTable) - .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) - .run() - .pipe(Effect.orDie) - yield* db - .insert(SessionTable) - .values({ - id: sessionID, - project_id: Project.ID.global, - slug: "test", - directory: "/project", - title: "test", - version: "test", - }) - .run() - .pipe(Effect.orDie) - const events = yield* EventV2.Service - const id = SessionMessage.ID.make("msg_delivery_conflict") - const prompt = new Prompt({ text: "admitted" }) - yield* SessionInput.admit(db, events, { id, sessionID, prompt, delivery: "queue" }) - - const exit = yield* events - .publish(SessionEvent.Prompted, { sessionID, messageID: id, timestamp: created, prompt, delivery: "steer" }) - .pipe(Effect.exit) - - expect(String(exit)).toContain("SessionInput.LifecycleConflict") - expect( - yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie), - ).toMatchObject({ delivery: "queue", promoted_seq: null }) - }), - ) - it.effect("does not revive a stale incomplete in-memory assistant projection", () => Effect.gen(function* () { const stale = new SessionMessage.Assistant({ diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index d663cb715..5b6cc1ce1 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -177,7 +177,7 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("streams durable Session events after an aggregate cursor", () => + it.effect("streams durable Session events after an aggregate sequence", () => Effect.gen(function* () { yield* setup const session = yield* SessionV2.Service @@ -191,17 +191,17 @@ describe("SessionV2.prompt", () => { yield* SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER) const streamed = Array.from(yield* Fiber.join(fiber)) - expect(streamed.map((event) => [event.cursor, event.event.type])).toEqual([ - [EventV2.Cursor.make(0), "session.next.prompt.admitted"], - [EventV2.Cursor.make(1), "session.next.prompt.admitted"], - [EventV2.Cursor.make(2), "session.next.prompt.promoted"], - [EventV2.Cursor.make(3), "session.next.prompt.promoted"], + expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([ + [0, "session.next.prompt.admitted"], + [1, "session.next.prompt.admitted"], + [2, "session.next.prompt.promoted"], + [3, "session.next.prompt.promoted"], ]) expect( Array.from( - yield* session.events({ sessionID, after: streamed[0]!.cursor }).pipe(Stream.take(1), Stream.runCollect), - ).map((event) => [event.cursor, event.event.type]), - ).toEqual([[EventV2.Cursor.make(1), "session.next.prompt.admitted"]]) + yield* session.events({ sessionID, after: streamed[0]!.durable?.seq }).pipe(Stream.take(1), Stream.runCollect), + ).map((event) => [event.durable?.seq, event.type]), + ).toEqual([[1, "session.next.prompt.admitted"]]) }), ) @@ -472,58 +472,6 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("rejects an input ID already used by a durable non-prompt event", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - yield* events.publish(SessionEvent.Synthetic, { - sessionID, - messageID, - timestamp: yield* DateTime.now, - text: "Collision", - }) - - const failure = yield* session - .prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "Collision" }), resume: false }) - .pipe(Effect.flip) - - expect(failure._tag).toBe("Session.PromptConflictError") - expect(yield* admitted(messageID)).toBeUndefined() - }), - ) - - it.effect("rejects a durable event ID reserved by an admitted prompt without poisoning promotion", () => - Effect.gen(function* () { - yield* setup - const { db } = yield* Database.Service - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - const prompt = new Prompt({ text: "Reserved prompt" }) - yield* session.prompt({ id: messageID, sessionID, prompt, resume: false }) - - const failure = yield* events - .publish(SessionEvent.Synthetic, { - sessionID, - messageID, - timestamp: yield* DateTime.now, - text: "Conflicting synthetic", - }) - .pipe(Effect.catchDefect(Effect.succeed)) - - expect(String(failure)).toContain("SessionInput.LifecycleConflict") - expect(yield* admitted(messageID)).not.toHaveProperty("promotedSeq") - expect(yield* session.messages({ sessionID })).toEqual([]) - - yield* SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER) - - expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 1 }) - expect(yield* session.messages({ sessionID })).toMatchObject([ - { id: messageID, type: "user", text: "Reserved prompt" }, - ]) - }), - ) - it.effect("rejects reuse of one globally unique message ID across sessions", () => Effect.gen(function* () { yield* setup diff --git a/packages/core/test/session-runner-tool-events.test.ts b/packages/core/test/session-runner-tool-events.test.ts index 3d4a858cb..f2d18cfa2 100644 --- a/packages/core/test/session-runner-tool-events.test.ts +++ b/packages/core/test/session-runner-tool-events.test.ts @@ -19,17 +19,17 @@ const capture = () => { Effect.sync(() => { const event = { id: EventV2.ID.create(), type: definition.type, data } as EventV2.Payload published.push({ - type: definition.sync ? EventV2.versionedType(definition.type, definition.sync.version) : definition.type, + type: definition.durable + ? EventV2.versionedType(definition.type, definition.durable.version) + : definition.type, data, }) return event }), subscribe: () => Stream.empty, all: () => Stream.empty, - aggregateEvents: () => Stream.empty, - sync: () => Effect.succeed(Effect.void), + durable: () => Stream.empty, listen: () => Effect.succeed(Effect.void), - beforeCommit: () => Effect.void, project: () => Effect.void, replay: () => Effect.void, replayAll: () => Effect.succeed(undefined), diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 0cc275085..393f5526a 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -1355,34 +1355,6 @@ describe("SessionRunnerLLM", () => { }), ) - it.effect("replays retained context projections while replacement is pending", () => - Effect.gen(function* () { - yield* setup - const session = yield* SessionV2.Service - const events = yield* EventV2.Service - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) - - requests.length = 0 - response = [] - yield* session.resume(sessionID) - systemBaseline = "Changed context" - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false }) - yield* session.resume(sessionID) - yield* events.publish(SessionEvent.ModelSwitched, { - sessionID, - messageID: SessionMessage.ID.create(), - timestamp: DateTime.makeUnsafe(1), - model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") }, - }) - - yield* replaySessionProjection(sessionID) - systemBaseline = "Replacement context" - yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false }) - yield* session.resume(sessionID) - expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Replacement context"]) - }), - ) - it.effect("replaces the baseline lazily after completed compaction without reopening replacement on replay", () => Effect.gen(function* () { yield* setup diff --git a/packages/opencode/src/event-v2-bridge.ts b/packages/opencode/src/event-v2-bridge.ts index 14a8053f9..4cef7311d 100644 --- a/packages/opencode/src/event-v2-bridge.ts +++ b/packages/opencode/src/event-v2-bridge.ts @@ -45,9 +45,9 @@ export const layer = Layer.effect( workspace: workspaceID, payload: { id: event.id, type: event.type, properties: event.data }, }) - const sync = EventV2.registry.get(event.type)?.sync - if (sync === undefined || event.seq === undefined || event.version === undefined) return - const aggregateID = (event.data as Record)[sync.aggregate] + const durable = EventV2.registry.get(event.type)?.durable + if (durable === undefined || event.durable === undefined) return + const aggregateID = (event.data as Record)[durable.aggregate] if (typeof aggregateID !== "string") return GlobalBus.emit("event", { directory: event.location?.directory ?? ctx?.directory, @@ -57,8 +57,8 @@ export const layer = Layer.effect( type: "sync", syncEvent: { id: event.id, - type: EventV2.versionedType(event.type, event.version), - seq: event.seq, + type: EventV2.versionedType(event.type, event.durable.version), + seq: event.durable.seq, aggregateID, data: event.data, }, diff --git a/packages/opencode/src/server/routes/instance/httpapi/groups/global.ts b/packages/opencode/src/server/routes/instance/httpapi/groups/global.ts index 87556a1ad..3e1315486 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/groups/global.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/groups/global.ts @@ -16,13 +16,13 @@ const GlobalHealth = Schema.Struct({ const SyncEventSchemas = EventV2.registry .values() .flatMap((definition) => { - if (!definition.sync) return [] + if (!definition.durable) return [] return [ Schema.Struct({ type: Schema.Literal("sync"), id: EventV2.ID, syncEvent: Schema.Struct({ - type: Schema.Literal(EventV2.versionedType(definition.type, definition.sync.version)), + type: Schema.Literal(EventV2.versionedType(definition.type, definition.durable.version)), id: EventV2.ID, seq: Schema.Finite, aggregateID: Schema.String,