chore: generate
This commit is contained in:
parent
fb43c15f88
commit
82d9cab48d
@ -100,8 +100,7 @@ export function define<const Type extends string, Fields extends Schema.Struct.F
|
||||
) {
|
||||
registry.set(input.type, definition)
|
||||
}
|
||||
if (input.durable)
|
||||
durableRegistry.set(versionedType(input.type, input.durable.version), definition)
|
||||
if (input.durable) durableRegistry.set(versionedType(input.type, input.durable.version), definition)
|
||||
return definition as Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> &
|
||||
Definition<Type, Schema.Struct<Fields>>
|
||||
}
|
||||
@ -126,10 +125,7 @@ export interface Interface {
|
||||
) => Effect.Effect<Payload<D>>
|
||||
readonly subscribe: <D extends Definition>(definition: D) => Stream.Stream<Payload<D>>
|
||||
readonly all: () => Stream.Stream<Payload>
|
||||
readonly durable: (input: {
|
||||
readonly aggregateID: string
|
||||
readonly after?: number
|
||||
}) => Stream.Stream<Payload>
|
||||
readonly durable: (input: { readonly aggregateID: string; readonly after?: number }) => Stream.Stream<Payload>
|
||||
/** @deprecated Use `all()` and consume the returned stream. */
|
||||
readonly listen: (listener: Subscriber) => Effect.Effect<Unsubscribe>
|
||||
readonly project: <D extends Definition>(definition: D, projector: Subscriber<D>) => Effect.Effect<void>
|
||||
@ -382,8 +378,7 @@ export const layerWith = (options?: LayerOptions) =>
|
||||
Effect.suspend(() => observer(event)).pipe(
|
||||
Effect.catchCauseIf(
|
||||
(cause) => !Cause.hasInterrupts(cause),
|
||||
(cause) =>
|
||||
Effect.logError("Event listener failed", { eventID: event.id, eventType: event.type, cause }),
|
||||
(cause) => Effect.logError("Event listener failed", { eventID: event.id, eventType: event.type, cause }),
|
||||
),
|
||||
)
|
||||
|
||||
@ -435,9 +430,9 @@ export const layerWith = (options?: LayerOptions) =>
|
||||
const payload = {
|
||||
id: event.id,
|
||||
type: definition.type,
|
||||
data: Schema.decodeUnknownSync(
|
||||
definition.data as Schema.Codec<unknown, unknown, never, never>,
|
||||
)(event.data),
|
||||
data: Schema.decodeUnknownSync(definition.data as Schema.Codec<unknown, unknown, never, never>)(
|
||||
event.data,
|
||||
),
|
||||
} as Payload
|
||||
const committed = yield* commitDurableEvent(payload, {
|
||||
seq: event.seq,
|
||||
@ -580,10 +575,7 @@ export const layerWith = (options?: LayerOptions) =>
|
||||
return subscription
|
||||
})
|
||||
|
||||
const durable = (input: {
|
||||
readonly aggregateID: string
|
||||
readonly after?: number
|
||||
}): Stream.Stream<Payload> =>
|
||||
const durable = (input: { readonly aggregateID: string; readonly after?: number }): Stream.Stream<Payload> =>
|
||||
Stream.unwrap(
|
||||
Effect.gen(function* () {
|
||||
const wakes = yield* subscribeDurable(input.aggregateID)
|
||||
|
||||
@ -340,9 +340,7 @@ export const layer = Layer.effect(
|
||||
result
|
||||
.get(input.sessionID)
|
||||
.pipe(Effect.as(events.durable({ aggregateID: input.sessionID, after: input.after }))),
|
||||
).pipe(
|
||||
Stream.filter((event): event is SessionEvent.DurableEvent => isDurableSessionEvent(event)),
|
||||
),
|
||||
).pipe(Stream.filter((event): event is SessionEvent.DurableEvent => isDurableSessionEvent(event))),
|
||||
prompt: Effect.fn("V2Session.prompt")((input) =>
|
||||
Effect.uninterruptible(
|
||||
Effect.gen(function* () {
|
||||
|
||||
@ -351,8 +351,7 @@ export const layer = Layer.effectDiscard(
|
||||
.run()
|
||||
.pipe(Effect.orDie)
|
||||
yield* run(db, event)
|
||||
if (event.durable === undefined)
|
||||
return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
yield* SessionContextEpoch.requestReplacement(db, event.data.sessionID, event.durable.seq)
|
||||
}),
|
||||
)
|
||||
@ -367,8 +366,7 @@ export const layer = Layer.effectDiscard(
|
||||
.pipe(Effect.orDie)
|
||||
if (existing) return yield* Effect.die(new PromptAlreadyProjected())
|
||||
yield* run(db, event)
|
||||
if (event.durable === undefined)
|
||||
return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
yield* SessionInput.projectLegacyPrompted(db, {
|
||||
id: messageID,
|
||||
sessionID: event.data.sessionID,
|
||||
@ -381,8 +379,7 @@ export const layer = Layer.effectDiscard(
|
||||
)
|
||||
yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) =>
|
||||
Effect.gen(function* () {
|
||||
if (event.durable === undefined)
|
||||
return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
yield* SessionInput.projectAdmitted(db, {
|
||||
admittedSeq: event.durable.seq,
|
||||
id: event.data.messageID,
|
||||
@ -395,8 +392,7 @@ export const layer = Layer.effectDiscard(
|
||||
)
|
||||
yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) =>
|
||||
Effect.gen(function* () {
|
||||
if (event.durable === undefined)
|
||||
return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
|
||||
yield* insertMessage(
|
||||
db,
|
||||
event,
|
||||
|
||||
@ -399,9 +399,7 @@ describe("EventV2", () => {
|
||||
const events = yield* EventV2.Service
|
||||
const aggregateID = EventV2.ID.create()
|
||||
yield* events.publish(SyncMessage, { id: aggregateID, text: "zero" })
|
||||
const fiber = yield* events
|
||||
.durable({ aggregateID })
|
||||
.pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped)
|
||||
const fiber = yield* events.durable({ aggregateID }).pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped)
|
||||
|
||||
yield* events.publish(SyncMessage, { id: aggregateID, text: "one" })
|
||||
|
||||
@ -433,9 +431,7 @@ describe("EventV2", () => {
|
||||
yield* Effect.gen(function* () {
|
||||
const events = yield* EventV2.Service
|
||||
const aggregateID = EventV2.ID.create()
|
||||
const fiber = yield* events
|
||||
.durable({ aggregateID })
|
||||
.pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
||||
const fiber = yield* events.durable({ aggregateID }).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
||||
yield* Deferred.await(readStarted)
|
||||
|
||||
pause = false
|
||||
@ -464,10 +460,7 @@ describe("EventV2", () => {
|
||||
}
|
||||
|
||||
expect(Array.from(yield* Fiber.join(fiber)).map((event) => [event.durable?.seq, event.data])).toEqual(
|
||||
Array.from({ length: count }, (_, index) => [
|
||||
index,
|
||||
{ id: aggregateID, text: String(index) },
|
||||
]),
|
||||
Array.from({ length: count }, (_, index) => [index, { id: aggregateID, text: String(index) }]),
|
||||
)
|
||||
}),
|
||||
)
|
||||
@ -476,9 +469,7 @@ describe("EventV2", () => {
|
||||
Effect.gen(function* () {
|
||||
const events = yield* EventV2.Service
|
||||
const aggregateID = EventV2.ID.create()
|
||||
const fiber = yield* events
|
||||
.durable({ aggregateID })
|
||||
.pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
||||
const fiber = yield* events.durable({ aggregateID }).pipe(Stream.take(1), Stream.runCollect, Effect.forkScoped)
|
||||
yield* Effect.yieldNow
|
||||
|
||||
yield* events.publish(Message, { text: "live only" })
|
||||
|
||||
@ -199,7 +199,9 @@ describe("SessionV2.prompt", () => {
|
||||
])
|
||||
expect(
|
||||
Array.from(
|
||||
yield* session.events({ sessionID, after: streamed[0]!.durable?.seq }).pipe(Stream.take(1), Stream.runCollect),
|
||||
yield* session
|
||||
.events({ sessionID, after: streamed[0]!.durable?.seq })
|
||||
.pipe(Stream.take(1), Stream.runCollect),
|
||||
).map((event) => [event.durable?.seq, event.type]),
|
||||
).toEqual([[1, "session.next.prompt.admitted"]])
|
||||
}),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user