import { $ } from "bun" import { describe, expect } from "bun:test" import fs from "fs/promises" import path from "path" import { ConfigProvider, Deferred, Duration, Effect, Fiber, Layer, Option, Stream } from "effect" import { Config } from "@opencode-ai/core/config" import { EventV2 } from "@opencode-ai/core/event" import { FSUtil } from "@opencode-ai/core/fs-util" import { Watcher } from "@opencode-ai/core/filesystem/watcher" import { Git } from "@opencode-ai/core/git" import { Location } from "@opencode-ai/core/location" import { AbsolutePath } from "@opencode-ai/core/schema" import { location } from "../fixture/location" import { tmpdir } from "../fixture/tmpdir" import { testEffect } from "../lib/effect" const describeWatcher = Watcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip type WatcherEvent = { file: string; event: "add" | "change" | "unlink" } const it = testEffect(Layer.mergeAll(FSUtil.defaultLayer, EventV2.defaultLayer)) const configLayer = Layer.succeed( Config.Service, Config.Service.of({ entries: () => Effect.succeed([]), }), ) const flagsLayer = ConfigProvider.layer( ConfigProvider.fromUnknown({ OPENCODE_EXPERIMENTAL_FILEWATCHER: "true", OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false", }), ) function provide(directory: string, vcs?: Location.Interface["vcs"]) { const locationLayer = Layer.succeed( Location.Service, Location.Service.of(location({ directory: AbsolutePath.make(directory) }, { vcs })), ) return Effect.provide( Watcher.layer.pipe( Layer.provide(configLayer), Layer.provide(Git.defaultLayer), Layer.provide(locationLayer), Layer.provide(flagsLayer), ), ) } function withTmp( f: (directory: string, vcs?: Location.Interface["vcs"]) => Effect.Effect, options?: { git?: boolean; init?: (directory: string) => Promise }, ) { return Effect.acquireRelease( Effect.promise(async () => { const tmp = await tmpdir() if (!options?.git) return { tmp, vcs: undefined } await $`git init`.cwd(tmp.path).quiet() await $`git config core.fsmonitor false`.cwd(tmp.path).quiet() await $`git config commit.gpgsign false`.cwd(tmp.path).quiet() await $`git config user.email test@opencode.test`.cwd(tmp.path).quiet() await $`git config user.name Test`.cwd(tmp.path).quiet() await $`git commit --allow-empty -m root`.cwd(tmp.path).quiet() await options.init?.(tmp.path) return { tmp, vcs: { type: "git" as const, store: AbsolutePath.make(path.join(tmp.path, ".git")) } } }), ({ tmp }) => Effect.promise(() => tmp[Symbol.asyncDispose]()), ).pipe(Effect.flatMap(({ tmp, vcs }) => f(tmp.path, vcs).pipe(provide(tmp.path, vcs)))) } function wait(check: (event: WatcherEvent) => boolean) { return Effect.gen(function* () { const events = yield* EventV2.Service const deferred = yield* Deferred.make() const fiber = yield* events.subscribe(Watcher.Event.Updated).pipe( Stream.runForEach((event) => { if (!check(event.data)) return Effect.void return Deferred.succeed(deferred, event.data).pipe(Effect.asVoid) }), Effect.forkScoped, ) yield* Effect.yieldNow return { deferred, fiber } }) } function maybeNextUpdate( check: (event: WatcherEvent) => boolean, trigger: Effect.Effect, timeout: Duration.Input = "5 seconds", ) { return Effect.acquireUseRelease( wait(check), ({ deferred }) => trigger.pipe(Effect.andThen(Deferred.await(deferred)), Effect.timeoutOption(timeout)), ({ fiber }) => Fiber.interrupt(fiber), ) } function nextUpdate(check: (event: WatcherEvent) => boolean, trigger: Effect.Effect) { return Effect.gen(function* () { const result = yield* maybeNextUpdate(check, trigger) if (Option.isSome(result)) return result.value return yield* Effect.fail(new Error("timed out waiting for file watcher update")) }) } function eventuallyUpdate(check: (event: WatcherEvent) => boolean, trigger: () => Effect.Effect) { return Effect.gen(function* () { while (true) { const result = yield* maybeNextUpdate(check, trigger(), "250 millis") if (Option.isSome(result)) return result.value } }).pipe( Effect.timeoutOrElse({ duration: "5 seconds", orElse: () => Effect.fail(new Error("timed out waiting for file watcher readiness")), }), ) } function noUpdate(check: (event: WatcherEvent) => boolean, trigger: Effect.Effect, timeout = 500) { return Effect.acquireUseRelease( wait(check), ({ deferred }) => trigger.pipe( Effect.andThen(Deferred.await(deferred)), Effect.timeoutOption(`${timeout} millis`), Effect.tap((result) => Effect.sync(() => expect(result).toEqual(Option.none()))), ), ({ fiber }) => Fiber.interrupt(fiber), ) } function ready(directory: string) { const file = path.join(directory, `.watcher-${Math.random().toString(36).slice(2)}`) return Effect.gen(function* () { const fs = yield* FSUtil.Service yield* eventuallyUpdate( (event) => event.file === file, () => fs.writeFileString(file, `ready-${Math.random()}`), ).pipe(Effect.ensuring(fs.remove(file, { force: true }).pipe(Effect.ignore)), Effect.asVoid) }) } describeWatcher("Watcher", () => { it.live("publishes root create, update, and delete events", () => withTmp( (directory) => Effect.gen(function* () { const fs = yield* FSUtil.Service const file = path.join(directory, "watch.txt") yield* ready(directory) for (const item of [ { event: "add" as const, trigger: fs.writeFileString(file, "a") }, { event: "change" as const, trigger: fs.writeFileString(file, "b") }, { event: "unlink" as const, trigger: fs.remove(file) }, ]) { expect( yield* nextUpdate((event) => event.file === file && event.event === item.event, item.trigger), ).toEqual({ file, event: item.event, }) } }), { git: true }, ), ) it.live("watches non-git roots", () => withTmp((directory) => Effect.gen(function* () { const fs = yield* FSUtil.Service const file = path.join(directory, "plain.txt") yield* ready(directory) expect(yield* nextUpdate((event) => event.file === file, fs.writeFileString(file, "plain"))).toEqual({ file, event: "add", }) }), ), ) it.live("cleanup stops publishing events", () => Effect.gen(function* () { const events = yield* EventV2.Service const fs = yield* FSUtil.Service const tmp = yield* Effect.acquireRelease( Effect.promise(() => tmpdir()), (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), ) yield* ready(tmp.path).pipe(provide(tmp.path), Effect.scoped) const file = path.join(tmp.path, "after-dispose.txt") yield* noUpdate((event) => event.file === file, fs.writeFileString(file, "gone")).pipe( Effect.provideService(EventV2.Service, events), ) }).pipe(Effect.provide(Layer.mergeAll(FSUtil.defaultLayer, EventV2.defaultLayer))), ) it.live("ignores .git/index changes", () => withTmp( (directory) => Effect.gen(function* () { const fs = yield* FSUtil.Service const index = path.join(directory, ".git", "index") yield* ready(directory) yield* noUpdate( (event) => event.file === index, fs .writeFileString(path.join(directory, "tracked.txt"), "a") .pipe(Effect.andThen(Effect.promise(() => $`git add .`.cwd(directory).quiet())), Effect.asVoid), ) }), { git: true }, ), ) it.live("publishes .git/HEAD events", () => withTmp( (directory) => Effect.gen(function* () { const fs = yield* FSUtil.Service const head = path.join(directory, ".git", "HEAD") const branch = `watch-${Math.random().toString(36).slice(2)}` yield* ready(directory) yield* Effect.promise(() => $`git branch ${branch}`.cwd(directory).quiet()) expect( yield* nextUpdate((event) => event.file === head, fs.writeFileString(head, `ref: refs/heads/${branch}\n`)), ).toEqual({ file: head, event: "change", }) }), { git: true }, ), ) const describeSymlink = process.platform !== "win32" ? describe : describe.skip describeSymlink("symlinked .git", () => { it.live("publishes .git/HEAD events through a symlinked .git directory", () => withTmp( (directory) => Effect.gen(function* () { const afs = yield* FSUtil.Service const actual = path.join(directory, "..", `actual_${path.basename(directory)}`) yield* Effect.addFinalizer(() => Effect.promise(() => fs.rm(actual, { recursive: true, force: true }))) yield* ready(directory) const head = path.join(directory, ".git", "HEAD") const branch = `watch-${Math.random().toString(36).slice(2)}` yield* Effect.promise(() => $`git branch ${branch}`.cwd(directory).quiet()) expect( yield* nextUpdate( (event) => event.file === path.join(actual, "HEAD"), afs.writeFileString(head, `ref: refs/heads/${branch}\n`), ), ).toEqual({ file: path.join(actual, "HEAD"), event: "change" }) }), { git: true, init: async (directory) => { const actual = path.join(directory, "..", `actual_${path.basename(directory)}`) await fs.rename(path.join(directory, ".git"), actual) await fs.symlink(actual, path.join(directory, ".git")) }, }, ), ) }) })