fix: task id passed to background job for continuation (#30485)
This commit is contained in:
parent
6003217eaa
commit
70cd4bf0ce
@ -1,6 +1,6 @@
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { Identifier } from "@/id/id"
|
||||
import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope, SynchronizedRef } from "effect"
|
||||
import { Cause, Clock, Context, Deferred, Effect, Exit, Layer, Scope, SynchronizedRef } from "effect"
|
||||
|
||||
export type Status = "running" | "completed" | "error" | "cancelled"
|
||||
|
||||
@ -19,7 +19,11 @@ export type Info = {
|
||||
type Active = {
|
||||
info: Info
|
||||
done: Deferred.Deferred<Info>
|
||||
fiber?: Fiber.Fiber<void, unknown>
|
||||
scope: Scope.Closeable
|
||||
token: object
|
||||
pending: number
|
||||
next: number
|
||||
output?: { sequence: number; text: string }
|
||||
}
|
||||
|
||||
type State = {
|
||||
@ -30,6 +34,7 @@ type State = {
|
||||
type FinishResult = {
|
||||
info?: Info
|
||||
done?: Deferred.Deferred<Info>
|
||||
scope?: Scope.Closeable
|
||||
}
|
||||
|
||||
export type StartInput = {
|
||||
@ -40,6 +45,11 @@ export type StartInput = {
|
||||
run: Effect.Effect<string, unknown>
|
||||
}
|
||||
|
||||
export type ExtendInput = {
|
||||
id: string
|
||||
run: Effect.Effect<string, unknown>
|
||||
}
|
||||
|
||||
export type WaitInput = {
|
||||
id: string
|
||||
timeout?: number
|
||||
@ -54,6 +64,7 @@ export interface Interface {
|
||||
readonly list: () => Effect.Effect<Info[]>
|
||||
readonly get: (id: string) => Effect.Effect<Info | undefined>
|
||||
readonly start: (input: StartInput) => Effect.Effect<Info>
|
||||
readonly extend: (input: ExtendInput) => Effect.Effect<boolean>
|
||||
readonly wait: (input: WaitInput) => Effect.Effect<WaitResult>
|
||||
readonly cancel: (id: string) => Effect.Effect<Info | undefined>
|
||||
}
|
||||
@ -84,36 +95,75 @@ export const layer = Layer.effect(
|
||||
}),
|
||||
)
|
||||
|
||||
const finish = Effect.fn("BackgroundJob.finish")(function* (
|
||||
const settle = Effect.fn("BackgroundJob.settle")(function* (
|
||||
id: string,
|
||||
status: Exclude<Status, "running">,
|
||||
data?: { output?: string; error?: string },
|
||||
token: object,
|
||||
sequence: number,
|
||||
exit: Exit.Exit<string, unknown>,
|
||||
) {
|
||||
const completed_at = yield* Clock.currentTimeMillis
|
||||
const s = yield* InstanceState.get(state)
|
||||
const result = yield* SynchronizedRef.modify(
|
||||
(yield* InstanceState.get(state)).jobs,
|
||||
s.jobs,
|
||||
(jobs): readonly [FinishResult, Map<string, Active>] => {
|
||||
const job = jobs.get(id)
|
||||
if (!job) return [{}, jobs]
|
||||
if (job.token !== token) return [{}, jobs]
|
||||
if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs]
|
||||
const pending = job.pending - 1
|
||||
const output = Exit.isSuccess(exit) && (!job.output || sequence > job.output.sequence)
|
||||
? { sequence, text: exit.value }
|
||||
: job.output
|
||||
if (Exit.isSuccess(exit) && pending > 0) {
|
||||
return [{}, new Map(jobs).set(id, { ...job, pending, output })]
|
||||
}
|
||||
const status: Exclude<Status, "running"> = Exit.isSuccess(exit)
|
||||
? "completed"
|
||||
: Cause.hasInterruptsOnly(exit.cause)
|
||||
? "cancelled"
|
||||
: "error"
|
||||
const next = {
|
||||
...job,
|
||||
fiber: undefined,
|
||||
pending: 0,
|
||||
output,
|
||||
info: {
|
||||
...job.info,
|
||||
status,
|
||||
completed_at,
|
||||
...(data?.output !== undefined ? { output: data.output } : {}),
|
||||
...(data?.error !== undefined ? { error: data.error } : {}),
|
||||
...(output ? { output: output.text } : {}),
|
||||
...(Exit.isFailure(exit) ? { error: errorText(Cause.squash(exit.cause)) } : {}),
|
||||
},
|
||||
}
|
||||
return [{ info: snapshot(next), done: job.done }, new Map(jobs).set(id, next)]
|
||||
return [
|
||||
{ info: snapshot(next), done: job.done, scope: job.scope },
|
||||
new Map(jobs).set(id, next),
|
||||
]
|
||||
},
|
||||
)
|
||||
if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore)
|
||||
if (result.scope) {
|
||||
yield* Scope.close(result.scope, Exit.void).pipe(Effect.forkIn(s.scope, { startImmediately: true }))
|
||||
}
|
||||
return result.info
|
||||
})
|
||||
|
||||
const fork = Effect.fn("BackgroundJob.fork")(function* (
|
||||
scope: Scope.Scope,
|
||||
id: string,
|
||||
token: object,
|
||||
sequence: number,
|
||||
run: Effect.Effect<string, unknown>,
|
||||
) {
|
||||
return yield* run.pipe(
|
||||
Effect.matchCauseEffect({
|
||||
onSuccess: (output) => settle(id, token, sequence, Exit.succeed(output)),
|
||||
onFailure: (cause) => settle(id, token, sequence, Exit.failCause(cause)),
|
||||
}),
|
||||
Effect.asVoid,
|
||||
Effect.forkIn(scope, { startImmediately: true }),
|
||||
)
|
||||
})
|
||||
|
||||
const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () {
|
||||
return Array.from((yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).values())
|
||||
.map(snapshot)
|
||||
@ -138,17 +188,9 @@ export const layer = Layer.effect(
|
||||
Effect.fnUntraced(function* (jobs) {
|
||||
const existing = jobs.get(id)
|
||||
if (existing?.info.status === "running") return [snapshot(existing), jobs] as const
|
||||
const fiber = yield* restore(input.run).pipe(
|
||||
Effect.matchCauseEffect({
|
||||
onSuccess: (output) => finish(id, "completed", { output }),
|
||||
onFailure: (cause) =>
|
||||
finish(id, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", {
|
||||
error: errorText(Cause.squash(cause)),
|
||||
}),
|
||||
}),
|
||||
Effect.asVoid,
|
||||
Effect.forkIn(s.scope, { startImmediately: true }),
|
||||
)
|
||||
const scope = yield* Scope.fork(s.scope, "parallel")
|
||||
const token = {}
|
||||
yield* fork(scope, id, token, 0, restore(input.run))
|
||||
const job = {
|
||||
info: {
|
||||
id,
|
||||
@ -159,7 +201,10 @@ export const layer = Layer.effect(
|
||||
metadata: input.metadata,
|
||||
},
|
||||
done,
|
||||
fiber,
|
||||
scope,
|
||||
token,
|
||||
pending: 1,
|
||||
next: 1,
|
||||
}
|
||||
return [snapshot(job), new Map(jobs).set(id, job)] as const
|
||||
}),
|
||||
@ -168,6 +213,30 @@ export const layer = Layer.effect(
|
||||
)
|
||||
})
|
||||
|
||||
const extend: Interface["extend"] = Effect.fn("BackgroundJob.extend")(function* (input) {
|
||||
return yield* Effect.uninterruptibleMask((restore) =>
|
||||
Effect.gen(function* () {
|
||||
const s = yield* InstanceState.get(state)
|
||||
return yield* SynchronizedRef.modifyEffect(
|
||||
s.jobs,
|
||||
Effect.fnUntraced(function* (jobs) {
|
||||
const job = jobs.get(input.id)
|
||||
if (!job || job.info.status !== "running") return [false, jobs] as const
|
||||
yield* fork(job.scope, input.id, job.token, job.next, restore(input.run))
|
||||
return [
|
||||
true,
|
||||
new Map(jobs).set(input.id, {
|
||||
...job,
|
||||
pending: job.pending + 1,
|
||||
next: job.next + 1,
|
||||
}),
|
||||
] as const
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) {
|
||||
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(input.id)
|
||||
if (!job) return { timedOut: false }
|
||||
@ -180,18 +249,34 @@ export const layer = Layer.effect(
|
||||
})
|
||||
|
||||
const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) {
|
||||
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id)
|
||||
if (!job) return
|
||||
if (job.info.status !== "running") return snapshot(job)
|
||||
if (job.fiber) {
|
||||
yield* Fiber.interrupt(job.fiber).pipe(Effect.ignore)
|
||||
yield* Fiber.await(job.fiber).pipe(Effect.ignore)
|
||||
}
|
||||
const info = yield* finish(id, "cancelled")
|
||||
return info
|
||||
const completed_at = yield* Clock.currentTimeMillis
|
||||
const result = yield* SynchronizedRef.modify(
|
||||
(yield* InstanceState.get(state)).jobs,
|
||||
(jobs): readonly [FinishResult, Map<string, Active>] => {
|
||||
const job = jobs.get(id)
|
||||
if (!job) return [{}, jobs]
|
||||
if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs]
|
||||
const next = {
|
||||
...job,
|
||||
pending: 0,
|
||||
info: {
|
||||
...job.info,
|
||||
status: "cancelled" as const,
|
||||
completed_at,
|
||||
},
|
||||
}
|
||||
return [
|
||||
{ info: snapshot(next), done: job.done, scope: job.scope },
|
||||
new Map(jobs).set(id, next),
|
||||
]
|
||||
},
|
||||
)
|
||||
if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore)
|
||||
if (result.scope) yield* Scope.close(result.scope, Exit.void)
|
||||
return result.info
|
||||
})
|
||||
|
||||
return Service.of({ list, get, start, wait, cancel })
|
||||
return Service.of({ list, get, start, extend, wait, cancel })
|
||||
}),
|
||||
)
|
||||
|
||||
|
||||
@ -10,7 +10,7 @@ import { Agent } from "../agent/agent"
|
||||
import { deriveSubagentSessionPermission } from "../agent/subagent-permissions"
|
||||
import type { SessionPrompt } from "../session/prompt"
|
||||
import { Config } from "@/config/config"
|
||||
import { Cause, Effect, Exit, Schema, Scope } from "effect"
|
||||
import { Effect, Exit, Schema, Scope } from "effect"
|
||||
import { EffectBridge } from "@/effect/bridge"
|
||||
import { RuntimeFlags } from "@/effect/runtime-flags"
|
||||
import { Database } from "@opencode-ai/core/database/database"
|
||||
@ -69,6 +69,17 @@ function backgroundOutput(sessionID: SessionID) {
|
||||
].join("\n")
|
||||
}
|
||||
|
||||
function backgroundUpdateOutput(sessionID: SessionID) {
|
||||
return [
|
||||
`<task id="${sessionID}" state="running">`,
|
||||
"<summary>Background task updated</summary>",
|
||||
"<task_result>",
|
||||
"Additional context sent to the background task.",
|
||||
"</task_result>",
|
||||
"</task>",
|
||||
].join("\n")
|
||||
}
|
||||
|
||||
function backgroundMessage(input: {
|
||||
sessionID: SessionID
|
||||
description: string
|
||||
@ -90,11 +101,6 @@ function backgroundMessage(input: {
|
||||
].join("\n")
|
||||
}
|
||||
|
||||
function errorText(error: unknown) {
|
||||
if (error instanceof Error) return error.message
|
||||
return String(error)
|
||||
}
|
||||
|
||||
export const TaskTool = Tool.define(
|
||||
id,
|
||||
Effect.gen(function* () {
|
||||
@ -231,9 +237,16 @@ export const TaskTool = Tool.define(
|
||||
.pipe(Effect.ignore, Effect.forkIn(scope, { startImmediately: true }))
|
||||
})
|
||||
|
||||
const existing = yield* background.get(nextSession.id)
|
||||
if (existing?.status === "running") {
|
||||
return yield* Effect.fail(new Error(`Task ${nextSession.id} is already running.`))
|
||||
if (yield* background.extend({ id: nextSession.id, run: runTask() })) {
|
||||
return {
|
||||
title: params.description,
|
||||
metadata: {
|
||||
...metadata,
|
||||
background: true,
|
||||
jobId: nextSession.id,
|
||||
},
|
||||
output: backgroundUpdateOutput(nextSession.id),
|
||||
}
|
||||
}
|
||||
|
||||
if (runInBackground) {
|
||||
@ -242,16 +255,18 @@ export const TaskTool = Tool.define(
|
||||
type: id,
|
||||
title: params.description,
|
||||
metadata,
|
||||
run: runTask().pipe(
|
||||
Effect.tap((text) => inject("completed", text).pipe(Effect.ignore)),
|
||||
Effect.catchCause((cause) =>
|
||||
(Cause.hasInterruptsOnly(cause)
|
||||
? Effect.void
|
||||
: inject("error", errorText(Cause.squash(cause))).pipe(Effect.ignore)
|
||||
).pipe(Effect.andThen(Effect.failCause(cause))),
|
||||
),
|
||||
),
|
||||
run: runTask(),
|
||||
})
|
||||
yield* background
|
||||
.wait({ id: info.id })
|
||||
.pipe(
|
||||
Effect.flatMap((result) => {
|
||||
if (result.info?.status === "completed") return inject("completed", result.info.output ?? "")
|
||||
if (result.info?.status === "error") return inject("error", result.info.error ?? "")
|
||||
return Effect.void
|
||||
}),
|
||||
Effect.forkIn(scope, { startImmediately: true }),
|
||||
)
|
||||
|
||||
return {
|
||||
title: params.description,
|
||||
|
||||
@ -78,6 +78,38 @@ describe("background.job", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.instance("waits for extensions before completing a running job", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
const first = yield* Deferred.make<void>()
|
||||
const second = yield* Deferred.make<void>()
|
||||
const job = yield* jobs.start({
|
||||
type: "test",
|
||||
run: Deferred.await(first).pipe(Effect.as("first")),
|
||||
})
|
||||
|
||||
expect(yield* jobs.extend({ id: job.id, run: Deferred.await(second).pipe(Effect.as("second")) })).toBe(true)
|
||||
yield* Deferred.succeed(first, undefined)
|
||||
expect((yield* jobs.get(job.id))?.status).toBe("running")
|
||||
|
||||
yield* Deferred.succeed(second, undefined)
|
||||
const done = yield* jobs.wait({ id: job.id })
|
||||
expect(done.info?.status).toBe("completed")
|
||||
expect(done.info?.output).toBe("second")
|
||||
}),
|
||||
)
|
||||
|
||||
it.instance("rejects extensions after a job completes", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
const job = yield* jobs.start({ type: "test", run: Effect.succeed("done") })
|
||||
yield* jobs.wait({ id: job.id })
|
||||
|
||||
expect(yield* jobs.extend({ id: job.id, run: Effect.succeed("late") })).toBe(false)
|
||||
expect((yield* jobs.get(job.id))?.output).toBe("done")
|
||||
}),
|
||||
)
|
||||
|
||||
it.instance("records failed jobs", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
@ -93,19 +125,56 @@ describe("background.job", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
it.instance("ignores stale settlements after restarting a failed job", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
const fail = yield* Deferred.make<void>()
|
||||
const interrupted = yield* Deferred.make<void>()
|
||||
const release = yield* Deferred.make<void>()
|
||||
const id = "job_test"
|
||||
yield* jobs.start({
|
||||
id,
|
||||
type: "test",
|
||||
run: Deferred.await(fail).pipe(Effect.andThen(Effect.fail(new Error("boom")))),
|
||||
})
|
||||
yield* jobs.extend({
|
||||
id,
|
||||
run: Effect.never.pipe(
|
||||
Effect.ensuring(Deferred.succeed(interrupted, undefined).pipe(Effect.andThen(Deferred.await(release)))),
|
||||
),
|
||||
})
|
||||
|
||||
yield* Deferred.succeed(fail, undefined)
|
||||
expect((yield* jobs.wait({ id })).info?.status).toBe("error")
|
||||
yield* Deferred.await(interrupted)
|
||||
yield* jobs.start({ id, type: "test", run: Effect.never })
|
||||
|
||||
yield* Deferred.succeed(release, undefined)
|
||||
yield* Effect.yieldNow
|
||||
expect((yield* jobs.get(id))?.status).toBe("running")
|
||||
yield* jobs.cancel(id)
|
||||
}),
|
||||
)
|
||||
|
||||
it.instance("can cancel running jobs", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
const interrupted = yield* Deferred.make<void>()
|
||||
const extendedInterrupted = yield* Deferred.make<void>()
|
||||
const job = yield* jobs.start({
|
||||
type: "test",
|
||||
run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(interrupted, undefined))),
|
||||
})
|
||||
yield* jobs.extend({
|
||||
id: job.id,
|
||||
run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(extendedInterrupted, undefined))),
|
||||
})
|
||||
|
||||
const cancelled = yield* jobs.cancel(job.id)
|
||||
|
||||
expect(cancelled?.status).toBe("cancelled")
|
||||
yield* Deferred.await(interrupted).pipe(Effect.timeout("1 second"))
|
||||
yield* Deferred.await(extendedInterrupted).pipe(Effect.timeout("1 second"))
|
||||
expect((yield* jobs.get(job.id))?.status).toBe("cancelled")
|
||||
}),
|
||||
)
|
||||
|
||||
@ -518,6 +518,79 @@ describe("tool.task", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
background.instance("background task completion waits for running updates", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
const { chat, assistant } = yield* seed()
|
||||
const tool = yield* TaskTool
|
||||
const def = yield* tool.init()
|
||||
const first = defer<void>()
|
||||
const second = defer<void>()
|
||||
const updated = defer<SessionPrompt.PromptInput>()
|
||||
const injected = defer<SessionPrompt.PromptInput>()
|
||||
let prompts = 0
|
||||
const promptOps: TaskPromptOps = {
|
||||
...stubOps(),
|
||||
prompt: (input) => {
|
||||
if (input.sessionID === chat.id) {
|
||||
injected.resolve(input)
|
||||
return Effect.succeed(reply(input, "done"))
|
||||
}
|
||||
prompts++
|
||||
if (prompts === 1) return Effect.promise(() => first.promise).pipe(Effect.as(reply(input, "first done")))
|
||||
updated.resolve(input)
|
||||
return Effect.promise(() => second.promise).pipe(Effect.as(reply(input, "second done")))
|
||||
},
|
||||
}
|
||||
const context = {
|
||||
sessionID: chat.id,
|
||||
messageID: assistant.id,
|
||||
agent: "build",
|
||||
abort: new AbortController().signal,
|
||||
extra: { promptOps },
|
||||
messages: [],
|
||||
metadata: () => Effect.void,
|
||||
ask: () => Effect.void,
|
||||
}
|
||||
|
||||
const started = yield* def.execute(
|
||||
{
|
||||
description: "inspect bug",
|
||||
prompt: "look into the cache key path",
|
||||
subagent_type: "general",
|
||||
background: true,
|
||||
},
|
||||
context,
|
||||
)
|
||||
const result = yield* def.execute(
|
||||
{
|
||||
description: "add investigation scope",
|
||||
prompt: "also inspect cancellation",
|
||||
subagent_type: "general",
|
||||
task_id: started.metadata.sessionId,
|
||||
},
|
||||
context,
|
||||
)
|
||||
|
||||
expect((yield* Effect.promise(() => updated.promise)).parts).toEqual([
|
||||
{ type: "text", text: "also inspect cancellation" },
|
||||
])
|
||||
expect(result.metadata.sessionId).toBe(started.metadata.sessionId)
|
||||
expect(result.metadata.background).toBe(true)
|
||||
expect(result.output).toContain("Background task updated")
|
||||
first.resolve()
|
||||
expect((yield* jobs.get(started.metadata.sessionId))?.status).toBe("running")
|
||||
|
||||
second.resolve()
|
||||
const waited = yield* jobs.wait({ id: started.metadata.sessionId, timeout: 1_000 })
|
||||
expect(waited.info?.status).toBe("completed")
|
||||
expect(waited.info?.output).toBe("second done")
|
||||
const notification = yield* Effect.promise(() => injected.promise)
|
||||
expect(notification.parts[0]?.type).toBe("text")
|
||||
if (notification.parts[0]?.type === "text") expect(notification.parts[0].text).toContain("second done")
|
||||
}),
|
||||
)
|
||||
|
||||
background.instance("background tasks complete through the background job service", () =>
|
||||
Effect.gen(function* () {
|
||||
const jobs = yield* BackgroundJob.Service
|
||||
|
||||
Loading…
Reference in New Issue
Block a user