564 lines
18 KiB
TypeScript
564 lines
18 KiB
TypeScript
import { describe, expect } from "bun:test"
|
|
import { DateTime, Effect, Fiber, Layer, Stream } from "effect"
|
|
import { eq } from "drizzle-orm"
|
|
import { Database } from "@opencode-ai/core/database/database"
|
|
import { EventV2 } from "@opencode-ai/core/event"
|
|
import { EventTable } from "@opencode-ai/core/event/sql"
|
|
import { SessionEvent } from "@opencode-ai/core/session/event"
|
|
import { Project } from "@opencode-ai/core/project"
|
|
import { ProjectTable } from "@opencode-ai/core/project/sql"
|
|
import { AbsolutePath } from "@opencode-ai/core/schema"
|
|
import { SessionV2 } from "@opencode-ai/core/session"
|
|
import { Prompt } from "@opencode-ai/core/session/prompt"
|
|
import { SessionMessage } from "@opencode-ai/core/session/message"
|
|
import { SessionProjector } from "@opencode-ai/core/session/projector"
|
|
import { SessionExecution } from "@opencode-ai/core/session/execution"
|
|
import { SessionInput } from "@opencode-ai/core/session/input"
|
|
import { SessionInputTable, SessionMessageTable, SessionTable } from "@opencode-ai/core/session/sql"
|
|
import { SessionStore } from "@opencode-ai/core/session/store"
|
|
import { testEffect } from "./lib/effect"
|
|
|
|
const executionCalls: SessionV2.ID[] = []
|
|
const interruptCalls: SessionV2.ID[] = []
|
|
const wakeCalls: SessionV2.ID[] = []
|
|
const execution = Layer.succeed(
|
|
SessionExecution.Service,
|
|
SessionExecution.Service.of({
|
|
resume: (sessionID) =>
|
|
Effect.sync(() => {
|
|
executionCalls.push(sessionID)
|
|
}),
|
|
interrupt: (sessionID) =>
|
|
Effect.sync(() => {
|
|
interruptCalls.push(sessionID)
|
|
}),
|
|
wake: (sessionID) =>
|
|
Effect.sync(() => {
|
|
wakeCalls.push(sessionID)
|
|
}),
|
|
}),
|
|
)
|
|
const sessions = SessionV2.layer.pipe(
|
|
Layer.provide(EventV2.defaultLayer),
|
|
Layer.provide(Database.defaultLayer),
|
|
Layer.provide(SessionStore.defaultLayer),
|
|
Layer.provide(Project.defaultLayer),
|
|
Layer.provide(execution),
|
|
)
|
|
const it = testEffect(
|
|
Layer.mergeAll(
|
|
Database.defaultLayer,
|
|
EventV2.defaultLayer,
|
|
SessionProjector.defaultLayer,
|
|
SessionStore.defaultLayer,
|
|
execution,
|
|
sessions,
|
|
),
|
|
)
|
|
const sessionID = SessionV2.ID.make("ses_prompt_test")
|
|
const messageID = SessionMessage.ID.create()
|
|
|
|
const setup = Effect.gen(function* () {
|
|
const { db } = yield* Database.Service
|
|
yield* db
|
|
.insert(ProjectTable)
|
|
.values({ id: Project.ID.global, worktree: AbsolutePath.make("/project"), sandboxes: [] })
|
|
.onConflictDoNothing()
|
|
.run()
|
|
.pipe(Effect.orDie)
|
|
yield* db
|
|
.insert(SessionTable)
|
|
.values({
|
|
id: sessionID,
|
|
project_id: Project.ID.global,
|
|
slug: "test",
|
|
directory: "/project",
|
|
title: "test",
|
|
version: "test",
|
|
})
|
|
.onConflictDoNothing()
|
|
.run()
|
|
.pipe(Effect.orDie)
|
|
})
|
|
|
|
const admitted = (id: SessionMessage.ID) => Database.Service.use(({ db }) => SessionInput.find(db, id))
|
|
const admittedCount = Database.Service.use(({ db }) =>
|
|
db
|
|
.select()
|
|
.from(SessionInputTable)
|
|
.all()
|
|
.pipe(
|
|
Effect.orDie,
|
|
Effect.map((rows) => rows.length),
|
|
),
|
|
)
|
|
const eventCount = (type: string) =>
|
|
Database.Service.use(({ db }) =>
|
|
db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.type, type))
|
|
.all()
|
|
.pipe(
|
|
Effect.orDie,
|
|
Effect.map((rows) => rows.length),
|
|
),
|
|
)
|
|
|
|
describe("SessionV2.prompt", () => {
|
|
it.effect("delegates execution continuation through SessionExecution", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
executionCalls.length = 0
|
|
wakeCalls.length = 0
|
|
yield* session.resume(sessionID)
|
|
expect(executionCalls).toEqual([sessionID])
|
|
expect(wakeCalls).toEqual([])
|
|
}),
|
|
)
|
|
|
|
it.effect("delegates process-local interruption through SessionExecution", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
interruptCalls.length = 0
|
|
|
|
yield* session.interrupt(sessionID)
|
|
expect(interruptCalls).toEqual([sessionID])
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
}),
|
|
)
|
|
|
|
it.effect("delegates interruption without requiring a recorded Session", () =>
|
|
Effect.gen(function* () {
|
|
const session = yield* SessionV2.Service
|
|
interruptCalls.length = 0
|
|
|
|
yield* session.interrupt(SessionV2.ID.make("ses_missing"))
|
|
expect(interruptCalls).toEqual([SessionV2.ID.make("ses_missing")])
|
|
}),
|
|
)
|
|
|
|
it.effect("durably admits one user message before transcript promotion", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
|
|
const message = yield* session.prompt({
|
|
sessionID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
resume: false,
|
|
})
|
|
|
|
expect(message.prompt.text).toBe("Fix the failing tests")
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
expect(yield* admitted(message.id)).toMatchObject({
|
|
id: message.id,
|
|
sessionID,
|
|
prompt: { text: "Fix the failing tests" },
|
|
delivery: "steer",
|
|
})
|
|
}),
|
|
)
|
|
|
|
it.effect("streams durable Session events after an aggregate sequence", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
const { db } = yield* Database.Service
|
|
const fiber = yield* session.events({ sessionID }).pipe(Stream.take(4), Stream.runCollect, Effect.forkScoped)
|
|
yield* Effect.yieldNow
|
|
|
|
yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "First" }), resume: false })
|
|
yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Second" }), resume: false })
|
|
yield* SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER)
|
|
const streamed = Array.from(yield* Fiber.join(fiber))
|
|
|
|
expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([
|
|
[0, "session.next.prompt.admitted"],
|
|
[1, "session.next.prompt.admitted"],
|
|
[2, "session.next.prompted"],
|
|
[3, "session.next.prompted"],
|
|
])
|
|
expect(
|
|
Array.from(
|
|
yield* session
|
|
.events({ sessionID, after: streamed[0]!.durable?.seq })
|
|
.pipe(Stream.take(1), Stream.runCollect),
|
|
).map((event) => [event.durable?.seq, event.type]),
|
|
).toEqual([[1, "session.next.prompt.admitted"]])
|
|
}),
|
|
)
|
|
|
|
it.effect("resumes through a recorded message without appending another prompt", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const message = yield* session.prompt({
|
|
sessionID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
resume: false,
|
|
})
|
|
|
|
executionCalls.length = 0
|
|
wakeCalls.length = 0
|
|
yield* session.resume(sessionID)
|
|
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
expect(yield* admitted(message.id)).not.toHaveProperty("promotedSeq")
|
|
expect(executionCalls).toEqual([sessionID])
|
|
expect(wakeCalls).toEqual([])
|
|
}),
|
|
)
|
|
|
|
it.effect("records distinct messages when the ID is omitted", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const input = { sessionID, prompt: Prompt.make({ text: "Fix the failing tests" }), resume: false }
|
|
|
|
const first = yield* session.prompt(input)
|
|
const second = yield* session.prompt(input)
|
|
|
|
expect(second.id).not.toBe(first.id)
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
expect(yield* admittedCount).toBe(2)
|
|
}),
|
|
)
|
|
|
|
it.effect("returns the original recorded message when the ID is retried", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const input = {
|
|
sessionID,
|
|
id: messageID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
resume: false,
|
|
}
|
|
|
|
const first = yield* session.prompt(input)
|
|
const retried = yield* session.prompt(input)
|
|
|
|
expect(retried).toEqual(first)
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
expect(yield* admittedCount).toBe(1)
|
|
}),
|
|
)
|
|
|
|
it.effect("wakes execution when an exact prompt retry recovers a committed message", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const input = {
|
|
sessionID,
|
|
id: messageID,
|
|
prompt: Prompt.make({ text: "Recover committed prompt" }),
|
|
resume: false,
|
|
}
|
|
const first = yield* session.prompt(input)
|
|
wakeCalls.length = 0
|
|
|
|
const retried = yield* session.prompt({ ...input, resume: true })
|
|
|
|
expect(retried).toEqual(first)
|
|
expect(wakeCalls).toEqual([sessionID])
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects reuse of one ID with a different prompt", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
|
|
yield* session.prompt({
|
|
sessionID,
|
|
id: messageID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
})
|
|
const failure = yield* session
|
|
.prompt({
|
|
sessionID,
|
|
id: messageID,
|
|
prompt: Prompt.make({ text: "Delete the failing tests" }),
|
|
resume: false,
|
|
})
|
|
.pipe(Effect.flip)
|
|
|
|
expect(failure._tag).toBe("Session.PromptConflictError")
|
|
expect(yield* session.messages({ sessionID })).toHaveLength(0)
|
|
expect(yield* admittedCount).toBe(1)
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects reuse of one ID with a different delivery mode", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
|
|
yield* session.prompt({
|
|
id: messageID,
|
|
sessionID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
resume: false,
|
|
})
|
|
const failure = yield* session
|
|
.prompt({
|
|
id: messageID,
|
|
sessionID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
delivery: "queue",
|
|
resume: false,
|
|
})
|
|
.pipe(Effect.flip)
|
|
|
|
expect(failure._tag).toBe("Session.PromptConflictError")
|
|
}),
|
|
)
|
|
|
|
it.effect("returns one recorded message to concurrent exact retries", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const input = {
|
|
sessionID,
|
|
id: messageID,
|
|
prompt: Prompt.make({ text: "Fix the failing tests" }),
|
|
resume: false,
|
|
}
|
|
|
|
const messages = yield* Effect.all([session.prompt(input), session.prompt(input)], { concurrency: "unbounded" })
|
|
|
|
expect(messages[1]).toEqual(messages[0])
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
expect(yield* admittedCount).toBe(1)
|
|
expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptAdmitted.type, 1))).toBe(1)
|
|
}),
|
|
)
|
|
|
|
it.effect("promotes one message once under concurrent promotion attempts", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const { db } = yield* Database.Service
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
yield* session.prompt({ id: messageID, sessionID, prompt: Prompt.make({ text: "Promote once" }), resume: false })
|
|
|
|
yield* Effect.all(
|
|
[
|
|
SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER),
|
|
SessionInput.promoteSteers(db, events, sessionID, Number.MAX_SAFE_INTEGER),
|
|
],
|
|
{ concurrency: "unbounded" },
|
|
)
|
|
|
|
expect(yield* eventCount(EventV2.versionedType(SessionEvent.Prompted.type, 1))).toBe(1)
|
|
expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 1 })
|
|
expect(yield* session.messages({ sessionID })).toMatchObject([
|
|
{ id: messageID, type: "user", text: "Promote once" },
|
|
])
|
|
}),
|
|
)
|
|
|
|
it.effect("promotes steers only through the captured inbox cutoff", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const { db } = yield* Database.Service
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
const first = yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Before cutoff" }), resume: false })
|
|
const cutoff = first.admittedSeq
|
|
const second = yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "After cutoff" }), resume: false })
|
|
|
|
yield* SessionInput.promoteSteers(db, events, sessionID, cutoff)
|
|
|
|
expect(yield* admitted(first.id)).toHaveProperty("promotedSeq")
|
|
expect(yield* admitted(second.id)).not.toHaveProperty("promotedSeq")
|
|
}),
|
|
)
|
|
|
|
it.effect("reprojects pending inbox input without scheduling execution", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const { db } = yield* Database.Service
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
wakeCalls.length = 0
|
|
yield* session.prompt({
|
|
id: messageID,
|
|
sessionID,
|
|
prompt: Prompt.make({ text: "Replay pending" }),
|
|
resume: false,
|
|
})
|
|
const recorded = yield* db
|
|
.select()
|
|
.from(EventTable)
|
|
.where(eq(EventTable.aggregate_id, sessionID))
|
|
.all()
|
|
.pipe(Effect.orDie)
|
|
|
|
yield* events.remove(sessionID)
|
|
yield* db.delete(SessionInputTable).where(eq(SessionInputTable.session_id, sessionID)).run().pipe(Effect.orDie)
|
|
yield* db
|
|
.delete(SessionMessageTable)
|
|
.where(eq(SessionMessageTable.session_id, sessionID))
|
|
.run()
|
|
.pipe(Effect.orDie)
|
|
yield* events.replayAll(
|
|
recorded.map((event) => ({
|
|
id: event.id,
|
|
aggregateID: event.aggregate_id,
|
|
seq: event.seq,
|
|
type: event.type,
|
|
data: event.data,
|
|
})),
|
|
)
|
|
|
|
expect(yield* admitted(messageID)).toMatchObject({ id: messageID, prompt: { text: "Replay pending" } })
|
|
expect(yield* session.messages({ sessionID })).toEqual([])
|
|
expect(wakeCalls).toEqual([])
|
|
}),
|
|
)
|
|
|
|
it.effect("returns an exact retry of a legacy projected prompt", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
const prompt = Prompt.make({ text: "Historical prompt" })
|
|
yield* events.publish(SessionEvent.Prompted, {
|
|
sessionID,
|
|
messageID,
|
|
timestamp: yield* DateTime.now,
|
|
prompt,
|
|
delivery: "steer",
|
|
})
|
|
|
|
const retried = yield* session.prompt({ id: messageID, sessionID, prompt, resume: false })
|
|
|
|
expect(retried).toMatchObject({ id: messageID, prompt: { text: "Historical prompt" } })
|
|
expect(yield* admitted(messageID)).toHaveProperty("promotedSeq")
|
|
}),
|
|
)
|
|
|
|
it.effect("returns an exact retry of a legacy projected queued prompt", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
const prompt = Prompt.make({ text: "Historical queued prompt" })
|
|
yield* events.publish(SessionEvent.Prompted, {
|
|
sessionID,
|
|
messageID,
|
|
timestamp: yield* DateTime.now,
|
|
prompt,
|
|
delivery: "queue",
|
|
})
|
|
|
|
const retried = yield* session.prompt({ id: messageID, sessionID, prompt, delivery: "queue", resume: false })
|
|
|
|
expect(retried).toMatchObject({ id: messageID, prompt: { text: "Historical queued prompt" } })
|
|
expect(yield* admitted(messageID)).toMatchObject({ delivery: "queue" })
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects reuse of one globally unique message ID across sessions", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const { db } = yield* Database.Service
|
|
const session = yield* SessionV2.Service
|
|
const other = SessionV2.ID.make("ses_prompt_other")
|
|
yield* db
|
|
.insert(SessionTable)
|
|
.values({
|
|
id: other,
|
|
project_id: Project.ID.global,
|
|
slug: "other",
|
|
directory: "/project",
|
|
title: "other",
|
|
version: "test",
|
|
})
|
|
.onConflictDoNothing()
|
|
.run()
|
|
.pipe(Effect.orDie)
|
|
const prompt = Prompt.make({ text: "Fix the failing tests" })
|
|
|
|
yield* session.prompt({ id: messageID, sessionID, prompt, resume: false })
|
|
const failure = yield* session
|
|
.prompt({ id: messageID, sessionID: other, prompt, resume: false })
|
|
.pipe(Effect.flip)
|
|
|
|
expect(failure).toMatchObject({ _tag: "Session.PromptConflictError", sessionID: other, messageID })
|
|
}),
|
|
)
|
|
|
|
it.effect("rejects a prompt ID already used by visible Session history", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
const events = yield* EventV2.Service
|
|
yield* events.publish(SessionEvent.Synthetic, {
|
|
sessionID,
|
|
messageID,
|
|
timestamp: yield* DateTime.now,
|
|
text: "Existing history",
|
|
})
|
|
|
|
const failure = yield* session
|
|
.prompt({ id: messageID, sessionID, prompt: Prompt.make({ text: "Conflicting prompt" }), resume: false })
|
|
.pipe(Effect.flip)
|
|
|
|
expect(failure).toMatchObject({ _tag: "Session.PromptConflictError", sessionID, messageID })
|
|
expect(yield* admitted(messageID)).toBeUndefined()
|
|
}),
|
|
)
|
|
|
|
it.effect("starts execution by default after recording the prompt", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
executionCalls.length = 0
|
|
wakeCalls.length = 0
|
|
|
|
yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Run by default" }) })
|
|
|
|
expect(executionCalls).toEqual([])
|
|
expect(wakeCalls).toEqual([sessionID])
|
|
}),
|
|
)
|
|
|
|
it.effect("starts execution when resume is explicitly true", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
executionCalls.length = 0
|
|
wakeCalls.length = 0
|
|
|
|
yield* session.prompt({
|
|
sessionID,
|
|
prompt: Prompt.make({ text: "Run explicitly" }),
|
|
resume: true,
|
|
})
|
|
|
|
expect(executionCalls).toEqual([])
|
|
expect(wakeCalls).toEqual([sessionID])
|
|
}),
|
|
)
|
|
|
|
it.effect("only records the prompt when resume is false", () =>
|
|
Effect.gen(function* () {
|
|
yield* setup
|
|
const session = yield* SessionV2.Service
|
|
executionCalls.length = 0
|
|
wakeCalls.length = 0
|
|
|
|
yield* session.prompt({ sessionID, prompt: Prompt.make({ text: "Do not run" }), resume: false })
|
|
|
|
expect(executionCalls).toEqual([])
|
|
expect(wakeCalls).toEqual([])
|
|
}),
|
|
)
|
|
})
|