fix(core): settle interrupted assistant steps (#33266)
This commit is contained in:
parent
ff837fe949
commit
49593c1ec4
@ -305,14 +305,7 @@ export const layer = Layer.effect(
|
||||
const llmFailure = failure instanceof LLMError ? failure : undefined
|
||||
if (llmFailure && !publisher.hasProviderError()) {
|
||||
yield* withPublication(publisher.failUnsettledTools("Provider did not return a tool result", true))
|
||||
yield* withPublication(
|
||||
events.publish(SessionEvent.Step.Failed, {
|
||||
sessionID: session.id,
|
||||
timestamp: yield* DateTime.now,
|
||||
assistantMessageID: yield* publisher.startAssistant(),
|
||||
error: { type: "unknown", message: llmFailure.reason.message },
|
||||
}),
|
||||
)
|
||||
yield* withPublication(publisher.failAssistant(llmFailure.reason.message))
|
||||
}
|
||||
if (stream._tag === "Failure" && Cause.hasInterrupts(stream.cause)) yield* FiberSet.clear(toolFibers)
|
||||
const settled = yield* restore(awaitToolFibers(toolFibers)).pipe(Effect.exit)
|
||||
@ -327,6 +320,8 @@ export const layer = Layer.effect(
|
||||
) {
|
||||
yield* FiberSet.clear(toolFibers)
|
||||
yield* withPublication(publisher.failUnsettledTools("Tool execution interrupted"))
|
||||
if (publisher.hasActiveAssistant())
|
||||
yield* withPublication(publisher.failAssistant("Provider turn interrupted"))
|
||||
}
|
||||
if (settled._tag === "Failure" && !Cause.hasInterrupts(settled.cause)) {
|
||||
const failure = Cause.squash(settled.cause)
|
||||
|
||||
@ -65,11 +65,14 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
>()
|
||||
const timestamp = DateTime.now
|
||||
let assistantMessageID: SessionMessage.ID | undefined
|
||||
let assistantActive = false
|
||||
let assistantFailed = false
|
||||
let providerFailed = false
|
||||
|
||||
const startAssistant = Effect.fnUntraced(function* () {
|
||||
if (assistantMessageID !== undefined) return assistantMessageID
|
||||
assistantMessageID = SessionMessage.ID.create()
|
||||
assistantActive = true
|
||||
yield* events.publish(SessionEvent.Step.Started, {
|
||||
...input,
|
||||
assistantMessageID,
|
||||
@ -190,6 +193,20 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
yield* flushFragments()
|
||||
})
|
||||
|
||||
const failAssistant = Effect.fnUntraced(function* (message: string) {
|
||||
if (assistantFailed) return
|
||||
yield* flush()
|
||||
const assistantMessageID = yield* startAssistant()
|
||||
assistantActive = false
|
||||
assistantFailed = true
|
||||
yield* events.publish(SessionEvent.Step.Failed, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: yield* timestamp,
|
||||
assistantMessageID,
|
||||
error: { type: "unknown", message },
|
||||
})
|
||||
})
|
||||
|
||||
const failUnsettledTools = Effect.fn("SessionRunner.failUnsettledTools")(function* (
|
||||
message: string,
|
||||
hostedOnly = false,
|
||||
@ -375,6 +392,7 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
}
|
||||
case "step-finish":
|
||||
yield* flush()
|
||||
assistantActive = false
|
||||
yield* events.publish(SessionEvent.Step.Ended, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: yield* timestamp,
|
||||
@ -388,13 +406,7 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
return
|
||||
case "provider-error":
|
||||
providerFailed = true
|
||||
yield* flush()
|
||||
yield* events.publish(SessionEvent.Step.Failed, {
|
||||
sessionID: input.sessionID,
|
||||
timestamp: yield* timestamp,
|
||||
assistantMessageID: yield* startAssistant(),
|
||||
error: { type: "unknown", message: event.message },
|
||||
})
|
||||
yield* failAssistant(event.message)
|
||||
return
|
||||
}
|
||||
})
|
||||
@ -402,10 +414,11 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
|
||||
return {
|
||||
publish,
|
||||
flush,
|
||||
failAssistant,
|
||||
failUnsettledTools,
|
||||
hasActiveAssistant: () => assistantActive,
|
||||
hasAssistantStarted: () => assistantMessageID !== undefined,
|
||||
hasProviderError: () => providerFailed,
|
||||
startAssistant,
|
||||
assistantMessageID: assistantMessageIDForTool,
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,12 +82,21 @@ const assistant = (message: SessionMessage.Assistant, model: Model) => {
|
||||
const result = toolResult(item, sameModel ? (item.provider?.resultMetadata ?? item.provider?.metadata) : undefined)
|
||||
return item.provider?.executed === true && result ? [call, result] : [call]
|
||||
})
|
||||
const meaningful = content.filter((part) => {
|
||||
if (part.type === "text") return part.text !== ""
|
||||
if (part.type !== "reasoning") return true
|
||||
return part.text !== "" || (part.providerMetadata !== undefined && Object.keys(part.providerMetadata).length > 0)
|
||||
})
|
||||
const results = message.content
|
||||
.filter((item): item is SessionMessage.AssistantTool => item.type === "tool" && item.provider?.executed !== true)
|
||||
.map((item) => toolResult(item, sameModel ? (item.provider?.resultMetadata ?? item.provider?.metadata) : undefined))
|
||||
.filter((message) => message !== undefined)
|
||||
.map(Message.tool)
|
||||
return [Message.make({ id: message.id, role: "assistant", content, metadata: message.metadata }), ...results]
|
||||
if (meaningful.length === 0) return results
|
||||
return [
|
||||
Message.make({ id: message.id, role: "assistant", content: meaningful, metadata: message.metadata }),
|
||||
...results,
|
||||
]
|
||||
}
|
||||
|
||||
function toLLMMessage(message: SessionMessage.Message, model: Model): Message[] {
|
||||
|
||||
@ -14,6 +14,39 @@ const id = (value: string) => SessionMessage.ID.make(`msg_${value}`)
|
||||
const model = Model.make({ id: "model", provider: "provider", route: OpenAIChat.route })
|
||||
|
||||
describe("toLLMMessages", () => {
|
||||
test("omits empty assistant turns", () => {
|
||||
const assistant = (value: string, content: SessionMessage.Assistant["content"]) =>
|
||||
new SessionMessage.Assistant({
|
||||
id: id(value),
|
||||
type: "assistant",
|
||||
agent: "build",
|
||||
model: { id: ModelV2.ID.make("model"), providerID: ProviderV2.ID.make("provider") },
|
||||
content,
|
||||
time: { created, completed: created },
|
||||
})
|
||||
const messages = toLLMMessages(
|
||||
[
|
||||
assistant("empty", []),
|
||||
assistant("empty-text", [new SessionMessage.AssistantText({ type: "text", id: "empty", text: "" })]),
|
||||
assistant("empty-reasoning", [
|
||||
new SessionMessage.AssistantReasoning({ type: "reasoning", id: "empty-reasoning", text: "" }),
|
||||
]),
|
||||
assistant("text", [new SessionMessage.AssistantText({ type: "text", id: "text", text: "Partial" })]),
|
||||
assistant("reasoning", [
|
||||
new SessionMessage.AssistantReasoning({
|
||||
type: "reasoning",
|
||||
id: "reasoning",
|
||||
text: "",
|
||||
providerMetadata: { anthropic: { signature: "sig_1" } },
|
||||
}),
|
||||
]),
|
||||
],
|
||||
model,
|
||||
)
|
||||
|
||||
expect(messages.map((message) => message.id)).toEqual([id("text"), id("reasoning")])
|
||||
})
|
||||
|
||||
test("maps every top-level V2 Session message type", () => {
|
||||
const file = new FileAttachment({ uri: "data:image/png;base64,aGVsbG8=", mime: "image/png", name: "hello.png" })
|
||||
const messages = toLLMMessages(
|
||||
|
||||
@ -547,6 +547,8 @@ const verifyPartialFlushOnInterruption = (kind: FragmentKind) =>
|
||||
{ type: "user", text: prompt },
|
||||
{
|
||||
type: "assistant",
|
||||
finish: "error",
|
||||
error: { type: "unknown", message: "Provider turn interrupted" },
|
||||
content: [
|
||||
kind === "tool input"
|
||||
? { type: "tool", id: fragmentID(kind, "interrupted"), state: { status: "error" } }
|
||||
|
||||
Loading…
Reference in New Issue
Block a user