import { describe, expect } from "bun:test" import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect" import { EventV2 } from "@opencode-ai/core/event" import { Database } from "@opencode-ai/core/database/database" import { EventSequenceTable, EventTable } from "@opencode-ai/core/event/sql" import { Location } from "@opencode-ai/core/location" import { AbsolutePath } from "@opencode-ai/core/schema" import { WorkspaceV2 } from "@opencode-ai/core/workspace" import { V2Schema } from "@opencode-ai/core/v2-schema" import { eq } from "drizzle-orm" import { location } from "./fixture/location" import { testEffect } from "./lib/effect" const locationLayer = Layer.succeed( Location.Service, Location.Service.of( location({ directory: AbsolutePath.make("project"), workspaceID: WorkspaceV2.ID.make("wrk_test") }), ), ) const eventLayer = Layer.mergeAll(EventV2.defaultLayer, Database.defaultLayer) const it = testEffect(eventLayer.pipe(Layer.provideMerge(locationLayer))) const itWithoutLocation = testEffect(eventLayer) const Message = EventV2.define({ type: "test.message", schema: { text: Schema.String, }, }) const SyncMessage = EventV2.define({ type: "test.sync", sync: { version: 1, aggregate: "id", }, schema: { id: Schema.String, text: Schema.String, }, }) const SyncSent = EventV2.define({ type: "test.sent", sync: { version: 1, aggregate: "messageID", }, schema: { messageID: Schema.String, text: Schema.String, }, }) const GlobalMessage = EventV2.define({ type: "test.global", schema: { text: Schema.String, }, }) const VersionedMessage = EventV2.define({ type: "test.versioned", sync: { version: 2, aggregate: "id", }, schema: { id: Schema.String, text: Schema.String, }, }) const SyncTimestamp = EventV2.define({ type: "test.timestamp", sync: { version: 1, aggregate: "id", }, schema: { id: Schema.String, timestamp: V2Schema.DateTimeUtcFromMillis, }, }) describe("EventV2", () => { it.effect("derives stable namespaced external IDs", () => Effect.sync(() => { const input = { namespace: "opencord.agent-input", key: "input-1" } expect(EventV2.ID.fromExternal(input)).toBe(EventV2.ID.fromExternal(input)) expect(EventV2.ID.fromExternal(input)).toMatch(/^evt_[a-f0-9]{64}$/) expect(EventV2.ID.fromExternal({ ...input, namespace: "another-app" })).not.toBe(EventV2.ID.fromExternal(input)) expect(EventV2.ID.fromExternal({ namespace: "a:b", key: "c" })).not.toBe( EventV2.ID.fromExternal({ namespace: "a", key: "b:c" }), ) }), ) it.effect("publishes events with the current location", () => Effect.gen(function* () { const events = yield* EventV2.Service const fiber = yield* events.subscribe(Message).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow const event = yield* events.publish(Message, { text: "hello" }) const received = Array.from(yield* Fiber.join(fiber)) expect(received).toEqual([event]) expect(event.type).toBe("test.message") expect(event).not.toHaveProperty("version") expect(event.data).toEqual({ text: "hello" }) expect(event.location).toEqual({ directory: AbsolutePath.make("project"), workspaceID: WorkspaceV2.ID.make("wrk_test"), }) }), ) itWithoutLocation.effect("omits location when no location is available", () => Effect.gen(function* () { const events = yield* EventV2.Service const event = yield* events.publish(GlobalMessage, { text: "hello" }) expect(event).not.toHaveProperty("location") expect(event.type).toBe("test.global") }), ) it.effect("publishes definition version", () => Effect.gen(function* () { const events = yield* EventV2.Service const event = yield* events.publish(VersionedMessage, { id: "one", text: "hello" }) expect(event.type).toBe("test.versioned") expect(event.version).toBe(2) }), ) it.effect("stores definitions in the exported registry", () => Effect.sync(() => { expect(EventV2.registry.get(Message.type)).toBe(Message) }), ) it.effect("keeps the latest sync definition in the registry", () => Effect.sync(() => { const latest = EventV2.define({ type: "test.out-of-order", sync: { version: 2, aggregate: "id" }, schema: { id: Schema.String }, }) EventV2.define({ type: "test.out-of-order", sync: { version: 1, aggregate: "id" }, schema: { id: Schema.String }, }) expect(EventV2.registry.get("test.out-of-order")).toBe(latest) }), ) it.effect("publishes to typed and wildcard subscriptions", () => Effect.gen(function* () { const events = yield* EventV2.Service const typed = yield* events.subscribe(Message).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) const wildcard = yield* events.all().pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow const event = yield* events.publish(Message, { text: "hello" }) expect(Array.from(yield* Fiber.join(typed))).toEqual([event]) expect(Array.from(yield* Fiber.join(wildcard))).toEqual([event]) }), ) it.effect("runs projectors inline", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() yield* events.project(SyncMessage, (event) => Effect.sync(() => { received.push(event) }), ) const event = yield* events.publish(SyncMessage, { id: "one", text: "hello" }) yield* events.publish(SyncMessage, { id: "one", text: "after unsubscribe" }) expect(received[0]).toEqual(event) expect(received[1]?.data).toEqual({ id: "one", text: "after unsubscribe" }) }), ) it.effect("commits local operational state inside a new synchronized event transaction", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() const aggregateID = EventV2.ID.create() yield* events.project(SyncMessage, () => Effect.sync(() => received.push("projector"))) yield* events.publish( SyncMessage, { id: aggregateID, text: "hello" }, { commit: (seq) => Effect.sync(() => received.push(`commit:${seq}`)) }, ) expect(received).toEqual(["projector", "commit:0"]) }), ) it.effect("rolls back the synchronized event and projector when the local commit fails", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* db.run("CREATE TABLE IF NOT EXISTS event_commit_probe (value text NOT NULL)") yield* db.run("DELETE FROM event_commit_probe") yield* events.project(SyncMessage, () => db.run("INSERT INTO event_commit_probe (value) VALUES ('projected')").pipe(Effect.orDie, Effect.asVoid), ) const exit = yield* events .publish(SyncMessage, { id: aggregateID, text: "hello" }, { commit: () => Effect.die("commit failed") }) .pipe(Effect.exit) expect(String(exit)).toContain("commit failed") expect(yield* db.all("SELECT value FROM event_commit_probe")).toEqual([]) expect(yield* db.select().from(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).all()).toEqual([]) expect( yield* db.select().from(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).all(), ).toEqual([]) }), ) it.effect("rejects local commit hooks on live-only events", () => Effect.gen(function* () { const events = yield* EventV2.Service const exit = yield* events.publish(Message, { text: "hello" }, { commit: () => Effect.void }).pipe(Effect.exit) expect(String(exit)).toContain("Local commit hooks require a synchronized event") }), ) it.effect("runs projectors before publishing to streams", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() const fiber = yield* events.all().pipe( Stream.take(1), Stream.runForEach(() => Effect.sync(() => received.push("stream"))), Effect.forkScoped, ) yield* events.project(SyncMessage, (event) => Effect.sync(() => { received.push(event.type) }), ) yield* Effect.yieldNow yield* events.publish(SyncMessage, { id: "one", text: "hello" }) yield* Fiber.join(fiber) expect(received).toEqual([SyncMessage.type, "stream"]) }), ) it.effect("runs listeners inline after projectors", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() yield* events.project(SyncMessage, () => Effect.sync(() => { received.push("projector") }), ) const unsubscribe = yield* events.listen(() => Effect.sync(() => { received.push("listener") }), ) yield* events.publish(SyncMessage, { id: "one", text: "hello" }) yield* unsubscribe yield* events.publish(SyncMessage, { id: "one", text: "after unsubscribe" }) expect(received).toEqual(["projector", "listener", "projector"]) }), ) it.effect("isolates observer defects after durable events commit", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() yield* events.sync(() => Effect.die("sync defect")) yield* events.listen(() => { throw new Error("listener defect") }) yield* events.listen((event) => Effect.sync(() => { received.push(event.type) }), ) const event = yield* events.publish(SyncMessage, { id: "one", text: "hello" }) expect(received).toEqual([SyncMessage.type]) expect(event.seq).toBeNumber() }), ) it.effect("preserves observer interruption", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service yield* events.listen(() => Effect.interrupt) const exit = yield* events.publish(SyncMessage, { id: "interrupted", text: "hello" }).pipe(Effect.exit) const committed = yield* db .select({ id: EventTable.id }) .from(EventTable) .where(eq(EventTable.aggregate_id, "interrupted")) .get() .pipe(Effect.orDie) expect(Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)).toBeTrue() expect(committed).toBeDefined() }), ) it.effect("keeps live-only listener defects fail-fast", () => Effect.gen(function* () { const events = yield* EventV2.Service const defect = new Error("listener defect") yield* events.listen(() => Effect.die(defect)) expect(yield* events.publish(Message, { text: "hello" }).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect) }), ) it.effect("does not synchronize live-only events", () => Effect.gen(function* () { const events = yield* EventV2.Service const synchronized = new Array() const unsubscribe = yield* events.sync((event) => Effect.sync(() => { synchronized.push(event.type) }), ) yield* Effect.addFinalizer(() => unsubscribe) yield* events.publish(Message, { text: "live only" }) yield* events.publish(SyncMessage, { id: "one", text: "durable" }) expect(synchronized).toEqual([SyncMessage.type]) }), ) it.effect("synchronizes only after the durable event commits", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const synchronized = new Array() yield* events.sync((event) => db .select({ id: EventTable.id }) .from(EventTable) .where(eq(EventTable.id, event.id)) .get() .pipe( Effect.orDie, Effect.map((row) => synchronized.push(row !== undefined)), Effect.asVoid, ), ) yield* events.publish(SyncMessage, { id: EventV2.ID.create(), text: "durable" }) expect(synchronized).toEqual([true]) }), ) it.effect("inserts sync event rows on publish", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "first" }) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) expect(rows).toHaveLength(1) expect(rows[0]?.type).toBe(EventV2.versionedType(SyncMessage.type, 1)) expect(rows[0]?.aggregate_id).toBe(aggregateID) }), ) it.effect("increments sync event seq per aggregate", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "first" }) yield* events.publish(SyncMessage, { id: aggregateID, text: "second" }) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) expect(rows.map((row) => row.seq)).toEqual([0, 1]) }), ) it.effect("replays durable aggregate events after a cursor and tails new events", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" }) yield* events.publish(SyncMessage, { id: aggregateID, text: "one" }) const fiber = yield* events .aggregateEvents({ aggregateID, after: EventV2.Cursor.make(0) }) .pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* events.publish(SyncMessage, { id: aggregateID, text: "two" }) expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual([ [EventV2.Cursor.make(1), { id: aggregateID, text: "one" }], [EventV2.Cursor.make(2), { id: aggregateID, text: "two" }], ]) }), ) it.effect("catches durable aggregate events published during replay handoff", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" }) const fiber = yield* events .aggregateEvents({ aggregateID }) .pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) yield* events.publish(SyncMessage, { id: aggregateID, text: "one" }) expect( Array.from(yield* Fiber.join(fiber)).map((event) => [ event.cursor, (event.event.data as { text: string }).text, ]), ).toEqual([ [EventV2.Cursor.make(0), "zero"], [EventV2.Cursor.make(1), "one"], ]) }), ) it.effect("retains a durable wake committed while historical replay is paused", () => Effect.gen(function* () { const readStarted = yield* Deferred.make() const continueRead = yield* Deferred.make() let pause = true const database = Database.layerFromPath(":memory:") const eventLayer = EventV2.layerWith({ beforeAggregateRead: () => pause ? Deferred.succeed(readStarted, undefined).pipe(Effect.andThen(Deferred.await(continueRead))) : Effect.void, }).pipe(Layer.provide(database)) yield* Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const fiber = yield* events .aggregateEvents({ aggregateID }) .pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) yield* Deferred.await(readStarted) pause = false yield* events.publish(SyncMessage, { id: aggregateID, text: "during handoff" }) yield* Deferred.succeed(continueRead, undefined) expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual([ [EventV2.Cursor.make(0), { id: aggregateID, text: "during handoff" }], ]) }).pipe(Effect.provide(Layer.mergeAll(database, eventLayer))) }), ) it.effect("coalesces durable aggregate wakes while draining every committed event", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const count = 64 const fiber = yield* events .aggregateEvents({ aggregateID }) .pipe(Stream.take(count), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow for (let index = 0; index < count; index++) { yield* events.publish(SyncMessage, { id: aggregateID, text: String(index) }) } expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.cursor, event.event.data])).toEqual( Array.from({ length: count }, (_, index) => [ EventV2.Cursor.make(index), { id: aggregateID, text: String(index) }, ]), ) }), ) it.effect("omits live-only events from durable aggregate streams", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const fiber = yield* events .aggregateEvents({ aggregateID }) .pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* events.publish(Message, { text: "live only" }) yield* events.publish(SyncMessage, { id: aggregateID, text: "durable" }) expect(Array.from(yield* Fiber.join(fiber)).map((event) => event.event.type)).toEqual([SyncMessage.type]) }), ) it.effect("uses custom sync aggregate field", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncSent, { messageID: aggregateID, text: "sent" }) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) expect(rows).toHaveLength(1) expect(rows[0]?.aggregate_id).toBe(aggregateID) }), ) it.effect("replays sync events through projectors", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() yield* events.project(SyncMessage, (event) => Effect.sync(() => { received.push(event) }), ) const aggregateID = EventV2.ID.create() yield* events.replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "hello" }, }) expect(received[0]?.type).toBe(SyncMessage.type) expect(received[0]?.data).toEqual({ id: aggregateID, text: "hello" }) }), ) it.effect("replay inserts external event rows", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "replayed" }, }) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) expect(rows).toHaveLength(1) expect(rows[0]?.aggregate_id).toBe(aggregateID) }), ) it.effect( "replay rejects an envelope aggregate that differs from its payload without mutating the payload aggregate", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const envelopeAggregateID = EventV2.ID.create() const payloadAggregateID = EventV2.ID.create() const received = new Array() yield* events.publish(SyncMessage, { id: payloadAggregateID, text: "seed" }) yield* events.project(SyncMessage, (event) => Effect.sync(() => { received.push(event) }), ) const exit = yield* events .replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID: envelopeAggregateID, data: { id: payloadAggregateID, text: "replayed" }, }) .pipe(Effect.exit) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, payloadAggregateID)) .all() .pipe(Effect.orDie) const sequence = yield* db .select({ seq: EventSequenceTable.seq }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, payloadAggregateID)) .get() .pipe(Effect.orDie) expect(String(exit)).toContain("Aggregate mismatch") expect(received).toHaveLength(0) expect(rows).toHaveLength(1) expect(sequence).toEqual({ seq: 0 }) }), ) it.effect("replay defects on sequence mismatch", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() yield* events.replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "first" }, }) const exit = yield* events .replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 5, aggregateID, data: { id: aggregateID, text: "bad" }, }) .pipe(Effect.exit) expect(String(exit)).toContain("Sequence mismatch") }), ) it.effect("replay decodes synchronized transformed values before projection", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const received = new Array() yield* events.project(SyncTimestamp, (event) => Effect.sync(() => { received.push(event) }), ) yield* events.replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncTimestamp.type, 1), seq: 0, aggregateID, data: { id: aggregateID, timestamp: 0 }, }) expect(received[0]?.data.timestamp).toEqual(DateTime.makeUnsafe(0)) }), ) it.effect("replay defects on unknown event type", () => Effect.gen(function* () { const events = yield* EventV2.Service const exit = yield* events .replay({ id: EventV2.ID.create(), type: "unknown.event.1", seq: 0, aggregateID: EventV2.ID.create(), data: {}, }) .pipe(Effect.exit) expect(String(exit)).toContain("Unknown sync event type") }), ) it.effect("replayAll validates contiguous aggregate events", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const source = yield* events.replayAll([ { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "one" }, }, { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "two" }, }, ]) expect(source).toBe(aggregateID) }), ) it.effect("replayAll accepts later chunks after the first batch", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() const one = yield* events.replayAll([ { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "one" }, }, { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "two" }, }, ]) const two = yield* events.replayAll([ { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 2, aggregateID, data: { id: aggregateID, text: "three" }, }, { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 3, aggregateID, data: { id: aggregateID, text: "four" }, }, ]) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) expect(one).toBe(aggregateID) expect(two).toBe(aggregateID) expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3]) }), ) it.effect("claim fences replay owners", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "seed" }) yield* events.claim(aggregateID, "owner-a") yield* events.project(SyncMessage, (event) => Effect.sync(() => { received.push(event) }), ) yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "ignored" }, }, { ownerID: "owner-b" }, ) expect(received).toHaveLength(0) }), ) it.effect("strict owner fences exact replay", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const id = EventV2.ID.create() const replayed = { id, type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "owned" }, } yield* events.replay(replayed, { ownerID: "owner-a" }) const exit = yield* events.replay(replayed, { ownerID: "owner-b", strictOwner: true }).pipe(Effect.exit) expect(String(exit)).toContain("Replay owner mismatch") }), ) it.effect("exact replay claims an unowned aggregate", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() const published = yield* events.publish(SyncMessage, { id: aggregateID, text: "owned" }) const replayed = { id: published.id, type: EventV2.versionedType(SyncMessage.type, 1), seq: published.seq!, aggregateID, data: published.data, } yield* events.replay(replayed, { ownerID: "owner-a", strictOwner: true }) const row = yield* db .select({ ownerID: EventSequenceTable.owner_id }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, aggregateID)) .get() .pipe(Effect.orDie) expect(row?.ownerID).toBe("owner-a") const exit = yield* events .replay( { ...replayed, id: EventV2.ID.create(), seq: 1, data: { id: aggregateID, text: "conflict" } }, { ownerID: "owner-b", strictOwner: true }, ) .pipe(Effect.exit) expect(String(exit)).toContain("Replay owner mismatch") }), ) it.effect("replay with owner claims an unowned sequence", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "owned" }, }, { ownerID: "owner-1" }, ) const row = yield* db .select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, aggregateID)) .get() .pipe(Effect.orDie) expect(row).toEqual({ seq: 0, ownerID: "owner-1" }) }), ) it.effect("replay claims an existing unowned sequence before fencing a different owner", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "local" }) yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "claimed" }, }, { ownerID: "owner-1" }, ) yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 2, aggregateID, data: { id: aggregateID, text: "fenced" }, }, { ownerID: "owner-2" }, ) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) const sequence = yield* db .select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, aggregateID)) .get() .pipe(Effect.orDie) expect(rows.map((row) => row.seq)).toEqual([0, 1]) expect(sequence).toEqual({ seq: 1, ownerID: "owner-1" }) }), ) it.effect("strict replay rejects an owner conflict instead of silently skipping it", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "claimed" }, }, { ownerID: "owner-1" }, ) const exit = yield* events .replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "conflict" }, }, { ownerID: "owner-2", strictOwner: true }, ) .pipe(Effect.exit) expect(String(exit)).toContain("Replay owner mismatch") }), ) it.effect("publishes accepted replay with its durable sequence and suppresses stale replay", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() const aggregateID = EventV2.ID.create() yield* events.listen((event) => Effect.sync(() => received.push(event))) const replayed = { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "replayed" }, } yield* events.replay(replayed, { publish: true }) yield* events.replay(replayed, { publish: true }) expect(received).toMatchObject([{ id: replayed.id, seq: 0, data: replayed.data }]) }), ) it.effect("rejects divergent stale replay without publishing it", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() const aggregateID = EventV2.ID.create() const replayed = { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "original" }, } yield* events.listen((event) => Effect.sync(() => received.push(event))) yield* events.replay(replayed, { publish: true }) const exit = yield* events .replay({ ...replayed, data: { id: aggregateID, text: "divergent" } }, { publish: true }) .pipe(Effect.exit) expect(String(exit)).toContain("Replay diverged") expect(received).toHaveLength(1) }), ) it.effect("rejects an event ID reused at another aggregate position", () => Effect.gen(function* () { const events = yield* EventV2.Service const aggregateID = EventV2.ID.create() const id = EventV2.ID.create() yield* events.replay({ id, type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "first" }, }) const exit = yield* events .replay({ id, type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "second" }, }) .pipe(Effect.exit) expect(String(exit)).toContain(`Event ${id} already exists`) }), ) it.effect("replay from a different owner leaves claimed sequence unchanged", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() const received = new Array() yield* events.listen((event) => Effect.sync(() => received.push(event))) yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "first" }, }, { ownerID: "owner-1" }, ) yield* events.replay( { id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 1, aggregateID, data: { id: aggregateID, text: "ignored" }, }, { ownerID: "owner-2", publish: true }, ) const rows = yield* db .select() .from(EventTable) .where(eq(EventTable.aggregate_id, aggregateID)) .all() .pipe(Effect.orDie) const sequence = yield* db .select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, aggregateID)) .get() .pipe(Effect.orDie) expect(rows).toHaveLength(1) expect(sequence).toEqual({ seq: 0, ownerID: "owner-1" }) expect(received).toHaveLength(0) }), ) it.effect("claim updates the event sequence owner", () => Effect.gen(function* () { const events = yield* EventV2.Service const { db } = yield* Database.Service const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "claimed" }) yield* events.claim(aggregateID, "owner-1") yield* events.claim(aggregateID, "owner-2") const row = yield* db .select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, aggregateID)) .get() .pipe(Effect.orDie) expect(row).toEqual({ seq: 0, ownerID: "owner-2" }) }), ) it.effect("remove clears sync event sequence", () => Effect.gen(function* () { const events = yield* EventV2.Service const received = new Array() const aggregateID = EventV2.ID.create() yield* events.publish(SyncMessage, { id: aggregateID, text: "seed" }) yield* events.remove(aggregateID) yield* events.project(SyncMessage, (event) => Effect.sync(() => { received.push(event) }), ) yield* events.replay({ id: EventV2.ID.create(), type: EventV2.versionedType(SyncMessage.type, 1), seq: 0, aggregateID, data: { id: aggregateID, text: "replayed" }, }) expect(received[0]?.data).toEqual({ id: aggregateID, text: "replayed" }) }), ) })