1086 lines
35 KiB
TypeScript
1086 lines
35 KiB
TypeScript
import { describe, expect } from "bun:test"
|
|
import { Cause, DateTime, Deferred, Effect, Exit, Fiber, Layer, Schema, Stream } from "effect"
|
|
import { EventV2 } from "@opencode-ai/core/event"
|
|
import { Database } from "@opencode-ai/core/database/database"
|
|
import { EventSequenceTable, EventTable } from "@opencode-ai/core/event/sql"
|
|
import { Location } from "@opencode-ai/core/location"
|
|
import { AbsolutePath } from "@opencode-ai/core/schema"
|
|
import { WorkspaceV2 } from "@opencode-ai/core/workspace"
|
|
import { V2Schema } from "@opencode-ai/core/v2-schema"
|
|
import { eq } from "drizzle-orm"
|
|
import { location } from "./fixture/location"
|
|
import { testEffect } from "./lib/effect"
|
|
|
|
const locationLayer = Layer.succeed(
|
|
Location.Service,
|
|
Location.Service.of(
|
|
location({ directory: AbsolutePath.make("project"), workspaceID: WorkspaceV2.ID.make("wrk_test") }),
|
|
),
|
|
)
|
|
const eventLayer = Layer.mergeAll(EventV2.defaultLayer, Database.defaultLayer)
|
|
const it = testEffect(eventLayer.pipe(Layer.provideMerge(locationLayer)))
|
|
const itWithoutLocation = testEffect(eventLayer)
|
|
|
|
const Message = EventV2.define({
|
|
type: "test.message",
|
|
schema: {
|
|
text: Schema.String,
|
|
},
|
|
})
|
|
|
|
const SyncMessage = EventV2.define({
|
|
type: "test.sync",
|
|
durable: {
|
|
version: 1,
|
|
aggregate: "id",
|
|
},
|
|
schema: {
|
|
id: Schema.String,
|
|
text: Schema.String,
|
|
},
|
|
})
|
|
|
|
const SyncSent = EventV2.define({
|
|
type: "test.sent",
|
|
durable: {
|
|
version: 1,
|
|
aggregate: "messageID",
|
|
},
|
|
schema: {
|
|
messageID: Schema.String,
|
|
text: Schema.String,
|
|
},
|
|
})
|
|
|
|
const GlobalMessage = EventV2.define({
|
|
type: "test.global",
|
|
schema: {
|
|
text: Schema.String,
|
|
},
|
|
})
|
|
|
|
const VersionedMessage = EventV2.define({
|
|
type: "test.versioned",
|
|
durable: {
|
|
version: 2,
|
|
aggregate: "id",
|
|
},
|
|
schema: {
|
|
id: Schema.String,
|
|
text: Schema.String,
|
|
},
|
|
})
|
|
|
|
const SyncTimestamp = EventV2.define({
|
|
type: "test.timestamp",
|
|
durable: {
|
|
version: 1,
|
|
aggregate: "id",
|
|
},
|
|
schema: {
|
|
id: Schema.String,
|
|
timestamp: V2Schema.DateTimeUtcFromMillis,
|
|
},
|
|
})
|
|
|
|
describe("EventV2", () => {
|
|
it.effect("derives stable namespaced external IDs", () =>
|
|
Effect.sync(() => {
|
|
const input = { namespace: "opencord.agent-input", key: "input-1" }
|
|
|
|
expect(EventV2.ID.fromExternal(input)).toBe(EventV2.ID.fromExternal(input))
|
|
expect(EventV2.ID.fromExternal(input)).toMatch(/^evt_[a-f0-9]{64}$/)
|
|
expect(EventV2.ID.fromExternal({ ...input, namespace: "another-app" })).not.toBe(EventV2.ID.fromExternal(input))
|
|
expect(EventV2.ID.fromExternal({ namespace: "a:b", key: "c" })).not.toBe(
|
|
EventV2.ID.fromExternal({ namespace: "a", key: "b:c" }),
|
|
)
|
|
}),
|
|
)
|
|
|
|
it.effect("publishes events with the current location", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const fiber = yield* events.subscribe(Message).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
const event = yield* events.publish(Message, { text: "hello" })
|
|
const received = Array.from(yield* Fiber.join(fiber))
|
|
|
|
expect(received).toEqual([event])
|
|
expect(event.type).toBe("test.message")
|
|
expect(event).not.toHaveProperty("version")
|
|
expect(event.data).toEqual({ text: "hello" })
|
|
expect(event.location).toEqual({
|
|
directory: AbsolutePath.make("project"),
|
|
workspaceID: WorkspaceV2.ID.make("wrk_test"),
|
|
})
|
|
}),
|
|
)
|
|
|
|
itWithoutLocation.effect("omits location when no location is available", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const event = yield* events.publish(GlobalMessage, { text: "hello" })
|
|
|
|
expect(event).not.toHaveProperty("location")
|
|
expect(event.type).toBe("test.global")
|
|
}),
|
|
)
|
|
|
|
it.effect("publishes definition version", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const event = yield* events.publish(VersionedMessage, { id: "one", text: "hello" })
|
|
|
|
expect(event.type).toBe("test.versioned")
|
|
expect(event.durable?.version).toBe(2)
|
|
}),
|
|
)
|
|
|
|
it.effect("stores definitions in the exported registry", () =>
|
|
Effect.sync(() => {
|
|
expect(EventV2.registry.get(Message.type)).toBe(Message)
|
|
}),
|
|
)
|
|
|
|
it.effect("keeps the latest sync definition in the registry", () =>
|
|
Effect.sync(() => {
|
|
const latest = EventV2.define({
|
|
type: "test.out-of-order",
|
|
durable: { version: 2, aggregate: "id" },
|
|
schema: { id: Schema.String },
|
|
})
|
|
EventV2.define({
|
|
type: "test.out-of-order",
|
|
durable: { version: 1, aggregate: "id" },
|
|
schema: { id: Schema.String },
|
|
})
|
|
|
|
expect(EventV2.registry.get("test.out-of-order")).toBe(latest)
|
|
}),
|
|
)
|
|
|
|
it.effect("publishes to typed and wildcard subscriptions", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const typed = yield* events.subscribe(Message).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
|
const wildcard = yield* events.all().pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
const event = yield* events.publish(Message, { text: "hello" })
|
|
|
|
expect(Array.from(yield* Fiber.join(typed))).toEqual([event])
|
|
expect(Array.from(yield* Fiber.join(wildcard))).toEqual([event])
|
|
}),
|
|
)
|
|
|
|
it.effect("runs projectors inline", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<EventV2.Payload>()
|
|
yield* events.project(SyncMessage, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event)
|
|
}),
|
|
)
|
|
|
|
const event = yield* events.publish(SyncMessage, { id: "one", text: "hello" })
|
|
yield* events.publish(SyncMessage, { id: "one", text: "after unsubscribe" })
|
|
|
|
expect(received[0]).toEqual(event)
|
|
expect(received[1]?.data).toEqual({ id: "one", text: "after unsubscribe" })
|
|
}),
|
|
)
|
|
|
|
it.effect("commits local operational state inside a new durable event transaction", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<string>()
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.project(SyncMessage, () => Effect.sync(() => received.push("projector")))
|
|
|
|
yield* events.publish(
|
|
SyncMessage,
|
|
{ id: aggregateID, text: "hello" },
|
|
{ commit: (seq) => Effect.sync(() => received.push(`commit:${seq}`)) },
|
|
)
|
|
|
|
expect(received).toEqual(["projector", "commit:0"])
|
|
}),
|
|
)
|
|
|
|
it.effect("rolls back the durable event and projector when the local commit fails", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* db.run("CREATE TABLE IF NOT EXISTS event_commit_probe (value text NOT NULL)")
|
|
yield* db.run("DELETE FROM event_commit_probe")
|
|
yield* events.project(SyncMessage, () =>
|
|
db.run("INSERT INTO event_commit_probe (value) VALUES ('projected')").pipe(Effect.orDie, Effect.asVoid),
|
|
)
|
|
|
|
const exit = yield* events
|
|
.publish(SyncMessage, { id: aggregateID, text: "hello" }, { commit: () => Effect.die("commit failed") })
|
|
.pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("commit failed")
|
|
expect(yield* db.all("SELECT value FROM event_commit_probe")).toEqual([])
|
|
expect(yield* db.select().from(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).all()).toEqual([])
|
|
expect(
|
|
yield* db.select().from(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).all(),
|
|
).toEqual([])
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects local commit hooks on live-only events", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const exit = yield* events.publish(Message, { text: "hello" }, { commit: () => Effect.void }).pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("Local commit hooks require a durable event")
|
|
}),
|
|
)
|
|
|
|
it.effect("runs projectors before publishing to streams", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<string>()
|
|
const fiber = yield* events.all().pipe(
|
|
Stream.take(1),
|
|
Stream.runForEach(() => Effect.sync(() => received.push("stream"))),
|
|
Effect.forkScoped,
|
|
)
|
|
yield* events.project(SyncMessage, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event.type)
|
|
}),
|
|
)
|
|
|
|
yield* Effect.yieldNow
|
|
yield* events.publish(SyncMessage, { id: "one", text: "hello" })
|
|
yield* Fiber.join(fiber)
|
|
|
|
expect(received).toEqual([SyncMessage.type, "stream"])
|
|
}),
|
|
)
|
|
|
|
it.effect("runs listeners inline after projectors", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<string>()
|
|
yield* events.project(SyncMessage, () =>
|
|
Effect.sync(() => {
|
|
received.push("projector")
|
|
}),
|
|
)
|
|
const unsubscribe = yield* events.listen(() =>
|
|
Effect.sync(() => {
|
|
received.push("listener")
|
|
}),
|
|
)
|
|
|
|
yield* events.publish(SyncMessage, { id: "one", text: "hello" })
|
|
yield* unsubscribe
|
|
yield* events.publish(SyncMessage, { id: "one", text: "after unsubscribe" })
|
|
|
|
expect(received).toEqual(["projector", "listener", "projector"])
|
|
}),
|
|
)
|
|
|
|
it.effect("isolates observer defects after durable events commit", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<string>()
|
|
yield* events.listen(() => {
|
|
throw new Error("listener defect")
|
|
})
|
|
yield* events.listen((event) =>
|
|
Effect.sync(() => {
|
|
received.push(event.type)
|
|
}),
|
|
)
|
|
|
|
const event = yield* events.publish(SyncMessage, { id: "one", text: "hello" })
|
|
|
|
expect(received).toEqual([SyncMessage.type])
|
|
expect(event.durable?.seq).toBeNumber()
|
|
}),
|
|
)
|
|
|
|
it.effect("preserves observer interruption", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
yield* events.listen(() => Effect.interrupt)
|
|
|
|
const exit = yield* events.publish(SyncMessage, { id: "interrupted", text: "hello" }).pipe(Effect.exit)
|
|
const committed = yield* db
|
|
.select({ id: EventTable.id })
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, "interrupted"))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(Exit.isFailure(exit) && Cause.hasInterrupts(exit.cause)).toBeTrue()
|
|
expect(committed).toBeDefined()
|
|
}),
|
|
)
|
|
|
|
it.effect("keeps live-only listener defects fail-fast", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const defect = new Error("listener defect")
|
|
yield* events.listen(() => Effect.die(defect))
|
|
|
|
expect(yield* events.publish(Message, { text: "hello" }).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect)
|
|
}),
|
|
)
|
|
|
|
it.effect("inserts durable event rows on publish", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "first" })
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(rows).toHaveLength(1)
|
|
expect(rows[0]?.type).toBe(EventV2.versionedType(SyncMessage.type, 1))
|
|
expect(rows[0]?.aggregate_id).toBe(aggregateID)
|
|
}),
|
|
)
|
|
|
|
it.effect("increments durable event seq per aggregate", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "first" })
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "second" })
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(rows.map((row) => row.seq)).toEqual([0, 1])
|
|
}),
|
|
)
|
|
|
|
it.effect("replays durable aggregate events after a sequence and tails new events", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" })
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "one" })
|
|
const fiber = yield* events
|
|
.durable({ aggregateID, after: 0 })
|
|
.pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "two" })
|
|
|
|
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual([
|
|
[1, { id: aggregateID, text: "one" }],
|
|
[2, { id: aggregateID, text: "two" }],
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("catches durable aggregate events published during replay handoff", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" })
|
|
const fiber = yield* events.durable({ aggregateID }).pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped)
|
|
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "one" })
|
|
|
|
expect(
|
|
Array.from(yield* Fiber.join(fiber)).map((event) => [
|
|
event.durable?.seq,
|
|
(event.data as { text: string }).text,
|
|
]),
|
|
).toEqual([
|
|
[0, "zero"],
|
|
[1, "one"],
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("retains a durable wake committed while historical replay is paused", () =>
|
|
Effect.gen(function* () {
|
|
const readStarted = yield* Deferred.make<void>()
|
|
const continueRead = yield* Deferred.make<void>()
|
|
let pause = true
|
|
const database = Database.layerFromPath(":memory:")
|
|
const eventLayer = EventV2.layerWith({
|
|
beforeAggregateRead: () =>
|
|
pause
|
|
? Deferred.succeed(readStarted, undefined).pipe(Effect.andThen(Deferred.await(continueRead)))
|
|
: Effect.void,
|
|
}).pipe(Layer.provide(database))
|
|
|
|
yield* Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const fiber = yield* events.durable({ aggregateID }).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
|
yield* Deferred.await(readStarted)
|
|
|
|
pause = false
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "during handoff" })
|
|
yield* Deferred.succeed(continueRead, undefined)
|
|
|
|
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual([
|
|
[0, { id: aggregateID, text: "during handoff" }],
|
|
])
|
|
}).pipe(Effect.provide(Layer.mergeAll(database, eventLayer)))
|
|
}),
|
|
)
|
|
|
|
it.effect("coalesces durable aggregate wakes while draining every committed event", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const count = 64
|
|
const fiber = yield* events
|
|
.durable({ aggregateID })
|
|
.pipe(Stream.take(count), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
|
|
for (let index = 0; index < count; index++) {
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: String(index) })
|
|
}
|
|
|
|
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual(
|
|
Array.from({ length: count }, (_, index) => [index, { id: aggregateID, text: String(index) }]),
|
|
)
|
|
}),
|
|
)
|
|
|
|
it.effect("omits live-only events from durable aggregate streams", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const fiber = yield* events.durable({ aggregateID }).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
|
|
yield* events.publish(Message, { text: "live only" })
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "durable" })
|
|
|
|
expect(Array.from(yield* Fiber.join(fiber)).map((event) => event.type)).toEqual([SyncMessage.type])
|
|
}),
|
|
)
|
|
|
|
it.effect("uses custom sync aggregate field", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.publish(SyncSent, { messageID: aggregateID, text: "sent" })
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(rows).toHaveLength(1)
|
|
expect(rows[0]?.aggregate_id).toBe(aggregateID)
|
|
}),
|
|
)
|
|
|
|
it.effect("replays durable events through projectors", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<EventV2.Payload>()
|
|
yield* events.project(SyncMessage, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event)
|
|
}),
|
|
)
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "hello" },
|
|
})
|
|
|
|
expect(received[0]?.type).toBe(SyncMessage.type)
|
|
expect(received[0]?.data).toEqual({ id: aggregateID, text: "hello" })
|
|
}),
|
|
)
|
|
|
|
it.effect("replay inserts external event rows", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "replayed" },
|
|
})
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(rows).toHaveLength(1)
|
|
expect(rows[0]?.aggregate_id).toBe(aggregateID)
|
|
}),
|
|
)
|
|
|
|
it.effect(
|
|
"replay rejects an envelope aggregate that differs from its payload without mutating the payload aggregate",
|
|
() =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const envelopeAggregateID = EventV2.ID.create()
|
|
const payloadAggregateID = EventV2.ID.create()
|
|
const received = new Array<EventV2.Payload>()
|
|
yield* events.publish(SyncMessage, { id: payloadAggregateID, text: "seed" })
|
|
yield* events.project(SyncMessage, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event)
|
|
}),
|
|
)
|
|
|
|
const exit = yield* events
|
|
.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID: envelopeAggregateID,
|
|
data: { id: payloadAggregateID, text: "replayed" },
|
|
})
|
|
.pipe(Effect.exit)
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, payloadAggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
const sequence = yield* db
|
|
.select({ seq: EventSequenceTable.seq })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, payloadAggregateID))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(String(exit)).toContain("Aggregate mismatch")
|
|
expect(received).toHaveLength(0)
|
|
expect(rows).toHaveLength(1)
|
|
expect(sequence).toEqual({ seq: 0 })
|
|
}),
|
|
)
|
|
|
|
it.effect("replay defects on sequence mismatch", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "first" },
|
|
})
|
|
const exit = yield* events
|
|
.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 5,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "bad" },
|
|
})
|
|
.pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("Sequence mismatch")
|
|
}),
|
|
)
|
|
|
|
it.effect("replay decodes synchronized transformed values before projection", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const received = new Array<typeof SyncTimestamp.Type>()
|
|
yield* events.project(SyncTimestamp, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event)
|
|
}),
|
|
)
|
|
|
|
yield* events.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncTimestamp.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, timestamp: 0 },
|
|
})
|
|
|
|
expect(received[0]?.data.timestamp).toEqual(DateTime.makeUnsafe(0))
|
|
}),
|
|
)
|
|
|
|
it.effect("replay defects on unknown event type", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const exit = yield* events
|
|
.replay({
|
|
id: EventV2.ID.create(),
|
|
type: "unknown.event.1",
|
|
seq: 0,
|
|
aggregateID: EventV2.ID.create(),
|
|
data: {},
|
|
})
|
|
.pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("Unknown durable event type")
|
|
}),
|
|
)
|
|
|
|
it.effect("replayAll validates contiguous aggregate events", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const source = yield* events.replayAll([
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "one" },
|
|
},
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "two" },
|
|
},
|
|
])
|
|
|
|
expect(source).toBe(aggregateID)
|
|
}),
|
|
)
|
|
|
|
it.effect("replayAll accepts later chunks after the first batch", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
const one = yield* events.replayAll([
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "one" },
|
|
},
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "two" },
|
|
},
|
|
])
|
|
const two = yield* events.replayAll([
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 2,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "three" },
|
|
},
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 3,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "four" },
|
|
},
|
|
])
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(one).toBe(aggregateID)
|
|
expect(two).toBe(aggregateID)
|
|
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
|
|
}),
|
|
)
|
|
|
|
it.effect("claim fences replay owners", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<EventV2.Payload>()
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "seed" })
|
|
yield* events.claim(aggregateID, "owner-a")
|
|
yield* events.project(SyncMessage, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event)
|
|
}),
|
|
)
|
|
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "ignored" },
|
|
},
|
|
{ ownerID: "owner-b" },
|
|
)
|
|
|
|
expect(received).toHaveLength(0)
|
|
}),
|
|
)
|
|
|
|
it.effect("strict owner fences exact replay", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const id = EventV2.ID.create()
|
|
const replayed = {
|
|
id,
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "owned" },
|
|
}
|
|
yield* events.replay(replayed, { ownerID: "owner-a" })
|
|
|
|
const exit = yield* events.replay(replayed, { ownerID: "owner-b", strictOwner: true }).pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("Replay owner mismatch")
|
|
}),
|
|
)
|
|
|
|
it.effect("exact replay claims an unowned aggregate", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const published = yield* events.publish(SyncMessage, { id: aggregateID, text: "owned" })
|
|
const replayed = {
|
|
id: published.id,
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: published.durable!.seq,
|
|
aggregateID,
|
|
data: published.data,
|
|
}
|
|
|
|
yield* events.replay(replayed, { ownerID: "owner-a", strictOwner: true })
|
|
const row = yield* db
|
|
.select({ ownerID: EventSequenceTable.owner_id })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(row?.ownerID).toBe("owner-a")
|
|
const exit = yield* events
|
|
.replay(
|
|
{ ...replayed, id: EventV2.ID.create(), seq: 1, data: { id: aggregateID, text: "conflict" } },
|
|
{ ownerID: "owner-b", strictOwner: true },
|
|
)
|
|
.pipe(Effect.exit)
|
|
expect(String(exit)).toContain("Replay owner mismatch")
|
|
}),
|
|
)
|
|
|
|
it.effect("replay with owner claims an unowned sequence", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "owned" },
|
|
},
|
|
{ ownerID: "owner-1" },
|
|
)
|
|
const row = yield* db
|
|
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(row).toEqual({ seq: 0, ownerID: "owner-1" })
|
|
}),
|
|
)
|
|
|
|
it.effect("replay claims an existing unowned sequence before fencing a different owner", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "local" })
|
|
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "claimed" },
|
|
},
|
|
{ ownerID: "owner-1" },
|
|
)
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 2,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "fenced" },
|
|
},
|
|
{ ownerID: "owner-2" },
|
|
)
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
const sequence = yield* db
|
|
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(rows.map((row) => row.seq)).toEqual([0, 1])
|
|
expect(sequence).toEqual({ seq: 1, ownerID: "owner-1" })
|
|
}),
|
|
)
|
|
|
|
it.effect("strict replay rejects an owner conflict instead of silently skipping it", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "claimed" },
|
|
},
|
|
{ ownerID: "owner-1" },
|
|
)
|
|
|
|
const exit = yield* events
|
|
.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "conflict" },
|
|
},
|
|
{ ownerID: "owner-2", strictOwner: true },
|
|
)
|
|
.pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("Replay owner mismatch")
|
|
}),
|
|
)
|
|
|
|
it.effect("publishes accepted replay with its durable sequence and suppresses stale replay", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<EventV2.Payload>()
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.listen((event) => Effect.sync(() => received.push(event)))
|
|
const replayed = {
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "replayed" },
|
|
}
|
|
|
|
yield* events.replay(replayed, { publish: true })
|
|
yield* events.replay(replayed, { publish: true })
|
|
|
|
expect(received).toMatchObject([{ id: replayed.id, durable: { seq: 0, version: 1 }, data: replayed.data }])
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects divergent stale replay without publishing it", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<EventV2.Payload>()
|
|
const aggregateID = EventV2.ID.create()
|
|
const replayed = {
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "original" },
|
|
}
|
|
yield* events.listen((event) => Effect.sync(() => received.push(event)))
|
|
yield* events.replay(replayed, { publish: true })
|
|
|
|
const exit = yield* events
|
|
.replay({ ...replayed, data: { id: aggregateID, text: "divergent" } }, { publish: true })
|
|
.pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain("Replay diverged")
|
|
expect(received).toHaveLength(1)
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects an event ID reused at another aggregate position", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const id = EventV2.ID.create()
|
|
yield* events.replay({
|
|
id,
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "first" },
|
|
})
|
|
|
|
const exit = yield* events
|
|
.replay({
|
|
id,
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "second" },
|
|
})
|
|
.pipe(Effect.exit)
|
|
|
|
expect(String(exit)).toContain(`Event ${id} already exists`)
|
|
}),
|
|
)
|
|
|
|
it.effect("replay from a different owner leaves claimed sequence unchanged", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
const received = new Array<EventV2.Payload>()
|
|
yield* events.listen((event) => Effect.sync(() => received.push(event)))
|
|
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "first" },
|
|
},
|
|
{ ownerID: "owner-1" },
|
|
)
|
|
yield* events.replay(
|
|
{
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 1,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "ignored" },
|
|
},
|
|
{ ownerID: "owner-2", publish: true },
|
|
)
|
|
const rows = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, aggregateID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
const sequence = yield* db
|
|
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(rows).toHaveLength(1)
|
|
expect(sequence).toEqual({ seq: 0, ownerID: "owner-1" })
|
|
expect(received).toHaveLength(0)
|
|
}),
|
|
)
|
|
|
|
it.effect("claim updates the event sequence owner", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const aggregateID = EventV2.ID.create()
|
|
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "claimed" })
|
|
yield* events.claim(aggregateID, "owner-1")
|
|
yield* events.claim(aggregateID, "owner-2")
|
|
const row = yield* db
|
|
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
|
|
.from(EventSequenceTable)
|
|
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
|
|
.get()
|
|
.pipe(Effect.orDie)
|
|
|
|
expect(row).toEqual({ seq: 0, ownerID: "owner-2" })
|
|
}),
|
|
)
|
|
|
|
it.effect("remove clears durable event sequence", () =>
|
|
Effect.gen(function* () {
|
|
const events = yield* EventV2.Service
|
|
const received = new Array<EventV2.Payload>()
|
|
const aggregateID = EventV2.ID.create()
|
|
yield* events.publish(SyncMessage, { id: aggregateID, text: "seed" })
|
|
yield* events.remove(aggregateID)
|
|
yield* events.project(SyncMessage, (event) =>
|
|
Effect.sync(() => {
|
|
received.push(event)
|
|
}),
|
|
)
|
|
|
|
yield* events.replay({
|
|
id: EventV2.ID.create(),
|
|
type: EventV2.versionedType(SyncMessage.type, 1),
|
|
seq: 0,
|
|
aggregateID,
|
|
data: { id: aggregateID, text: "replayed" },
|
|
})
|
|
|
|
expect(received[0]?.data).toEqual({ id: aggregateID, text: "replayed" })
|
|
}),
|
|
)
|
|
})
|