diff --git a/packages/opencode/src/background/job.ts b/packages/opencode/src/background/job.ts index 3ea228f04..8179c7328 100644 --- a/packages/opencode/src/background/job.ts +++ b/packages/opencode/src/background/job.ts @@ -1,6 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Identifier } from "@/id/id" -import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope, SynchronizedRef } from "effect" +import { Cause, Clock, Context, Deferred, Effect, Exit, Layer, Scope, SynchronizedRef } from "effect" export type Status = "running" | "completed" | "error" | "cancelled" @@ -19,7 +19,11 @@ export type Info = { type Active = { info: Info done: Deferred.Deferred - fiber?: Fiber.Fiber + scope: Scope.Closeable + token: object + pending: number + next: number + output?: { sequence: number; text: string } } type State = { @@ -30,6 +34,7 @@ type State = { type FinishResult = { info?: Info done?: Deferred.Deferred + scope?: Scope.Closeable } export type StartInput = { @@ -40,6 +45,11 @@ export type StartInput = { run: Effect.Effect } +export type ExtendInput = { + id: string + run: Effect.Effect +} + export type WaitInput = { id: string timeout?: number @@ -54,6 +64,7 @@ export interface Interface { readonly list: () => Effect.Effect readonly get: (id: string) => Effect.Effect readonly start: (input: StartInput) => Effect.Effect + readonly extend: (input: ExtendInput) => Effect.Effect readonly wait: (input: WaitInput) => Effect.Effect readonly cancel: (id: string) => Effect.Effect } @@ -84,36 +95,75 @@ export const layer = Layer.effect( }), ) - const finish = Effect.fn("BackgroundJob.finish")(function* ( + const settle = Effect.fn("BackgroundJob.settle")(function* ( id: string, - status: Exclude, - data?: { output?: string; error?: string }, + token: object, + sequence: number, + exit: Exit.Exit, ) { const completed_at = yield* Clock.currentTimeMillis + const s = yield* InstanceState.get(state) const result = yield* SynchronizedRef.modify( - (yield* InstanceState.get(state)).jobs, + s.jobs, (jobs): readonly [FinishResult, Map] => { const job = jobs.get(id) if (!job) return [{}, jobs] + if (job.token !== token) return [{}, jobs] if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs] + const pending = job.pending - 1 + const output = Exit.isSuccess(exit) && (!job.output || sequence > job.output.sequence) + ? { sequence, text: exit.value } + : job.output + if (Exit.isSuccess(exit) && pending > 0) { + return [{}, new Map(jobs).set(id, { ...job, pending, output })] + } + const status: Exclude = Exit.isSuccess(exit) + ? "completed" + : Cause.hasInterruptsOnly(exit.cause) + ? "cancelled" + : "error" const next = { ...job, - fiber: undefined, + pending: 0, + output, info: { ...job.info, status, completed_at, - ...(data?.output !== undefined ? { output: data.output } : {}), - ...(data?.error !== undefined ? { error: data.error } : {}), + ...(output ? { output: output.text } : {}), + ...(Exit.isFailure(exit) ? { error: errorText(Cause.squash(exit.cause)) } : {}), }, } - return [{ info: snapshot(next), done: job.done }, new Map(jobs).set(id, next)] + return [ + { info: snapshot(next), done: job.done, scope: job.scope }, + new Map(jobs).set(id, next), + ] }, ) if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore) + if (result.scope) { + yield* Scope.close(result.scope, Exit.void).pipe(Effect.forkIn(s.scope, { startImmediately: true })) + } return result.info }) + const fork = Effect.fn("BackgroundJob.fork")(function* ( + scope: Scope.Scope, + id: string, + token: object, + sequence: number, + run: Effect.Effect, + ) { + return yield* run.pipe( + Effect.matchCauseEffect({ + onSuccess: (output) => settle(id, token, sequence, Exit.succeed(output)), + onFailure: (cause) => settle(id, token, sequence, Exit.failCause(cause)), + }), + Effect.asVoid, + Effect.forkIn(scope, { startImmediately: true }), + ) + }) + const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () { return Array.from((yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).values()) .map(snapshot) @@ -138,17 +188,9 @@ export const layer = Layer.effect( Effect.fnUntraced(function* (jobs) { const existing = jobs.get(id) if (existing?.info.status === "running") return [snapshot(existing), jobs] as const - const fiber = yield* restore(input.run).pipe( - Effect.matchCauseEffect({ - onSuccess: (output) => finish(id, "completed", { output }), - onFailure: (cause) => - finish(id, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", { - error: errorText(Cause.squash(cause)), - }), - }), - Effect.asVoid, - Effect.forkIn(s.scope, { startImmediately: true }), - ) + const scope = yield* Scope.fork(s.scope, "parallel") + const token = {} + yield* fork(scope, id, token, 0, restore(input.run)) const job = { info: { id, @@ -159,7 +201,10 @@ export const layer = Layer.effect( metadata: input.metadata, }, done, - fiber, + scope, + token, + pending: 1, + next: 1, } return [snapshot(job), new Map(jobs).set(id, job)] as const }), @@ -168,6 +213,30 @@ export const layer = Layer.effect( ) }) + const extend: Interface["extend"] = Effect.fn("BackgroundJob.extend")(function* (input) { + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const s = yield* InstanceState.get(state) + return yield* SynchronizedRef.modifyEffect( + s.jobs, + Effect.fnUntraced(function* (jobs) { + const job = jobs.get(input.id) + if (!job || job.info.status !== "running") return [false, jobs] as const + yield* fork(job.scope, input.id, job.token, job.next, restore(input.run)) + return [ + true, + new Map(jobs).set(input.id, { + ...job, + pending: job.pending + 1, + next: job.next + 1, + }), + ] as const + }), + ) + }), + ) + }) + const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) { const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(input.id) if (!job) return { timedOut: false } @@ -180,18 +249,34 @@ export const layer = Layer.effect( }) const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) { - const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id) - if (!job) return - if (job.info.status !== "running") return snapshot(job) - if (job.fiber) { - yield* Fiber.interrupt(job.fiber).pipe(Effect.ignore) - yield* Fiber.await(job.fiber).pipe(Effect.ignore) - } - const info = yield* finish(id, "cancelled") - return info + const completed_at = yield* Clock.currentTimeMillis + const result = yield* SynchronizedRef.modify( + (yield* InstanceState.get(state)).jobs, + (jobs): readonly [FinishResult, Map] => { + const job = jobs.get(id) + if (!job) return [{}, jobs] + if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs] + const next = { + ...job, + pending: 0, + info: { + ...job.info, + status: "cancelled" as const, + completed_at, + }, + } + return [ + { info: snapshot(next), done: job.done, scope: job.scope }, + new Map(jobs).set(id, next), + ] + }, + ) + if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore) + if (result.scope) yield* Scope.close(result.scope, Exit.void) + return result.info }) - return Service.of({ list, get, start, wait, cancel }) + return Service.of({ list, get, start, extend, wait, cancel }) }), ) diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index d4affaec4..2324f79ff 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -10,7 +10,7 @@ import { Agent } from "../agent/agent" import { deriveSubagentSessionPermission } from "../agent/subagent-permissions" import type { SessionPrompt } from "../session/prompt" import { Config } from "@/config/config" -import { Cause, Effect, Exit, Schema, Scope } from "effect" +import { Effect, Exit, Schema, Scope } from "effect" import { EffectBridge } from "@/effect/bridge" import { RuntimeFlags } from "@/effect/runtime-flags" import { Database } from "@opencode-ai/core/database/database" @@ -69,6 +69,17 @@ function backgroundOutput(sessionID: SessionID) { ].join("\n") } +function backgroundUpdateOutput(sessionID: SessionID) { + return [ + ``, + "Background task updated", + "", + "Additional context sent to the background task.", + "", + "", + ].join("\n") +} + function backgroundMessage(input: { sessionID: SessionID description: string @@ -90,11 +101,6 @@ function backgroundMessage(input: { ].join("\n") } -function errorText(error: unknown) { - if (error instanceof Error) return error.message - return String(error) -} - export const TaskTool = Tool.define( id, Effect.gen(function* () { @@ -231,9 +237,16 @@ export const TaskTool = Tool.define( .pipe(Effect.ignore, Effect.forkIn(scope, { startImmediately: true })) }) - const existing = yield* background.get(nextSession.id) - if (existing?.status === "running") { - return yield* Effect.fail(new Error(`Task ${nextSession.id} is already running.`)) + if (yield* background.extend({ id: nextSession.id, run: runTask() })) { + return { + title: params.description, + metadata: { + ...metadata, + background: true, + jobId: nextSession.id, + }, + output: backgroundUpdateOutput(nextSession.id), + } } if (runInBackground) { @@ -242,16 +255,18 @@ export const TaskTool = Tool.define( type: id, title: params.description, metadata, - run: runTask().pipe( - Effect.tap((text) => inject("completed", text).pipe(Effect.ignore)), - Effect.catchCause((cause) => - (Cause.hasInterruptsOnly(cause) - ? Effect.void - : inject("error", errorText(Cause.squash(cause))).pipe(Effect.ignore) - ).pipe(Effect.andThen(Effect.failCause(cause))), - ), - ), + run: runTask(), }) + yield* background + .wait({ id: info.id }) + .pipe( + Effect.flatMap((result) => { + if (result.info?.status === "completed") return inject("completed", result.info.output ?? "") + if (result.info?.status === "error") return inject("error", result.info.error ?? "") + return Effect.void + }), + Effect.forkIn(scope, { startImmediately: true }), + ) return { title: params.description, diff --git a/packages/opencode/test/background/job.test.ts b/packages/opencode/test/background/job.test.ts index afc7260bb..667f9f5db 100644 --- a/packages/opencode/test/background/job.test.ts +++ b/packages/opencode/test/background/job.test.ts @@ -78,6 +78,38 @@ describe("background.job", () => { }), ) + it.instance("waits for extensions before completing a running job", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const first = yield* Deferred.make() + const second = yield* Deferred.make() + const job = yield* jobs.start({ + type: "test", + run: Deferred.await(first).pipe(Effect.as("first")), + }) + + expect(yield* jobs.extend({ id: job.id, run: Deferred.await(second).pipe(Effect.as("second")) })).toBe(true) + yield* Deferred.succeed(first, undefined) + expect((yield* jobs.get(job.id))?.status).toBe("running") + + yield* Deferred.succeed(second, undefined) + const done = yield* jobs.wait({ id: job.id }) + expect(done.info?.status).toBe("completed") + expect(done.info?.output).toBe("second") + }), + ) + + it.instance("rejects extensions after a job completes", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const job = yield* jobs.start({ type: "test", run: Effect.succeed("done") }) + yield* jobs.wait({ id: job.id }) + + expect(yield* jobs.extend({ id: job.id, run: Effect.succeed("late") })).toBe(false) + expect((yield* jobs.get(job.id))?.output).toBe("done") + }), + ) + it.instance("records failed jobs", () => Effect.gen(function* () { const jobs = yield* BackgroundJob.Service @@ -93,19 +125,56 @@ describe("background.job", () => { }), ) + it.instance("ignores stale settlements after restarting a failed job", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const fail = yield* Deferred.make() + const interrupted = yield* Deferred.make() + const release = yield* Deferred.make() + const id = "job_test" + yield* jobs.start({ + id, + type: "test", + run: Deferred.await(fail).pipe(Effect.andThen(Effect.fail(new Error("boom")))), + }) + yield* jobs.extend({ + id, + run: Effect.never.pipe( + Effect.ensuring(Deferred.succeed(interrupted, undefined).pipe(Effect.andThen(Deferred.await(release)))), + ), + }) + + yield* Deferred.succeed(fail, undefined) + expect((yield* jobs.wait({ id })).info?.status).toBe("error") + yield* Deferred.await(interrupted) + yield* jobs.start({ id, type: "test", run: Effect.never }) + + yield* Deferred.succeed(release, undefined) + yield* Effect.yieldNow + expect((yield* jobs.get(id))?.status).toBe("running") + yield* jobs.cancel(id) + }), + ) + it.instance("can cancel running jobs", () => Effect.gen(function* () { const jobs = yield* BackgroundJob.Service const interrupted = yield* Deferred.make() + const extendedInterrupted = yield* Deferred.make() const job = yield* jobs.start({ type: "test", run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(interrupted, undefined))), }) + yield* jobs.extend({ + id: job.id, + run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(extendedInterrupted, undefined))), + }) const cancelled = yield* jobs.cancel(job.id) expect(cancelled?.status).toBe("cancelled") yield* Deferred.await(interrupted).pipe(Effect.timeout("1 second")) + yield* Deferred.await(extendedInterrupted).pipe(Effect.timeout("1 second")) expect((yield* jobs.get(job.id))?.status).toBe("cancelled") }), ) diff --git a/packages/opencode/test/tool/task.test.ts b/packages/opencode/test/tool/task.test.ts index 3e268d586..b8d2accf9 100644 --- a/packages/opencode/test/tool/task.test.ts +++ b/packages/opencode/test/tool/task.test.ts @@ -518,6 +518,79 @@ describe("tool.task", () => { }), ) + background.instance("background task completion waits for running updates", () => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const { chat, assistant } = yield* seed() + const tool = yield* TaskTool + const def = yield* tool.init() + const first = defer() + const second = defer() + const updated = defer() + const injected = defer() + let prompts = 0 + const promptOps: TaskPromptOps = { + ...stubOps(), + prompt: (input) => { + if (input.sessionID === chat.id) { + injected.resolve(input) + return Effect.succeed(reply(input, "done")) + } + prompts++ + if (prompts === 1) return Effect.promise(() => first.promise).pipe(Effect.as(reply(input, "first done"))) + updated.resolve(input) + return Effect.promise(() => second.promise).pipe(Effect.as(reply(input, "second done"))) + }, + } + const context = { + sessionID: chat.id, + messageID: assistant.id, + agent: "build", + abort: new AbortController().signal, + extra: { promptOps }, + messages: [], + metadata: () => Effect.void, + ask: () => Effect.void, + } + + const started = yield* def.execute( + { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + background: true, + }, + context, + ) + const result = yield* def.execute( + { + description: "add investigation scope", + prompt: "also inspect cancellation", + subagent_type: "general", + task_id: started.metadata.sessionId, + }, + context, + ) + + expect((yield* Effect.promise(() => updated.promise)).parts).toEqual([ + { type: "text", text: "also inspect cancellation" }, + ]) + expect(result.metadata.sessionId).toBe(started.metadata.sessionId) + expect(result.metadata.background).toBe(true) + expect(result.output).toContain("Background task updated") + first.resolve() + expect((yield* jobs.get(started.metadata.sessionId))?.status).toBe("running") + + second.resolve() + const waited = yield* jobs.wait({ id: started.metadata.sessionId, timeout: 1_000 }) + expect(waited.info?.status).toBe("completed") + expect(waited.info?.output).toBe("second done") + const notification = yield* Effect.promise(() => injected.promise) + expect(notification.parts[0]?.type).toBe("text") + if (notification.parts[0]?.type === "text") expect(notification.parts[0].text).toContain("second done") + }), + ) + background.instance("background tasks complete through the background job service", () => Effect.gen(function* () { const jobs = yield* BackgroundJob.Service