import { describe, expect } from "bun:test" import { DateTime, Effect, Layer, Schema } from "effect" import { asc, eq } from "drizzle-orm" import { Database } from "@opencode-ai/core/database/database" import { EventV2 } from "@opencode-ai/core/event" import { EventTable } from "@opencode-ai/core/event/sql" import { ModelV2 } from "@opencode-ai/core/model" import { Project } from "@opencode-ai/core/project" import { ProjectTable } from "@opencode-ai/core/project/sql" import { ProviderV2 } from "@opencode-ai/core/provider" import { AbsolutePath } from "@opencode-ai/core/schema" import { SessionV2 } from "@opencode-ai/core/session" import { SessionEvent } from "@opencode-ai/core/session/event" import { SessionMessage } from "@opencode-ai/core/session/message" import { Prompt } from "@opencode-ai/core/session/prompt" import { SessionMessageUpdater } from "@opencode-ai/core/session/message-updater" import { SessionProjector } from "@opencode-ai/core/session/projector" import { SessionExecution } from "@opencode-ai/core/session/execution" import { SessionInput } from "@opencode-ai/core/session/input" import { SessionStore } from "@opencode-ai/core/session/store" import { SessionInputTable, SessionMessageTable, SessionTable } from "@opencode-ai/core/session/sql" import { testEffect } from "./lib/effect" const it = testEffect(Layer.mergeAll(Database.defaultLayer, EventV2.defaultLayer, SessionProjector.defaultLayer)) const sessionID = SessionV2.ID.make("ses_projector_test") const created = DateTime.makeUnsafe(0) const model = { id: ModelV2.ID.make("model"), providerID: ProviderV2.ID.make("provider") } const encodeMessage = Schema.encodeSync(SessionMessage.Message) const assistantRow = ( id: SessionMessage.ID, seq: number, time: { created: DateTime.Utc; completed?: DateTime.Utc } = { created }, ) => { const { id: _, type, ...data } = encodeMessage(SessionMessage.Assistant.make({ id, type: "assistant", agent: "build", model, content: [], time })) return { id, session_id: sessionID, type, seq, time_created: DateTime.toEpochMillis(time.created), data } } describe("SessionProjector", () => { it.effect("orders projected messages and context by durable aggregate sequence", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .run() .pipe(Effect.orDie) yield* db .insert(SessionTable) .values({ id: sessionID, project_id: Project.ID.global, slug: "test", directory: "/project", title: "test", version: "test", }) .run() .pipe(Effect.orDie) const events = yield* EventV2.Service yield* events.publish( SessionEvent.Prompted, { sessionID, messageID: SessionMessage.ID.make("msg_first"), timestamp: created, prompt: Prompt.make({ text: "first" }), delivery: "steer", }, { id: EventV2.ID.make("evt_z") }, ) yield* events.publish( SessionEvent.Prompted, { sessionID, messageID: SessionMessage.ID.make("msg_second"), timestamp: created, prompt: Prompt.make({ text: "second" }), delivery: "steer", }, { id: EventV2.ID.make("evt_a") }, ) const sessions = yield* SessionV2.Service const firstPage = yield* sessions.messages({ sessionID, limit: 1, order: "asc" }) expect(firstPage.map((message) => (message.type === "user" ? message.text : message.type))).toEqual(["first"]) const secondPage = yield* sessions.messages({ sessionID, limit: 1, order: "asc", cursor: { id: firstPage[0]!.id, direction: "next" }, }) expect(secondPage.map((message) => (message.type === "user" ? message.text : message.type))).toEqual(["second"]) expect( (yield* sessions.messages({ sessionID, limit: 1, order: "asc", cursor: { id: secondPage[0]!.id, direction: "previous" }, })).map((message) => (message.type === "user" ? message.text : message.type)), ).toEqual(["first"]) expect( (yield* sessions.context(sessionID)).map((message) => (message.type === "user" ? message.text : message.type)), ).toEqual(["first", "second"]) }).pipe( Effect.provide( SessionV2.layer.pipe( Layer.provide(EventV2.defaultLayer), Layer.provide(Database.defaultLayer), Layer.provide(Project.defaultLayer), Layer.provide(SessionStore.defaultLayer), Layer.provide(SessionExecution.noopLayer), ), ), ), ) it.effect("marks an inbox row promoted with the Prompted event sequence", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .run() .pipe(Effect.orDie) yield* db .insert(SessionTable) .values({ id: sessionID, project_id: Project.ID.global, slug: "test", directory: "/project", title: "test", version: "test", }) .run() .pipe(Effect.orDie) const events = yield* EventV2.Service const id = SessionMessage.ID.make("msg_admitted") const admitted = yield* SessionInput.admit(db, events, { id, sessionID, prompt: Prompt.make({ text: "promote me" }), delivery: "steer", }) if (!admitted) return yield* Effect.die("Prompt admission failed") const event = yield* events.publish(SessionEvent.Prompted, { sessionID, timestamp: admitted.timeCreated, messageID: id, prompt: Prompt.make({ text: "promote me" }), delivery: "steer", }) expect( yield* db.select().from(SessionInputTable).where(eq(SessionInputTable.id, id)).get().pipe(Effect.orDie), ).toMatchObject({ promoted_seq: event.durable?.seq }) }), ) it.effect("projects durable context messages supported by the updater", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .run() .pipe(Effect.orDie) yield* db .insert(SessionTable) .values({ id: sessionID, project_id: Project.ID.global, slug: "test", directory: "/project", title: "test", version: "test", }) .run() .pipe(Effect.orDie) const events = yield* EventV2.Service yield* events.publish(SessionEvent.AgentSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: created, agent: "build", }) yield* events.publish(SessionEvent.ModelSwitched, { sessionID, messageID: SessionMessage.ID.create(), timestamp: created, model, }) yield* events.publish(SessionEvent.Synthetic, { sessionID, messageID: SessionMessage.ID.create(), timestamp: created, text: "synthetic context", }) yield* events.publish(SessionEvent.Shell.Started, { sessionID, messageID: SessionMessage.ID.create(), timestamp: created, callID: "shell-1", command: "pwd", }) yield* events.publish(SessionEvent.Shell.Ended, { sessionID, timestamp: DateTime.makeUnsafe(1), callID: "shell-1", output: "/project", }) const compactionID = SessionMessage.ID.create() yield* events.publish(SessionEvent.Compaction.Started, { sessionID, messageID: compactionID, timestamp: created, reason: "manual", }) yield* events.publish(SessionEvent.Compaction.Delta, { sessionID, messageID: compactionID, timestamp: created, text: "partial", }) expect( yield* db .select({ id: EventTable.id }) .from(EventTable) .where(eq(EventTable.type, SessionEvent.Compaction.Delta.type)) .all() .pipe(Effect.orDie), ).toEqual([]) expect( yield* db .select({ id: SessionMessageTable.id }) .from(SessionMessageTable) .where(eq(SessionMessageTable.type, "compaction")) .all() .pipe(Effect.orDie), ).toEqual([]) yield* events.publish(SessionEvent.Compaction.Ended, { sessionID, messageID: compactionID, timestamp: DateTime.makeUnsafe(1), reason: "manual", text: "summary", recent: "recent context", }) const rows = yield* db .select() .from(SessionMessageTable) .where(eq(SessionMessageTable.session_id, sessionID)) .orderBy(asc(SessionMessageTable.seq)) .all() .pipe(Effect.orDie) const messages = rows.map((row) => Schema.decodeUnknownSync(SessionMessage.Message)({ ...row.data, id: row.id, type: row.type }), ) expect(messages.map((message) => message.type)).toEqual([ "agent-switched", "model-switched", "synthetic", "shell", "compaction", ]) expect(messages.find((message) => message.type === "shell")).toMatchObject({ output: "/project", time: { completed: DateTime.makeUnsafe(1) }, }) expect(messages.find((message) => message.type === "compaction")).toMatchObject({ summary: "summary", recent: "recent context", }) expect( yield* db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get().pipe(Effect.orDie), ).toMatchObject({ agent: "build", model, time_updated: DateTime.toEpochMillis(created), }) }), ) it.effect("rejects distinct creator events that reuse one projected message ID", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .run() .pipe(Effect.orDie) yield* db .insert(SessionTable) .values({ id: sessionID, project_id: Project.ID.global, slug: "test", directory: "/project", title: "test", version: "test", }) .run() .pipe(Effect.orDie) const events = yield* EventV2.Service const id = SessionMessage.ID.make("msg_creator_collision") yield* events.publish(SessionEvent.Synthetic, { sessionID, messageID: id, timestamp: created, text: "keep me" }) const exit = yield* events .publish(SessionEvent.Step.Started, { sessionID, assistantMessageID: id, timestamp: created, agent: "build", model, }) .pipe(Effect.exit) expect(exit._tag).toBe("Failure") expect( yield* db.select().from(SessionMessageTable).where(eq(SessionMessageTable.id, id)).get().pipe(Effect.orDie), ).toMatchObject({ type: "synthetic" }) }), ) it.effect("does not revive a stale incomplete in-memory assistant projection", () => Effect.gen(function* () { const stale = SessionMessage.Assistant.make({ id: SessionMessage.ID.make("msg_assistant_stale"), type: "assistant", agent: "build", model, content: [], time: { created }, }) const completed = SessionMessage.Assistant.make({ id: SessionMessage.ID.make("msg_assistant_completed"), type: "assistant", agent: "build", model, content: [], time: { created: DateTime.makeUnsafe(1), completed: DateTime.makeUnsafe(2) }, }) expect( yield* SessionMessageUpdater.memory({ messages: [stale, completed] }).getCurrentAssistant(), ).toBeUndefined() }), ) it.effect("updates only the newest incomplete assistant projection", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .run() .pipe(Effect.orDie) yield* db .insert(SessionTable) .values({ id: sessionID, project_id: Project.ID.global, slug: "test", directory: "/project", title: "test", version: "test", }) .run() .pipe(Effect.orDie) yield* db .insert(SessionMessageTable) .values([ assistantRow(SessionMessage.ID.make("msg_assistant_1"), 0), assistantRow(SessionMessage.ID.make("msg_assistant_2"), 1), ]) .run() .pipe(Effect.orDie) const service = yield* EventV2.Service yield* service.publish(SessionEvent.Step.Ended, { sessionID, timestamp: DateTime.makeUnsafe(1), assistantMessageID: SessionMessage.ID.make("msg_assistant_2"), finish: "stop", cost: 0, tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, }) const rows = yield* db .select() .from(SessionMessageTable) .where(eq(SessionMessageTable.session_id, sessionID)) .orderBy(asc(SessionMessageTable.id)) .all() .pipe(Effect.orDie) const messages = rows.map((row) => Schema.decodeUnknownSync(SessionMessage.Message)({ ...row.data, id: row.id, type: row.type }), ) expect(messages[0]).not.toHaveProperty("time.completed") expect(messages[1]).toMatchObject({ type: "assistant", finish: "stop", time: { completed: DateTime.makeUnsafe(1) }, }) }), ) it.effect("does not revive a stale incomplete assistant projection", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db .insert(ProjectTable) .values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] }) .run() .pipe(Effect.orDie) yield* db .insert(SessionTable) .values({ id: sessionID, project_id: Project.ID.global, slug: "test", directory: "/project", title: "test", version: "test", }) .run() .pipe(Effect.orDie) yield* db .insert(SessionMessageTable) .values([ assistantRow(SessionMessage.ID.make("msg_assistant_stale"), 0), assistantRow(SessionMessage.ID.make("msg_assistant_completed"), 1, { created: DateTime.makeUnsafe(1), completed: DateTime.makeUnsafe(2), }), ]) .run() .pipe(Effect.orDie) const service = yield* EventV2.Service yield* service.publish(SessionEvent.Text.Started, { sessionID, assistantMessageID: SessionMessage.ID.make("msg_assistant_completed"), timestamp: DateTime.makeUnsafe(3), textID: "text-stale", }) const rows = yield* db .select() .from(SessionMessageTable) .where(eq(SessionMessageTable.session_id, sessionID)) .orderBy(asc(SessionMessageTable.id)) .all() .pipe(Effect.orDie) const messages = rows.map((row) => Schema.decodeUnknownSync(SessionMessage.Message)({ ...row.data, id: row.id, type: row.type }), ) expect(messages).toEqual([ SessionMessage.Assistant.make({ id: SessionMessage.ID.make("msg_assistant_completed"), type: "assistant", agent: "build", model, content: [SessionMessage.AssistantText.make({ type: "text", id: "text-stale", text: "" })], time: { created: DateTime.makeUnsafe(1), completed: DateTime.makeUnsafe(2) }, }), SessionMessage.Assistant.make({ id: SessionMessage.ID.make("msg_assistant_stale"), type: "assistant", agent: "build", model, content: [], time: { created }, }), ]) }), ) })