refactor(core): simplify event model (#33238)

This commit is contained in:
Dax 2026-06-21 16:34:57 +02:00 committed by GitHub
parent ca006a2d20
commit fb43c15f88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 231 additions and 569 deletions

View File

@ -5,7 +5,7 @@ import { and, asc, eq, gt } from "drizzle-orm"
import { Database } from "./database/database"
import { EventSequenceTable, EventTable } from "./event/sql"
import { Location } from "./location"
import { externalID, type ExternalID, NonNegativeInt, withStatics } from "./schema"
import { externalID, type ExternalID, withStatics } from "./schema"
import { Identifier } from "./util/identifier"
import { LayerNode } from "./effect/layer-node"
import { isDeepStrictEqual } from "node:util"
@ -19,16 +19,9 @@ export const ID = Schema.String.check(Schema.isStartsWith("evt_")).pipe(
)
export type ID = typeof ID.Type
/**
* Durable aggregate continuation position for embedded replay streams.
* TODO: Decide whether a future HTTP / SDK surface should expose an opaque cursor instead.
*/
export const Cursor = NonNegativeInt.pipe(Schema.brand("EventV2.Cursor"))
export type Cursor = typeof Cursor.Type
export type Definition<Type extends string = string, DataSchema extends Schema.Top = Schema.Top> = {
readonly type: Type
readonly sync?: {
readonly durable?: {
readonly version: number
readonly aggregate: string
}
@ -41,20 +34,16 @@ export type Payload<D extends Definition = Definition> = {
readonly id: ID
readonly type: D["type"]
readonly data: Data<D>
/** Durable aggregate order, populated while synchronized events are projected. */
readonly seq?: number
readonly version?: number
readonly durable?: {
readonly aggregateID: string
readonly seq: number
readonly version: number
}
readonly location?: Location.Ref
readonly metadata?: Record<string, unknown>
/** Internal replay marker for projectors that own non-replicated operational state. */
readonly replay?: boolean
}
export type Projector<D extends Definition = Definition> = (event: Payload<D>) => Effect.Effect<void>
type AnyProjector = (event: Payload) => Effect.Effect<void>
export type CommitGuard = (event: Payload) => Effect.Effect<void>
export type Listener = (event: Payload) => Effect.Effect<void>
export type Sync = (event: Payload) => Effect.Effect<void>
export type Subscriber<D extends Definition = Definition> = (event: Payload<D>) => Effect.Effect<void>
export type Unsubscribe = Effect.Effect<void>
export type SerializedEvent = {
@ -65,13 +54,8 @@ export type SerializedEvent = {
readonly data: Record<string, unknown>
}
export type CursorEvent<E extends Payload = Payload> = {
readonly cursor: Cursor
readonly event: E
}
export class InvalidSyncEventError extends Schema.TaggedErrorClass<InvalidSyncEventError>()(
"EventV2.InvalidSyncEvent",
export class InvalidDurableEventError extends Schema.TaggedErrorClass<InvalidDurableEventError>()(
"EventV2.InvalidDurableEvent",
{
type: Schema.String,
message: Schema.String,
@ -83,19 +67,11 @@ export function versionedType(type: string, version: number) {
}
export const registry = new Map<string, Definition>()
type SyncDefinition = Definition & {
readonly sync: NonNullable<Definition["sync"]>
readonly encode: (data: unknown) => unknown
readonly decode: (data: unknown) => unknown
}
const syncRegistry = new Map<string, SyncDefinition>()
// Synchronized events cross a JSON boundary, so their data schemas must encode and decode without services.
const syncCodec = (definition: Definition) => definition.data as Schema.Codec<unknown, unknown, never, never>
const durableRegistry = new Map<string, Definition>()
export function define<const Type extends string, Fields extends Schema.Struct.Fields>(input: {
readonly type: Type
readonly sync?: {
readonly durable?: {
readonly version: number
readonly aggregate: string
}
@ -106,28 +82,26 @@ export function define<const Type extends string, Fields extends Schema.Struct.F
id: ID,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)),
type: Schema.Literal(input.type),
version: Schema.optional(Schema.Number),
durable: Schema.optional(Schema.Struct({ aggregateID: Schema.String, seq: Schema.Number, version: Schema.Number })),
location: Schema.optional(Location.Ref),
data: Data,
}).annotate({ identifier: input.type })
const definition = Object.assign(Payload, {
type: input.type,
...(input.sync === undefined ? {} : { sync: input.sync }),
...(input.durable === undefined ? {} : { durable: input.durable }),
data: Data,
})
const existing = registry.get(input.type)
if (input.sync === undefined || existing?.sync === undefined || input.sync.version >= existing.sync.version) {
if (
input.durable === undefined ||
existing?.durable === undefined ||
input.durable.version >= existing.durable.version
) {
registry.set(input.type, definition)
}
if (input.sync)
syncRegistry.set(
versionedType(input.type, input.sync.version),
Object.assign(definition, {
encode: Schema.encodeUnknownSync(syncCodec(definition)),
decode: Schema.decodeUnknownSync(syncCodec(definition)),
}) as SyncDefinition,
)
if (input.durable)
durableRegistry.set(versionedType(input.type, input.durable.version), definition)
return definition as Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> &
Definition<Type, Schema.Struct<Fields>>
}
@ -140,7 +114,7 @@ export interface PublishOptions {
readonly id?: ID
readonly metadata?: Record<string, unknown>
readonly location?: Location.Ref
/** Local operational projection committed atomically with a new synchronized event. Not replayed or serialized. */
/** Local operational projection committed atomically with a new durable event. Not replayed or serialized. */
readonly commit?: (seq: number) => Effect.Effect<void>
}
@ -152,14 +126,13 @@ export interface Interface {
) => Effect.Effect<Payload<D>>
readonly subscribe: <D extends Definition>(definition: D) => Stream.Stream<Payload<D>>
readonly all: () => Stream.Stream<Payload>
readonly aggregateEvents: (input: {
readonly durable: (input: {
readonly aggregateID: string
readonly after?: Cursor
}) => Stream.Stream<CursorEvent>
readonly sync: (handler: Sync) => Effect.Effect<Unsubscribe>
readonly listen: (listener: Listener) => Effect.Effect<Unsubscribe>
readonly beforeCommit: (guard: CommitGuard) => Effect.Effect<void>
readonly project: <D extends Definition>(definition: D, projector: Projector<D>) => Effect.Effect<void>
readonly after?: number
}) => Stream.Stream<Payload>
/** @deprecated Use `all()` and consume the returned stream. */
readonly listen: (listener: Subscriber) => Effect.Effect<Unsubscribe>
readonly project: <D extends Definition>(definition: D, projector: Subscriber<D>) => Effect.Effect<void>
readonly replay: (
event: SerializedEvent,
options?: { readonly publish?: boolean; readonly ownerID?: string; readonly strictOwner?: boolean },
@ -182,37 +155,37 @@ export const layerWith = (options?: LayerOptions) =>
Layer.effect(
Service,
Effect.gen(function* () {
const all = yield* PubSub.unbounded<Payload>()
const synchronized = new Map<string, Set<PubSub.PubSub<void>>>()
const typed = new Map<string, PubSub.PubSub<Payload>>()
const projectors = new Map<string, AnyProjector[]>()
const commitGuards = new Array<CommitGuard>()
const listeners = new Array<Listener>()
const syncHandlers = new Array<Sync>()
const pubsub = {
all: yield* PubSub.unbounded<Payload>(),
durable: new Map<string, Set<PubSub.PubSub<void>>>(),
typed: new Map<string, PubSub.PubSub<Payload>>(),
}
const projectors = new Map<string, Subscriber[]>()
const listeners = new Array<Subscriber>()
const { db } = yield* Database.Service
const getOrCreate = (definition: Definition) =>
Effect.gen(function* () {
const existing = typed.get(definition.type)
const existing = pubsub.typed.get(definition.type)
if (existing) return existing
const pubsub = yield* PubSub.unbounded<Payload>()
typed.set(definition.type, pubsub)
return pubsub
const created = yield* PubSub.unbounded<Payload>()
pubsub.typed.set(definition.type, created)
return created
})
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
yield* PubSub.shutdown(all)
yield* PubSub.shutdown(pubsub.all)
yield* Effect.forEach(
synchronized.values(),
pubsub.durable.values(),
(pubsubs) => Effect.forEach(pubsubs, PubSub.shutdown, { discard: true }),
{ discard: true },
)
yield* Effect.forEach(typed.values(), PubSub.shutdown, { discard: true })
yield* Effect.forEach(pubsub.typed.values(), PubSub.shutdown, { discard: true })
}),
)
function commitSyncEvent(
function commitDurableEvent(
event: Payload,
input?: {
readonly seq: number
@ -224,28 +197,20 @@ export const layerWith = (options?: LayerOptions) =>
) {
return Effect.gen(function* () {
const definition = registry.get(event.type)
const sync = definition?.sync
if (sync) {
if (event.version !== sync.version) {
yield* Effect.die(
new InvalidSyncEventError({
type: event.type,
message: `Expected event version ${sync.version}, got ${event.version}`,
}),
)
}
const aggregateID = (event.data as Record<string, unknown>)[sync.aggregate]
const durable = definition?.durable
if (durable) {
const aggregateID = (event.data as Record<string, unknown>)[durable.aggregate]
if (typeof aggregateID !== "string") {
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Expected string aggregate field ${sync.aggregate}`,
message: `Expected string aggregate field ${durable.aggregate}`,
}),
)
} else {
if (input && input.aggregateID !== aggregateID) {
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Aggregate mismatch: expected ${input.aggregateID}, got ${aggregateID}`,
}),
@ -265,12 +230,12 @@ export const layerWith = (options?: LayerOptions) =>
.get()
.pipe(Effect.orDie)
const latest = row?.seq ?? -1
const encoded = syncRegistry
.get(versionedType(definition.type, sync.version))!
.encode(event.data) as Record<string, unknown>
const encoded = Schema.encodeUnknownSync(
definition.data as Schema.Codec<unknown, unknown, never, never>,
)(event.data) as Record<string, unknown>
if (input?.strictOwner && row?.ownerID && row.ownerID !== input.ownerID) {
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Replay owner mismatch for aggregate ${aggregateID}: expected ${row.ownerID}, got ${input.ownerID ?? "none"}`,
}),
@ -285,7 +250,7 @@ export const layerWith = (options?: LayerOptions) =>
.pipe(Effect.orDie)
if (
stored?.id === event.id &&
stored.type === versionedType(definition.type, sync.version) &&
stored.type === versionedType(definition.type, durable.version) &&
isDeepStrictEqual(stored.data, encoded)
) {
if (input.ownerID && row?.ownerID == null) {
@ -299,7 +264,7 @@ export const layerWith = (options?: LayerOptions) =>
return
}
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Replay diverged at aggregate ${aggregateID} sequence ${input.seq}`,
}),
@ -311,7 +276,7 @@ export const layerWith = (options?: LayerOptions) =>
const seq = input?.seq ?? latest + 1
if (input && seq !== latest + 1) {
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Sequence mismatch for aggregate ${aggregateID}: expected ${latest + 1}, got ${seq}`,
}),
@ -325,16 +290,17 @@ export const layerWith = (options?: LayerOptions) =>
.pipe(Effect.orDie)
if (stored)
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Event ${event.id} already exists at aggregate ${stored.aggregateID} sequence ${stored.seq}`,
}),
)
for (const guard of commitGuards) {
yield* guard(event)
}
const committed = {
...event,
durable: { aggregateID, seq, version: durable.version },
} as Payload
for (const projector of list) {
yield* projector({ ...event, seq } as Payload)
yield* projector(committed)
}
if (commit) yield* commit(seq)
yield* db
@ -356,7 +322,7 @@ export const layerWith = (options?: LayerOptions) =>
id: event.id,
aggregate_id: aggregateID,
seq,
type: versionedType(definition.type, sync.version),
type: versionedType(definition.type, durable.version),
data: encoded,
},
])
@ -369,8 +335,8 @@ export const layerWith = (options?: LayerOptions) =>
.pipe(Effect.orDie)
if (committed) {
yield* Effect.forEach(
synchronized.get(committed.aggregateID) ?? [],
(pubsub) => PubSub.publish(pubsub, undefined),
pubsub.durable.get(committed.aggregateID) ?? [],
(wake) => PubSub.publish(wake, undefined),
{ discard: true },
)
}
@ -384,19 +350,25 @@ export const layerWith = (options?: LayerOptions) =>
function publishEvent<D extends Definition>(event: Payload<D>, commit?: PublishOptions["commit"]) {
return Effect.gen(function* () {
const durable = registry.get(event.type)?.sync !== undefined
if (!durable && commit)
const definition = registry.get(event.type)
if (!definition?.durable && commit)
return yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: "Local commit hooks require a synchronized event",
message: "Local commit hooks require a durable event",
}),
)
if (durable) {
const committed = yield* commitSyncEvent(event as Payload, undefined, commit)
if (definition?.durable) {
const committed = yield* commitDurableEvent(event as Payload, undefined, commit)
if (committed) {
event = { ...event, seq: committed.seq }
yield* Effect.forEach(syncHandlers, (sync) => observe(event as Payload, "sync", sync), { discard: true })
event = {
...event,
durable: {
aggregateID: committed.aggregateID,
seq: committed.seq,
version: definition.durable.version,
},
}
yield* notify(event as Payload, true)
return event
}
@ -406,12 +378,12 @@ export const layerWith = (options?: LayerOptions) =>
})
}
const observe = (event: Payload, kind: "sync" | "listener", observer: (event: Payload) => Effect.Effect<void>) =>
const observe = (event: Payload, observer: (event: Payload) => Effect.Effect<void>) =>
Effect.suspend(() => observer(event)).pipe(
Effect.catchCauseIf(
(cause) => !Cause.hasInterrupts(cause),
(cause) =>
Effect.logError("Event observer failed", { eventID: event.id, eventType: event.type, kind, cause }),
Effect.logError("Event listener failed", { eventID: event.id, eventType: event.type, cause }),
),
)
@ -419,12 +391,12 @@ export const layerWith = (options?: LayerOptions) =>
return Effect.gen(function* () {
yield* Effect.forEach(
listeners,
(listener) => (isolateListeners ? observe(event, "listener", listener) : listener(event)),
(listener) => (isolateListeners ? observe(event, listener) : listener(event)),
{ discard: true },
)
const pubsub = typed.get(event.type)
if (pubsub) yield* PubSub.publish(pubsub, event)
yield* PubSub.publish(all, event)
const typed = pubsub.typed.get(event.type)
if (typed) yield* PubSub.publish(typed, event)
yield* PubSub.publish(pubsub.all, event)
})
}
@ -441,7 +413,6 @@ export const layerWith = (options?: LayerOptions) =>
id: options?.id ?? ID.create(),
...(options?.metadata ? { metadata: options.metadata } : {}),
type: definition.type,
...(definition.sync === undefined ? {} : { version: definition.sync.version }),
...(location ? { location } : {}),
data,
} as Payload<D>,
@ -455,27 +426,37 @@ export const layerWith = (options?: LayerOptions) =>
options?: { readonly publish?: boolean; readonly ownerID?: string; readonly strictOwner?: boolean },
) {
return Effect.gen(function* () {
const definition = syncRegistry.get(event.type)
if (!definition) {
const definition = durableRegistry.get(event.type)
if (!definition?.durable) {
yield* Effect.die(
new InvalidSyncEventError({ type: event.type, message: `Unknown sync event type ${event.type}` }),
new InvalidDurableEventError({ type: event.type, message: `Unknown durable event type ${event.type}` }),
)
} else {
const payload = {
id: event.id,
type: definition.type,
version: definition.sync.version,
data: definition.decode(event.data),
replay: true,
data: Schema.decodeUnknownSync(
definition.data as Schema.Codec<unknown, unknown, never, never>,
)(event.data),
} as Payload
const committed = yield* commitSyncEvent(payload, {
const committed = yield* commitDurableEvent(payload, {
seq: event.seq,
aggregateID: event.aggregateID,
ownerID: options?.ownerID,
strictOwner: options?.strictOwner,
})
if (committed && options?.publish) {
yield* notify({ ...payload, seq: committed.seq }, true)
yield* notify(
{
...payload,
durable: {
aggregateID: committed.aggregateID,
seq: committed.seq,
version: definition.durable.version,
},
},
true,
)
}
}
})
@ -490,7 +471,7 @@ export const layerWith = (options?: LayerOptions) =>
if (!source) return undefined
if (events.some((event) => event.aggregateID !== source)) {
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: events[0]?.type ?? "unknown",
message: "Replay events must belong to the same aggregate",
}),
@ -501,7 +482,7 @@ export const layerWith = (options?: LayerOptions) =>
const seq = start + index
if (event.seq !== seq) {
yield* Effect.die(
new InvalidSyncEventError({
new InvalidDurableEventError({
type: event.type,
message: `Replay sequence mismatch at index ${index}: expected ${seq}, got ${event.seq}`,
}),
@ -540,22 +521,18 @@ export const layerWith = (options?: LayerOptions) =>
Stream.map((event) => event as Payload<D>),
)
const streamAll = (): Stream.Stream<Payload> => Stream.fromPubSub(all)
const streamAll = (): Stream.Stream<Payload> => Stream.fromPubSub(pubsub.all)
const decodeSerializedEvent = (event: SerializedEvent): CursorEvent => {
const definition = syncRegistry.get(event.type)
if (!definition) {
throw new InvalidSyncEventError({ type: event.type, message: `Unknown sync event type ${event.type}` })
const decodeSerializedEvent = (event: SerializedEvent): Payload => {
const definition = durableRegistry.get(event.type)
if (!definition?.durable) {
throw new InvalidDurableEventError({ type: event.type, message: `Unknown durable event type ${event.type}` })
}
return {
cursor: Cursor.make(event.seq),
event: {
id: event.id,
type: definition.type,
version: definition.sync.version,
seq: event.seq,
data: definition.decode(event.data),
},
id: event.id,
type: definition.type,
durable: { aggregateID: event.aggregateID, seq: event.seq, version: definition.durable.version },
data: Schema.decodeUnknownSync(definition.data as Schema.Codec<unknown, unknown, never, never>)(event.data),
}
}
@ -583,43 +560,43 @@ export const layerWith = (options?: LayerOptions) =>
),
)
const subscribeSynchronized = (aggregateID: string) =>
const subscribeDurable = (aggregateID: string) =>
Effect.gen(function* () {
const pubsub = yield* PubSub.sliding<void>(1)
const subscription = yield* PubSub.subscribe(pubsub)
const wake = yield* PubSub.sliding<void>(1)
const subscription = yield* PubSub.subscribe(wake)
yield* Effect.acquireRelease(
Effect.sync(() => {
const pubsubs = synchronized.get(aggregateID) ?? new Set()
pubsubs.add(pubsub)
synchronized.set(aggregateID, pubsubs)
const wakes = pubsub.durable.get(aggregateID) ?? new Set()
wakes.add(wake)
pubsub.durable.set(aggregateID, wakes)
}),
() =>
Effect.sync(() => {
const pubsubs = synchronized.get(aggregateID)
pubsubs?.delete(pubsub)
if (pubsubs?.size === 0) synchronized.delete(aggregateID)
}).pipe(Effect.andThen(PubSub.shutdown(pubsub))),
const wakes = pubsub.durable.get(aggregateID)
wakes?.delete(wake)
if (wakes?.size === 0) pubsub.durable.delete(aggregateID)
}).pipe(Effect.andThen(PubSub.shutdown(wake))),
)
return subscription
})
const streamEvents = (input: {
const durable = (input: {
readonly aggregateID: string
readonly after?: Cursor
}): Stream.Stream<CursorEvent> =>
readonly after?: number
}): Stream.Stream<Payload> =>
Stream.unwrap(
Effect.gen(function* () {
const synchronized = yield* subscribeSynchronized(input.aggregateID)
let cursor = input.after ?? -1
const read = Effect.suspend(() => readAfter(input.aggregateID, cursor)).pipe(
const wakes = yield* subscribeDurable(input.aggregateID)
let sequence = input.after ?? -1
const read = Effect.suspend(() => readAfter(input.aggregateID, sequence)).pipe(
Effect.tap((events) =>
Effect.sync(() => {
cursor = events.at(-1)?.cursor ?? cursor
sequence = events.at(-1)?.durable?.seq ?? sequence
}),
),
)
const historical = yield* read
const live = Stream.fromSubscription(synchronized).pipe(
const live = Stream.fromSubscription(wakes).pipe(
Stream.mapEffect(() => read),
Stream.flattenIterable,
)
@ -627,7 +604,7 @@ export const layerWith = (options?: LayerOptions) =>
}),
)
const listen = (listener: Listener): Effect.Effect<Unsubscribe> =>
const listen = (listener: Subscriber): Effect.Effect<Unsubscribe> =>
Effect.sync(() => {
listeners.push(listener)
return Effect.sync(() => {
@ -636,21 +613,7 @@ export const layerWith = (options?: LayerOptions) =>
})
})
const sync = (handler: Sync): Effect.Effect<Unsubscribe> =>
Effect.sync(() => {
syncHandlers.push(handler)
return Effect.sync(() => {
const index = syncHandlers.indexOf(handler)
if (index >= 0) syncHandlers.splice(index, 1)
})
})
const beforeCommit = (guard: CommitGuard): Effect.Effect<void> =>
Effect.sync(() => {
commitGuards.push(guard)
})
const project = <D extends Definition>(definition: D, projector: Projector<D>): Effect.Effect<void> =>
const project = <D extends Definition>(definition: D, projector: Subscriber<D>): Effect.Effect<void> =>
Effect.sync(() => {
const list = projectors.get(definition.type) ?? []
list.push((event) => projector(event as Payload<D>))
@ -661,10 +624,8 @@ export const layerWith = (options?: LayerOptions) =>
publish,
subscribe,
all: streamAll,
aggregateEvents: streamEvents,
sync,
durable,
listen,
beforeCommit,
project,
replay,
replayAll,

View File

@ -1,7 +1,6 @@
export * as Session from "./session"
import { Effect, Schema, Stream } from "effect"
import { EventV2 } from "../event"
import { ModelV2 } from "../model"
import { SessionV2 } from "../session"
import { MessageDecodeError } from "../session/error"
@ -34,9 +33,7 @@ export type Delivery = SessionInput.Delivery
export const ListInput = SessionV2.ListInput
export type ListInput = SessionV2.ListInput
export const EventCursor = EventV2.Cursor
export type EventCursor = EventV2.Cursor
export type Event = EventV2.CursorEvent<SessionEvent.DurableEvent>
export type Event = SessionEvent.DurableEvent
export const NotFoundError = SessionV2.NotFoundError
export type NotFoundError = SessionV2.NotFoundError
@ -99,7 +96,7 @@ export interface MessageInput {
export interface EventsInput {
readonly sessionID: ID
readonly after?: EventCursor
readonly after?: number
}
export interface Interface {

View File

@ -124,8 +124,8 @@ export interface Interface {
) => Effect.Effect<SessionMessage.Message[], NotFoundError | MessageDecodeError>
readonly events: (input: {
sessionID: SessionSchema.ID
after?: EventV2.Cursor
}) => Stream.Stream<EventV2.CursorEvent<SessionEvent.DurableEvent>, NotFoundError>
after?: number
}) => Stream.Stream<SessionEvent.DurableEvent, NotFoundError>
readonly switchAgent: (input: {
sessionID: SessionSchema.ID
agent: string
@ -339,11 +339,9 @@ export const layer = Layer.effect(
Stream.unwrap(
result
.get(input.sessionID)
.pipe(Effect.as(events.aggregateEvents({ aggregateID: input.sessionID, after: input.after }))),
.pipe(Effect.as(events.durable({ aggregateID: input.sessionID, after: input.after }))),
).pipe(
Stream.filter((event): event is EventV2.CursorEvent<SessionEvent.DurableEvent> =>
isDurableSessionEvent(event.event),
),
Stream.filter((event): event is SessionEvent.DurableEvent => isDurableSessionEvent(event)),
),
prompt: Effect.fn("V2Session.prompt")((input) =>
Effect.uninterruptible(
@ -413,9 +411,9 @@ export const layer = Layer.effect(
sessionID,
timestamp: yield* DateTime.now,
})
if (event.seq === undefined)
if (event.durable === undefined)
return yield* Effect.die("Interrupt request event is missing aggregate sequence")
yield* execution.interrupt(sessionID, event.seq)
yield* execution.interrupt(sessionID, event.durable.seq)
}),
),
),

View File

@ -27,13 +27,13 @@ const Base = {
}
const options = {
sync: {
durable: {
aggregate: "sessionID",
version: 1,
},
} as const
const stepSettlementOptions = {
sync: {
durable: {
aggregate: "sessionID",
version: 2,
},
@ -456,7 +456,7 @@ export namespace Compaction {
export const Ended = EventV2.define({
type: "session.next.compaction.ended",
sync: { aggregate: "sessionID", version: 2 },
durable: { aggregate: "sessionID", version: 2 },
schema: {
...Base,
messageID: SessionMessageID.ID,

View File

@ -74,11 +74,11 @@ export const admit = Effect.fn("SessionInput.admit")(function* (
})
.pipe(
Effect.flatMap((event) =>
event.seq === undefined
event.durable === undefined
? Effect.die("Prompt admission event is missing aggregate sequence")
: Effect.succeed(
new Admitted({
admittedSeq: event.seq,
admittedSeq: event.durable.seq,
id: input.id,
sessionID: input.sessionID,
prompt: input.prompt,
@ -117,13 +117,6 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio
readonly timeCreated: DateTime.Utc
},
) {
const message = yield* db
.select({ id: SessionMessageTable.id })
.from(SessionMessageTable)
.where(eq(SessionMessageTable.id, input.id))
.get()
.pipe(Effect.orDie)
if (message) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
const stored = yield* db
.insert(SessionInputTable)
.values({
@ -208,37 +201,6 @@ const matchesPrompt = (input: Admitted, expected: { readonly sessionID: SessionS
input.sessionID === expected.sessionID &&
JSON.stringify(encodePrompt(input.prompt)) === JSON.stringify(encodePrompt(expected.prompt))
export const guardReservedID = Effect.fn("SessionInput.guardReservedID")(function* (
db: DatabaseService,
event: EventV2.Payload,
) {
if (
Schema.is(SessionEvent.PromptLifecycle.Admitted)(event) ||
Schema.is(SessionEvent.PromptLifecycle.Promoted)(event)
)
return
const id = reservedID(event)
if (id === undefined) return
const admitted = yield* db
.select({ id: SessionInputTable.id })
.from(SessionInputTable)
.where(eq(SessionInputTable.id, id))
.get()
.pipe(Effect.orDie)
if (admitted === undefined) return
return yield* Effect.die(new LifecycleConflict({ id }))
})
const reservedID = (event: EventV2.Payload) => {
if (Schema.is(SessionEvent.Step.Started)(event)) return event.data.assistantMessageID
if (Schema.is(SessionEvent.AgentSwitched)(event)) return event.data.messageID
if (Schema.is(SessionEvent.ModelSwitched)(event)) return event.data.messageID
if (Schema.is(SessionEvent.Prompted)(event)) return event.data.messageID
if (Schema.is(SessionEvent.Synthetic)(event)) return event.data.messageID
if (Schema.is(SessionEvent.Shell.Started)(event)) return event.data.messageID
if (Schema.is(SessionEvent.Compaction.Started)(event)) return event.data.messageID
}
export const projectLegacyPrompted = Effect.fn("SessionInput.projectLegacyPrompted")(function* (
db: DatabaseService,
input: {

View File

@ -115,7 +115,7 @@ function run(db: DatabaseService, event: SessionEvent.Event) {
const decodeRow = (row: typeof SessionMessageTable.$inferSelect) =>
decodeMessage({ ...row.data, id: row.id, type: row.type })
const updateMessage = (message: SessionMessage.Message) => {
if (event.seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence")
const encoded = encodeMessage(message)
const { id, type, ...data } = encoded
return db
@ -192,7 +192,7 @@ function run(db: DatabaseService, event: SessionEvent.Event) {
}
function insertMessage(db: DatabaseService, event: SessionEvent.Event, message: SessionMessage.Message) {
if (event.seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence")
const encoded = encodeMessage(message)
const { id, type, ...data } = encoded
return db
@ -201,7 +201,7 @@ function insertMessage(db: DatabaseService, event: SessionEvent.Event, message:
id: SessionMessage.ID.make(id),
session_id: event.data.sessionID,
type,
seq: event.seq,
seq: event.durable.seq,
time_created: DateTime.toEpochMillis(message.time.created),
data,
})
@ -213,7 +213,6 @@ export const layer = Layer.effectDiscard(
Effect.gen(function* () {
const events = yield* EventV2.Service
const { db } = yield* Database.Service
yield* events.beforeCommit((event) => SessionInput.guardReservedID(db, event))
yield* events.project(SessionV1.Event.Created, (event) =>
Effect.gen(function* () {
const stored = yield* db
@ -331,7 +330,7 @@ export const layer = Layer.effectDiscard(
}),
)
yield* events.project(SessionEvent.AgentSwitched, (event) => {
if (event.seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence")
return db
.update(SessionTable)
.set({ agent: event.data.agent, time_updated: DateTime.toEpochMillis(event.data.timestamp) })
@ -340,7 +339,7 @@ export const layer = Layer.effectDiscard(
.pipe(
Effect.orDie,
Effect.andThen(run(db, event)),
Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.seq)),
Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq)),
)
})
yield* events.project(SessionEvent.ModelSwitched, (event) =>
@ -352,9 +351,9 @@ export const layer = Layer.effectDiscard(
.run()
.pipe(Effect.orDie)
yield* run(db, event)
if (event.seq === undefined)
return yield* Effect.die("Synchronized Session event is missing aggregate sequence")
yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.seq)
if (event.durable === undefined)
return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq)
}),
)
yield* events.project(SessionEvent.Prompted, (event) =>
@ -368,24 +367,24 @@ export const layer = Layer.effectDiscard(
.pipe(Effect.orDie)
if (existing) return yield* Effect.die(new PromptAlreadyProjected())
yield* run(db, event)
if (event.seq === undefined)
return yield* Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined)
return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* SessionInput.projectLegacyPrompted(db, {
id: messageID,
sessionID: event.data.sessionID,
prompt: event.data.prompt,
delivery: event.data.delivery,
timeCreated: event.data.timestamp,
promotedSeq: event.seq,
promotedSeq: event.durable.seq,
})
}),
)
yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) =>
Effect.gen(function* () {
if (event.seq === undefined)
return yield* Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined)
return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* SessionInput.projectAdmitted(db, {
admittedSeq: event.seq,
admittedSeq: event.durable.seq,
id: event.data.messageID,
sessionID: event.data.sessionID,
prompt: event.data.prompt,
@ -396,8 +395,8 @@ export const layer = Layer.effectDiscard(
)
yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) =>
Effect.gen(function* () {
if (event.seq === undefined)
return yield* Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined)
return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* insertMessage(
db,
event,
@ -406,18 +405,14 @@ export const layer = Layer.effectDiscard(
sessionID: event.data.sessionID,
prompt: event.data.prompt,
timeCreated: event.data.timeCreated,
promotedSeq: event.seq,
promotedSeq: event.durable.seq,
}),
)
}),
)
yield* events.project(SessionEvent.InterruptRequested, () => Effect.void)
yield* events.project(SessionEvent.ContextUpdated, (event) => {
if (!event.replay || event.seq === undefined) return run(db, event)
return run(db, event).pipe(
Effect.andThen(SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.seq)),
)
})
// TODO: Reconstruct context epoch replacement state during replay without adding replay state to every EventV2 payload.
yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event))
yield* events.project(SessionEvent.Synthetic, (event) => run(db, event))
yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event))
yield* events.project(SessionEvent.Shell.Ended, (event) => run(db, event))
@ -436,9 +431,9 @@ export const layer = Layer.effectDiscard(
yield* events.project(SessionEvent.Reasoning.Ended, (event) => run(db, event))
// yield* events.project(SessionEvent.Retried, (event) => run(db, event))
yield* events.project(SessionEvent.Compaction.Ended, (event) => {
if (event.version === 1) return Effect.void
const seq = event.seq
if (seq === undefined) return Effect.die("Synchronized Session event is missing aggregate sequence")
if (event.durable === undefined) return Effect.die("Durable Session event is missing aggregate sequence")
if (event.durable.version === 1) return Effect.void
const seq = event.durable.seq
return Effect.gen(function* () {
yield* run(db, event)
yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, seq)

View File

@ -502,7 +502,7 @@ export type WithParts = {
}
const options = {
sync: {
durable: {
aggregate: "sessionID",
version: 1,
},

View File

@ -30,7 +30,7 @@ const Message = EventV2.define({
const SyncMessage = EventV2.define({
type: "test.sync",
sync: {
durable: {
version: 1,
aggregate: "id",
},
@ -42,7 +42,7 @@ const SyncMessage = EventV2.define({
const SyncSent = EventV2.define({
type: "test.sent",
sync: {
durable: {
version: 1,
aggregate: "messageID",
},
@ -61,7 +61,7 @@ const GlobalMessage = EventV2.define({
const VersionedMessage = EventV2.define({
type: "test.versioned",
sync: {
durable: {
version: 2,
aggregate: "id",
},
@ -73,7 +73,7 @@ const VersionedMessage = EventV2.define({
const SyncTimestamp = EventV2.define({
type: "test.timestamp",
sync: {
durable: {
version: 1,
aggregate: "id",
},
@ -132,7 +132,7 @@ describe("EventV2", () => {
const event = yield* events.publish(VersionedMessage, { id: "one", text: "hello" })
expect(event.type).toBe("test.versioned")
expect(event.version).toBe(2)
expect(event.durable?.version).toBe(2)
}),
)
@ -146,12 +146,12 @@ describe("EventV2", () => {
Effect.sync(() => {
const latest = EventV2.define({
type: "test.out-of-order",
sync: { version: 2, aggregate: "id" },
durable: { version: 2, aggregate: "id" },
schema: { id: Schema.String },
})
EventV2.define({
type: "test.out-of-order",
sync: { version: 1, aggregate: "id" },
durable: { version: 1, aggregate: "id" },
schema: { id: Schema.String },
})
@ -190,7 +190,7 @@ describe("EventV2", () => {
}),
)
it.effect("commits local operational state inside a new synchronized event transaction", () =>
it.effect("commits local operational state inside a new durable event transaction", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const received = new Array<string>()
@ -207,7 +207,7 @@ describe("EventV2", () => {
}),
)
it.effect("rolls back the synchronized event and projector when the local commit fails", () =>
it.effect("rolls back the durable event and projector when the local commit fails", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const { db } = yield* Database.Service
@ -236,7 +236,7 @@ describe("EventV2", () => {
const events = yield* EventV2.Service
const exit = yield* events.publish(Message, { text: "hello" }, { commit: () => Effect.void }).pipe(Effect.exit)
expect(String(exit)).toContain("Local commit hooks require a synchronized event")
expect(String(exit)).toContain("Local commit hooks require a durable event")
}),
)
@ -290,7 +290,6 @@ describe("EventV2", () => {
Effect.gen(function* () {
const events = yield* EventV2.Service
const received = new Array<string>()
yield* events.sync(() => Effect.die("sync defect"))
yield* events.listen(() => {
throw new Error("listener defect")
})
@ -303,7 +302,7 @@ describe("EventV2", () => {
const event = yield* events.publish(SyncMessage, { id: "one", text: "hello" })
expect(received).toEqual([SyncMessage.type])
expect(event.seq).toBeNumber()
expect(event.durable?.seq).toBeNumber()
}),
)
@ -336,49 +335,7 @@ describe("EventV2", () => {
}),
)
it.effect("does not synchronize live-only events", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const synchronized = new Array<string>()
const unsubscribe = yield* events.sync((event) =>
Effect.sync(() => {
synchronized.push(event.type)
}),
)
yield* Effect.addFinalizer(() => unsubscribe)
yield* events.publish(Message, { text: "live only" })
yield* events.publish(SyncMessage, { id: "one", text: "durable" })
expect(synchronized).toEqual([SyncMessage.type])
}),
)
it.effect("synchronizes only after the durable event commits", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const { db } = yield* Database.Service
const synchronized = new Array<boolean>()
yield* events.sync((event) =>
db
.select({ id: EventTable.id })
.from(EventTable)
.where(eq(EventTable.id, event.id))
.get()
.pipe(
Effect.orDie,
Effect.map((row) => synchronized.push(row !== undefined)),
Effect.asVoid,
),
)
yield* events.publish(SyncMessage, { id: EventV2.ID.create(), text: "durable" })
expect(synchronized).toEqual([true])
}),
)
it.effect("inserts sync event rows on publish", () =>
it.effect("inserts durable event rows on publish", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const { db } = yield* Database.Service
@ -398,7 +355,7 @@ describe("EventV2", () => {
}),
)
it.effect("increments sync event seq per aggregate", () =>
it.effect("increments durable event seq per aggregate", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const { db } = yield* Database.Service
@ -417,22 +374,22 @@ describe("EventV2", () => {
}),
)
it.effect("replays durable aggregate events after a cursor and tails new events", () =>
it.effect("replays durable aggregate events after a sequence and tails new events", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const aggregateID = EventV2.ID.create()
yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" })
yield* events.publish(SyncMessage, { id: aggregateID, text: "one" })
const fiber = yield* events
.aggregateEvents({ aggregateID, after: EventV2.Cursor.make(0) })
.durable({ aggregateID, after: 0 })
.pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
yield* events.publish(SyncMessage, { id: aggregateID, text: "two" })
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual([
[EventV2.Cursor.make(1), { id: aggregateID, text: "one" }],
[EventV2.Cursor.make(2), { id: aggregateID, text: "two" }],
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual([
[1, { id: aggregateID, text: "one" }],
[2, { id: aggregateID, text: "two" }],
])
}),
)
@ -443,19 +400,19 @@ describe("EventV2", () => {
const aggregateID = EventV2.ID.create()
yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" })
const fiber = yield* events
.aggregateEvents({ aggregateID })
.durable({ aggregateID })
.pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped)
yield* events.publish(SyncMessage, { id: aggregateID, text: "one" })
expect(
Array.from(yield* Fiber.join(fiber)).map((event) => [
event.cursor,
(event.event.data as { text: string }).text,
event.durable?.seq,
(event.data as { text: string }).text,
]),
).toEqual([
[EventV2.Cursor.make(0), "zero"],
[EventV2.Cursor.make(1), "one"],
[0, "zero"],
[1, "one"],
])
}),
)
@ -477,7 +434,7 @@ describe("EventV2", () => {
const events = yield* EventV2.Service
const aggregateID = EventV2.ID.create()
const fiber = yield* events
.aggregateEvents({ aggregateID })
.durable({ aggregateID })
.pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
yield* Deferred.await(readStarted)
@ -485,8 +442,8 @@ describe("EventV2", () => {
yield* events.publish(SyncMessage, { id: aggregateID, text: "during handoff" })
yield* Deferred.succeed(continueRead, undefined)
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual([
[EventV2.Cursor.make(0), { id: aggregateID, text: "during handoff" }],
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual([
[0, { id: aggregateID, text: "during handoff" }],
])
}).pipe(Effect.provide(Layer.mergeAll(database, eventLayer)))
}),
@ -498,7 +455,7 @@ describe("EventV2", () => {
const aggregateID = EventV2.ID.create()
const count = 64
const fiber = yield* events
.aggregateEvents({ aggregateID })
.durable({ aggregateID })
.pipe(Stream.take(count), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
@ -506,9 +463,9 @@ describe("EventV2", () => {
yield* events.publish(SyncMessage, { id: aggregateID, text: String(index) })
}
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual(
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual(
Array.from({ length: count }, (_, index) => [
EventV2.Cursor.make(index),
index,
{ id: aggregateID, text: String(index) },
]),
)
@ -520,14 +477,14 @@ describe("EventV2", () => {
const events = yield* EventV2.Service
const aggregateID = EventV2.ID.create()
const fiber = yield* events
.aggregateEvents({ aggregateID })
.durable({ aggregateID })
.pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
yield* Effect.yieldNow
yield* events.publish(Message, { text: "live only" })
yield* events.publish(SyncMessage, { id: aggregateID, text: "durable" })
expect(Array.from(yield* Fiber.join(fiber)).map((event) => event.event.type)).toEqual([SyncMessage.type])
expect(Array.from(yield* Fiber.join(fiber)).map((event) => event.type)).toEqual([SyncMessage.type])
}),
)
@ -550,7 +507,7 @@ describe("EventV2", () => {
}),
)
it.effect("replays sync events through projectors", () =>
it.effect("replays durable events through projectors", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const received = new Array<EventV2.Payload>()
@ -706,7 +663,7 @@ describe("EventV2", () => {
})
.pipe(Effect.exit)
expect(String(exit)).toContain("Unknown sync event type")
expect(String(exit)).toContain("Unknown durable event type")
}),
)
@ -843,7 +800,7 @@ describe("EventV2", () => {
const replayed = {
id: published.id,
type: EventV2.versionedType(SyncMessage.type, 1),
seq: published.seq!,
seq: published.durable!.seq,
aggregateID,
data: published.data,
}
@ -988,7 +945,7 @@ describe("EventV2", () => {
yield* events.replay(replayed, { publish: true })
yield* events.replay(replayed, { publish: true })
expect(received).toMatchObject([{ id: replayed.id, seq: 0, data: replayed.data }])
expect(received).toMatchObject([{ id: replayed.id, durable: { seq: 0, version: 1 }, data: replayed.data }])
}),
)
@ -1110,7 +1067,7 @@ describe("EventV2", () => {
}),
)
it.effect("remove clears sync event sequence", () =>
it.effect("remove clears durable event sequence", () =>
Effect.gen(function* () {
const events = yield* EventV2.Service
const received = new Array<EventV2.Payload>()

View File

@ -220,8 +220,8 @@ describe("SessionV2.create", () => {
expect(
Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(2), Stream.runCollect)),
).toMatchObject([
{ cursor: 1, event: { type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } } },
{ cursor: 2, event: { type: "session.next.prompt.promoted" } },
{ durable: { seq: 1 }, type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } },
{ durable: { seq: 2 }, type: "session.next.prompt.promoted" },
])
}),
)
@ -355,7 +355,7 @@ describe("SessionV2.create", () => {
expect(yield* session.get(created.id)).toMatchObject({ model })
expect(
Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(1), Stream.runCollect)),
).toMatchObject([{ event: { type: "session.next.model.switched", data: { model } } }])
).toMatchObject([{ type: "session.next.model.switched", data: { model } }])
}),
)

View File

@ -162,7 +162,7 @@ describe("SessionProjector", () => {
expect(
yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie),
).toMatchObject({ promoted_seq: event.seq })
).toMatchObject({ promoted_seq: event.durable?.seq })
}),
)
@ -334,134 +334,6 @@ describe("SessionProjector", () => {
}),
)
it.effect("rejects a Prompted event that conflicts with an admitted inbox row", () =>
Effect.gen(function* () {
const { db } = yield* Database.Service
yield* db
.insert(ProjectTable)
.values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] })
.run()
.pipe(Effect.orDie)
yield* db
.insert(SessionTable)
.values({
id: sessionID,
project_id: Project.ID.global,
slug: "test",
directory: "/project",
title: "test",
version: "test",
})
.run()
.pipe(Effect.orDie)
const events = yield* EventV2.Service
const id = SessionMessage.ID.make("msg_conflict")
yield* SessionInput.admit(db, events, {
id,
sessionID,
prompt: new Prompt({ text: "admitted" }),
delivery: "steer",
})
const exit = yield* events
.publish(SessionEvent.Prompted, {
sessionID,
messageID: id,
timestamp: created,
prompt: new Prompt({ text: "different" }),
delivery: "steer",
})
.pipe(Effect.exit)
expect(String(exit)).toContain("SessionInput.LifecycleConflict")
expect(
yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie),
).toMatchObject({ promoted_seq: null })
}),
)
it.effect("rejects an assistant message ID that conflicts with an admitted inbox row", () =>
Effect.gen(function* () {
const { db } = yield* Database.Service
yield* db
.insert(ProjectTable)
.values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] })
.run()
.pipe(Effect.orDie)
yield* db
.insert(SessionTable)
.values({
id: sessionID,
project_id: Project.ID.global,
slug: "test",
directory: "/project",
title: "test",
version: "test",
})
.run()
.pipe(Effect.orDie)
const events = yield* EventV2.Service
const id = SessionMessage.ID.make("msg_conflict")
yield* SessionInput.admit(db, events, {
id,
sessionID,
prompt: new Prompt({ text: "admitted" }),
delivery: "steer",
})
const exit = yield* events
.publish(SessionEvent.Step.Started, {
sessionID,
timestamp: created,
assistantMessageID: id,
agent: "build",
model,
})
.pipe(Effect.exit)
expect(String(exit)).toContain("SessionInput.LifecycleConflict")
expect(
yield* db.select().from(SessionMessageTable).where(eq(SessionMessageTable.id, id)).get().pipe(Effect.orDie),
).toBeUndefined()
}),
)
it.effect("rejects a Prompted delivery mode that conflicts with an admitted inbox row", () =>
Effect.gen(function* () {
const { db } = yield* Database.Service
yield* db
.insert(ProjectTable)
.values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] })
.run()
.pipe(Effect.orDie)
yield* db
.insert(SessionTable)
.values({
id: sessionID,
project_id: Project.ID.global,
slug: "test",
directory: "/project",
title: "test",
version: "test",
})
.run()
.pipe(Effect.orDie)
const events = yield* EventV2.Service
const id = SessionMessage.ID.make("msg_delivery_conflict")
const prompt = new Prompt({ text: "admitted" })
yield* SessionInput.admit(db, events, { id, sessionID, prompt, delivery: "queue" })
const exit = yield* events
.publish(SessionEvent.Prompted, { sessionID, messageID: id, timestamp: created, prompt, delivery: "steer" })
.pipe(Effect.exit)
expect(String(exit)).toContain("SessionInput.LifecycleConflict")
expect(
yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie),
).toMatchObject({ delivery: "queue", promoted_seq: null })
}),
)
it.effect("does not revive a stale incomplete in-memory assistant projection", () =>
Effect.gen(function* () {
const stale = new SessionMessage.Assistant({

View File

@ -177,7 +177,7 @@ describe("SessionV2.prompt", () => {
}),
)
it.effect("streams durable Session events after an aggregate cursor", () =>
it.effect("streams durable Session events after an aggregate sequence", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
@ -191,17 +191,17 @@ describe("SessionV2.prompt", () => {
yield* SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER)
const streamed = Array.from(yield* Fiber.join(fiber))
expect(streamed.map((event) => [event.cursor, event.event.type])).toEqual([
[EventV2.Cursor.make(0), "session.next.prompt.admitted"],
[EventV2.Cursor.make(1), "session.next.prompt.admitted"],
[EventV2.Cursor.make(2), "session.next.prompt.promoted"],
[EventV2.Cursor.make(3), "session.next.prompt.promoted"],
expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([
[0, "session.next.prompt.admitted"],
[1, "session.next.prompt.admitted"],
[2, "session.next.prompt.promoted"],
[3, "session.next.prompt.promoted"],
])
expect(
Array.from(
yield* session.events({ sessionID, after: streamed[0]!.cursor }).pipe(Stream.take(1), Stream.runCollect),
).map((event) => [event.cursor, event.event.type]),
).toEqual([[EventV2.Cursor.make(1), "session.next.prompt.admitted"]])
yield* session.events({ sessionID, after: streamed[0]!.durable?.seq }).pipe(Stream.take(1), Stream.runCollect),
).map((event) => [event.durable?.seq, event.type]),
).toEqual([[1, "session.next.prompt.admitted"]])
}),
)
@ -472,58 +472,6 @@ describe("SessionV2.prompt", () => {
}),
)
it.effect("rejects an input ID already used by a durable non-prompt event", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* events.publish(SessionEvent.Synthetic, {
sessionID,
messageID,
timestamp: yield* DateTime.now,
text: "Collision",
})
const failure = yield* session
.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "Collision" }), resume: false })
.pipe(Effect.flip)
expect(failure._tag).toBe("Session.PromptConflictError")
expect(yield* admitted(messageID)).toBeUndefined()
}),
)
it.effect("rejects a durable event ID reserved by an admitted prompt without poisoning promotion", () =>
Effect.gen(function* () {
yield* setup
const { db } = yield* Database.Service
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
const prompt = new Prompt({ text: "Reserved prompt" })
yield* session.prompt({ id: messageID, sessionID, prompt, resume: false })
const failure = yield* events
.publish(SessionEvent.Synthetic, {
sessionID,
messageID,
timestamp: yield* DateTime.now,
text: "Conflicting synthetic",
})
.pipe(Effect.catchDefect(Effect.succeed))
expect(String(failure)).toContain("SessionInput.LifecycleConflict")
expect(yield* admitted(messageID)).not.toHaveProperty("promotedSeq")
expect(yield* session.messages({ sessionID })).toEqual([])
yield* SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER)
expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 1 })
expect(yield* session.messages({ sessionID })).toMatchObject([
{ id: messageID, type: "user", text: "Reserved prompt" },
])
}),
)
it.effect("rejects reuse of one globally unique message ID across sessions", () =>
Effect.gen(function* () {
yield* setup

View File

@ -19,17 +19,17 @@ const capture = () => {
Effect.sync(() => {
const event = { id: EventV2.ID.create(), type: definition.type, data } as EventV2.Payload<typeof definition>
published.push({
type: definition.sync ? EventV2.versionedType(definition.type, definition.sync.version) : definition.type,
type: definition.durable
? EventV2.versionedType(definition.type, definition.durable.version)
: definition.type,
data,
})
return event
}),
subscribe: () => Stream.empty,
all: () => Stream.empty,
aggregateEvents: () => Stream.empty,
sync: () => Effect.succeed(Effect.void),
durable: () => Stream.empty,
listen: () => Effect.succeed(Effect.void),
beforeCommit: () => Effect.void,
project: () => Effect.void,
replay: () => Effect.void,
replayAll: () => Effect.succeed(undefined),

View File

@ -1355,34 +1355,6 @@ describe("SessionRunnerLLM", () => {
}),
)
it.effect("replays retained context projections while replacement is pending", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const events = yield* EventV2.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false })
requests.length = 0
response = []
yield* session.resume(sessionID)
systemBaseline = "Changed context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Second" }), resume: false })
yield* session.resume(sessionID)
yield* events.publish(SessionEvent.ModelSwitched, {
sessionID,
messageID: SessionMessage.ID.create(),
timestamp: DateTime.makeUnsafe(1),
model: { id: ModelV2.ID.make("replacement"), providerID: ProviderV2.ID.make("fake") },
})
yield* replaySessionProjection(sessionID)
systemBaseline = "Replacement context"
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Third" }), resume: false })
yield* session.resume(sessionID)
expect(requests.at(-1)?.system.map((part) => part.text)).toEqual(["Replacement context"])
}),
)
it.effect("replaces the baseline lazily after completed compaction without reopening replacement on replay", () =>
Effect.gen(function* () {
yield* setup

View File

@ -45,9 +45,9 @@ export const layer = Layer.effect(
workspace: workspaceID,
payload: { id: event.id, type: event.type, properties: event.data },
})
const sync = EventV2.registry.get(event.type)?.sync
if (sync === undefined || event.seq === undefined || event.version === undefined) return
const aggregateID = (event.data as Record<string, unknown>)[sync.aggregate]
const durable = EventV2.registry.get(event.type)?.durable
if (durable === undefined || event.durable === undefined) return
const aggregateID = (event.data as Record<string, unknown>)[durable.aggregate]
if (typeof aggregateID !== "string") return
GlobalBus.emit("event", {
directory: event.location?.directory ?? ctx?.directory,
@ -57,8 +57,8 @@ export const layer = Layer.effect(
type: "sync",
syncEvent: {
id: event.id,
type: EventV2.versionedType(event.type, event.version),
seq: event.seq,
type: EventV2.versionedType(event.type, event.durable.version),
seq: event.durable.seq,
aggregateID,
data: event.data,
},

View File

@ -16,13 +16,13 @@ const GlobalHealth = Schema.Struct({
const SyncEventSchemas = EventV2.registry
.values()
.flatMap((definition) => {
if (!definition.sync) return []
if (!definition.durable) return []
return [
Schema.Struct({
type: Schema.Literal("sync"),
id: EventV2.ID,
syncEvent: Schema.Struct({
type: Schema.Literal(EventV2.versionedType(definition.type, definition.sync.version)),
type: Schema.Literal(EventV2.versionedType(definition.type, definition.durable.version)),
id: EventV2.ID,
seq: Schema.Finite,
aggregateID: Schema.String,