From 6a3b2f339a656c5dbbb6dfae43b26388b2398053 Mon Sep 17 00:00:00 2001 From: Simon Klee Date: Mon, 1 Jun 2026 15:11:45 +0200 Subject: [PATCH] add run --replay mode (#30239) --- bun.lock | 30 +- package.json | 6 +- packages/opencode/src/cli/cmd/run.ts | 2 +- packages/opencode/src/cli/cmd/run/footer.ts | 40 +- .../src/cli/cmd/run/runtime.lifecycle.ts | 44 +- .../opencode/src/cli/cmd/run/runtime.queue.ts | 23 +- packages/opencode/src/cli/cmd/run/runtime.ts | 81 ++- .../opencode/src/cli/cmd/run/session-data.ts | 5 + .../src/cli/cmd/run/session-replay.ts | 111 ++++- .../src/cli/cmd/run/stream.transport.ts | 232 ++++++++- packages/opencode/src/cli/cmd/run/types.ts | 15 + .../__snapshots__/help-snapshots.test.ts.snap | 2 +- .../test/cli/run/runtime.queue.test.ts | 5 +- .../test/cli/run/session-replay.test.ts | 284 ++++++++++- .../test/cli/run/stream.transport.test.ts | 464 +++++++++++++++++- packages/plugin/package.json | 6 +- 16 files changed, 1292 insertions(+), 58 deletions(-) diff --git a/bun.lock b/bun.lock index 74b6df0fc..e9f71f338 100644 --- a/bun.lock +++ b/bun.lock @@ -611,9 +611,9 @@ "typescript": "catalog:", }, "peerDependencies": { - "@opentui/core": ">=0.3.0", - "@opentui/keymap": ">=0.3.0", - "@opentui/solid": ">=0.3.0", + "@opentui/core": ">=0.3.1", + "@opentui/keymap": ">=0.3.1", + "@opentui/solid": ">=0.3.1", }, "optionalPeers": [ "@opentui/core", @@ -862,9 +862,9 @@ "@npmcli/arborist": "9.4.0", "@octokit/rest": "22.0.0", "@openauthjs/openauth": "0.0.0-20250322224806", - "@opentui/core": "0.3.0", - "@opentui/keymap": "0.3.0", - "@opentui/solid": "0.3.0", + "@opentui/core": "0.3.1", + "@opentui/keymap": "0.3.1", + "@opentui/solid": "0.3.1", "@pierre/diffs": "1.1.0-beta.18", "@playwright/test": "1.59.1", "@sentry/solid": "10.36.0", @@ -1758,23 +1758,23 @@ "@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.40.0", "", {}, "sha512-cifvXDhcqMwwTlTK04GBNeIe7yyo28Mfby85QXFe1Yk8nmi36Ab/5UQwptOx84SsoGNRg+EVSjwzfSZMy6pmlw=="], - "@opentui/core": ["@opentui/core@0.3.0", "", { "dependencies": { "bun-ffi-structs": "0.2.2", "diff": "9.0.0", "marked": "17.0.1", "string-width": "7.2.0", "strip-ansi": "7.1.2", "yoga-layout": "3.2.1" }, "optionalDependencies": { "@opentui/core-darwin-arm64": "0.3.0", "@opentui/core-darwin-x64": "0.3.0", "@opentui/core-linux-arm64": "0.3.0", "@opentui/core-linux-x64": "0.3.0", "@opentui/core-win32-arm64": "0.3.0", "@opentui/core-win32-x64": "0.3.0" }, "peerDependencies": { "web-tree-sitter": "0.25.10" } }, "sha512-wvNESYGYGRLuvarZ3QY4CTB+BziZ/j6Snd9qRKD4fQ7SF6G4UpYElLTFrg7uzRo1v7WJTqbquymcTvWEHMnpYA=="], + "@opentui/core": ["@opentui/core@0.3.1", "", { "dependencies": { "bun-ffi-structs": "0.2.2", "diff": "9.0.0", "marked": "17.0.1", "string-width": "7.2.0", "strip-ansi": "7.1.2", "yoga-layout": "3.2.1" }, "optionalDependencies": { "@opentui/core-darwin-arm64": "0.3.1", "@opentui/core-darwin-x64": "0.3.1", "@opentui/core-linux-arm64": "0.3.1", "@opentui/core-linux-x64": "0.3.1", "@opentui/core-win32-arm64": "0.3.1", "@opentui/core-win32-x64": "0.3.1" }, "peerDependencies": { "web-tree-sitter": "0.25.10" } }, "sha512-kQFSsSCgtlasSqTigCgKmM67xaquGvTg+vwimDnFSZtcBEt4E3dz7qLrbeh5FVvTA+RMbwe+Bozq03PW+SgjXw=="], - "@opentui/core-darwin-arm64": ["@opentui/core-darwin-arm64@0.3.0", "", { "os": "darwin", "cpu": "arm64" }, "sha512-/eDfAcutAHJqR9spwHMLuo6LMqngymev/m+i6uqlk98gX1EJiJe2pJ16sKbp3RctgH/Gz/8TYOhVHpPGYJl7yQ=="], + "@opentui/core-darwin-arm64": ["@opentui/core-darwin-arm64@0.3.1", "", { "os": "darwin", "cpu": "arm64" }, "sha512-krvVfiBpeBY+727R8yogdqIcxkK3RUVcI97bqjl8jTeDMcWOkFFfHezssRMPmbR5x++1tX669Fz3fuxoe7XUIg=="], - "@opentui/core-darwin-x64": ["@opentui/core-darwin-x64@0.3.0", "", { "os": "darwin", "cpu": "x64" }, "sha512-/j6EWAvdwhz1wU/mWfXepAf3+NuMYz2Ic5ozaid5LdwIpPomIkM9yCUDm76mQhRBbjsAl/7UeSeUA0qSCMSZBg=="], + "@opentui/core-darwin-x64": ["@opentui/core-darwin-x64@0.3.1", "", { "os": "darwin", "cpu": "x64" }, "sha512-D/6ec5H8SPpSBMr01/sqgSddIl1Qc1QMKsDl/wV5MpbxYc7Qvie9qlNvvoSsWNfAXAbafLRb1jQBzouk41cp1w=="], - "@opentui/core-linux-arm64": ["@opentui/core-linux-arm64@0.3.0", "", { "os": "linux", "cpu": "arm64" }, "sha512-uUFVT3V35KkM1m8gaLmRcTV9dsJzXnxwM+dv6+NjScx0W/Y0CJKbW9wDYwnLyPnBNgaFUi171zmJra5gTtFTsw=="], + "@opentui/core-linux-arm64": ["@opentui/core-linux-arm64@0.3.1", "", { "os": "linux", "cpu": "arm64" }, "sha512-E/FFBoAsWJyS/EO/cF7h7DuEENYa9nAdSv1W/TIyKXpBisN6K3U1Xgbk528TkfWjrwJjhGs+9OMYdXuAHd5LTw=="], - "@opentui/core-linux-x64": ["@opentui/core-linux-x64@0.3.0", "", { "os": "linux", "cpu": "x64" }, "sha512-73bNNNU2OaqZQLIlvzDOdAzQmzBAqf+cSilmJ+Y9JnybrBn1d6VShC66+V4xxIgonq1swk7BD+SUHYbwwGilQA=="], + "@opentui/core-linux-x64": ["@opentui/core-linux-x64@0.3.1", "", { "os": "linux", "cpu": "x64" }, "sha512-Btb7Q4BOC55Aj2qCs0VoxGuj87DNfUEaSx0z89oeU4npTN+6SpJApyGZTCNNeSe2sdmOGeh/8eAR4X96ORjcKg=="], - "@opentui/core-win32-arm64": ["@opentui/core-win32-arm64@0.3.0", "", { "os": "win32", "cpu": "arm64" }, "sha512-jg5KrV/4mVQ0mdkcL9CtQVtBk0NAtQ+2rCKoZ/jNHB6GxGK0ot9vDV6P3X68hZVkvpb2pdXfg6GRsZJ+Np4hZA=="], + "@opentui/core-win32-arm64": ["@opentui/core-win32-arm64@0.3.1", "", { "os": "win32", "cpu": "arm64" }, "sha512-+lt24u3KwEPG69oXDOLz9N484wPcAHvrPbDNU77OT6DvWew+StAjh40eY+Zeu0TkTNDWfj7qnQKV0GKWtFA3cw=="], - "@opentui/core-win32-x64": ["@opentui/core-win32-x64@0.3.0", "", { "os": "win32", "cpu": "x64" }, "sha512-kiM3C5bwQBTfrJKAOfb+L3U6MMkPSQlMhAERlLMjqSurc+llcyqygr/wbXSvfAqJtKlIpf3MKJRnVFTyfRIdng=="], + "@opentui/core-win32-x64": ["@opentui/core-win32-x64@0.3.1", "", { "os": "win32", "cpu": "x64" }, "sha512-eVkKMYirYgpn92lI0YT/GKru4J+UiXjzwyzNRFX+P59OHXvL3GFdqJMcJmX4/zvyjg4c8HDnU79YLnyG+TlXLw=="], - "@opentui/keymap": ["@opentui/keymap@0.3.0", "", { "dependencies": { "@opentui/core": "0.3.0" }, "peerDependencies": { "@opentui/react": "0.3.0", "@opentui/solid": "0.3.0", "react": ">=19.2.0", "solid-js": "1.9.12" }, "optionalPeers": ["@opentui/react", "@opentui/solid", "react", "solid-js"] }, "sha512-lJN57DanKujy3u0IhfSMCShvXIobRjhprdkrdM3brQoX6wxk7gTFE8fTCCz9z1nINkXNsKHQ6grZO1dsT/0mzA=="], + "@opentui/keymap": ["@opentui/keymap@0.3.1", "", { "dependencies": { "@opentui/core": "0.3.1" }, "peerDependencies": { "@opentui/react": "0.3.1", "@opentui/solid": "0.3.1", "react": ">=19.2.0", "solid-js": "1.9.12" }, "optionalPeers": ["@opentui/react", "@opentui/solid", "react", "solid-js"] }, "sha512-BTj+ggsarO2uyvd6CWzvgfsekA8c4aEclbAPKPZGVjBI3Fo5+KAHUrXvteFO5qpGMANfEJTtVHoRu5cic1Nlaw=="], - "@opentui/solid": ["@opentui/solid@0.3.0", "", { "dependencies": { "@babel/core": "7.28.0", "@babel/preset-typescript": "7.27.1", "@opentui/core": "0.3.0", "babel-plugin-module-resolver": "5.0.2", "babel-preset-solid": "1.9.12", "entities": "7.0.1", "s-js": "^0.4.9" }, "peerDependencies": { "solid-js": "1.9.12" } }, "sha512-AUtNzvgkdW81Ftl0sahAy3tY1LIPSMzBw3APBC8jiDAzzPv4kYVdyWXryTxLbU2q+Pgtr57VwKwHgc5wsNrd2w=="], + "@opentui/solid": ["@opentui/solid@0.3.1", "", { "dependencies": { "@babel/core": "7.28.0", "@babel/preset-typescript": "7.27.1", "@opentui/core": "0.3.1", "babel-plugin-module-resolver": "5.0.2", "babel-preset-solid": "1.9.12", "entities": "7.0.1", "s-js": "^0.4.9" }, "peerDependencies": { "solid-js": "1.9.12" } }, "sha512-2R6wEijfMub9COTBCm8IKVj2y7+Sc4fZZjJawxk8sE6+++mzeUaokKNJTlYhZXpMju4LKMv6j9CjWkG8JYfbcg=="], "@oslojs/asn1": ["@oslojs/asn1@1.0.0", "", { "dependencies": { "@oslojs/binary": "1.0.0" } }, "sha512-zw/wn0sj0j0QKbIXfIlnEcTviaCzYOY3V5rAyjR6YtOByFtJiT574+8p9Wlach0lZH9fddD4yb9laEAIl4vXQA=="], diff --git a/package.json b/package.json index 0b5119d94..e1aed6e8b 100644 --- a/package.json +++ b/package.json @@ -38,9 +38,9 @@ "@types/cross-spawn": "6.0.6", "@octokit/rest": "22.0.0", "@hono/zod-validator": "0.4.2", - "@opentui/core": "0.3.0", - "@opentui/keymap": "0.3.0", - "@opentui/solid": "0.3.0", + "@opentui/core": "0.3.1", + "@opentui/keymap": "0.3.1", + "@opentui/solid": "0.3.1", "ulid": "3.0.1", "@kobalte/core": "0.13.11", "@types/luxon": "3.7.1", diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index b80a2389e..cdbf4562d 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -221,7 +221,7 @@ export const RunCommand = effectCmd({ .option("replay", { type: "boolean", default: false, - describe: "replay visible session history on interactive resume", + describe: "replay interactive session history on resume and after resize", }) .option("replay-limit", { type: "number", diff --git a/packages/opencode/src/cli/cmd/run/footer.ts b/packages/opencode/src/cli/cmd/run/footer.ts index 16c6b2420..90ca009e3 100644 --- a/packages/opencode/src/cli/cmd/run/footer.ts +++ b/packages/opencode/src/cli/cmd/run/footer.ts @@ -171,6 +171,7 @@ export class RunFooter implements FooterApi { private queue: StreamCommit[] = [] private pending = false private flushing: Promise = Promise.resolve() + private flushError: unknown // Fixed portion of footer height above the textarea. private base: number private rows = TEXTAREA_MIN_ROWS @@ -204,6 +205,15 @@ export class RunFooter implements FooterApi { private requestExitHandler: (() => boolean) | undefined private scrollback: RunScrollbackStream + private createScrollback(wrote: boolean): RunScrollbackStream { + return new RunScrollbackStream(this.renderer, this.options.theme, { + diffStyle: this.options.diffStyle, + wrote, + sessionID: this.options.sessionID, + treeSitterClient: this.options.treeSitterClient, + }) + } + constructor( private renderer: CliRenderer, private options: RunFooterOptions, @@ -257,12 +267,7 @@ export class RunFooter implements FooterApi { this.queuedPrompts = queuedPrompts this.setQueuedPrompts = setQueuedPrompts this.base = Math.max(1, renderer.footerHeight - TEXTAREA_MIN_ROWS) - this.scrollback = new RunScrollbackStream(renderer, options.theme, { - diffStyle: options.diffStyle, - wrote: options.wrote, - sessionID: options.sessionID, - treeSitterClient: options.treeSitterClient, - }) + this.scrollback = this.createScrollback(options.wrote ?? false) this.renderer.on(CliRenderEvents.DESTROY, this.handleDestroy) @@ -465,7 +470,9 @@ export class RunFooter implements FooterApi { }, ), ) - .catch(() => {}) + .catch((error) => { + this.flushError = error + }) } private present(view: FooterView): void { @@ -523,6 +530,12 @@ export class RunFooter implements FooterApi { } return this.flushing.then(async () => { + if (this.flushError !== undefined) { + const error = this.flushError + this.flushError = undefined + throw error + } + if (this.isGone) { return } @@ -535,6 +548,15 @@ export class RunFooter implements FooterApi { }) } + public resetForReplay(wrote: boolean): void { + if (this.isGone) { + return + } + + this.scrollback.destroy() + this.scrollback = this.createScrollback(wrote) + } + public close(): void { if (this.closed) { return @@ -936,6 +958,8 @@ export class RunFooter implements FooterApi { }, ), ) - .catch(() => {}) + .catch((error) => { + this.flushError = error + }) } } diff --git a/packages/opencode/src/cli/cmd/run/runtime.lifecycle.ts b/packages/opencode/src/cli/cmd/run/runtime.lifecycle.ts index bc44aafa9..389f868ee 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.lifecycle.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.lifecycle.ts @@ -8,7 +8,7 @@ // // Also wires SIGINT so Ctrl-c clears a live prompt draft first, then falls // back to the usual two-press exit sequence through RunFooter.requestExit(). -import { createCliRenderer, type CliRenderer, type ScrollbackWriter } from "@opentui/core" +import { CliRenderEvents, createCliRenderer, type CliRenderer, type ScrollbackWriter } from "@opentui/core" import { createDefaultOpenTuiKeymap } from "@opentui/keymap/opentui" import { Session as SessionApi } from "@/session/session" import { registerOpencodeKeymap } from "@/cli/cmd/tui/keymap" @@ -75,6 +75,8 @@ export type LifecycleInput = { export type Lifecycle = { footer: FooterApi + onResize(fn: () => void): () => void + resetForReplay(input: { sessionTitle?: string; sessionID?: string; history: RunPrompt[] }): Promise close(input: { showExit: boolean; sessionTitle?: string; sessionID?: string; history?: RunPrompt[] }): Promise } @@ -307,6 +309,46 @@ export async function createRuntimeLifecycle(input: LifecycleInput): Promise { + if (width === renderer.terminalWidth && height === renderer.terminalHeight) { + return + } + + width = renderer.terminalWidth + height = renderer.terminalHeight + fn() + } + renderer.on(CliRenderEvents.RESIZE, resize) + return () => renderer.off(CliRenderEvents.RESIZE, resize) + }, + async resetForReplay(next) { + if (closed || renderer.isDestroyed || footer.isClosed) { + throw new Error("runtime closed") + } + + await footer.idle() + if (closed || renderer.isDestroyed || footer.isClosed) { + throw new Error("runtime closed") + } + + footer.resetForReplay(true) + renderer.resetSplitFooterForReplay({ clearSavedLines: true }) + const splash = splashInfo(next.sessionTitle ?? input.sessionTitle, next.history) + renderer.writeToScrollback( + entrySplash({ + ...splashMeta({ + title: splash.title, + session_id: next.sessionID ?? input.getSessionID?.() ?? input.sessionID, + }), + theme: theme.splash, + showSession: splash.showSession, + }), + ) + renderer.requestRender() + }, close, } } catch (error) { diff --git a/packages/opencode/src/cli/cmd/run/runtime.queue.ts b/packages/opencode/src/cli/cmd/run/runtime.queue.ts index 64172e828..e575647af 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.queue.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.queue.ts @@ -162,7 +162,14 @@ export async function runPromptQueue(input: QueueInput): Promise { continue } - state.active = prompt + const sent = + prompt.mode === "shell" + ? prompt + : { + ...prompt, + messageID: prompt.messageID ?? queued?.messageID ?? MessageID.ascending(), + } + state.active = sent emit( { @@ -185,18 +192,24 @@ export async function runPromptQueue(input: QueueInput): Promise { break } - if (prompt.mode !== "shell") { - const commit = { kind: "user", text: prompt.text, phase: "start", source: "system" } as const + if (sent.mode !== "shell") { + const commit = { + kind: "user", + text: sent.text, + phase: "start", + source: "system", + messageID: sent.messageID, + } as const input.trace?.write("ui.commit", commit) input.footer.append(commit) } - input.onSend?.(prompt) + input.onSend?.(sent) if (state.closed) { break } - const task = input.run(prompt, ctrl.signal).then( + const task = input.run(sent, ctrl.signal).then( () => ({ type: "done" as const }), (error) => ({ type: "error" as const, error }), ) diff --git a/packages/opencode/src/cli/cmd/run/runtime.ts b/packages/opencode/src/cli/cmd/run/runtime.ts index 2e2b07398..01665db1c 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.ts @@ -14,13 +14,14 @@ // 4. runs the prompt queue until the footer closes. import { createOpencodeClient } from "@opencode-ai/sdk/v2" import { Flag } from "@opencode-ai/core/flag/flag" +import { MessageID } from "@/session/schema" import { createRunDemo } from "./demo" import { resolveModelInfo, resolveRunTuiConfig, resolveSessionInfo } from "./runtime.boot" import { createRuntimeLifecycle } from "./runtime.lifecycle" import { recordRunSpanError, setRunSpanAttributes, withRunSpan } from "./otel" import { trace } from "./trace" import { cycleVariant, formatModelLabel, resolveSavedVariant, resolveVariant, saveVariant } from "./variant.shared" -import type { RunInput, RunPrompt, RunProvider } from "./types" +import type { LocalReplayAnchor, LocalReplayRow, RunInput, RunPrompt, RunProvider, StreamCommit } from "./types" /** @internal Exported for testing */ export { pickVariant, resolveVariant } from "./variant.shared" @@ -114,6 +115,7 @@ type RuntimeState = { activeVariant: string | undefined sessionID: string history: RunPrompt[] + localRows: LocalReplayRow[] sessionTitle?: string agent: string | undefined switching?: Promise @@ -139,6 +141,9 @@ function variantsFor(providers: RunProvider[], model: RunInput["model"]) { return Object.keys(providers.find((item) => item.id === model.providerID)?.models?.[model.modelID]?.variants ?? {}) } +const REPLAY_RESIZE_DELAY = 250 +const LOCAL_REPLAY_ROW_LIMIT = 100 + async function resolveExitTitle( ctx: BootContext, input: RunRuntimeInput, @@ -196,6 +201,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { activeVariant: resolveVariant(ctx.variant, session.variant, savedVariant, []), sessionID: ctx.sessionID, history: [...session.history], + localRows: [], sessionTitle: ctx.sessionTitle, agent: ctx.agent, } @@ -374,6 +380,9 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { }, }) const footer = shell.footer + const rememberLocal = (commit: StreamCommit, after?: LocalReplayAnchor) => { + state.localRows = [...state.localRows, { commit, after }].slice(-LOCAL_REPLAY_ROW_LIMIT) + } const loadCatalog = async (): Promise => { if (footer.isClosed) { @@ -510,6 +519,36 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { return next } + let replayResizeTimer: ReturnType | undefined + const offResize = input.replay + ? shell.onResize(() => { + if (replayResizeTimer) { + clearTimeout(replayResizeTimer) + } + + replayResizeTimer = setTimeout(() => { + replayResizeTimer = undefined + if (footer.isClosed || !state.stream) { + return + } + + void state.stream + .then((item) => + item.handle.replayOnResize({ + localRows: () => state.localRows, + reset: () => + shell.resetForReplay({ + sessionTitle: state.sessionTitle, + sessionID: state.sessionID, + history: state.history, + }), + }), + ) + .catch(() => {}) + }, REPLAY_RESIZE_DELAY) + }) + : () => {} + const runQueue = async () => { let includeFiles = true if (state.demo) { @@ -525,6 +564,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { onSend: (prompt) => { state.shown = true state.history.push(prompt) + if (prompt.mode !== "shell") { + rememberLocal({ + kind: "user", + text: prompt.text, + phase: "start", + source: "system", + messageID: prompt.messageID, + }) + } }, onNewSession: createSession ? async () => { @@ -545,6 +593,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { state.sessionTitle = created.sessionTitle state.agent = created.agent ?? state.agent state.history = [] + state.localRows = [] includeFiles = true state.demo = input.demo ? createRunDemo({ @@ -598,12 +647,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { status: "failed to start new session", }, }) - footer.append({ + const commit = { kind: "error", text: error instanceof Error ? error.message : String(error), phase: "start", source: "system", - }) + messageID: MessageID.ascending(), + } as const + rememberLocal(commit) + footer.append(commit) } } : undefined, @@ -614,6 +666,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { await state.switching?.catch(() => {}) + let outputAnchor: LocalReplayAnchor | undefined return withRunSpan( "RunInteractive.turn", { @@ -644,8 +697,16 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { prompt, files: input.files, includeFiles, + onVisibleOutput: (anchor) => { + outputAnchor = anchor + }, signal, }) + if (prompt.messageID) { + state.localRows = state.localRows.filter( + (row) => row.commit.kind !== "user" || row.commit.messageID !== prompt.messageID, + ) + } includeFiles = false } catch (error) { if (signal.aborted || footer.isClosed) { @@ -656,7 +717,15 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { const text = (await state.stream?.then((item) => item.mod).catch(() => undefined))?.formatUnknownError(error) ?? (error instanceof Error ? error.message : String(error)) - footer.append({ kind: "error", text, phase: "start", source: "system" }) + const commit = { + kind: "error", + text, + phase: "start", + source: "system", + messageID: prompt.messageID, + } as const + rememberLocal(commit, outputAnchor) + footer.append(commit) } }, ) @@ -683,6 +752,10 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { try { await runQueue() } finally { + if (replayResizeTimer) { + clearTimeout(replayResizeTimer) + } + offResize() await state.stream?.then((item) => item.handle.close()).catch(() => {}) } } finally { diff --git a/packages/opencode/src/cli/cmd/run/session-data.ts b/packages/opencode/src/cli/cmd/run/session-data.ts index 4a3a49fb8..03951ec4c 100644 --- a/packages/opencode/src/cli/cmd/run/session-data.ts +++ b/packages/opencode/src/cli/cmd/run/session-data.ts @@ -60,6 +60,7 @@ type SessionCommit = StreamCommit // - part: part ID → "assistant" | "reasoning" (text parts only) // - text: part ID → full accumulated text so far // - sent: part ID → byte offset of last flushed text (for incremental output) +// - visible: part ID → rendered text for an active part after display transforms // - end: part IDs whose time.end has arrived (part is finished) // - shell: shell call ID → chosen transcript source for direct shell calls // - echo: message ID → bash outputs to strip from the next assistant chunk @@ -82,6 +83,7 @@ export type SessionData = { part: Map text: Map sent: Map + visible: Map end: Set echo: Map> } @@ -119,6 +121,7 @@ export function createSessionData( part: new Map(), text: new Map(), sent: new Map(), + visible: new Map(), end: new Set(), echo: new Map(), } @@ -538,6 +541,7 @@ function flushPart(data: SessionData, commits: SessionCommit[], partID: string, if (chunk) { data.sent.set(partID, text.length) + data.visible.set(partID, (data.visible.get(partID) ?? "") + chunk) commits.push({ kind, text: chunk, @@ -567,6 +571,7 @@ function drop(data: SessionData, partID: string) { data.part.delete(partID) data.text.delete(partID) data.sent.delete(partID) + data.visible.delete(partID) data.msg.delete(partID) data.end.delete(partID) } diff --git a/packages/opencode/src/cli/cmd/run/session-replay.ts b/packages/opencode/src/cli/cmd/run/session-replay.ts index f43bff5be..dde41f324 100644 --- a/packages/opencode/src/cli/cmd/run/session-replay.ts +++ b/packages/opencode/src/cli/cmd/run/session-replay.ts @@ -1,7 +1,7 @@ import type { Event, PermissionRequest, QuestionRequest } from "@opencode-ai/sdk/v2" import { bootstrapSessionData, createSessionData, reduceSessionData, type SessionData } from "./session-data" import { messagePrompt, type SessionMessages } from "./session.shared" -import type { FooterPatch, StreamCommit } from "./types" +import type { FooterPatch, LocalReplayRow, StreamCommit } from "./types" type ReplayInput = { messages: SessionMessages @@ -186,3 +186,112 @@ export function replaySession(input: ReplayInput): SessionReplay { patch: replayPatch(data, patch), } } + +export function replayLocalRows(messages: SessionMessages, commits: StreamCommit[], rows: LocalReplayRow[]): StreamCommit[] { + const persisted = new Set(messages.map((message) => message.info.id)) + return rows.reduce((out, local) => { + const row = local.commit + if (row.kind === "user" && row.messageID && persisted.has(row.messageID)) { + return out + } + + if (!row.messageID) { + return [...out, row] + } + + const exact = local.after + ? out.findIndex( + (commit) => + commit.kind === local.after?.kind && + commit.text === local.after.text && + commit.phase === local.after.phase && + commit.toolState === local.after.toolState && + (local.after.partID ? commit.partID === local.after.partID : commit.messageID === local.after.messageID), + ) + : -1 + const anchored = + exact !== -1 + ? exact + : local.after + ? out.findLastIndex((commit) => + local.after?.partID + ? commit.partID === local.after.partID + : commit.kind === local.after?.kind && commit.messageID === local.after.messageID, + ) + : -1 + if (anchored !== -1) { + const commit = out[anchored] + const visible = local.after?.visible + if (commit && visible && commit.text.startsWith(visible) && commit.text.length > visible.length) { + return [ + ...out.slice(0, anchored), + { ...commit, text: visible }, + row, + { ...commit, text: commit.text.slice(visible.length) }, + ...out.slice(anchored + 1), + ] + } + + return [...out.slice(0, anchored + 1), row, ...out.slice(anchored + 1)] + } + + const after = out.findIndex((commit) => commit.kind === "user" && commit.messageID === row.messageID) + if (after !== -1) { + return [...out.slice(0, after + 1), row, ...out.slice(after + 1)] + } + + const before = out.findIndex((commit) => commit.messageID && row.messageID! < commit.messageID) + if (before === -1) { + return [...out, row] + } + + return [...out.slice(0, before), row, ...out.slice(before)] + }, commits) +} + +export function replayActiveText(data: SessionData, current: SessionData): StreamCommit[] { + return [...current.part.entries()].flatMap(([partID, kind]) => { + if (kind === "user" || current.end.has(partID) || data.ids.has(partID)) { + return [] + } + + const text = current.text.get(partID) ?? "" + const existing = data.text.get(partID) ?? "" + const sent = current.sent.get(partID) ?? 0 + const existingSent = data.sent.get(partID) ?? 0 + const visible = current.visible.get(partID) ?? "" + const existingVisible = data.visible.get(partID) ?? "" + if (!text.startsWith(existing) || existingSent > sent || !visible.startsWith(existingVisible)) { + return [] + } + + data.part.set(partID, kind) + data.text.set(partID, text) + data.sent.set(partID, sent) + data.visible.set(partID, visible) + const messageID = current.msg.get(partID) + if (messageID) { + data.msg.set(partID, messageID) + const role = current.role.get(messageID) + if (role) { + data.role.set(messageID, role) + } + } + + const chunk = visible.slice(existingVisible.length) + if (!chunk) { + return [] + } + + return [ + { + kind, + text: chunk, + phase: "progress", + source: kind, + ...(messageID ? { messageID } : {}), + partID, + }, + ] satisfies StreamCommit[] + }) +} diff --git a/packages/opencode/src/cli/cmd/run/stream.transport.ts b/packages/opencode/src/cli/cmd/run/stream.transport.ts index ca6a55d1d..5641b8f07 100644 --- a/packages/opencode/src/cli/cmd/run/stream.transport.ts +++ b/packages/opencode/src/cli/cmd/run/stream.transport.ts @@ -27,7 +27,7 @@ import { reduceSessionData, type SessionData, } from "./session-data" -import { replaySession } from "./session-replay" +import { replayActiveText, replayLocalRows, replaySession } from "./session-replay" import { bootstrapSubagentCalls, bootstrapSubagentData, @@ -51,6 +51,8 @@ import type { FooterSubagentState, FooterSubagentTab, FooterView, + LocalReplayAnchor, + LocalReplayRow, RunFilePart, RunInput, RunPrompt, @@ -81,6 +83,7 @@ type Wait = { tick: number armed: boolean live: boolean + onVisibleOutput?: (anchor: LocalReplayAnchor) => void done: Deferred.Deferred } @@ -91,15 +94,22 @@ export type SessionTurnInput = { prompt: RunPrompt files: RunFilePart[] includeFiles: boolean + onVisibleOutput?: (anchor: LocalReplayAnchor) => void signal?: AbortSignal } export type SessionTransport = { runPromptTurn(input: SessionTurnInput): Promise selectSubagent(sessionID: string | undefined): void + replayOnResize(input: SessionResizeReplayInput): Promise close(): Promise } +export type SessionResizeReplayInput = { + localRows: () => LocalReplayRow[] + reset: () => Promise +} + type State = { data: SessionData subagent: SubagentData @@ -115,6 +125,7 @@ type State = { type TransportService = { readonly runPromptTurn: (input: SessionTurnInput) => Effect.Effect readonly selectSubagent: (sessionID: string | undefined) => Effect.Effect + readonly replayOnResize: (input: SessionResizeReplayInput) => Effect.Effect readonly close: () => Effect.Effect } @@ -440,6 +451,9 @@ function createLayer(input: StreamInput) { blockers: new Map(), } let booting = true + let replaying = false + let replayDisabled = false + let replayPending: SessionResizeReplayInput | undefined const buffered: Event[] = [] const replayedParts = new Set() const recovering = new Set() @@ -594,6 +608,38 @@ function createLayer(input: StreamInput) { Effect.orElseSucceed(() => []), ) + const replayMessages = () => + Effect.promise(() => + input.sdk.session.messages({ + sessionID: input.sessionID, + ...(input.replayLimit === undefined + ? {} + : { limit: Math.max(input.replayLimit, SUBAGENT_BOOTSTRAP_LIMIT) }), + }), + ).pipe(Effect.flatMap((item) => (item.error ? Effect.fail(item.error) : Effect.succeed(item.data ?? [])))) + + const replayRequests = () => + Effect.all( + [ + Effect.promise(() => input.sdk.permission.list()).pipe( + Effect.flatMap((item) => (item.error ? Effect.fail(item.error) : Effect.succeed(item.data ?? []))), + ), + Effect.promise(() => input.sdk.question.list()).pipe( + Effect.flatMap((item) => (item.error ? Effect.fail(item.error) : Effect.succeed(item.data ?? []))), + ), + ], + { concurrency: "unbounded" }, + ) + + const markReplayedParts = (data: SessionData) => { + replayedParts.clear() + for (const [partID] of data.text) { + if (data.part.has(partID)) { + replayedParts.add(partID) + } + } + } + const bootstrapSubagentHistory = Effect.fn("RunStreamTransport.bootstrapSubagentHistory")(function* ( sessions: string[], ) { @@ -681,7 +727,6 @@ function createLayer(input: StreamInput) { }) : history - replayedParts.clear() if (history) { state.data = history.data } @@ -695,14 +740,8 @@ function createLayer(input: StreamInput) { }) } - if (replay) { - for (const [partID] of replay.data.text) { - if (!replay.data.part.has(partID)) { - continue - } - - replayedParts.add(partID) - } + if (history) { + markReplayedParts(history.data) } bootstrapSubagentData({ @@ -862,6 +901,20 @@ function createLayer(input: StreamInput) { limits: input.limits(), }) state.data = next.data + const visible = next.commits.at(-1) + if (visible) { + state.wait?.onVisibleOutput?.({ + kind: visible.kind, + text: visible.text, + phase: visible.phase, + messageID: visible.messageID, + partID: visible.partID, + toolState: visible.toolState, + ...(visible.partID && state.data.visible.has(visible.partID) + ? { visible: state.data.visible.get(visible.partID) } + : {}), + }) + } if ( event.type === "message.part.updated" && @@ -910,15 +963,163 @@ function createLayer(input: StreamInput) { yield* applyEvent(event) } - if (!changed) { + const arrived = buffered.splice(0) + if (!changed && arrived.length === 0) { buffered.push(...next) return } - pending = next + pending = [...next, ...arrived] } }) + const replayOnResize: (next: SessionResizeReplayInput) => Effect.Effect = Effect.fn( + "RunStreamTransport.replayOnResize", + )(function* (next: SessionResizeReplayInput) { + if (!input.replay || replayDisabled || booting || closed || input.footer.isClosed) { + return false + } + + if (replaying) { + replayPending = next + return false + } + + const finish: () => Effect.Effect = Effect.fnUntraced(function* () { + yield* drainBuffered() + const pending = replayPending + replayPending = undefined + if (!pending || replayDisabled || closed || input.footer.isClosed) { + replaying = false + return + } + + replaying = false + yield* replayOnResize(pending).pipe(Effect.asVoid) + }) + + replayedParts.clear() + replaying = true + input.trace?.write("replay.resize.start", { + sessionID: input.sessionID, + }) + const source = yield* Effect.all([replayMessages(), replayRequests()], { concurrency: "unbounded" }).pipe( + Effect.exit, + ) + if (Exit.isFailure(source)) { + input.trace?.write("replay.resize.abort", { + sessionID: input.sessionID, + phase: "snapshot", + }) + yield* finish() + return false + } + + const [messagesList, [permissions, questions]] = source.value + const sessionPermissions = permissions.filter((item) => item.sessionID === input.sessionID) + const sessionQuestions = questions.filter((item) => item.sessionID === input.sessionID) + const snapshot = yield* Effect.try({ + try: () => { + const history = replaySession({ + messages: messagesList, + permissions: sessionPermissions, + questions: sessionQuestions, + thinking: input.thinking, + limits: input.limits(), + }) + const activeCommits = replayActiveText(history.data, state.data) + return { + history, + activeCommits, + patch: + history.data.part.size > 0 || history.data.tools.size > 0 + ? { ...history.patch, phase: "running" as const } + : history.patch, + visible: + input.replayLimit !== undefined && messagesList.length > input.replayLimit + ? replaySession({ + messages: messagesList.slice(-input.replayLimit), + permissions: sessionPermissions, + questions: sessionQuestions, + thinking: input.thinking, + limits: input.limits(), + }) + : history, + } + }, + catch: (error) => error, + }).pipe(Effect.exit) + if (Exit.isFailure(snapshot)) { + input.trace?.write("replay.resize.abort", { + sessionID: input.sessionID, + phase: "snapshot", + }) + yield* finish() + return false + } + + const idle = yield* Effect.promise(() => input.footer.idle()).pipe(Effect.exit) + if (Exit.isFailure(idle) || closed || input.footer.isClosed) { + yield* finish() + return false + } + + const reset = yield* Effect.promise(() => next.reset()).pipe(Effect.exit) + if (Exit.isFailure(reset)) { + replayDisabled = true + input.trace?.write("replay.resize.disable", { + sessionID: input.sessionID, + phase: "reset", + }) + input.footer.append({ + kind: "error", + text: "resize replay failed; disabled for this session", + phase: "start", + source: "system", + }) + yield* finish() + return false + } + + state.data = snapshot.value.history.data + for (const request of [...state.data.permissions, ...state.data.questions]) { + seedBlocker(request.id) + } + + for (const commit of replayLocalRows( + messagesList, + [...snapshot.value.visible.commits, ...snapshot.value.activeCommits], + next.localRows(), + )) { + input.trace?.write("ui.commit", commit) + input.footer.append(commit) + } + + syncFooter([], snapshot.value.patch, currentSubagentState()) + const rebuilt = yield* Effect.promise(() => input.footer.idle()).pipe(Effect.exit) + if (Exit.isFailure(rebuilt)) { + replayDisabled = true + input.trace?.write("replay.resize.disable", { + sessionID: input.sessionID, + phase: "rebuild", + }) + input.footer.append({ + kind: "error", + text: "resize replay failed; disabled for this session", + phase: "start", + source: "system", + }) + yield* finish() + return false + } + + input.trace?.write("replay.resize.complete", { + sessionID: input.sessionID, + }) + yield* finish() + return true + }) + const watch = Effect.fn("RunStreamTransport.watch")(() => Stream.fromAsyncIterable(events.stream, (error) => error instanceof Error ? error : new Error(String(error)), @@ -943,7 +1144,7 @@ function createLayer(input: StreamInput) { } const sessionID = sid(event) - if (booting) { + if (booting || replaying) { if (sessionID) { input.trace?.write("recv.event", event) buffered.push(event) @@ -1005,6 +1206,7 @@ function createLayer(input: StreamInput) { tick: state.tick, armed: false, live: false, + onVisibleOutput: next.onVisibleOutput, done: yield* Deferred.make(), } state.wait = item @@ -1020,6 +1222,7 @@ function createLayer(input: StreamInput) { const req = { sessionID: input.sessionID, + messageID: next.prompt.messageID, agent: next.agent, model: next.model, variant: next.variant, @@ -1081,6 +1284,7 @@ function createLayer(input: StreamInput) { input.sdk.session.command( { sessionID: input.sessionID, + messageID: next.prompt.messageID, agent: next.agent, model: next.model ? `${next.model.providerID}/${next.model.modelID}` : undefined, variant: next.variant, @@ -1231,6 +1435,7 @@ function createLayer(input: StreamInput) { return Service.of({ runPromptTurn, selectSubagent, + replayOnResize, close, }) }), @@ -1254,6 +1459,7 @@ export async function createSessionTransport(input: StreamInput): Promise runtime.runPromise((svc) => svc.runPromptTurn(next)), selectSubagent: (sessionID) => runtime.runSync((svc) => svc.selectSubagent(sessionID)), + replayOnResize: (next) => runtime.runPromise((svc) => svc.replayOnResize(next)), close: () => runtime.runPromise((svc) => svc.close()), } } diff --git a/packages/opencode/src/cli/cmd/run/types.ts b/packages/opencode/src/cli/cmd/run/types.ts index 8a88bb7ad..65099394c 100644 --- a/packages/opencode/src/cli/cmd/run/types.ts +++ b/packages/opencode/src/cli/cmd/run/types.ts @@ -309,6 +309,21 @@ export type StreamCommit = { } } +export type LocalReplayAnchor = { + kind: EntryKind + text: string + phase: StreamPhase + messageID?: string + partID?: string + toolState?: StreamToolState + visible?: string +} + +export type LocalReplayRow = { + commit: StreamCommit + after?: LocalReplayAnchor +} + // The public contract between the stream transport / prompt queue and // the footer. RunFooter implements this. The transport and queue never // touch the renderer directly -- they go through this interface. diff --git a/packages/opencode/test/cli/help/__snapshots__/help-snapshots.test.ts.snap b/packages/opencode/test/cli/help/__snapshots__/help-snapshots.test.ts.snap index 14882e264..b7148ebce 100644 --- a/packages/opencode/test/cli/help/__snapshots__/help-snapshots.test.ts.snap +++ b/packages/opencode/test/cli/help/__snapshots__/help-snapshots.test.ts.snap @@ -103,7 +103,7 @@ Options: --variant model variant (provider-specific reasoning effort, e.g., high, max, minimal) [string] --thinking show thinking blocks [boolean] - --replay replay visible session history on interactive resume + --replay replay interactive session history on resume and after resize [boolean] [default: false] --replay-limit cap visible interactive replay to the newest N messages [number] diff --git a/packages/opencode/test/cli/run/runtime.queue.test.ts b/packages/opencode/test/cli/run/runtime.queue.test.ts index 728e18fcf..7eba8bb25 100644 --- a/packages/opencode/test/cli/run/runtime.queue.test.ts +++ b/packages/opencode/test/cli/run/runtime.queue.test.ts @@ -143,6 +143,7 @@ describe("run runtime queue", () => { text: "hello", phase: "start", source: "system", + messageID: expect.any(String), }, ]) }) @@ -225,6 +226,7 @@ describe("run runtime queue", () => { text: " hello ", phase: "start", source: "system", + messageID: expect.any(String), }, ]) }) @@ -260,6 +262,7 @@ describe("run runtime queue", () => { text: "/fmt bash", phase: "start", source: "system", + messageID: expect.any(String), }, ]) ui.api.close() @@ -321,7 +324,7 @@ describe("run runtime queue", () => { await Promise.resolve() expect(turns.map((item) => item.text)).toEqual(["one"]) - expect(turns[0]?.messageID).toBeUndefined() + expect(turns[0]?.messageID).toEqual(expect.any(String)) expect(ui.commits.map((item) => item.text)).toEqual(["one"]) const first = ui.events.find((item) => item.type === "queued.prompts") const event = ui.events.findLast((item) => item.type === "queued.prompts") diff --git a/packages/opencode/test/cli/run/session-replay.test.ts b/packages/opencode/test/cli/run/session-replay.test.ts index 36d25c6b4..a6642c4bb 100644 --- a/packages/opencode/test/cli/run/session-replay.test.ts +++ b/packages/opencode/test/cli/run/session-replay.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from "bun:test" -import { replaySession } from "@/cli/cmd/run/session-replay" +import { replayLocalRows, replaySession } from "@/cli/cmd/run/session-replay" import type { SessionMessages } from "@/cli/cmd/run/session.shared" function userMessage(id: string, text: string): SessionMessages[number] { @@ -156,4 +156,286 @@ describe("run session replay", () => { }), ) }) + + test("merges failed local rows ahead of later persisted prompts", () => { + const persisted = { + kind: "user", + text: "successful", + phase: "start", + source: "system", + messageID: "msg-user-2", + } as const + const failed = { + kind: "user", + text: "failed", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const error = { + kind: "error", + text: "network unavailable", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + + expect(replayLocalRows([userMessage("msg-user-2", "successful")], [persisted], [{ commit: failed }, { commit: error }])).toEqual([ + failed, + error, + persisted, + ]) + }) + + test("retains local errors but not duplicate local prompts once a prompt persists", () => { + const persisted = { + kind: "user", + text: "failed after persistence", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const error = { + kind: "error", + text: "connection closed", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + + expect(replayLocalRows([userMessage("msg-user-1", "failed after persistence")], [persisted], [{ commit: persisted }, { commit: error }])).toEqual([ + persisted, + error, + ]) + }) + + test("keeps a local turn failure below assistant output already visible for that turn", () => { + const first = { + kind: "user", + text: "start", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const answer = { + kind: "assistant", + text: "partial answer", + phase: "progress", + source: "assistant", + messageID: "msg-assistant-1", + } as const + const error = { + kind: "error", + text: "stream failed", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const second = { + kind: "user", + text: "retry", + phase: "start", + source: "system", + messageID: "msg-user-2", + } as const + + expect( + replayLocalRows( + [userMessage("msg-user-1", "start"), userMessage("msg-user-2", "retry")], + [first, answer, second], + [ + { + commit: error, + after: { kind: "assistant", text: "partial answer", phase: "progress", messageID: "msg-assistant-1" }, + }, + ], + ), + ).toEqual([first, answer, error, second]) + }) + + test("keeps a local failure above assistant output received after the failure", () => { + const first = { + kind: "user", + text: "start", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const error = { + kind: "error", + text: "request failed", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const late = { + kind: "assistant", + text: "late answer", + phase: "progress", + source: "assistant", + messageID: "msg-assistant-1", + } as const + + expect(replayLocalRows([userMessage("msg-user-1", "start")], [first, late], [{ commit: error }])).toEqual([ + first, + error, + late, + ]) + }) + + test("inserts a local failure between persisted output chunks spanning that failure", () => { + const first = { + kind: "user", + text: "start", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const complete = { + kind: "assistant", + text: "before after", + phase: "progress", + source: "assistant", + messageID: "msg-assistant-1", + partID: "part-1", + } as const + const error = { + kind: "error", + text: "stream failed", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + + expect( + replayLocalRows([userMessage("msg-user-1", "start")], [first, complete], [ + { + commit: error, + after: { + kind: "assistant", + text: "before ", + phase: "progress", + messageID: "msg-assistant-1", + partID: "part-1", + visible: "before ", + }, + }, + ]), + ).toEqual([first, { ...complete, text: "before " }, error, { ...complete, text: "after" }]) + }) + + test("places an unpersisted failed prompt before live output from that turn", () => { + const prompt = { + kind: "user", + text: "start", + phase: "start", + source: "system", + messageID: "msg-1", + } as const + const answer = { + kind: "assistant", + text: "partial answer", + phase: "progress", + source: "assistant", + messageID: "msg-2", + } as const + const error = { + kind: "error", + text: "stream failed", + phase: "start", + source: "system", + messageID: "msg-1", + } as const + + expect( + replayLocalRows([], [answer], [ + { commit: prompt }, + { + commit: error, + after: { kind: "assistant", text: "partial answer", phase: "progress", messageID: "msg-2" }, + }, + ]), + ).toEqual([prompt, answer, error]) + }) + + test("anchors a failure after the visible start of a tool that later completes", () => { + const prompt = { + kind: "user", + text: "run ls", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const running = { + kind: "tool", + text: "running bash", + phase: "start", + source: "tool", + messageID: "msg-assistant-1", + partID: "part-tool-1", + toolState: "running", + } as const + const completed = { + kind: "tool", + text: "file.txt", + phase: "final", + source: "tool", + messageID: "msg-assistant-1", + partID: "part-tool-1", + toolState: "completed", + } as const + const error = { + kind: "error", + text: "connection lost", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + + expect( + replayLocalRows([userMessage("msg-user-1", "run ls")], [prompt, running, completed], [ + { + commit: error, + after: { + kind: "tool", + text: "running bash", + phase: "start", + messageID: "msg-assistant-1", + partID: "part-tool-1", + toolState: "running", + }, + }, + ]), + ).toEqual([prompt, running, error, completed]) + }) + + test("retains an unpersisted local diagnostic before later persisted prompts", () => { + const first = { + kind: "user", + text: "before", + phase: "start", + source: "system", + messageID: "msg-user-1", + } as const + const error = { + kind: "error", + text: "failed to start new session", + phase: "start", + source: "system", + messageID: "msg-user-2", + } as const + const second = { + kind: "user", + text: "after", + phase: "start", + source: "system", + messageID: "msg-user-3", + } as const + + expect( + replayLocalRows([userMessage("msg-user-1", "before"), userMessage("msg-user-3", "after")], [first, second], [ + { commit: error }, + ]), + ).toEqual([first, error, second]) + }) }) diff --git a/packages/opencode/test/cli/run/stream.transport.test.ts b/packages/opencode/test/cli/run/stream.transport.test.ts index 74fb7ec02..2dcfc3c4c 100644 --- a/packages/opencode/test/cli/run/stream.transport.test.ts +++ b/packages/opencode/test/cli/run/stream.transport.test.ts @@ -1,7 +1,7 @@ import { afterEach, describe, expect, mock, spyOn, test } from "bun:test" import { OpencodeClient, type GlobalEvent } from "@opencode-ai/sdk/v2" import { createSessionTransport } from "@/cli/cmd/run/stream.transport" -import type { FooterApi, FooterEvent, RunFilePart, StreamCommit } from "@/cli/cmd/run/types" +import type { FooterApi, FooterEvent, LocalReplayRow, RunFilePart, StreamCommit } from "@/cli/cmd/run/types" type EventStream = Awaited>["stream"] type GlobalEventStream = Awaited>["stream"] @@ -11,6 +11,7 @@ type SessionChild = NonNullable type SessionStatusMap = NonNullable>["data"]> type TextPart = Extract +type ReasoningPart = Extract afterEach(() => { mock.restore() @@ -298,6 +299,29 @@ function textUpdated(part: TextPart): SdkEvent { } } +function reasoningPart(id: string, messageID: string, text: string): ReasoningPart { + return { + id, + sessionID: "session-1", + messageID, + type: "reasoning", + text, + time: { start: 1 }, + } +} + +function reasoningUpdated(part: ReasoningPart): SdkEvent { + return { + id: `evt-${part.id}-updated`, + type: "message.part.updated", + properties: { + sessionID: part.sessionID, + part, + time: 1, + }, + } +} + function toolUpdated(part: SessionToolPart): SdkEvent { return { id: `evt-${part.id}-updated`, @@ -721,6 +745,444 @@ describe("run stream transport", () => { } }) + test("rebuilds session output on resize and continues live deltas from replayed state", async () => { + const src = eventFeed() + const ui = footer() + let calls = 0 + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async () => { + calls += 1 + if (calls === 1) { + return ok([]) + } + + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [textPart("text-1", "msg-1", "Hello")], + }), + ]) + }, + }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + const localRows: LocalReplayRow[] = [ + { commit: { kind: "user", text: "pending prompt", phase: "start", source: "system", messageID: "msg-pending" } }, + ] + const reset = mock(() => { + localRows.push({ + commit: { + kind: "user", + text: "sent during reset", + phase: "start", + source: "system", + messageID: "msg-during-reset", + }, + }) + return Promise.resolve() + }) + + try { + expect( + await transport.replayOnResize({ + localRows: () => localRows, + reset, + }), + ).toBe(true) + expect(reset).toHaveBeenCalledTimes(1) + expect(ui.commits).toEqual( + expect.arrayContaining([ + expect.objectContaining({ kind: "assistant", text: "Hello" }), + expect.objectContaining({ kind: "user", text: "sent during reset", messageID: "msg-during-reset" }), + ]), + ) + + src.push(textUpdated(textPart("text-1", "msg-1", "Hello world"))) + await waitFor(() => ui.commits.find((commit) => commit.kind === "assistant" && commit.text === " world")) + expect(ui.commits.filter((commit) => commit.kind === "assistant").map((commit) => commit.text)).toEqual([ + "Hello", + " world", + ]) + } finally { + src.close() + await transport.close() + } + }) + + test("coalesces active resize requests into one trailing replay", async () => { + const src = eventFeed() + const ui = footer() + const firstReset = defer() + const resetA = mock(() => firstReset.promise) + const resetB = mock(() => Promise.resolve()) + const resetC = mock(() => Promise.resolve()) + const transport = await createSessionTransport({ + sdk: sdk({ stream: src.stream }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + const active = transport.replayOnResize({ localRows: () => [], reset: resetA }) + await waitFor(() => (resetA.mock.calls.length === 1 ? true : undefined)) + + expect(await transport.replayOnResize({ localRows: () => [], reset: resetB })).toBe(false) + expect(await transport.replayOnResize({ localRows: () => [], reset: resetC })).toBe(false) + expect(resetB).not.toHaveBeenCalled() + + firstReset.resolve() + expect(await active).toBe(true) + expect(resetA).toHaveBeenCalledTimes(1) + expect(resetB).not.toHaveBeenCalled() + expect(resetC).toHaveBeenCalledTimes(1) + } finally { + src.close() + await transport.close() + } + }) + + test("keeps coalescing resize requests while buffered events drain", async () => { + const src = eventFeed() + const ui = footer() + const firstReset = defer() + const statusGate = defer() + const statusStarted = defer() + let blockStatus = false + const trace = mock((_type: string, _data?: unknown) => {}) + const resetA = mock(() => firstReset.promise) + const resetB = mock(() => Promise.resolve()) + const resetC = mock(() => Promise.resolve()) + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + status: async () => { + if (blockStatus) { + statusStarted.resolve() + await statusGate.promise + } + return ok(statusMap(true)) + }, + }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + trace: { write: trace }, + }) + const turn = transport.runPromptTurn({ + agent: undefined, + model: undefined, + variant: undefined, + prompt: { text: "active", parts: [] }, + files: [], + includeFiles: false, + }) + + try { + await waitFor(() => ui.events.find((event) => event.type === "turn.wait")) + const active = transport.replayOnResize({ localRows: () => [], reset: resetA }) + await waitFor(() => (resetA.mock.calls.length === 1 ? true : undefined)) + blockStatus = true + src.push(busy()) + src.push(idle()) + await waitFor(() => (trace.mock.calls.filter((call) => call[0] === "recv.event").length >= 2 ? true : undefined)) + + expect(await transport.replayOnResize({ localRows: () => [], reset: resetB })).toBe(false) + firstReset.resolve() + await Promise.race([ + statusStarted.promise, + Bun.sleep(1_000).then(() => { + throw new Error("timed out waiting for buffered status drain") + }), + ]) + + expect(await transport.replayOnResize({ localRows: () => [], reset: resetC })).toBe(false) + expect(resetC).not.toHaveBeenCalled() + blockStatus = false + statusGate.resolve() + + expect( + await Promise.race([ + active, + Bun.sleep(1_000).then(() => { + throw new Error("timed out waiting for trailing resize replay") + }), + ]), + ).toBe(true) + expect(resetB).not.toHaveBeenCalled() + expect(resetC).toHaveBeenCalledTimes(1) + } finally { + src.close() + await transport.close() + await turn + } + }) + + test("preserves assistant deltas not yet persisted when replaying during a live stream", async () => { + const src = eventFeed() + const ui = footer() + let calls = 0 + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async () => { + calls += 1 + if (calls === 1) { + return ok([]) + } + + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-live", + parts: [textPart("text-live", "msg-live", "")], + }), + ]) + }, + }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + src.push(assistant("msg-live")) + src.push(textUpdated(textPart("text-live", "msg-live", ""))) + src.push(textDelta("msg-live", "text-live", "Hello")) + await waitFor(() => ui.commits.find((commit) => commit.kind === "assistant" && commit.text === "Hello")) + ui.commits.length = 0 + + expect(await transport.replayOnResize({ localRows: () => [], reset: () => Promise.resolve() })).toBe(true) + src.push(textDelta("msg-live", "text-live", "Hello")) + src.push( + textUpdated({ + ...textPart("text-live", "msg-live", "HelloHello"), + time: { start: 1, end: 2 }, + }), + ) + + await waitFor(() => + ui.commits.filter((commit) => commit.kind === "assistant" && commit.text === "Hello").length === 2 + ? true + : undefined, + ) + expect( + ui.commits.filter((commit) => commit.kind === "assistant" && commit.text).map((commit) => commit.text), + ).toEqual(["Hello", "Hello"]) + } finally { + src.close() + await transport.close() + } + }) + + test("preserves the display prefix for active reasoning restored during replay", async () => { + const src = eventFeed() + const ui = footer() + let calls = 0 + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async () => { + calls += 1 + if (calls === 1) { + return ok([]) + } + + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-thinking", + parts: [reasoningPart("thinking-1", "msg-thinking", "")], + }), + ]) + }, + }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + src.push(assistant("msg-thinking")) + src.push(reasoningUpdated(reasoningPart("thinking-1", "msg-thinking", ""))) + src.push(textDelta("msg-thinking", "thinking-1", "plan")) + await waitFor(() => ui.commits.find((commit) => commit.kind === "reasoning" && commit.text === "Thinking: plan")) + ui.commits.length = 0 + + expect(await transport.replayOnResize({ localRows: () => [], reset: () => Promise.resolve() })).toBe(true) + expect(ui.commits.filter((commit) => commit.kind === "reasoning").map((commit) => commit.text)).toEqual([ + "Thinking: plan", + ]) + } finally { + src.close() + await transport.close() + } + }) + + test("does not overlay stale active text when persistence completes during replay", async () => { + const src = eventFeed() + const ui = footer() + let calls = 0 + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async () => { + calls += 1 + if (calls === 1) { + return ok([]) + } + + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-finished", + parts: [ + { + ...textPart("text-finished", "msg-finished", "Hello"), + time: { start: 1, end: 2 }, + }, + ], + }), + ]) + }, + }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + src.push(assistant("msg-finished")) + src.push(textUpdated(textPart("text-finished", "msg-finished", ""))) + src.push(textDelta("msg-finished", "text-finished", "Hello")) + await waitFor(() => ui.commits.find((commit) => commit.kind === "assistant" && commit.text === "Hello")) + ui.commits.length = 0 + + expect(await transport.replayOnResize({ localRows: () => [], reset: () => Promise.resolve() })).toBe(true) + expect( + ui.commits.filter((commit) => commit.kind === "assistant" && commit.text).map((commit) => commit.text), + ).toEqual(["Hello"]) + } finally { + src.close() + await transport.close() + } + }) + + test("does not clear the terminal when resize replay snapshot fetch fails", async () => { + const src = eventFeed() + const ui = footer() + let calls = 0 + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async () => { + calls += 1 + if (calls === 1) { + return ok([]) + } + + throw new Error("snapshot failed") + }, + }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + const reset = mock(() => Promise.resolve()) + + try { + expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false) + expect(reset).not.toHaveBeenCalled() + expect(ui.commits).toEqual([]) + } finally { + src.close() + await transport.close() + } + }) + + test("disables resize replay for the session after terminal reset fails", async () => { + const src = eventFeed() + const ui = footer() + const transport = await createSessionTransport({ + sdk: sdk({ stream: src.stream }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + const reset = mock(() => Promise.reject(new Error("clear failed"))) + + try { + expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false) + expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false) + expect(reset).toHaveBeenCalledTimes(1) + expect(ui.commits).toContainEqual({ + kind: "error", + text: "resize replay failed; disabled for this session", + phase: "start", + source: "system", + }) + } finally { + src.close() + await transport.close() + } + }) + + test("disables resize replay when rebuilding scrollback fails after terminal reset", async () => { + const src = eventFeed() + const ui = footer() + let cleared = false + const idle = ui.api.idle + ui.api.idle = () => (cleared ? Promise.reject(new Error("render failed")) : idle()) + const transport = await createSessionTransport({ + sdk: sdk({ stream: src.stream }), + sessionID: "session-1", + thinking: true, + replay: true, + limits: () => ({}), + footer: ui.api, + }) + const reset = mock(() => { + cleared = true + return Promise.resolve() + }) + + try { + expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false) + expect(await transport.replayOnResize({ localRows: () => [], reset })).toBe(false) + expect(reset).toHaveBeenCalledTimes(1) + expect(ui.commits).toContainEqual({ + kind: "error", + text: "resize replay failed; disabled for this session", + phase: "start", + source: "system", + }) + } finally { + src.close() + await transport.close() + } + }) + test("drops completed historical subagent tabs during bootstrap", async () => { const src = eventFeed() const ui = footer() diff --git a/packages/plugin/package.json b/packages/plugin/package.json index bcbf22459..5cd7d82b2 100644 --- a/packages/plugin/package.json +++ b/packages/plugin/package.json @@ -22,9 +22,9 @@ "zod": "catalog:" }, "peerDependencies": { - "@opentui/core": ">=0.3.0", - "@opentui/keymap": ">=0.3.0", - "@opentui/solid": ">=0.3.0" + "@opentui/core": ">=0.3.1", + "@opentui/keymap": ">=0.3.1", + "@opentui/solid": ">=0.3.1" }, "peerDependenciesMeta": { "@opentui/core": {