import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test" import { $ } from "bun" import fs from "node:fs/promises" import Http from "node:http" import path from "node:path" import { setTimeout as delay } from "node:timers/promises" import { NodeHttpServer } from "@effect/platform-node" import { Effect, Layer } from "effect" import { HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { eq } from "drizzle-orm" import * as Log from "@opencode-ai/core/util/log" import { Flag } from "@opencode-ai/core/flag/flag" import { GlobalBus, type GlobalEvent } from "@/bus/global" import { Database } from "@/storage/db" import { ProjectID } from "@/project/schema" import { ProjectTable } from "@/project/project.sql" import { Instance } from "@/project/instance" import { WithInstance } from "../../src/project/with-instance" import { Session as SessionNs } from "@/session/session" import { SessionID } from "@/session/schema" import { SessionTable } from "@/session/session.sql" import { SyncEvent } from "@/sync" import { EventSequenceTable } from "@/sync/event.sql" import { resetDatabase } from "../fixture/db" import { disposeAllInstances, provideTmpdirInstance, tmpdir } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { registerAdapter } from "../../src/control-plane/adapters" import { WorkspaceID } from "../../src/control-plane/schema" import { WorkspaceTable } from "../../src/control-plane/workspace.sql" import type { Target, WorkspaceAdapter, WorkspaceInfo } from "../../src/control-plane/types" import * as Workspace from "../../src/control-plane/workspace" import { AppRuntime } from "@/effect/app-runtime" import { InstanceStore } from "@/project/instance-store" import { InstanceBootstrap } from "@/project/bootstrap" void Log.init({ print: false }) const testServerLayer = Layer.mergeAll( NodeHttpServer.layer(Http.createServer, { host: "127.0.0.1", port: 0 }), Workspace.defaultLayer.pipe(Layer.provide(InstanceStore.defaultLayer), Layer.provide(InstanceBootstrap.defaultLayer)), SessionNs.defaultLayer, ) const it = testEffect(testServerLayer) const originalWorkspacesFlag = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES const originalEnv = { OPENCODE_AUTH_CONTENT: process.env.OPENCODE_AUTH_CONTENT, OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES, } type RecordedCreate = { info: WorkspaceInfo env: Record from?: WorkspaceInfo } type RecordedAdapter = { adapter: WorkspaceAdapter calls: { configure: WorkspaceInfo[] create: RecordedCreate[] list: number remove: WorkspaceInfo[] target: WorkspaceInfo[] } } type FetchCall = { url: URL method: string headers: Headers bodyText?: string json?: unknown } function unique(prefix: string) { return `${prefix}-${Math.random().toString(36).slice(2)}` } function restoreEnv() { Object.entries(originalEnv).forEach(([key, value]) => { if (value === undefined) { delete process.env[key] return } process.env[key] = value }) } beforeEach(() => { Database.close() Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true restoreEnv() }) afterEach(async () => { mock.restore() await disposeAllInstances() Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = originalWorkspacesFlag restoreEnv() await resetDatabase() }) async function withInstance(fn: (dir: string) => T | Promise) { await using tmp = await tmpdir({ git: true }) return WithInstance.provide({ directory: tmp.path, fn: () => fn(tmp.path), }) } async function initGitRepo(dir: string) { await fs.mkdir(dir, { recursive: true }) await $`git init`.cwd(dir).quiet() await $`git config core.fsmonitor false`.cwd(dir).quiet() await $`git config commit.gpgsign false`.cwd(dir).quiet() await $`git config user.email "test@opencode.test"`.cwd(dir).quiet() await $`git config user.name "Test"`.cwd(dir).quiet() await fs.writeFile(path.join(dir, "tracked.txt"), "base\n") await $`git add tracked.txt`.cwd(dir).quiet() await $`git commit -m "base"`.cwd(dir).quiet() } const runWorkspace = (effect: Effect.Effect) => AppRuntime.runPromise(effect) const createWorkspace = (input: Workspace.CreateInput) => runWorkspace(Workspace.Service.use((workspace) => workspace.create(input))) const warpWorkspaceSession = (input: Workspace.SessionWarpInput) => runWorkspace(Workspace.Service.use((workspace) => workspace.sessionWarp(input))) const listWorkspaces = (project: Parameters[0]) => runWorkspace(Workspace.Service.use((workspace) => workspace.list(project))) const syncListWorkspaces = (project: Parameters[0]) => runWorkspace(Workspace.Service.use((workspace) => workspace.syncList(project))) const getWorkspace = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.get(id))) const removeWorkspace = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.remove(id))) const workspaceStatus = () => runWorkspace(Workspace.Service.use((workspace) => workspace.status())) const isWorkspaceSyncing = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.isSyncing(id))) const startWorkspaceSyncing = (projectID: ProjectID) => { void runWorkspace(Workspace.Service.use((workspace) => workspace.startWorkspaceSyncing(projectID))) } const waitForWorkspaceSync = (workspaceID: WorkspaceID, state: Record, signal?: AbortSignal) => runWorkspace(Workspace.Service.use((workspace) => workspace.waitForSync(workspaceID, state, signal))) function captureGlobalEvents() { const events: GlobalEvent[] = [] const handler = (event: GlobalEvent) => events.push(event) GlobalBus.on("event", handler) return { events, dispose() { GlobalBus.off("event", handler) }, } } async function eventually(fn: () => T | Promise, timeout = 1500) { const started = Date.now() let last: unknown while (Date.now() - started < timeout) { try { return await fn() } catch (err) { last = err await delay(10) } } throw last ?? new Error("Timed out waiting for condition") } function eventuallyEffect(effect: Effect.Effect, timeout = 1500) { return Effect.gen(function* () { const started = Date.now() let last: unknown while (Date.now() - started < timeout) { const exit = yield* Effect.exit(effect) if (exit._tag === "Success") return last = exit.cause yield* Effect.sleep("10 millis") } throw last ?? new Error("Timed out waiting for condition") }) } function recordedAdapter(input: { target: (info: WorkspaceInfo) => Target | Promise configure?: (info: WorkspaceInfo) => WorkspaceInfo | Promise create?: (info: WorkspaceInfo, env: Record, from?: WorkspaceInfo) => Promise list?: () => Omit[] | Promise[]> remove?: (info: WorkspaceInfo) => Promise }): RecordedAdapter { const calls: RecordedAdapter["calls"] = { configure: [], create: [], list: 0, remove: [], target: [], } return { calls, adapter: { name: "recorded", description: "recorded", configure(info) { calls.configure.push(structuredClone(info)) return input.configure?.(info) ?? info }, async create(info, env, from) { calls.create.push({ info: structuredClone(info), env: { ...env }, from: from ? structuredClone(from) : undefined, }) await input.create?.(info, env, from) }, ...(input.list ? { async list() { calls.list += 1 return input.list?.() ?? [] }, } : {}), async remove(info) { calls.remove.push(structuredClone(info)) await input.remove?.(info) }, target(info) { calls.target.push(structuredClone(info)) return input.target(info) }, }, } } function localAdapter(dir: string, input?: { createDir?: boolean; remove?: (info: WorkspaceInfo) => Promise }) { return recordedAdapter({ configure(info) { return { ...info, directory: dir } }, async create() { if (input?.createDir === false) return await fs.mkdir(dir, { recursive: true }) }, remove: input?.remove, target() { return { type: "local", directory: dir } }, }) } function remoteAdapter(url: string, input?: { directory?: string | null; headers?: HeadersInit }) { return recordedAdapter({ configure(info) { return { ...info, directory: input?.directory ?? info.directory } }, target() { return { type: "remote", url, headers: input?.headers } }, }) } function eventStreamResponse(events: unknown[] = [], keepOpen = true) { const encoder = new TextEncoder() return new Response( new ReadableStream({ start(controller) { if (keepOpen) controller.enqueue(encoder.encode(":\n\n")) events.forEach((event) => controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`))) if (!keepOpen) controller.close() }, }), { status: 200, headers: { "content-type": "text/event-stream" } }, ) } function serverUrl() { return Effect.gen(function* () { return HttpServer.formatAddress((yield* HttpServer.HttpServer).address) }) } function workspaceInfo(projectID: ProjectID, type: string, input?: Partial): Workspace.Info { return { id: input?.id ?? WorkspaceID.ascending(), type, name: input?.name ?? unique("workspace"), branch: input?.branch ?? null, directory: input?.directory ?? null, extra: input?.extra ?? null, projectID, timeUsed: input?.timeUsed ?? Date.now(), } } function insertWorkspace(info: Workspace.Info) { Database.use((db) => db .insert(WorkspaceTable) .values({ id: info.id, type: info.type, branch: info.branch, name: info.name, directory: info.directory, extra: info.extra, project_id: info.projectID, time_used: info.timeUsed, }) .run(), ) } function insertProject(id: ProjectID, worktree: string) { Database.use((db) => db .insert(ProjectTable) .values({ id, worktree, vcs: null, name: null, time_created: Date.now(), time_updated: Date.now(), sandboxes: [], }) .run(), ) } function attachSessionToWorkspace(sessionID: SessionID, workspaceID: WorkspaceID) { Database.use((db) => db.update(SessionTable).set({ workspace_id: workspaceID }).where(eq(SessionTable.id, sessionID)).run(), ) } function sessionSequence(sessionID: SessionID) { return Database.use((db) => db .select({ seq: EventSequenceTable.seq }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, sessionID)) .get(), )?.seq } function sessionSequenceOwner(sessionID: SessionID) { return Database.use((db) => db .select({ ownerID: EventSequenceTable.owner_id }) .from(EventSequenceTable) .where(eq(EventSequenceTable.aggregate_id, sessionID)) .get(), )?.ownerID } function sessionUpdatedType() { return SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version) } describe("workspace schemas and exports", () => { test("keeps the historical event type names", () => { expect(Workspace.Event.Ready.type).toBe("workspace.ready") expect(Workspace.Event.Failed.type).toBe("workspace.failed") expect(Workspace.Event.Status.type).toBe("workspace.status") }) test("validates create input with workspace id, project id, branch, type, and extra", () => { const input = { id: WorkspaceID.ascending("wrk_schema_create"), type: "worktree", branch: "feature/schema", projectID: ProjectID.make("project-schema"), extra: { nested: true }, } expect(Workspace.CreateInput.zod.parse(input)).toEqual(input) expect(() => Workspace.CreateInput.zod.parse({ ...input, id: "bad" })).toThrow() expect(() => Workspace.CreateInput.zod.parse({ ...input, branch: 1 })).toThrow() }) }) describe("workspace CRUD", () => { test("get returns undefined for a missing workspace", async () => { await withInstance(async () => { expect(await getWorkspace(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined() }) }) test("list maps database rows, filters by project, and sorts by id", async () => { await withInstance(async () => { const otherProjectID = ProjectID.make("project-other") insertProject(otherProjectID, "/tmp/other") const a = workspaceInfo(Instance.project.id, "manual", { id: WorkspaceID.ascending("wrk_a_list"), branch: "a", directory: "/a", extra: { a: true }, }) const b = workspaceInfo(Instance.project.id, "manual", { id: WorkspaceID.ascending("wrk_b_list"), branch: "b", directory: "/b", extra: ["b"], }) const other = workspaceInfo(otherProjectID, "manual", { id: WorkspaceID.ascending("wrk_c_list") }) insertWorkspace(b) insertWorkspace(other) insertWorkspace(a) expect(await listWorkspaces(Instance.project)).toEqual([a, b]) }) }) test("create configures, persists, creates, starts local sync, and passes environment", async () => { await withInstance(async (dir) => { process.env.OPENCODE_AUTH_CONTENT = JSON.stringify({ test: { type: "api", key: "secret" } }) process.env.OTEL_EXPORTER_OTLP_HEADERS = "authorization=otel" process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "https://otel.test" process.env.OTEL_RESOURCE_ATTRIBUTES = "service.name=opencode-test" const workspaceID = WorkspaceID.ascending("wrk_create_local") const type = unique("create-local") const targetDir = path.join(dir, "created-local") const recorded = recordedAdapter({ configure(info) { return { ...info, branch: "configured-branch", name: "Configured Name", directory: targetDir, extra: { configured: true }, } }, async create() { await fs.mkdir(targetDir, { recursive: true }) }, target() { return { type: "local", directory: targetDir } }, }) registerAdapter(Instance.project.id, type, recorded.adapter) const info = await createWorkspace({ id: workspaceID, type, branch: null, projectID: Instance.project.id, extra: null, }) expect(info).toEqual({ id: workspaceID, type, branch: "configured-branch", name: "Configured Name", directory: targetDir, extra: { configured: true }, projectID: Instance.project.id, timeUsed: info.timeUsed, }) expect(await getWorkspace(workspaceID)).toEqual(info) expect(await listWorkspaces(Instance.project)).toEqual([info]) expect(recorded.calls.configure).toHaveLength(1) expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null }) expect(recorded.calls.create).toHaveLength(1) expect(recorded.calls.create[0].info).toEqual({ id: workspaceID, type, branch: "configured-branch", name: "Configured Name", directory: targetDir, extra: { configured: true }, projectID: Instance.project.id, }) expect(JSON.parse(recorded.calls.create[0].env.OPENCODE_AUTH_CONTENT ?? "{}")).toEqual({ test: { type: "api", key: "secret" }, }) expect(recorded.calls.create[0].env.OPENCODE_WORKSPACE_ID).toBe(workspaceID) expect(recorded.calls.create[0].env.OPENCODE_EXPERIMENTAL_WORKSPACES).toBe("true") expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel") expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test") expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test") expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBe("connected") await removeWorkspace(workspaceID) expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined() }) }) test("create propagates configure failures and does not insert a workspace", async () => { await withInstance(async () => { const type = unique("configure-failure") registerAdapter( Instance.project.id, type, recordedAdapter({ configure() { throw new Error("configure exploded") }, target() { return { type: "local", directory: "/unused" } }, }).adapter, ) await expect( createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null }), ).rejects.toThrow("configure exploded") expect(await listWorkspaces(Instance.project)).toEqual([]) }) }) test("create leaves the inserted row when adapter create fails", async () => { await withInstance(async () => { const type = unique("create-failure") const recorded = recordedAdapter({ async create() { throw new Error("create exploded") }, target() { return { type: "local", directory: "/unused" } }, }) registerAdapter(Instance.project.id, type, recorded.adapter) await expect( createWorkspace({ type, branch: "branch", projectID: Instance.project.id, extra: { x: 1 } }), ).rejects.toThrow("create exploded") const rows = await listWorkspaces(Instance.project) expect(rows).toHaveLength(1) expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } }) expect(recorded.calls.target).toHaveLength(0) await removeWorkspace(rows[0].id) }) }) test("create returns after a local workspace reports error", async () => { await withInstance(async (dir) => { const type = unique("local-error") const missing = path.join(dir, "missing-local-target") const recorded = localAdapter(missing, { createDir: false }) registerAdapter(Instance.project.id, type, recorded.adapter) const info = await createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null }) expect(info.directory).toBe(missing) expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBe("error") await removeWorkspace(info.id) }) }) test("syncList registers adapter-listed workspaces that are missing by name", async () => { await withInstance(async (dir) => { const type = unique("list-sync") const existing = workspaceInfo(Instance.project.id, type, { id: WorkspaceID.ascending("wrk_list_sync_existing"), name: "existing", directory: path.join(dir, "existing"), }) insertWorkspace(existing) const discovered = { type, name: "discovered", branch: "feature/discovered", directory: path.join(dir, "discovered"), extra: { source: "adapter" }, projectID: Instance.project.id, } const recorded = recordedAdapter({ list() { return [ { type, name: existing.name, branch: "ignored", directory: path.join(dir, "ignored"), extra: null, projectID: Instance.project.id, }, discovered, ] }, target(info) { return { type: "local", directory: info.directory ?? dir } }, }) registerAdapter(Instance.project.id, type, recorded.adapter) await syncListWorkspaces(Instance.project) const synced = (await listWorkspaces(Instance.project)).filter((item) => item.name === discovered.name) expect(synced).toHaveLength(1) expect(synced[0]).toMatchObject(discovered) expect(synced[0]?.id).toStartWith("wrk_") expect(await listWorkspaces(Instance.project)).toEqual(expect.arrayContaining([existing, synced[0]])) expect(recorded.calls.list).toBe(1) expect(recorded.calls.configure).toHaveLength(0) expect(recorded.calls.create).toHaveLength(0) expect(recorded.calls.target).toHaveLength(1) }) }) test("syncList calls every registered adapter with a list method", async () => { await withInstance(async (dir) => { const typeA = unique("list-sync-a") const typeB = unique("list-sync-b") const adapterA = recordedAdapter({ list() { return [ { type: typeA, name: "adapter-a", branch: null, directory: path.join(dir, "adapter-a"), extra: null, projectID: Instance.project.id, }, ] }, target(info) { return { type: "local", directory: info.directory ?? dir } }, }) const adapterB = recordedAdapter({ list() { return [ { type: typeB, name: "adapter-b", branch: null, directory: path.join(dir, "adapter-b"), extra: null, projectID: Instance.project.id, }, ] }, target(info) { return { type: "local", directory: info.directory ?? dir } }, }) const noList = recordedAdapter({ target() { return { type: "local", directory: dir } }, }) registerAdapter(Instance.project.id, typeA, adapterA.adapter) registerAdapter(Instance.project.id, typeB, adapterB.adapter) registerAdapter(Instance.project.id, unique("list-sync-none"), noList.adapter) await syncListWorkspaces(Instance.project) const synced = await listWorkspaces(Instance.project) expect( synced .filter((item) => item.type === typeA || item.type === typeB) .map((item) => item.name) .toSorted(), ).toEqual(["adapter-a", "adapter-b"]) expect(adapterA.calls.list).toBe(1) expect(adapterB.calls.list).toBe(1) expect(noList.calls.list).toBe(0) }) }) it.live("remote create connects to routed event and history endpoints", () => { const calls: FetchCall[] = [] return Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const bodyText = yield* req.text const call = { url: new URL(req.url, "http://localhost"), method: req.method, headers: new Headers(req.headers), bodyText, json: bodyText ? JSON.parse(bodyText) : undefined, } calls.push(call) if (call.url.pathname === "/base/global/event") return HttpServerResponse.fromWeb(eventStreamResponse([], false)) if (call.url.pathname === "/base/sync/history") return yield* HttpServerResponse.json([]) return HttpServerResponse.text("unexpected", { status: 500 }) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( (dir) => Effect.gen(function* () { const workspace = yield* Workspace.Service const type = unique("remote-create") const recorded = remoteAdapter(`${url}/base/?ignored=1#hash`, { directory: dir }) registerAdapter(Instance.project.id, type, recorded.adapter) const info = yield* workspace.create({ type, branch: null, projectID: Instance.project.id, extra: null }) expect( calls.map((call) => `${call.method} ${call.url.pathname}${call.url.search}${call.url.hash}`), ).toEqual(["GET /base/global/event", "POST /base/sync/history"]) expect(calls[1].json).toEqual({}) expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("connected") expect(yield* workspace.isSyncing(info.id)).toBe(true) yield* workspace.remove(info.id) expect(yield* workspace.isSyncing(info.id)).toBe(false) expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined() }), { git: true }, ) }) }) test("remove returns undefined for a missing workspace", async () => { await withInstance(async () => { expect(await removeWorkspace(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined() }) }) test("remove deletes the workspace, associated sessions, adapter resources, and status", async () => { await withInstance(async (dir) => { const type = unique("remove-local") const recorded = localAdapter(path.join(dir, "remove-local")) registerAdapter(Instance.project.id, type, recorded.adapter) const info = await createWorkspace({ type, branch: null, projectID: Instance.project.id, extra: null }) const one = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) const two = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) attachSessionToWorkspace(one.id, info.id) attachSessionToWorkspace(two.id, info.id) const removed = await removeWorkspace(info.id) expect(removed).toEqual(info) expect(await getWorkspace(info.id)).toBeUndefined() expect(recorded.calls.remove).toEqual([info]) expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined() expect( Database.use((db) => db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, info.id)).all(), ), ).toEqual([]) }) }) test("remove still deletes the row when the adapter cannot remove resources", async () => { await withInstance(async () => { const type = unique("remove-throws") const info = workspaceInfo(Instance.project.id, type, { id: WorkspaceID.ascending("wrk_remove_throws") }) registerAdapter( Instance.project.id, type, recordedAdapter({ async remove() { throw new Error("remove exploded") }, target() { return { type: "local", directory: "/unused" } }, }).adapter, ) insertWorkspace(info) expect(await removeWorkspace(info.id)).toEqual(info) expect(await getWorkspace(info.id)).toBeUndefined() }) }) test("sessionWarp moves a session into a local workspace and claims ownership", async () => { await withInstance(async (dir) => { const previousType = unique("warp-prev-local") const targetType = unique("warp-target-local") const previous = workspaceInfo(Instance.project.id, previousType) const target = workspaceInfo(Instance.project.id, targetType) insertWorkspace(previous) insertWorkspace(target) registerAdapter(Instance.project.id, previousType, localAdapter(path.join(dir, "warp-prev-local")).adapter) registerAdapter(Instance.project.id, targetType, localAdapter(path.join(dir, "warp-target-local")).adapter) const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) attachSessionToWorkspace(session.id, previous.id) await warpWorkspaceSession({ workspaceID: target.id, sessionID: session.id }) expect( Database.use((db) => db .select({ workspaceID: SessionTable.workspace_id }) .from(SessionTable) .where(eq(SessionTable.id, session.id)) .get(), )?.workspaceID, ).toBe(target.id) expect(sessionSequenceOwner(session.id)).toBe(target.id) }) }) test("sessionWarp applies source workspace patch to local target workspace", async () => { await withInstance(async (dir) => { const previousType = unique("warp-patch-prev-local") const targetType = unique("warp-patch-target-local") const previousDir = path.join(dir, "warp-patch-prev-local") const targetDir = path.join(dir, "warp-patch-target-local") await initGitRepo(previousDir) await initGitRepo(targetDir) await fs.writeFile(path.join(previousDir, "tracked.txt"), "changed\n") await fs.writeFile(path.join(previousDir, "new.txt"), "new\n") const previous = workspaceInfo(Instance.project.id, previousType) const target = workspaceInfo(Instance.project.id, targetType) insertWorkspace(previous) insertWorkspace(target) registerAdapter(Instance.project.id, previousType, localAdapter(previousDir, { createDir: false }).adapter) registerAdapter(Instance.project.id, targetType, localAdapter(targetDir, { createDir: false }).adapter) const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) attachSessionToWorkspace(session.id, previous.id) await warpWorkspaceSession({ workspaceID: target.id, sessionID: session.id, copyChanges: true }) expect(await fs.readFile(path.join(targetDir, "tracked.txt"), "utf8")).toBe("changed\n") expect(await fs.readFile(path.join(targetDir, "new.txt"), "utf8")).toBe("new\n") }) }) test("sessionWarp detaches a session to the local project and claims project ownership", async () => { await withInstance(async (dir) => { const previousType = unique("warp-detach-local") const previous = workspaceInfo(Instance.project.id, previousType) insertWorkspace(previous) registerAdapter(Instance.project.id, previousType, localAdapter(path.join(dir, "warp-detach-local")).adapter) const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) attachSessionToWorkspace(session.id, previous.id) await warpWorkspaceSession({ workspaceID: null, sessionID: session.id }) expect( Database.use((db) => db .select({ workspaceID: SessionTable.workspace_id }) .from(SessionTable) .where(eq(SessionTable.id, session.id)) .get(), )?.workspaceID, ).toBeNull() expect(sessionSequenceOwner(session.id)).toBe(Instance.project.id) }) }) it.live("sessionWarp syncs previous remote history, replays it, steals, and claims the sequence", () => { const calls: FetchCall[] = [] let historySessionID: SessionID | undefined let historyNextSeq = 0 return Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const bodyText = yield* req.text const call = { url: new URL(req.url, "http://localhost"), method: req.method, headers: new Headers(req.headers), bodyText, json: bodyText ? JSON.parse(bodyText) : undefined, } calls.push(call) if (call.url.pathname === "/warp-source/sync/history") { return yield* HttpServerResponse.json([ { id: `evt_${unique("warp-source-history")}`, aggregate_id: historySessionID!, seq: historyNextSeq, type: sessionUpdatedType(), data: { sessionID: historySessionID!, info: { title: "from source history" } }, }, ]) } if (call.url.pathname === "/warp-source/vcs/diff/raw") return HttpServerResponse.text("remote patch") if (call.url.pathname === "/warp-target/sync/replay") return yield* HttpServerResponse.json({ sessionID: "ok" }) if (call.url.pathname === "/warp-target/sync/steal") return yield* HttpServerResponse.json({ sessionID: "ok" }) if (call.url.pathname === "/warp-target/vcs/apply") return yield* HttpServerResponse.json({ applied: true }) return HttpServerResponse.text("unexpected", { status: 500 }) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const previousType = unique("warp-remote-source") const targetType = unique("warp-remote-target") const previous = workspaceInfo(Instance.project.id, previousType) const target = workspaceInfo(Instance.project.id, targetType, { directory: "remote-target-dir" }) insertWorkspace(previous) insertWorkspace(target) registerAdapter(Instance.project.id, previousType, remoteAdapter(`${url}/warp-source`).adapter) registerAdapter(Instance.project.id, targetType, remoteAdapter(`${url}/warp-target`).adapter) const session = yield* sessionSvc.create({}) attachSessionToWorkspace(session.id, previous.id) historySessionID = session.id historyNextSeq = (sessionSequence(session.id) ?? -1) + 1 yield* workspace.sessionWarp({ workspaceID: target.id, sessionID: session.id, copyChanges: true }) expect(calls.map((call) => `${call.method} ${call.url.pathname}`)).toEqual([ "POST /warp-source/sync/history", "GET /warp-source/vcs/diff/raw", "POST /warp-target/vcs/apply", "POST /warp-target/sync/replay", "POST /warp-target/sync/steal", ]) expect(calls[0].json).toEqual({ [session.id]: historyNextSeq - 1 }) expect(calls[2].json).toEqual({ patch: "remote patch" }) expect(calls[3].json).toMatchObject({ directory: "remote-target-dir", events: [ { aggregateID: session.id, seq: 0, type: SyncEvent.versionedType(SessionNs.Event.Created.type, SessionNs.Event.Created.version), }, { aggregateID: session.id, seq: historyNextSeq, type: sessionUpdatedType(), }, ], }) expect(calls[4].json).toEqual({ sessionID: session.id }) expect((yield* sessionSvc.get(session.id)).title).toBe("from source history") expect(sessionSequenceOwner(session.id)).toBe(target.id) }), { git: true }, ) }) }) }) describe("workspace sync state", () => { test("startWorkspaceSyncing is disabled by the experimental workspace flag", async () => { await withInstance(async (dir) => { Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false const type = unique("flag-disabled") const info = workspaceInfo(Instance.project.id, type) const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({}))) attachSessionToWorkspace(session.id, info.id) insertWorkspace(info) registerAdapter(Instance.project.id, type, localAdapter(path.join(dir, "flag-disabled")).adapter) startWorkspaceSyncing(Instance.project.id) await delay(25) expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined() }) }) test("startWorkspaceSyncing starts all workspaces", async () => { await withInstance(async (dir) => { const firstType = unique("first") const secondType = unique("second") const first = workspaceInfo(Instance.project.id, firstType) const second = workspaceInfo(Instance.project.id, secondType) await fs.mkdir(path.join(dir, "first"), { recursive: true }) await fs.mkdir(path.join(dir, "second"), { recursive: true }) insertWorkspace(first) insertWorkspace(second) registerAdapter(Instance.project.id, firstType, localAdapter(path.join(dir, "first")).adapter) registerAdapter(Instance.project.id, secondType, localAdapter(path.join(dir, "second")).adapter) startWorkspaceSyncing(Instance.project.id) await eventually(() => workspaceStatus().then((status) => { expect(status.find((item) => item.workspaceID === first.id)?.status).toBe("connected") expect(status.find((item) => item.workspaceID === second.id)?.status).toBe("connected") }), ) await removeWorkspace(first.id) await removeWorkspace(second.id) }) }) test("local start reports error when the target directory is missing", async () => { await withInstance(async (dir) => { const type = unique("missing-local") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter( Instance.project.id, type, localAdapter(path.join(dir, "missing-target"), { createDir: false }).adapter, ) attachSessionToWorkspace( (await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, info.id, ) startWorkspaceSyncing(Instance.project.id) await eventually(() => workspaceStatus().then((status) => expect(status.find((item) => item.workspaceID === info.id)?.status).toBe("error"), ), ) expect(await isWorkspaceSyncing(info.id)).toBe(false) await removeWorkspace(info.id) }) }) test("duplicate local status updates are suppressed", async () => { await withInstance(async (dir) => { const captured = captureGlobalEvents() try { const type = unique("dedupe-local") const info = workspaceInfo(Instance.project.id, type) const target = path.join(dir, "dedupe-local") await fs.mkdir(target, { recursive: true }) insertWorkspace(info) registerAdapter(Instance.project.id, type, localAdapter(target).adapter) attachSessionToWorkspace( (await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, info.id, ) startWorkspaceSyncing(Instance.project.id) startWorkspaceSyncing(Instance.project.id) await eventually(() => workspaceStatus().then((status) => expect(status.find((item) => item.workspaceID === info.id)?.status).toBe("connected"), ), ) expect( captured.events.filter( (event) => event.workspace === info.id && event.payload.type === Workspace.Event.Status.type, ), ).toHaveLength(1) await removeWorkspace(info.id) } finally { captured.dispose() } }) }) it.live("remote start emits disconnected, connecting, and connected then refuses duplicate listeners", () => { const calls: FetchCall[] = [] return Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const bodyText = yield* req.text const call = { url: new URL(req.url, "http://localhost"), method: req.method, headers: new Headers(req.headers), bodyText, json: bodyText ? JSON.parse(bodyText) : undefined, } calls.push(call) if (call.url.pathname === "/sync/global/event") return HttpServerResponse.fromWeb(eventStreamResponse()) if (call.url.pathname === "/sync/sync/history") return HttpServerResponse.fromWeb(Response.json([])) return HttpServerResponse.text("unexpected", { status: 500 }) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const captured = captureGlobalEvents() try { const type = unique("remote-start") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/sync`).adapter) attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* eventuallyEffect( Effect.gen(function* () { expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe( "connected", ) }), ) yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* Effect.sleep("25 millis") expect( captured.events .filter((event) => event.workspace === info.id && event.payload.type === Workspace.Event.Status.type) .map((event) => event.payload.properties.status), ).toEqual(["disconnected", "connecting", "connected"]) expect(calls.filter((call) => call.url.pathname === "/sync/global/event")).toHaveLength(1) expect(calls.filter((call) => call.url.pathname === "/sync/sync/history")).toHaveLength(1) expect(yield* workspace.isSyncing(info.id)).toBe(true) yield* workspace.remove(info.id) expect(yield* workspace.isSyncing(info.id)).toBe(false) } finally { captured.dispose() } }), { git: true }, ) }) }) it.live("remote connection HTTP failures set error and clear syncing", () => Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest if (new URL(req.url, "http://localhost").pathname === "/failed/global/event") return HttpServerResponse.text("nope", { status: 503 }) return HttpServerResponse.fromWeb(Response.json([])) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const type = unique("remote-connect-fail") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/failed`).adapter) attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* eventuallyEffect( Effect.gen(function* () { expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error") }), ) expect(yield* workspace.isSyncing(info.id)).toBe(false) yield* workspace.remove(info.id) }), { git: true }, ) }), ) it.live("remote history HTTP failures set error", () => Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const url = new URL(req.url, "http://localhost") if (url.pathname === "/history-failed/global/event") return HttpServerResponse.fromWeb(eventStreamResponse([], false)) if (url.pathname === "/history-failed/sync/history") return HttpServerResponse.text("history failed", { status: 500 }) return HttpServerResponse.fromWeb(Response.json([])) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const type = unique("remote-history-fail") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/history-failed`).adapter) attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* eventuallyEffect( Effect.gen(function* () { expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error") }), ) expect(yield* workspace.isSyncing(info.id)).toBe(false) yield* workspace.remove(info.id) }), { git: true }, ) }), ) it.live("sync history sends the local sequence fence and replays returned events in workspace context", () => { const historyBodies: unknown[] = [] let historySessionID: SessionID | undefined let historyNextSeq = 0 return Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const bodyText = yield* req.text const url = new URL(req.url, "http://localhost") if (url.pathname === "/history/global/event") return HttpServerResponse.fromWeb(eventStreamResponse()) if (url.pathname === "/history/sync/history") { historyBodies.push(bodyText ? JSON.parse(bodyText) : undefined) return HttpServerResponse.fromWeb( Response.json([ { id: `evt_${unique("history")}`, aggregate_id: historySessionID!, seq: historyNextSeq, type: sessionUpdatedType(), data: { sessionID: historySessionID!, info: { title: "from history" } }, }, ]), ) } return HttpServerResponse.text("unexpected", { status: 500 }) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const captured = captureGlobalEvents() try { const type = unique("history-replay") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/history`).adapter) const session = yield* sessionSvc.create({ title: "before history" }) attachSessionToWorkspace(session.id, info.id) historySessionID = session.id historyNextSeq = (sessionSequence(session.id) ?? -1) + 1 yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* eventuallyEffect( Effect.gen(function* () { expect((yield* sessionSvc.get(session.id).pipe(Effect.orDie)).title).toBe("from history") }), ) expect(historyBodies).toEqual([{ [session.id]: historyNextSeq - 1 }]) expect( captured.events.some( (event) => event.workspace === info.id && event.payload.type === "sync" && event.payload.syncEvent.seq === historyNextSeq, ), ).toBe(true) yield* workspace.remove(info.id) } finally { captured.dispose() } }), { git: true }, ) }) }) it.live("SSE forwards non-heartbeat events and ignores heartbeats", () => Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const url = new URL(req.url, "http://localhost") if (url.pathname === "/sse-forward/global/event") return HttpServerResponse.fromWeb( eventStreamResponse( [ { directory: "remote-dir", project: "remote-project", payload: { type: "server.heartbeat" } }, { directory: "remote-dir", project: "remote-project", payload: { type: "custom.remote", properties: { ok: true } }, }, ], false, ), ) if (url.pathname === "/sse-forward/sync/history") return HttpServerResponse.fromWeb(Response.json([])) return HttpServerResponse.text("unexpected", { status: 500 }) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const captured = captureGlobalEvents() try { const type = unique("sse-forward") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/sse-forward`).adapter) attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id) yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* eventuallyEffect( Effect.sync(() => expect( captured.events.some( (event) => event.workspace === info.id && event.payload.type === "custom.remote", ), ).toBe(true), ), ) expect( captured.events.some( (event) => event.workspace === info.id && event.payload.type === "server.heartbeat", ), ).toBe(false) expect( captured.events.find((event) => event.workspace === info.id && event.payload.type === "custom.remote"), ).toMatchObject({ directory: "remote-dir", project: "remote-project", payload: { properties: { ok: true } }, }) yield* workspace.remove(info.id) } finally { captured.dispose() } }), { git: true }, ) }), ) it.live("SSE sync events are replayed and forwarded", () => { let sseSessionID: SessionID | undefined let sseNextSeq = 0 return Effect.gen(function* () { yield* HttpServer.serveEffect()( Effect.gen(function* () { const req = yield* HttpServerRequest.HttpServerRequest const url = new URL(req.url, "http://localhost") if (url.pathname === "/sse-sync/global/event") return HttpServerResponse.fromWeb( eventStreamResponse( [ { directory: "remote-dir", project: "remote-project", payload: { type: "sync", syncEvent: { id: `evt_${unique("sse")}`, aggregateID: sseSessionID!, seq: sseNextSeq, type: sessionUpdatedType(), data: { sessionID: sseSessionID!, info: { title: "from sse" } }, }, }, }, ], false, ), ) if (url.pathname === "/sse-sync/sync/history") return HttpServerResponse.fromWeb(Response.json([])) return HttpServerResponse.text("unexpected", { status: 500 }) }), ) const url = yield* serverUrl() yield* provideTmpdirInstance( () => Effect.gen(function* () { const workspace = yield* Workspace.Service const sessionSvc = yield* SessionNs.Service const captured = captureGlobalEvents() try { const type = unique("sse-sync") const info = workspaceInfo(Instance.project.id, type) insertWorkspace(info) registerAdapter(Instance.project.id, type, remoteAdapter(`${url}/sse-sync`).adapter) const session = yield* sessionSvc.create({ title: "before sse" }) attachSessionToWorkspace(session.id, info.id) sseSessionID = session.id sseNextSeq = (sessionSequence(session.id) ?? -1) + 1 yield* workspace.startWorkspaceSyncing(Instance.project.id) yield* eventuallyEffect( Effect.gen(function* () { expect((yield* sessionSvc.get(session.id).pipe(Effect.orDie)).title).toBe("from sse") }), ) expect( captured.events.some( (event) => event.workspace === info.id && event.payload.type === "sync" && event.payload.syncEvent.seq === sseNextSeq, ), ).toBe(true) yield* workspace.remove(info.id) } finally { captured.dispose() } }), { git: true }, ) }) }) }) describe("workspace waitForSync", () => { test("returns immediately for an empty fence", async () => { await withInstance(async () => { await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined() }) }) test("returns immediately when the stored sequence already satisfies the fence", async () => { await withInstance(async () => { const sessionID = SessionID.descending("ses_wait_done") Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 4 }).run()) await expect( waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 }), ).resolves.toBeUndefined() await expect( waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 }), ).resolves.toBeUndefined() }) }) test("waits until the database reaches the requested sequence and a workspace event arrives", async () => { await withInstance(async () => { const workspaceID = WorkspaceID.ascending("wrk_wait_event") const sessionID = SessionID.descending("ses_wait_event") Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 1 }).run()) const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 2 }) await delay(10) Database.use((db) => db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), ) GlobalBus.emit("event", { workspace: workspaceID, payload: { type: "anything" } }) await expect(waited).resolves.toBeUndefined() }) }) test("a sync event for a different workspace can also release the fence", async () => { await withInstance(async () => { const workspaceID = WorkspaceID.ascending("wrk_wait_sync_any") const sessionID = SessionID.descending("ses_wait_sync_any") Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 0 }).run()) const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 1 }) await delay(10) Database.use((db) => db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), ) GlobalBus.emit("event", { workspace: WorkspaceID.ascending("wrk_other_workspace"), payload: { type: "sync" }, }) await expect(waited).resolves.toBeUndefined() }) }) test("rejects with the abort reason when aborted", async () => { await withInstance(async () => { const abort = new AbortController() const reason = new Error("caller aborted") const waited = waitForWorkspaceSync( WorkspaceID.ascending("wrk_wait_abort"), { [SessionID.descending("ses_wait_abort")]: 1 }, abort.signal, ) abort.abort(reason) await expect(waited).rejects.toMatchObject({ _tag: "WorkspaceSyncAbortedError", message: reason.message, cause: reason, }) }) }) test("times out with the requested fence in the error message", async () => { await withInstance(async () => { const sessionID = SessionID.descending("ses_wait_timeout") await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 })).rejects.toThrow( `Timed out waiting for sync fence: {"${sessionID}":1}`, ) }) }, 7000) })