fix(core): await plugin readiness
This commit is contained in:
parent
d2c866bf70
commit
4a710e4679
@ -1,6 +1,6 @@
|
||||
export * as PluginV2 from "./plugin"
|
||||
|
||||
import { Context, Effect, Exit, Layer, Schema, Scope } from "effect"
|
||||
import { Context, Deferred, Effect, Exit, Layer, Schema, Scope } from "effect"
|
||||
import type { Plugin } from "@opencode-ai/plugin/v2/effect"
|
||||
import { AgentV2 } from "./agent"
|
||||
import { AISDK } from "./aisdk"
|
||||
@ -29,6 +29,7 @@ export const Event = {
|
||||
export interface Interface {
|
||||
readonly add: (id: ID, effect: Plugin["effect"]) => Effect.Effect<void>
|
||||
readonly remove: (id: ID) => Effect.Effect<void>
|
||||
readonly wait: (id: ID) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Plugin") {}
|
||||
@ -41,13 +42,18 @@ export const layer = Layer.effect(
|
||||
const scope = yield* Scope.make()
|
||||
const active = new Map<ID, Scope.Closeable>()
|
||||
const loading = new Set<ID>()
|
||||
const waiters = new Map<ID, Set<Deferred.Deferred<void>>>()
|
||||
const failures = new Map<ID, Exit.Exit<void, never>>()
|
||||
let host: Parameters<Plugin["effect"]>[0]
|
||||
|
||||
const add = Effect.fn("Plugin.add")(function* (id: ID, effect: Plugin["effect"]) {
|
||||
if (loading.has(id)) return yield* Effect.die(`Plugin load cycle detected for ${id}`)
|
||||
|
||||
yield* locks.withLock(id)(
|
||||
Effect.sync(() => loading.add(id)).pipe(
|
||||
Effect.sync(() => {
|
||||
loading.add(id)
|
||||
failures.delete(id)
|
||||
}).pipe(
|
||||
Effect.andThen(
|
||||
State.batch(
|
||||
Effect.gen(function* () {
|
||||
@ -61,11 +67,22 @@ export const layer = Layer.effect(
|
||||
Effect.withSpan("Plugin.load", { attributes: { "plugin.id": id } }),
|
||||
Effect.onExit((exit) => (Exit.isFailure(exit) ? Scope.close(child, exit) : Effect.void)),
|
||||
)
|
||||
active.set(id, child)
|
||||
yield* events.publish(Event.Added, { id })
|
||||
active.set(id, child)
|
||||
yield* Effect.forEach(waiters.get(id) ?? [], (waiter) => Deferred.succeed(waiter, undefined), {
|
||||
discard: true,
|
||||
})
|
||||
waiters.delete(id)
|
||||
}),
|
||||
),
|
||||
),
|
||||
Effect.onExit((exit) => {
|
||||
if (Exit.isSuccess(exit)) return Effect.void
|
||||
failures.set(id, exit)
|
||||
return Effect.forEach(waiters.get(id) ?? [], (waiter) => Deferred.done(waiter, exit), {
|
||||
discard: true,
|
||||
}).pipe(Effect.ensuring(Effect.sync(() => waiters.delete(id))))
|
||||
}),
|
||||
Effect.ensuring(Effect.sync(() => loading.delete(id))),
|
||||
),
|
||||
)
|
||||
@ -79,12 +96,41 @@ export const layer = Layer.effect(
|
||||
Effect.gen(function* () {
|
||||
const current = active.get(id)
|
||||
active.delete(id)
|
||||
failures.delete(id)
|
||||
if (current) yield* Scope.close(current, Exit.void).pipe(Effect.ignore)
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
const wait = Effect.fn("Plugin.wait")(function* (id: ID) {
|
||||
const waiter = yield* Deferred.make<void>()
|
||||
const pending = yield* locks.withLock(id)(
|
||||
Effect.sync(() => {
|
||||
if (active.has(id)) return false
|
||||
const failure = failures.get(id)
|
||||
if (failure) return failure
|
||||
const current = waiters.get(id) ?? new Set()
|
||||
current.add(waiter)
|
||||
waiters.set(id, current)
|
||||
return true
|
||||
}),
|
||||
)
|
||||
if (!pending) return
|
||||
if (typeof pending !== "boolean") return yield* pending
|
||||
yield* Deferred.await(waiter).pipe(
|
||||
Effect.ensuring(
|
||||
locks.withLock(id)(
|
||||
Effect.sync(() => {
|
||||
const current = waiters.get(id)
|
||||
current?.delete(waiter)
|
||||
if (current?.size === 0) waiters.delete(id)
|
||||
}),
|
||||
),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
yield* Effect.addFinalizer((exit) =>
|
||||
Effect.gen(function* () {
|
||||
active.clear()
|
||||
@ -95,6 +141,7 @@ export const layer = Layer.effect(
|
||||
const service = Service.of({
|
||||
add,
|
||||
remove,
|
||||
wait,
|
||||
})
|
||||
host = yield* PluginHost.make(service)
|
||||
return service
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { describe, expect } from "bun:test"
|
||||
import { Effect } from "effect"
|
||||
import { Effect, Exit, Fiber } from "effect"
|
||||
import { define } from "@opencode-ai/plugin/v2/effect"
|
||||
import { AgentV2 } from "@opencode-ai/core/agent"
|
||||
import { PluginV2 } from "@opencode-ai/core/plugin"
|
||||
@ -9,6 +9,34 @@ import { PluginTestLayer } from "./plugin/fixture"
|
||||
const it = testEffect(PluginTestLayer)
|
||||
|
||||
describe("PluginV2", () => {
|
||||
it.effect("waits for a plugin and returns immediately once active", () =>
|
||||
Effect.gen(function* () {
|
||||
const plugins = yield* PluginV2.Service
|
||||
const id = PluginV2.ID.make("waited")
|
||||
const waiting = yield* plugins.wait(id).pipe(Effect.forkChild)
|
||||
|
||||
yield* plugins.add(id, () => Effect.void)
|
||||
yield* Fiber.join(waiting)
|
||||
yield* plugins.wait(id)
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("propagates plugin activation defects to waiters", () =>
|
||||
Effect.gen(function* () {
|
||||
const plugins = yield* PluginV2.Service
|
||||
const id = PluginV2.ID.make("failed")
|
||||
const waiting = yield* plugins.wait(id).pipe(Effect.exit, Effect.forkChild)
|
||||
|
||||
const added = yield* plugins.add(id, () => Effect.die("boom")).pipe(Effect.exit)
|
||||
const pending = yield* Fiber.join(waiting)
|
||||
const later = yield* plugins.wait(id).pipe(Effect.exit)
|
||||
|
||||
expect(Exit.isFailure(added)).toBe(true)
|
||||
expect(Exit.isFailure(pending)).toBe(true)
|
||||
expect(Exit.isFailure(later)).toBe(true)
|
||||
}),
|
||||
)
|
||||
|
||||
it.effect("adds, replaces, and removes plugins", () =>
|
||||
Effect.gen(function* () {
|
||||
const plugins = yield* PluginV2.Service
|
||||
|
||||
@ -30,6 +30,7 @@ import { ModelV2 } from "@opencode-ai/core/model"
|
||||
import { LocationServiceMap } from "@opencode-ai/core/location-layer"
|
||||
import { Reference } from "@opencode-ai/core/reference"
|
||||
import { Location } from "@opencode-ai/core/location"
|
||||
import { PluginV2 } from "@opencode-ai/core/plugin"
|
||||
|
||||
export const Info = Schema.Struct({
|
||||
name: Schema.String,
|
||||
@ -99,6 +100,7 @@ export const layer = Layer.effect(
|
||||
const cfg = yield* config.get()
|
||||
const skillDirs = yield* skill.dirs()
|
||||
const referenceDirs = yield* Effect.gen(function* () {
|
||||
yield* (yield* PluginV2.Service).wait(PluginV2.ID.make("core/config-reference"))
|
||||
return (yield* (yield* Reference.Service).list()).map((reference) => reference.path)
|
||||
}).pipe(Effect.provide(locations.get(Location.Ref.make({ directory: AbsolutePath.make(ctx.directory) }))))
|
||||
const whitelistedDirs = [
|
||||
|
||||
@ -20,6 +20,7 @@ import { AbsolutePath } from "@opencode-ai/core/schema"
|
||||
import { Location } from "@opencode-ai/core/location"
|
||||
import { LocationServiceMap } from "@opencode-ai/core/location-layer"
|
||||
import { Reference } from "@opencode-ai/core/reference"
|
||||
import { PluginV2 } from "@opencode-ai/core/plugin"
|
||||
|
||||
export function provider(model: Provider.Model) {
|
||||
if (model.api.id.includes("gpt-4") || model.api.id.includes("o1") || model.api.id.includes("o3"))
|
||||
@ -54,6 +55,7 @@ export const layer = Layer.effect(
|
||||
environment: Effect.fn("SystemPrompt.environment")(function* (model: Provider.Model) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const references = yield* Effect.gen(function* () {
|
||||
yield* (yield* PluginV2.Service).wait(PluginV2.ID.make("core/config-reference"))
|
||||
return (yield* (yield* Reference.Service).list()).filter((reference) => reference.description !== undefined)
|
||||
}).pipe(Effect.provide(locations.get(Location.Ref.make({ directory: AbsolutePath.make(ctx.directory) }))))
|
||||
return [
|
||||
|
||||
@ -4,6 +4,8 @@ import { Server } from "../../src/server/server"
|
||||
import { Global } from "@opencode-ai/core/global"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
|
||||
import { Effect } from "effect"
|
||||
import { pollWithTimeout } from "../lib/effect"
|
||||
|
||||
afterEach(async () => {
|
||||
await disposeAllInstances()
|
||||
@ -24,12 +26,19 @@ describe("reference HttpApi", () => {
|
||||
},
|
||||
})
|
||||
|
||||
const response = await Server.Default().app.request("/api/reference", {
|
||||
headers: { "x-opencode-directory": tmp.path },
|
||||
})
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
const body = await response.json()
|
||||
const body = await Effect.runPromise(
|
||||
pollWithTimeout(
|
||||
Effect.promise(async () => {
|
||||
const response = await Server.Default().app.request("/api/reference", {
|
||||
headers: { "x-opencode-directory": tmp.path },
|
||||
})
|
||||
expect(response.status).toBe(200)
|
||||
const body = await response.json()
|
||||
return body.data.length === 0 ? undefined : body
|
||||
}),
|
||||
"references were not loaded",
|
||||
),
|
||||
)
|
||||
expect(body).toMatchObject({ location: { directory: tmp.path } })
|
||||
expect(body.data).toEqual([
|
||||
{
|
||||
|
||||
Loading…
Reference in New Issue
Block a user