From f2cf607376689670b55adf62203fef0446c9fef1 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Sun, 14 Jun 2026 16:16:39 +0530 Subject: [PATCH] refactor(core): canonicalize pty service (#32182) --- packages/core/src/location.ts | 4 +- packages/core/src/pty.ts | 248 ++++++++++-------- packages/core/src/pty/input.ts | 24 -- packages/core/src/pty/protocol.ts | 37 +++ .../{opencode/src/shell => core/src}/shell.ts | 51 ++-- packages/core/test/pty/info-schema.test.ts | 5 + packages/core/test/pty/input.test.ts | 19 -- packages/core/test/pty/protocol.test.ts | 27 ++ .../test/pty/pty-output-isolation.test.ts | 110 -------- packages/core/test/pty/pty-session.test.ts | 168 +++++++++++- .../test/shell => core/test}/shell.test.ts | 15 +- packages/opencode/src/pty-preparation.ts | 30 --- .../routes/instance/httpapi/handlers/pty.ts | 162 ++++++------ .../server/routes/instance/httpapi/server.ts | 2 +- packages/opencode/src/server/server.ts | 2 +- packages/opencode/src/session/prompt.ts | 2 +- packages/opencode/src/tool/shell.ts | 2 +- packages/opencode/test/pty/pty-shell.test.ts | 102 ------- .../test/server/httpapi-exercise/index.ts | 38 +++ .../opencode/test/server/httpapi-pty.test.ts | 27 ++ .../test/server/httpapi-v2-pty.test.ts | 171 ++++++++++++ packages/opencode/test/session/prompt.test.ts | 2 +- packages/opencode/test/tool/shell.test.ts | 2 +- packages/server/src/api.ts | 2 + .../src/server => server/src}/cors.ts | 0 packages/server/src/errors.ts | 15 ++ packages/server/src/groups/pty.ts | 144 ++++++++++ packages/server/src/handlers.ts | 4 + packages/server/src/handlers/pty.ts | 221 ++++++++++++++++ .../server/src/middleware/authorization.ts | 4 + 30 files changed, 1134 insertions(+), 506 deletions(-) delete mode 100644 packages/core/src/pty/input.ts create mode 100644 packages/core/src/pty/protocol.ts rename packages/{opencode/src/shell => core/src}/shell.ts (82%) delete mode 100644 packages/core/test/pty/input.test.ts create mode 100644 packages/core/test/pty/protocol.test.ts delete mode 100644 packages/core/test/pty/pty-output-isolation.test.ts rename packages/{opencode/test/shell => core/test}/shell.test.ts (85%) delete mode 100644 packages/opencode/src/pty-preparation.ts delete mode 100644 packages/opencode/test/pty/pty-shell.test.ts create mode 100644 packages/opencode/test/server/httpapi-v2-pty.test.ts rename packages/{opencode/src/server => server/src}/cors.ts (100%) create mode 100644 packages/server/src/groups/pty.ts create mode 100644 packages/server/src/handlers/pty.ts diff --git a/packages/core/src/location.ts b/packages/core/src/location.ts index ebfb096f1..9225e4a01 100644 --- a/packages/core/src/location.ts +++ b/packages/core/src/location.ts @@ -1,6 +1,6 @@ import { Context, Effect, Layer, Schema } from "effect" import { Project } from "./project" -import { AbsolutePath } from "./schema" +import { AbsolutePath, optionalOmitUndefined } from "./schema" import { WorkspaceV2 } from "./workspace" export * as Location from "./location" @@ -12,7 +12,7 @@ export class Ref extends Schema.Class("Location.Ref")({ export class Info extends Schema.Class("Location.Info")({ directory: AbsolutePath, - workspaceID: WorkspaceV2.ID.pipe(Schema.optional), + workspaceID: optionalOmitUndefined(WorkspaceV2.ID), project: Schema.Struct({ id: Project.ID, directory: AbsolutePath, diff --git a/packages/core/src/pty.ts b/packages/core/src/pty.ts index 190cf2349..7765013a5 100644 --- a/packages/core/src/pty.ts +++ b/packages/core/src/pty.ts @@ -2,22 +2,27 @@ export * as Pty from "./pty" import type { Disp, Proc } from "#pty" import { Context, Effect, Layer, Schema, Types } from "effect" +import { Config } from "./config" import { EventV2 } from "./event" import { Location } from "./location" import { NonNegativeInt, PositiveInt } from "./schema" import { PtyID } from "./pty/schema" +import { Shell } from "./shell" import { lazy } from "./util/lazy" const BUFFER_LIMIT = 1024 * 1024 * 2 -const BUFFER_CHUNK = 64 * 1024 -const encoder = new TextEncoder() +// Exited sessions stay observable (status, exit code, retained output) until removed explicitly. +// Cap retention so abandoned terminals do not accumulate unbounded buffers. +const EXITED_LIMIT = 25 const pty = lazy(() => import("#pty")) -type Socket = { - readyState: number - data?: unknown - send: (data: string | Uint8Array | ArrayBuffer) => void - close: (code?: number, reason?: string) => void +type Subscriber = { + readonly onData: (chunk: string) => void + readonly onEnd: (event: { exitCode?: number }) => void + active: boolean + detached: boolean + pending: string[] + end?: { exitCode?: number } } type Active = { @@ -26,22 +31,10 @@ type Active = { buffer: string bufferCursor: number cursor: number - subscribers: Map + subscribers: Map listeners: Disp[] } -const sock = (ws: Socket) => (ws.data && typeof ws.data === "object" ? ws.data : ws) - -// WebSocket control frame: 0x00 + UTF-8 JSON. -const meta = (cursor: number) => { - const json = JSON.stringify({ cursor }) - const bytes = encoder.encode(json) - const out = new Uint8Array(bytes.length + 1) - out[0] = 0 - out.set(bytes, 1) - return out -} - export const Info = Schema.Struct({ id: PtyID, title: Schema.String, @@ -51,6 +44,8 @@ export const Info = Schema.Struct({ status: Schema.Literals(["running", "exited"]), // Windows ConPTY assigns the child pid asynchronously, so 0 is valid at spawn time. pid: NonNegativeInt, + // Present once status is "exited". + exitCode: Schema.optional(NonNegativeInt), }).annotate({ identifier: "Pty" }) export type Info = Types.DeepMutable @@ -65,14 +60,6 @@ export const CreateInput = Schema.Struct({ export type CreateInput = Types.DeepMutable -export type PreparedCreate = { - readonly command: string - readonly args: string[] - readonly cwd: string - readonly title?: string - readonly env: Record -} - export const UpdateInput = Schema.Struct({ title: Schema.optional(Schema.String), size: Schema.optional( @@ -85,10 +72,34 @@ export const UpdateInput = Schema.Struct({ export type UpdateInput = Types.DeepMutable +export type AttachInput = { + // Absolute output cursor to replay from. -1 tails from the current end; omitted replays the full retained buffer. + readonly cursor?: number + // Callbacks fire synchronously from the native PTY data path; keep them non-blocking. + readonly onData: (chunk: string) => void + // Fired once when the session stops producing output: process exit (exitCode set), removal, or service teardown. + readonly onEnd: (event: { exitCode?: number }) => void +} + +export type Attachment = { + // Retained output from the requested cursor to the current end. + readonly replay: string + // Absolute output cursor after replay. + readonly cursor: number + readonly write: (data: string) => void + // Starts live delivery after the caller has applied replay and cursor metadata. + readonly activate: () => void + readonly detach: () => void +} + export class NotFoundError extends Schema.TaggedErrorClass()("Pty.NotFoundError", { ptyID: PtyID, }) {} +export class ExitedError extends Schema.TaggedErrorClass()("Pty.ExitedError", { + ptyID: PtyID, +}) {} + export const Event = { Created: EventV2.define({ type: "pty.created", schema: { info: Info } }), Updated: EventV2.define({ type: "pty.updated", schema: { info: Info } }), @@ -99,19 +110,11 @@ export const Event = { export interface Interface { readonly list: () => Effect.Effect readonly get: (id: PtyID) => Effect.Effect - readonly create: (input: PreparedCreate) => Effect.Effect + readonly create: (input: CreateInput) => Effect.Effect readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect readonly remove: (id: PtyID) => Effect.Effect - readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect readonly write: (id: PtyID, data: string) => Effect.Effect - readonly connect: ( - id: PtyID, - ws: Socket, - cursor?: number, - ) => Effect.Effect< - { onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined, - NotFoundError - > + readonly attach: (id: PtyID, input: AttachInput) => Effect.Effect } export class Service extends Context.Service()("@opencode/v2/Pty") {} @@ -121,28 +124,41 @@ export const layer = Layer.effect( Effect.gen(function* () { const events = yield* EventV2.Service const location = yield* Location.Service + const config = yield* Config.Service const context = yield* Effect.context() const runFork = Effect.runForkWith(context) const sessions = new Map() + const exitOrder: PtyID[] = [] + + function notifyEnd(session: Active, event: { exitCode?: number }) { + for (const subscriber of session.subscribers.values()) { + if (!subscriber.active) { + subscriber.end = event + continue + } + try { + subscriber.onEnd(event) + } catch {} + } + session.subscribers.clear() + } function teardown(session: Active) { for (const listener of session.listeners) listener.dispose() session.listeners.length = 0 - try { - session.process.kill() - } catch {} - for (const [sub, ws] of session.subscribers.entries()) { + if (session.info.status === "running") { try { - if (sock(ws) === sub) ws.close() + session.process.kill() } catch {} } - session.subscribers.clear() + notifyEnd(session, {}) } yield* Effect.addFinalizer(() => Effect.sync(() => { for (const session of sessions.values()) teardown(session) sessions.clear() + exitOrder.length = 0 }), ) @@ -154,12 +170,13 @@ export const layer = Layer.effect( const removeSession = Effect.fnUntraced(function* (id: PtyID) { const session = sessions.get(id) - if (!session) return false + if (!session) return sessions.delete(id) + const index = exitOrder.indexOf(id) + if (index !== -1) exitOrder.splice(index, 1) yield* Effect.logInfo("removing session", { id }) teardown(session) yield* events.publish(Event.Deleted, { id: session.info.id }) - return true }) const remove = Effect.fn("Pty.remove")(function* (id: PtyID) { @@ -175,26 +192,36 @@ export const layer = Layer.effect( return (yield* requireSession(id)).info }) - const create = Effect.fn("Pty.create")(function* (input: PreparedCreate) { + const create = Effect.fn("Pty.create")(function* (input: CreateInput) { const id = PtyID.ascending() - yield* Effect.logInfo("creating session", { id, cmd: input.command, args: input.args, cwd: input.cwd }) + const command = input.command || Shell.preferred(Config.latest(yield* config.entries(), "shell")) + const args = Shell.login(command) ? [...(input.args ?? []), "-l"] : [...(input.args ?? [])] + const cwd = input.cwd || location.directory + // TODO: Apply plugin shell.env environment augmentation once V2 plugin hooks exist; legacy + // routes merge plugin-provided values into input.env at the boundary. + const env = { + ...process.env, + ...input.env, + TERM: "xterm-256color", + OPENCODE_TERMINAL: "1", + } as Record + if (process.platform === "win32") { + env.LC_ALL = "C.UTF-8" + env.LC_CTYPE = "C.UTF-8" + env.LANG = "C.UTF-8" + } + yield* Effect.logInfo("creating session", { id, cmd: command, args, cwd }) const { spawn } = yield* Effect.promise(() => pty()) - const proc = yield* Effect.sync(() => - spawn(input.command, input.args, { - name: "xterm-256color", - cwd: input.cwd, - env: input.env, - }), - ) - const info = { + const proc = yield* Effect.sync(() => spawn(command, args, { name: "xterm-256color", cwd, env })) + const info: Info = { id, title: input.title || `Terminal ${id.slice(-4)}`, - command: input.command, - args: input.args, - cwd: input.cwd, + command, + args, + cwd, status: "running", pid: proc.pid, - } as const + } const session: Active = { info, process: proc, @@ -208,15 +235,15 @@ export const layer = Layer.effect( session.listeners.push( proc.onData((chunk) => { session.cursor += chunk.length - for (const [key, ws] of session.subscribers.entries()) { - if (ws.readyState !== 1 || sock(ws) !== key) { - session.subscribers.delete(key) + for (const [token, subscriber] of session.subscribers.entries()) { + if (!subscriber.active) { + subscriber.pending.push(chunk) continue } try { - ws.send(chunk) + subscriber.onData(chunk) } catch { - session.subscribers.delete(key) + session.subscribers.delete(token) } } session.buffer += chunk @@ -227,12 +254,19 @@ export const layer = Layer.effect( }), proc.onExit(({ exitCode }) => { if (session.info.status === "exited") return + session.info.status = "exited" + session.info.exitCode = exitCode + notifyEnd(session, { exitCode }) + exitOrder.push(id) runFork( Effect.gen(function* () { yield* Effect.logInfo("session exited", { id, exitCode }) - session.info.status = "exited" yield* events.publish(Event.Exited, { id, exitCode }) - yield* removeSession(id) + while (exitOrder.length > EXITED_LIMIT) { + const oldest = exitOrder[0] + if (!oldest) break + yield* removeSession(oldest) + } }), ) }), @@ -244,66 +278,72 @@ export const layer = Layer.effect( const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) { const session = yield* requireSession(id) if (input.title) session.info.title = input.title - if (input.size) session.process.resize(input.size.cols, input.size.rows) + if (input.size && session.info.status === "running") + session.process.resize(input.size.cols, input.size.rows) yield* events.publish(Event.Updated, { info: session.info }) return session.info }) - const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) { - const session = yield* requireSession(id) - if (session.info.status === "running") session.process.resize(cols, rows) - }) - const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) { const session = yield* requireSession(id) if (session.info.status === "running") session.process.write(data) }) - const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) { - const session = yield* requireSession(id).pipe(Effect.tapError(() => Effect.sync(() => ws.close()))) - yield* Effect.logInfo("client connected to session", { id, directory: location.directory }) - const sub = sock(ws) - session.subscribers.delete(sub) - session.subscribers.set(sub, ws) - const cleanup = () => session.subscribers.delete(sub) + const attach = Effect.fn("Pty.attach")(function* (id: PtyID, input: AttachInput) { + const session = yield* requireSession(id) + if (session.info.status !== "running") return yield* new ExitedError({ ptyID: id }) + yield* Effect.logInfo("client attached to session", { id, directory: location.directory }) + const token = {} + const subscriber: Subscriber = { + onData: input.onData, + onEnd: input.onEnd, + active: false, + detached: false, + pending: [], + } + session.subscribers.set(token, subscriber) const start = session.bufferCursor const end = session.cursor const from = - cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 - const data = (() => { + input.cursor === -1 + ? end + : typeof input.cursor === "number" && Number.isSafeInteger(input.cursor) + ? Math.max(0, input.cursor) + : 0 + const replay = (() => { if (!session.buffer || from >= end) return "" const offset = Math.max(0, from - start) if (offset >= session.buffer.length) return "" return session.buffer.slice(offset) })() - if (data) { - try { - for (let i = 0; i < data.length; i += BUFFER_CHUNK) ws.send(data.slice(i, i + BUFFER_CHUNK)) - } catch { - cleanup() - ws.close() - return - } - } - try { - ws.send(meta(end)) - } catch { - cleanup() - ws.close() - return - } return { - onMessage: (message: string | ArrayBuffer) => { - session.process.write(typeof message === "string" ? message : new TextDecoder().decode(message)) + replay, + cursor: end, + write: (data: string) => { + if (session.info.status === "running") session.process.write(data) }, - onClose: () => { - cleanup() + activate: () => { + if (subscriber.active || subscriber.detached) return + subscriber.active = true + try { + for (const chunk of subscriber.pending) subscriber.onData(chunk) + subscriber.pending.length = 0 + if (subscriber.end) subscriber.onEnd(subscriber.end) + } catch { + session.subscribers.delete(token) + } + }, + detach: () => { + subscriber.detached = true + subscriber.pending.length = 0 + subscriber.end = undefined + session.subscribers.delete(token) }, } }) - return Service.of({ list, get, create, update, remove, resize, write, connect }) + return Service.of({ list, get, create, update, remove, write, attach }) }), ) -export const locationLayer = layer +export const locationLayer = layer.pipe(Layer.provide(Config.locationLayer)) diff --git a/packages/core/src/pty/input.ts b/packages/core/src/pty/input.ts deleted file mode 100644 index 0e4ea9a61..000000000 --- a/packages/core/src/pty/input.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Effect } from "effect" - -const inputDecoder = new TextDecoder("utf-8", { fatal: true }) - -export function handlePtyInput( - handler: { onMessage: (message: string | ArrayBuffer) => void }, - message: string | Uint8Array, -) { - if (typeof message === "string") { - handler.onMessage(message) - return Effect.void - } - return Effect.try({ - try: () => inputDecoder.decode(message), - catch: () => new Error("invalid PTY websocket input"), - }).pipe( - Effect.catch(() => Effect.succeed(undefined)), - Effect.flatMap((decoded) => { - if (decoded === undefined) return Effect.void - handler.onMessage(decoded) - return Effect.void - }), - ) -} diff --git a/packages/core/src/pty/protocol.ts b/packages/core/src/pty/protocol.ts new file mode 100644 index 000000000..21c6a89f7 --- /dev/null +++ b/packages/core/src/pty/protocol.ts @@ -0,0 +1,37 @@ +export * as PtyProtocol from "./protocol" + +// Wire protocol for PTY websocket transports. The PTY domain service is transport-free; server +// routes adapt Pty.attach to websockets with these helpers so every surface speaks one protocol. +// +// Outbound frames are raw UTF-8 terminal chunks. One control frame — a 0x00 byte followed by +// UTF-8 JSON — carries the absolute output cursor after replay so clients can resume later. + +const encoder = new TextEncoder() +const decoder = new TextDecoder("utf-8", { fatal: true }) + +// Replay can be megabytes; send it in bounded frames. +export const REPLAY_CHUNK = 64 * 1024 + +export function metaFrame(cursor: number) { + const bytes = encoder.encode(JSON.stringify({ cursor })) + const out = new Uint8Array(bytes.length + 1) + out[0] = 0 + out.set(bytes, 1) + return out +} + +export function chunks(data: string) { + const out: string[] = [] + for (let i = 0; i < data.length; i += REPLAY_CHUNK) out.push(data.slice(i, i + REPLAY_CHUNK)) + return out +} + +// Inbound client frames are UTF-8 text or binary; invalid UTF-8 input is dropped. +export function decodeInput(message: string | Uint8Array | ArrayBuffer) { + if (typeof message === "string") return message + try { + return decoder.decode(message instanceof ArrayBuffer ? new Uint8Array(message) : message) + } catch { + return undefined + } +} diff --git a/packages/opencode/src/shell/shell.ts b/packages/core/src/shell.ts similarity index 82% rename from packages/opencode/src/shell/shell.ts rename to packages/core/src/shell.ts index 1e501676c..29089106d 100644 --- a/packages/opencode/src/shell/shell.ts +++ b/packages/core/src/shell.ts @@ -1,10 +1,13 @@ -import { Flag } from "@opencode-ai/core/flag/flag" -import { lazy } from "@/util/lazy" -import { Filesystem } from "@/util/filesystem" -import { which } from "@opencode-ai/core/util/which" +export * as Shell from "./shell" + import path from "path" import { spawn, type ChildProcess } from "child_process" +import { readFile } from "fs/promises" +import { statSync } from "fs" import { setTimeout as sleep } from "node:timers/promises" +import { Flag } from "./flag/flag" +import { FSUtil } from "./fs-util" +import { which } from "./util/which" const SIGKILL_TIMEOUT_MS = 200 const META: Record = { @@ -47,7 +50,7 @@ export async function killTree(proc: ChildProcess, opts?: { exited?: () => boole if (!opts?.exited?.()) { process.kill(-pid, "SIGKILL") } - } catch (_e) { + } catch { proc.kill("SIGTERM") await sleep(SIGKILL_TIMEOUT_MS) if (!opts?.exited?.()) { @@ -56,9 +59,13 @@ export async function killTree(proc: ChildProcess, opts?: { exited?: () => boole } } +function stat(file: string) { + return statSync(file, { throwIfNoEntry: false }) ?? undefined +} + function full(file: string) { if (process.platform !== "win32") return file - const shell = Filesystem.windowsPath(file) + const shell = FSUtil.windowsPath(file) if (path.win32.dirname(shell) !== ".") { if (shell.startsWith("/") && name(shell) === "bash") return gitbash() || shell return shell @@ -76,13 +83,13 @@ function ok(file: string) { } function rooted(file: string) { - return path.isAbsolute(Filesystem.windowsPath(file)) + return path.isAbsolute(FSUtil.windowsPath(file)) } function resolve(file: string) { const shell = full(file) if (rooted(shell)) { - if (Filesystem.stat(shell)?.isFile()) return shell + if (stat(shell)?.isFile()) return shell return } return which(shell) ?? undefined @@ -99,7 +106,7 @@ function win() { } async function unix() { - const text = await Filesystem.readText("/etc/shells").catch(() => "") + const text = await readFile("/etc/shells", "utf8").catch(() => "") if (text) return Array.from(new Set(text.split("\n").filter((line) => line.trim() && !line.startsWith("#")))) return ["/bin/bash", "/bin/zsh", "/bin/sh"] } @@ -109,7 +116,7 @@ function select(file: string | undefined, opts?: { acceptable?: boolean }) { const shell = resolve(file) if (shell) return shell } - if (process.platform === "win32") return win()[0]! + if (process.platform === "win32") return win()[0] return fallback() } @@ -119,7 +126,7 @@ export function gitbash() { const git = which("git") if (!git) return const file = path.join(git, "..", "..", "bin", "bash.exe") - if (Filesystem.stat(file)?.size) return file + if (stat(file)?.size) return file } function fallback() { @@ -130,7 +137,7 @@ function fallback() { } export function name(file: string) { - if (process.platform === "win32") return path.win32.parse(Filesystem.windowsPath(file)).name.toLowerCase() + if (process.platform === "win32") return path.win32.parse(FSUtil.windowsPath(file)).name.toLowerCase() return path.basename(file).toLowerCase() } @@ -192,24 +199,28 @@ export function args(file: string, command: string, cwd: string) { return ["-c", command] } -const defaultPreferred = lazy(() => select(process.env.SHELL)) -const defaultAcceptable = lazy(() => select(process.env.SHELL, { acceptable: true })) +let defaultPreferred: string | undefined +let defaultAcceptable: string | undefined export function preferred(configShell?: string) { if (configShell) return select(configShell) - return defaultPreferred() + defaultPreferred ??= select(process.env.SHELL) + return defaultPreferred +} +preferred.reset = () => { + defaultPreferred = undefined } -preferred.reset = () => defaultPreferred.reset() export function acceptable(configShell?: string) { if (configShell) return select(configShell, { acceptable: true }) - return defaultAcceptable() + defaultAcceptable ??= select(process.env.SHELL, { acceptable: true }) + return defaultAcceptable +} +acceptable.reset = () => { + defaultAcceptable = undefined } -acceptable.reset = () => defaultAcceptable.reset() export async function list(): Promise { const shells = process.platform === "win32" ? win() : await unix() return shells.filter((s) => resolve(s)).map(info) } - -export * as Shell from "./shell" diff --git a/packages/core/test/pty/info-schema.test.ts b/packages/core/test/pty/info-schema.test.ts index 9f58c45c8..6cdf263a5 100644 --- a/packages/core/test/pty/info-schema.test.ts +++ b/packages/core/test/pty/info-schema.test.ts @@ -24,4 +24,9 @@ describe("Pty.Info", () => { test("rejects a negative pid", () => { expect(() => Schema.decodeUnknownSync(Pty.Info)(sample(-1))).toThrow() }) + + test("accepts an exit code for retained exited sessions", () => { + const info = Schema.decodeUnknownSync(Pty.Info)({ ...sample(48012), status: "exited", exitCode: 4 }) + expect(info.exitCode).toBe(4) + }) }) diff --git a/packages/core/test/pty/input.test.ts b/packages/core/test/pty/input.test.ts deleted file mode 100644 index 2cfe9756b..000000000 --- a/packages/core/test/pty/input.test.ts +++ /dev/null @@ -1,19 +0,0 @@ -import { describe, expect } from "bun:test" -import { Effect } from "effect" -import { handlePtyInput } from "@opencode-ai/core/pty/input" -import { it } from "../lib/effect" - -describe("pty websocket input", () => { - it.effect("does not forward invalid binary frames to the PTY handler", () => - Effect.gen(function* () { - const messages: Array = [] - const handler = { onMessage: (message: string | ArrayBuffer) => messages.push(message) } - - yield* handlePtyInput(handler, "ready") - yield* handlePtyInput(handler, new Uint8Array([0xff, 0xfe, 0xfd])) - yield* handlePtyInput(handler, new TextEncoder().encode("hello")) - - expect(messages).toEqual(["ready", "hello"]) - }), - ) -}) diff --git a/packages/core/test/pty/protocol.test.ts b/packages/core/test/pty/protocol.test.ts new file mode 100644 index 000000000..2bf961059 --- /dev/null +++ b/packages/core/test/pty/protocol.test.ts @@ -0,0 +1,27 @@ +import { describe, expect, test } from "bun:test" +import { PtyProtocol } from "@opencode-ai/core/pty/protocol" + +describe("pty protocol", () => { + test("drops invalid binary input frames and decodes valid ones", () => { + expect(PtyProtocol.decodeInput("ready")).toBe("ready") + expect(PtyProtocol.decodeInput(new Uint8Array([0xff, 0xfe, 0xfd]))).toBeUndefined() + expect(PtyProtocol.decodeInput(new TextEncoder().encode("hello"))).toBe("hello") + expect(PtyProtocol.decodeInput(new TextEncoder().encode("hello").buffer)).toBe("hello") + }) + + test("encodes the cursor as a 0x00-prefixed JSON control frame", () => { + const frame = PtyProtocol.metaFrame(42) + expect(frame[0]).toBe(0) + expect(JSON.parse(new TextDecoder().decode(frame.subarray(1)))).toEqual({ cursor: 42 }) + }) + + test("splits replay into bounded frames", () => { + expect(PtyProtocol.chunks("")).toEqual([]) + expect(PtyProtocol.chunks("abc")).toEqual(["abc"]) + const big = "x".repeat(PtyProtocol.REPLAY_CHUNK + 1) + const frames = PtyProtocol.chunks(big) + expect(frames.length).toBe(2) + expect(frames[0].length).toBe(PtyProtocol.REPLAY_CHUNK) + expect(frames.join("")).toBe(big) + }) +}) diff --git a/packages/core/test/pty/pty-output-isolation.test.ts b/packages/core/test/pty/pty-output-isolation.test.ts deleted file mode 100644 index b7a1fec12..000000000 --- a/packages/core/test/pty/pty-output-isolation.test.ts +++ /dev/null @@ -1,110 +0,0 @@ -import { describe, expect } from "bun:test" -import { Duration, Effect, Layer, Queue } from "effect" -import { EventV2 } from "@opencode-ai/core/event" -import { Location } from "@opencode-ai/core/location" -import { Pty } from "@opencode-ai/core/pty" -import { AbsolutePath } from "@opencode-ai/core/schema" -import { location } from "../fixture/location" -import { testEffect } from "../lib/effect" - -type Socket = Parameters[1] - -const locationLayer = Layer.succeed( - Location.Service, - Location.Service.of(location({ directory: AbsolutePath.make("/tmp") })), -) -const it = testEffect(Pty.layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provideMerge(locationLayer))) -const ptyTest = process.platform === "win32" ? it.live.skip : it.live - -const createPty = Effect.fn("PtyOutputIsolationTest.createPty")(function* (command: string) { - const pty = yield* Pty.Service - return yield* Effect.acquireRelease( - pty.create({ command, args: [], cwd: "/tmp", env: { TERM: "xterm-256color", OPENCODE_TERMINAL: "1" } }), - (info) => pty.remove(info.id).pipe(Effect.ignore), - ) -}) - -const decodeOutput = (data: string | Uint8Array | ArrayBuffer) => - typeof data === "string" - ? data - : Buffer.from(data instanceof Uint8Array ? data : new Uint8Array(data)).toString("utf8") - -const makeSocket = Effect.fn("PtyOutputIsolationTest.makeSocket")(function* (data: unknown) { - const output = yield* Queue.unbounded() - const socket: Socket = { - readyState: 1, - data, - send: (data) => Queue.offerUnsafe(output, decodeOutput(data)), - close: () => {}, - } - return { socket, output } -}) - -const waitForOutput = (output: Queue.Queue, text: string, duration: Duration.Input = "5 seconds") => - Effect.gen(function* () { - let received = "" - while (!received.includes(text)) received += yield* Queue.take(output) - return received - }).pipe( - Effect.timeoutOrElse({ - duration, - orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)), - }), - ) - -describe("pty output isolation", () => { - ptyTest("does not leak output when websocket objects are reused", () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* createPty("cat") - const b = yield* createPty("cat") - const shared = yield* makeSocket({ events: { connection: "a" } }) - const outB = yield* Queue.unbounded() - - yield* pty.connect(a.id, shared.socket) - shared.socket.data = { events: { connection: "b" } } - shared.socket.send = (data) => Queue.offerUnsafe(outB, decodeOutput(data)) - yield* pty.connect(b.id, shared.socket) - yield* pty.write(a.id, "AAA\n") - - const verify = yield* makeSocket({ events: { connection: "verify-a" } }) - yield* pty.connect(a.id, verify.socket) - expect(yield* waitForOutput(verify.output, "AAA")).toContain("AAA") - expect(yield* waitForOutput(outB, "AAA", "100 millis").pipe(Effect.option)).toMatchObject({ _tag: "None" }) - }), - ) - - ptyTest("does not leak output when Bun recycles websocket objects before re-connect", () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const info = yield* createPty("cat") - const first = yield* makeSocket({ events: { connection: "a" } }) - const recycled = yield* Queue.unbounded() - - yield* pty.connect(info.id, first.socket) - first.socket.data = { events: { connection: "b" } } - first.socket.send = (data) => Queue.offerUnsafe(recycled, decodeOutput(data)) - yield* pty.write(info.id, "AAA\n") - - const verify = yield* makeSocket({ events: { connection: "verify" } }) - yield* pty.connect(info.id, verify.socket) - expect(yield* waitForOutput(verify.output, "AAA")).toContain("AAA") - expect(yield* waitForOutput(recycled, "AAA", "100 millis").pipe(Effect.option)).toMatchObject({ _tag: "None" }) - }), - ) - - ptyTest("treats in-place socket data mutation as the same connection", () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const info = yield* createPty("cat") - const data = { connId: 1 } - const socket = yield* makeSocket(data) - - yield* pty.connect(info.id, socket.socket) - data.connId = 2 - yield* pty.write(info.id, "AAA\n") - - expect(yield* waitForOutput(socket.output, "AAA")).toContain("AAA") - }), - ) -}) diff --git a/packages/core/test/pty/pty-session.test.ts b/packages/core/test/pty/pty-session.test.ts index 5903db3f3..ab15e16c4 100644 --- a/packages/core/test/pty/pty-session.test.ts +++ b/packages/core/test/pty/pty-session.test.ts @@ -1,5 +1,6 @@ import { describe, expect } from "bun:test" -import { Cause, Effect, Exit, Layer, Queue } from "effect" +import { Cause, Deferred, Effect, Exit, Layer, Queue } from "effect" +import { Config } from "@opencode-ai/core/config" import { EventV2 } from "@opencode-ai/core/event" import { Location } from "@opencode-ai/core/location" import { Pty } from "@opencode-ai/core/pty" @@ -14,7 +15,14 @@ const locationLayer = Layer.succeed( Location.Service, Location.Service.of(location({ directory: AbsolutePath.make("/tmp") })), ) -const it = testEffect(Pty.layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provideMerge(locationLayer))) +const configLayer = Layer.mock(Config.Service)({ entries: () => Effect.succeed([]) }) +const it = testEffect( + Pty.layer.pipe( + Layer.provide(configLayer), + Layer.provideMerge(EventV2.defaultLayer), + Layer.provideMerge(locationLayer), + ), +) const ptyTest = process.platform === "win32" ? it.live.skip : it.live const subscribePtyEvents = Effect.fn("PtySessionTest.subscribePtyEvents")(function* () { @@ -56,36 +64,176 @@ const waitForEvents = (events: Queue.Queue, id: PtyID, count: number) }), ) +const attachCollecting = Effect.fn("PtySessionTest.attachCollecting")(function* (id: PtyID, cursor?: number) { + const pty = yield* Pty.Service + const output = yield* Queue.unbounded() + const ended = yield* Deferred.make<{ exitCode?: number }>() + const attachment = yield* pty.attach(id, { + cursor, + onData: (chunk) => Queue.offerUnsafe(output, chunk), + onEnd: (event) => Deferred.doneUnsafe(ended, Effect.succeed(event)), + }) + attachment.activate() + return { attachment, output, ended } +}) + +const waitForOutput = (output: Queue.Queue, text: string) => + Effect.gen(function* () { + let received = "" + while (!received.includes(text)) received += yield* Queue.take(output) + return received + }).pipe( + Effect.timeoutOrElse({ + duration: "5 seconds", + orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)), + }), + ) + describe("pty", () => { it.live("returns typed not found errors for missing sessions", () => Effect.gen(function* () { const pty = yield* Pty.Service const id = "pty_missing" as PtyID - let closed = false - const socket = { readyState: 1, send: () => {}, close: () => void (closed = true) } for (const result of [ yield* pty.get(id).pipe(Effect.asVoid, Effect.exit), yield* pty.update(id, { title: "missing" }).pipe(Effect.asVoid, Effect.exit), yield* pty.remove(id).pipe(Effect.exit), - yield* pty.resize(id, 80, 24).pipe(Effect.exit), yield* pty.write(id, "input").pipe(Effect.exit), - yield* pty.connect(id, socket).pipe(Effect.asVoid, Effect.exit), + yield* pty.attach(id, { onData: () => {}, onEnd: () => {} }).pipe(Effect.asVoid, Effect.exit), ]) { expect(Exit.isFailure(result)).toBe(true) if (Exit.isFailure(result)) expect(Cause.squash(result.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) } - expect(closed).toBe(true) }), ) - ptyTest("publishes created, exited, deleted in order for a short-lived process", () => + ptyTest("retains exited sessions until removed", () => Effect.gen(function* () { + const pty = yield* Pty.Service const events = yield* subscribePtyEvents() - const info = yield* createPty("/usr/bin/env", ["sh", "-c", "sleep 0.1"]) + const info = yield* createPty("/usr/bin/env", ["sh", "-c", "exit 3"]) - expect(yield* waitForEvents(events, info.id, 3)).toEqual(["created", "exited", "deleted"]) + expect(yield* waitForEvents(events, info.id, 2)).toEqual(["created", "exited"]) + const exited = yield* pty.get(info.id) + expect(exited.status).toBe("exited") + expect(exited.exitCode).toBe(3) + + yield* pty.remove(info.id) + expect(yield* waitForEvents(events, info.id, 1)).toEqual(["deleted"]) + const missing = yield* pty.get(info.id).pipe(Effect.exit) + expect(Exit.isFailure(missing)).toBe(true) + }), + ) + + ptyTest("replays buffered output and streams live output to attachments", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const info = yield* createPty("cat") + yield* pty.write(info.id, "AAA\n") + + const first = yield* attachCollecting(info.id) + expect(yield* waitForOutput(first.output, "AAA")).toContain("AAA") + + first.attachment.write("BBB\n") + yield* waitForOutput(first.output, "BBB") + + // A later attachment replays everything already buffered. + const replayed = yield* attachCollecting(info.id) + expect(replayed.attachment.replay).toContain("AAA") + expect(replayed.attachment.replay).toContain("BBB") + expect(replayed.attachment.cursor).toBeGreaterThan(0) + + // Tail attachments skip the buffer and only see subsequent output. + const tail = yield* attachCollecting(info.id, -1) + expect(tail.attachment.replay).toBe("") + expect(tail.attachment.cursor).toBe(replayed.attachment.cursor) + }), + ) + + ptyTest("stops delivering output after detach", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const info = yield* createPty("cat") + const attached = yield* attachCollecting(info.id, -1) + + attached.attachment.detach() + yield* pty.write(info.id, "AAA\n") + + const verify = yield* attachCollecting(info.id) + yield* waitForOutput(verify.output, "AAA") + const leaked = yield* Queue.poll(attached.output) + expect(leaked._tag).toBe("None") + }), + ) + + ptyTest("isolates output between sessions", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* createPty("cat") + const b = yield* createPty("cat") + const attachedA = yield* attachCollecting(a.id) + const attachedB = yield* attachCollecting(b.id) + + yield* pty.write(a.id, "AAA\n") + yield* waitForOutput(attachedA.output, "AAA") + + const leaked = yield* Queue.poll(attachedB.output) + expect(leaked._tag).toBe("None") + }), + ) + + ptyTest("notifies attachments with the exit code and rejects attach after exit", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const events = yield* subscribePtyEvents() + const info = yield* createPty("cat") + const attached = yield* attachCollecting(info.id) + + yield* pty.write(info.id, "\u0004") + expect(yield* Deferred.await(attached.ended).pipe(Effect.timeout("5 seconds"))).toEqual({ exitCode: 0 }) + yield* waitForEvents(events, info.id, 2) + + const result = yield* pty.attach(info.id, { onData: () => {}, onEnd: () => {} }).pipe(Effect.exit) + expect(Exit.isFailure(result)).toBe(true) + if (Exit.isFailure(result)) + expect(Cause.squash(result.cause)).toMatchObject({ _tag: "Pty.ExitedError", ptyID: info.id }) + }), + ) +}) + +const configuredShell = process.platform === "win32" ? undefined : Bun.which("bash") +const configuredIt = testEffect( + Pty.layer.pipe( + Layer.provide( + Layer.mock(Config.Service)({ + entries: () => + Effect.succeed( + configuredShell + ? [new Config.Document({ type: "document", info: new Config.Info({ shell: configuredShell }) })] + : [], + ), + }), + ), + Layer.provideMerge(EventV2.defaultLayer), + Layer.provideMerge(locationLayer), + ), +) +const configuredTest = process.platform === "win32" ? configuredIt.live.skip : configuredIt.live + +describe("pty create defaults", () => { + configuredTest("defaults command, login args, and cwd from config and location", () => + Effect.gen(function* () { + if (!configuredShell) return + const pty = yield* Pty.Service + const info = yield* Effect.acquireRelease(pty.create({ title: "configured" }), (created) => + pty.remove(created.id).pipe(Effect.ignore), + ) + expect(info.command).toBe(configuredShell) + expect(info.args).toEqual(["-l"]) + expect(info.cwd).toBe("/tmp") + expect(info.title).toBe("configured") }), ) }) diff --git a/packages/opencode/test/shell/shell.test.ts b/packages/core/test/shell.test.ts similarity index 85% rename from packages/opencode/test/shell/shell.test.ts rename to packages/core/test/shell.test.ts index 1f76783ac..1cc47a79f 100644 --- a/packages/opencode/test/shell/shell.test.ts +++ b/packages/core/test/shell.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from "bun:test" import path from "path" -import { Shell } from "../../src/shell/shell" -import { Filesystem } from "@/util/filesystem" +import { Shell } from "@opencode-ai/core/shell" +import { FSUtil } from "@opencode-ai/core/fs-util" import { which } from "@opencode-ai/core/util/which" const withShell = async (shell: string | undefined, fn: () => void | Promise) => { @@ -54,6 +54,15 @@ describe("shell", () => { expect(Shell.name(Shell.acceptable("nu"))).not.toBe("nu") }) + test("builds command args per shell family", () => { + expect(Shell.args("/bin/sh", "echo hi", "/tmp")).toEqual(["-c", "echo hi"]) + expect(Shell.args("/usr/bin/fish", "echo hi", "/tmp")).toEqual(["-c", "echo hi"]) + const zsh = Shell.args("/bin/zsh", "echo hi", "/tmp") + expect(zsh[0]).toBe("-l") + expect(zsh[1]).toBe("-c") + expect(zsh.at(-1)).toBe("/tmp") + }) + if (process.platform === "win32") { test("rejects blacklisted shells case-insensitively", async () => { await withShell("NU.EXE", async () => { @@ -64,7 +73,7 @@ describe("shell", () => { test("normalizes Git Bash shell paths from env", async () => { const shell = "/cygdrive/c/Program Files/Git/bin/bash.exe" await withShell(shell, async () => { - expect(Shell.preferred()).toBe(Filesystem.windowsPath(shell)) + expect(Shell.preferred()).toBe(FSUtil.windowsPath(shell)) }) }) diff --git a/packages/opencode/src/pty-preparation.ts b/packages/opencode/src/pty-preparation.ts deleted file mode 100644 index 6362225f5..000000000 --- a/packages/opencode/src/pty-preparation.ts +++ /dev/null @@ -1,30 +0,0 @@ -export * as PtyPreparation from "./pty-preparation" - -import { Config } from "@/config/config" -import * as InstanceState from "@/effect/instance-state" -import { Plugin } from "@/plugin" -import { Shell } from "@/shell/shell" -import { Pty } from "@opencode-ai/core/pty" -import { Effect } from "effect" - -export const prepareCreate = Effect.fn("PtyPreparation.prepareCreate")(function* (input: Pty.CreateInput) { - const config = yield* Config.Service - const plugin = yield* Plugin.Service - const command = input.command || Shell.preferred((yield* config.get()).shell) - const args = Shell.login(command) ? [...(input.args ?? []), "-l"] : [...(input.args ?? [])] - const cwd = input.cwd || (yield* InstanceState.context).directory - const shell = yield* plugin.trigger("shell.env", { cwd }, { env: {} }) - const env = { - ...process.env, - ...input.env, - ...shell.env, - TERM: "xterm-256color", - OPENCODE_TERMINAL: "1", - } as Record - if (process.platform === "win32") { - env.LC_ALL = "C.UTF-8" - env.LC_CTYPE = "C.UTF-8" - env.LANG = "C.UTF-8" - } - return { command, args, cwd, title: input.title, env } -}) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts index 0058c66b5..538920b00 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts @@ -1,23 +1,22 @@ import * as InstanceState from "@/effect/instance-state" import { registerDisposer } from "@/effect/instance-registry" import { InstanceRef, WorkspaceRef } from "@/effect/instance-ref" -import { PtyPreparation } from "@/pty-preparation" +import { Plugin } from "@/plugin" import { Pty } from "@opencode-ai/core/pty" -import { handlePtyInput } from "@opencode-ai/core/pty/input" +import { PtyProtocol } from "@opencode-ai/core/pty/protocol" import { PtyID } from "@opencode-ai/core/pty/schema" import { PtyTicket } from "@opencode-ai/core/pty/ticket" import { LocationServiceMap } from "@opencode-ai/core/location-layer" import { Location } from "@opencode-ai/core/location" import { AbsolutePath } from "@opencode-ai/core/schema" -import { Shell } from "@/shell/shell" -import { EffectBridge } from "@/effect/bridge" -import { CorsConfig, isAllowedRequestOrigin, type CorsOptions } from "@/server/cors" +import { Shell } from "@opencode-ai/core/shell" +import { CorsConfig, isAllowedRequestOrigin, type CorsOptions } from "@opencode-ai/server/cors" import { PTY_CONNECT_TICKET_QUERY, PTY_CONNECT_TOKEN_HEADER, PTY_CONNECT_TOKEN_HEADER_VALUE, } from "@/server/shared/pty-ticket" -import { Effect, Layer, Option, Schema } from "effect" +import { Effect, Layer, Option, Queue, Schema } from "effect" import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { HttpApiBuilder } from "effect/unstable/httpapi" import * as Socket from "effect/unstable/socket/Socket" @@ -36,10 +35,14 @@ const ticketScope = Effect.gen(function* () { return { directory: instance?.directory, workspaceID } }) +// Legacy surface compatibility: before exited-session retention, sessions vanished the moment +// their process exited. These routes preserve that observable behavior — exited sessions are +// invisible here — while the canonical /api/pty surface exposes them until removal. export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handlers) => Effect.gen(function* () { const tickets = yield* PtyTicket.Service const cors = yield* CorsConfig + const plugin = yield* Plugin.Service const locations = yield* LocationServiceMap const unregister = registerDisposer((directory) => Effect.runPromise(locations.invalidate(Location.Ref.make({ directory: AbsolutePath.make(directory) }))), @@ -59,33 +62,42 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler }) const list = Effect.fn("PtyHttpApi.list")(function* () { - return yield* pty(Pty.Service.use((service) => service.list())) + const sessions = yield* pty(Pty.Service.use((service) => service.list())) + return sessions.filter((info) => info.status === "running") }) const create = Effect.fn("PtyHttpApi.create")(function* (ctx: { payload: typeof Pty.CreateInput.Type }) { + const cwd = ctx.payload.cwd || (yield* InstanceState.context).directory + const shell = yield* plugin.trigger("shell.env", { cwd }, { env: {} as Record }) return yield* pty( Pty.Service.use((service) => - Effect.flatMap( - PtyPreparation.prepareCreate({ - ...ctx.payload, - args: ctx.payload.args ? [...ctx.payload.args] : undefined, - env: ctx.payload.env ? { ...ctx.payload.env } : undefined, - }), - service.create, - ), + service.create({ + ...ctx.payload, + args: ctx.payload.args ? [...ctx.payload.args] : undefined, + cwd, + env: { ...ctx.payload.env, ...shell.env }, + }), ), ) }) const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) { return yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe( - Effect.catchTag("Pty.NotFoundError", (error) => - Effect.fail( + Effect.catchTag( + "Pty.NotFoundError", + (error) => new ApiError.PtyNotFoundError({ ptyID: error.ptyID, message: `PTY session not found: ${error.ptyID}`, }), - ), + ), + Effect.flatMap((info) => + info.status === "running" + ? Effect.succeed(info) + : new ApiError.PtyNotFoundError({ + ptyID: ctx.params.ptyID, + message: `PTY session not found: ${ctx.params.ptyID}`, + }), ), ) }) @@ -94,6 +106,7 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler params: { ptyID: PtyID } payload: typeof Pty.UpdateInput.Type }) { + yield* get(ctx) return yield* pty( Pty.Service.use((service) => service.update(ctx.params.ptyID, { @@ -102,26 +115,27 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler }), ), ).pipe( - Effect.catchTag("Pty.NotFoundError", (error) => - Effect.fail( + Effect.catchTag( + "Pty.NotFoundError", + (error) => new ApiError.PtyNotFoundError({ ptyID: error.ptyID, message: `PTY session not found: ${error.ptyID}`, }), - ), ), ) }) const remove = Effect.fn("PtyHttpApi.remove")(function* (ctx: { params: { ptyID: PtyID } }) { + yield* get(ctx) yield* pty(Pty.Service.use((service) => service.remove(ctx.params.ptyID))).pipe( - Effect.catchTag("Pty.NotFoundError", (error) => - Effect.fail( + Effect.catchTag( + "Pty.NotFoundError", + (error) => new ApiError.PtyNotFoundError({ ptyID: error.ptyID, message: `PTY session not found: ${error.ptyID}`, }), - ), ), ) return true @@ -131,16 +145,7 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler const request = yield* HttpServerRequest.HttpServerRequest if (request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE || !validOrigin(request, cors)) return yield* new ApiError.PtyForbiddenError({ message: "Invalid PTY connect token request" }) - yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe( - Effect.catchTag("Pty.NotFoundError", (error) => - Effect.fail( - new ApiError.PtyNotFoundError({ - ptyID: error.ptyID, - message: `PTY session not found: ${error.ptyID}`, - }), - ), - ), - ) + yield* get(ctx) return yield* tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* ticketScope) }) }) @@ -180,7 +185,7 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne request: HttpServerRequest.HttpServerRequest }) { const exists = yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe( - Effect.as(true), + Effect.map((info) => info.status === "running"), Effect.catchTag("Pty.NotFoundError", () => Effect.succeed(false)), ) if (!exists) return HttpServerResponse.empty({ status: 404 }) @@ -214,48 +219,53 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne yield* closeAccepted(WebSocketTracker.SERVER_CLOSING_EVENT()) return HttpServerResponse.empty() } - const bridge = yield* EffectBridge.make() - const writeScoped = (effect: Effect.Effect) => { - bridge.fork(effect.pipe(Effect.catch(() => Effect.void))) - } - let closed = false - const adapter = { - get readyState() { - return closed ? 3 : 1 - }, - send: (data: string | Uint8Array | ArrayBuffer) => { - if (closed) return - writeScoped(write(data instanceof ArrayBuffer ? new Uint8Array(data) : data)) - }, - close: (code?: number, reason?: string) => { - if (closed) return - closed = true - writeScoped(write(new Socket.CloseEvent(code, reason))) - }, - } - const handler = yield* pty( - Pty.Service.use((service) => service.connect(ctx.params.ptyID, adapter, cursor)), - ).pipe( - Effect.catchTag("Pty.NotFoundError", () => - closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)), - ), - ) - if (!handler) return HttpServerResponse.empty() - // The handshake runs inside `socket.runRaw`, after the input callback is - // registered, so the client cannot send frames before PTY input is wired. - yield* socket - .runRaw((message) => handlePtyInput(handler, message)) - .pipe( - Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), - Effect.ensuring( - Effect.sync(() => { - closed = true - handler.onClose() - }), - ), - Effect.orDie, - ) + // Outbound frames flow through one queue drained by a single writer so replay, live + // output, and the close frame keep their order. + const outbox = yield* Queue.unbounded() + const attachment = yield* pty( + Pty.Service.use((service) => + service.attach(ctx.params.ptyID, { + cursor, + onData: (chunk) => Queue.offerUnsafe(outbox, chunk), + onEnd: () => Queue.offerUnsafe(outbox, new Socket.CloseEvent(1000)), + }), + ), + ).pipe( + Effect.catchTags({ + "Pty.NotFoundError": () => + closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)), + "Pty.ExitedError": () => + closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)), + }), + ) + if (!attachment) return HttpServerResponse.empty() + + for (const chunk of PtyProtocol.chunks(attachment.replay)) Queue.offerUnsafe(outbox, chunk) + Queue.offerUnsafe(outbox, PtyProtocol.metaFrame(attachment.cursor)) + attachment.activate() + + const drain = Effect.gen(function* () { + while (true) { + const item = yield* Queue.take(outbox) + yield* write(item) + if (item instanceof Socket.CloseEvent) return + } + }) + + // The reader runs concurrently with the writer; whichever finishes first ends the + // connection and the attachment is always released. + yield* Effect.race( + drain, + socket.runRaw((message) => { + const decoded = PtyProtocol.decodeInput(message) + if (decoded !== undefined) attachment.write(decoded) + }), + ).pipe( + Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), + Effect.ensuring(Effect.sync(() => attachment.detach())), + Effect.orDie, + ) return HttpServerResponse.empty() }), ) diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts index 8c5c0ad96..248c0da70 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/server.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts @@ -61,7 +61,7 @@ import { PtyTicket } from "@opencode-ai/core/pty/ticket" import { Ripgrep } from "@opencode-ai/core/ripgrep" import { SessionProjector } from "@opencode-ai/core/session/projector" import { lazy } from "@/util/lazy" -import { CorsConfig, isAllowedCorsOrigin, type CorsOptions } from "@/server/cors" +import { CorsConfig, isAllowedCorsOrigin, type CorsOptions } from "@opencode-ai/server/cors" import { serveUIEffect } from "@/server/shared/ui" import { ServerAuth } from "@/server/auth" import { InstanceHttpApi, RootHttpApi } from "./api" diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 93e452ced..e8958c7ba 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -10,7 +10,7 @@ import { HttpApiApp } from "./routes/instance/httpapi/server" import { disposeMiddleware } from "./routes/instance/httpapi/lifecycle" import { WebSocketTracker } from "./routes/instance/httpapi/websocket-tracker" import { PublicApi } from "./routes/instance/httpapi/public" -import type { CorsOptions } from "./cors" +import type { CorsOptions } from "@opencode-ai/server/cors" import { lazy } from "@/util/lazy" // @ts-ignore This global is needed to prevent ai-sdk from logging warnings to stdout https://github.com/vercel/ai/blob/2dc67e0ef538307f21368db32d5a12345d98831b/packages/ai/src/logger/log-warnings.ts#L85 diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 631b881ee..b3f85c813 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -35,7 +35,7 @@ import { Tool } from "@/tool/tool" import { Permission } from "@/permission" import { SessionStatus } from "./status" import { LLM } from "./llm" -import { Shell } from "@/shell/shell" +import { Shell } from "@opencode-ai/core/shell" import { ShellID } from "@/tool/shell/id" import { FSUtil } from "@opencode-ai/core/fs-util" import { Truncate } from "@/tool/truncate" diff --git a/packages/opencode/src/tool/shell.ts b/packages/opencode/src/tool/shell.ts index cb0bc9171..620378dc1 100644 --- a/packages/opencode/src/tool/shell.ts +++ b/packages/opencode/src/tool/shell.ts @@ -12,7 +12,7 @@ import { FSUtil } from "@opencode-ai/core/fs-util" import { fileURLToPath } from "url" import { Config } from "@/config/config" import { RuntimeFlags } from "@/effect/runtime-flags" -import { Shell } from "@/shell/shell" +import { Shell } from "@opencode-ai/core/shell" import { ShellID } from "./shell/id" import * as Truncate from "./truncate" diff --git a/packages/opencode/test/pty/pty-shell.test.ts b/packages/opencode/test/pty/pty-shell.test.ts deleted file mode 100644 index c3fbec66e..000000000 --- a/packages/opencode/test/pty/pty-shell.test.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { describe, expect } from "bun:test" -import { Effect, Layer } from "effect" -import { Config } from "../../src/config/config" -import { Plugin } from "../../src/plugin" -import { PtyPreparation } from "../../src/pty-preparation" -import { Pty } from "@opencode-ai/core/pty" -import { Shell } from "../../src/shell/shell" -import { testEffect } from "../lib/effect" - -Shell.preferred.reset() - -const it = testEffect(Layer.mergeAll(Config.defaultLayer, Plugin.defaultLayer)) -const preparationIt = testEffect( - Layer.mergeAll( - Layer.mock(Config.Service)({ get: () => Effect.succeed({}) }), - Layer.mock(Plugin.Service)({ - trigger: (_name: Name, _input: Input, output: Output) => - Effect.sync(() => { - const result = output as { env: Record } - result.env.INPUT = "plugin" - result.env.FROM_PLUGIN = "plugin" - result.env.TERM = "plugin" - return output - }), - list: () => Effect.succeed([]), - init: () => Effect.void, - }), - ), -) - -const preparePty = (input: Pty.CreateInput) => PtyPreparation.prepareCreate(input) - -describe("pty shell args", () => { - if (process.platform !== "win32") return - - const ps = Bun.which("pwsh") || Bun.which("powershell") - if (ps) { - it.instance( - "does not add login args to pwsh", - () => - Effect.gen(function* () { - const info = yield* preparePty({ command: ps, title: "pwsh" }) - expect(info.args).toEqual([]) - }), - { timeout: 30000 }, - ) - } - - const bash = (() => { - const shell = Shell.preferred() - if (Shell.name(shell) === "bash") return shell - return Shell.gitbash() - })() - if (bash) { - it.instance( - "adds login args to bash", - () => - Effect.gen(function* () { - const info = yield* preparePty({ command: bash, title: "bash" }) - expect(info.args).toEqual(["-l"]) - }), - { timeout: 30000 }, - ) - } -}) - -describe("pty configured shell", () => { - const configured = process.platform === "win32" ? Bun.which("pwsh") || Bun.which("powershell") : Bun.which("bash") - - it.instance( - "uses configured shell for default PTY command", - () => - Effect.gen(function* () { - if (!configured) return - - const info = yield* preparePty({ title: "configured" }) - if (process.platform === "win32") { - expect(info.command.toLowerCase()).toBe(configured.toLowerCase()) - } else { - expect(info.command).toBe(configured) - } - expect(info.args).toEqual(process.platform === "win32" ? [] : ["-l"]) - }), - configured ? { config: { shell: Shell.name(configured) } } : undefined, - { timeout: 30000 }, - ) -}) - -describe("pty environment preparation", () => { - preparationIt.instance("merges plugin environment before forced PTY values", () => - Effect.gen(function* () { - const input = { command: "/bin/sh", args: [] as string[], env: { INPUT: "caller" } } - const prepared = yield* preparePty(input) - - expect(input.args).toEqual([]) - expect(prepared.env.INPUT).toBe("plugin") - expect(prepared.env.FROM_PLUGIN).toBe("plugin") - expect(prepared.env.TERM).toBe("xterm-256color") - expect(prepared.env.OPENCODE_TERMINAL).toBe("1") - }), - ) -}) diff --git a/packages/opencode/test/server/httpapi-exercise/index.ts b/packages/opencode/test/server/httpapi-exercise/index.ts index 982f5cb97..3860d742d 100644 --- a/packages/opencode/test/server/httpapi-exercise/index.ts +++ b/packages/opencode/test/server/httpapi-exercise/index.ts @@ -757,6 +757,44 @@ const scenarios: Scenario[] = [ .seeded((ctx) => ctx.file("hello.txt", "hello\n")) .at((ctx) => ({ path: "/api/fs/find?query=hello&type=file", headers: ctx.headers() })) .json(200, locationData(array)), + http.protected.get("/api/pty", "v2.pty.list").json(200, locationData(array)), + http.protected + .post("/api/pty", "v2.pty.create") + .mutating() + .at((ctx) => ({ path: "/api/pty", headers: ctx.headers(), body: controlledPtyInput("HTTP API V2 PTY") })) + .json(200, locationData(object)), + http.protected + .get("/api/pty/{ptyID}", "v2.pty.get") + .at((ctx) => ({ path: route("/api/pty/{ptyID}", { ptyID: "pty_httpapi_missing" }), headers: ctx.headers() })) + .json(404, object, "status"), + http.protected + .put("/api/pty/{ptyID}", "v2.pty.update") + .mutating() + .at((ctx) => ({ + path: route("/api/pty/{ptyID}", { ptyID: "pty_httpapi_missing" }), + headers: ctx.headers(), + body: { title: "missing" }, + })) + .json(404, object, "status"), + http.protected + .delete("/api/pty/{ptyID}", "v2.pty.remove") + .mutating() + .at((ctx) => ({ path: route("/api/pty/{ptyID}", { ptyID: "pty_httpapi_missing" }), headers: ctx.headers() })) + .json(404, object, "status"), + http.protected + .post("/api/pty/{ptyID}/connect-token", "v2.pty.connectToken") + .at((ctx) => ({ + path: route("/api/pty/{ptyID}/connect-token", { ptyID: "pty_httpapi_missing" }), + headers: { ...ctx.headers(), "x-opencode-ticket": "1" }, + })) + .json(404, object, "status"), + http.protected + .get("/api/pty/{ptyID}/connect", "v2.pty.connect") + .at((ctx) => ({ + path: route("/api/pty/{ptyID}/connect", { ptyID: "pty_httpapi_missing" }), + headers: ctx.headers(), + })) + .status(404, undefined, "none"), http.protected.get("/api/reference", "v2.reference.list").json(200, object), http.protected .get("/api/provider/{providerID}", "v2.provider.get") diff --git a/packages/opencode/test/server/httpapi-pty.test.ts b/packages/opencode/test/server/httpapi-pty.test.ts index 28216484b..3eec0c968 100644 --- a/packages/opencode/test/server/httpapi-pty.test.ts +++ b/packages/opencode/test/server/httpapi-pty.test.ts @@ -136,6 +136,33 @@ describe("pty HttpApi bridge", () => { }) }) + testPty("hides exited sessions on the legacy surface", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const headers = { "x-opencode-directory": tmp.path } + const created = await app().request(PtyPaths.create, { + method: "POST", + headers: { ...headers, "content-type": "application/json" }, + body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "exit 0"] }), + }) + expect(created.status).toBe(200) + const info = await created.json() + + // Exited sessions are retained by core for the canonical surface, but the legacy + // routes preserve pre-retention behavior: exited sessions are invisible here. + const deadline = Date.now() + 5_000 + while (Date.now() < deadline) { + const found = await app().request(PtyPaths.get.replace(":ptyID", info.id), { headers }) + if (found.status === 404) break + await new Promise((resolve) => setTimeout(resolve, 50)) + } + const found = await app().request(PtyPaths.get.replace(":ptyID", info.id), { headers }) + expect(found.status).toBe(404) + + const list = await app().request(PtyPaths.list, { headers }) + expect(list.status).toBe(200) + expect(await list.json()).toEqual([]) + }) + testPty("disposes PTY sessions with their legacy instance", async () => { await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) const headers = { "x-opencode-directory": tmp.path } diff --git a/packages/opencode/test/server/httpapi-v2-pty.test.ts b/packages/opencode/test/server/httpapi-v2-pty.test.ts new file mode 100644 index 000000000..0a2aa8bcb --- /dev/null +++ b/packages/opencode/test/server/httpapi-v2-pty.test.ts @@ -0,0 +1,171 @@ +import { afterEach, describe, expect, test } from "bun:test" +import { Context, Config as EffectConfig, Effect, Layer, Queue, Schema } from "effect" +import { NodeHttpServer, NodeServices } from "@effect/platform-node" +import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http" +import * as Socket from "effect/unstable/socket/Socket" +import { Location } from "@opencode-ai/core/location" +import { Pty } from "@opencode-ai/core/pty" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" +import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server" +import { resetDatabase } from "../fixture/db" +import { disposeAllInstances, tmpdir, tmpdirScoped } from "../fixture/fixture" +import { testEffect } from "../lib/effect" + +const context = Context.empty() as Context.Context +const testPty = process.platform === "win32" ? test.skip : test + +function request(route: string, directory: string, init: RequestInit = {}) { + const headers = new Headers(init.headers) + headers.set("x-opencode-directory", directory) + return HttpApiApp.webHandler().handler( + new Request(`http://localhost${route}`, { + ...init, + headers, + }), + context, + ) +} + +const testStateLayer = Layer.effectDiscard( + Effect.gen(function* () { + yield* Effect.promise(() => resetDatabase()) + yield* Effect.addFinalizer(() => Effect.promise(() => resetDatabase())) + }), +) + +const servedRoutes: Layer.Layer = HttpRouter.serve( + HttpApiApp.routes, + { disableListenLog: true, disableLogger: true }, +) + +const effectIt = testEffect( + Layer.mergeAll( + testStateLayer, + Socket.layerWebSocketConstructorGlobal, + servedRoutes.pipe( + Layer.provide(Socket.layerWebSocketConstructorGlobal), + Layer.provideMerge(NodeHttpServer.layerTest), + Layer.provideMerge(NodeServices.layer), + ), + ), +) + +const directoryHeader = (dir: string) => HttpClientRequest.setHeader("x-opencode-directory", dir) + +const serverUrl = () => HttpServer.HttpServer.use((server) => Effect.succeed(HttpServer.formatAddress(server.address))) + +afterEach(async () => { + await disposeAllInstances() + await resetDatabase() +}) + +describe("v2 pty HttpApi", () => { + testPty("serves location-wrapped PTY routes and retains exited sessions", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + + const empty = await request("/api/pty", tmp.path) + expect(empty.status).toBe(200) + expect(Schema.decodeUnknownSync(Location.response(Schema.Array(Pty.Info)))(await empty.json()).data).toEqual([]) + + const created = await request("/api/pty", tmp.path, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "exit 4"], title: "v2" }), + }) + expect(created.status).toBe(200) + const body = Schema.decodeUnknownSync(Location.response(Pty.Info))(await created.json()) + expect(String(body.location.directory)).toBe(tmp.path) + expect(body.data.title).toBe("v2") + + // The canonical surface keeps exited sessions observable with their exit code. + const deadline = Date.now() + 5_000 + let info: { status: string; exitCode?: number } | undefined + while (Date.now() < deadline) { + const found = await request(`/api/pty/${body.data.id}`, tmp.path) + expect(found.status).toBe(200) + info = Schema.decodeUnknownSync(Location.response(Pty.Info))(await found.json()).data + if (info.status === "exited") break + await new Promise((resolve) => setTimeout(resolve, 50)) + } + expect(info).toMatchObject({ status: "exited", exitCode: 4 }) + + const removed = await request(`/api/pty/${body.data.id}`, tmp.path, { method: "DELETE" }) + expect(removed.status).toBe(204) + + const missing = await request(`/api/pty/${body.data.id}`, tmp.path) + expect(missing.status).toBe(404) + expect(await missing.json()).toMatchObject({ _tag: "PtyNotFoundError", ptyID: body.data.id }) + }) + + testPty("rejects connect tokens without the CSRF header and connects with a valid ticket", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const created = await request("/api/pty", tmp.path, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "sleep 5"] }), + }) + expect(created.status).toBe(200) + const info = Schema.decodeUnknownSync(Location.response(Pty.Info))(await created.json()).data + + try { + const forbidden = await request(`/api/pty/${info.id}/connect-token`, tmp.path, { method: "POST" }) + expect(forbidden.status).toBe(403) + expect(await forbidden.json()).toMatchObject({ _tag: "ForbiddenError" }) + + const token = await request(`/api/pty/${info.id}/connect-token`, tmp.path, { + method: "POST", + headers: { "x-opencode-ticket": "1" }, + }) + expect(token.status).toBe(200) + const ticket = Schema.decodeUnknownSync(Location.response(PtyTicket.ConnectToken))(await token.json()).data.ticket + expect(ticket).toBeTruthy() + + const invalid = await request(`/api/pty/${info.id}/connect?ticket=not-a-ticket`, tmp.path) + expect(invalid.status).toBe(403) + } finally { + await request(`/api/pty/${info.id}`, tmp.path, { method: "DELETE" }) + } + }) + ;(process.platform === "win32" ? effectIt.live.skip : effectIt.live)( + "serves PTY websocket output and input through the canonical route", + () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true, config: { formatter: false, lsp: false } }) + const created = yield* HttpClientRequest.post("/api/pty").pipe( + directoryHeader(dir), + HttpClientRequest.bodyJson({ command: "/bin/cat", title: "v2-websocket" }), + Effect.flatMap(HttpClient.execute), + ) + expect(created.status).toBe(200) + const body = yield* Schema.decodeUnknownEffect(Location.response(Pty.Info))(yield* created.json) + const info = body.data + + const socket = yield* Socket.makeWebSocket( + `${(yield* serverUrl()).replace(/^http/, "ws")}/api/pty/${info.id}/connect?cursor=-1&location[directory]=${encodeURIComponent(dir)}`, + { closeCodeIsError: () => false }, + ) + const messages = yield* Queue.unbounded() + yield* socket + .runRaw((message) => + Queue.offer(messages, typeof message === "string" ? message : new TextDecoder().decode(message)), + ) + .pipe(Effect.catch(() => Effect.void)) + .pipe(Effect.forkScoped) + const write = yield* socket.writer + + const takeUntil = (expected: string, seen = ""): Effect.Effect => + Effect.gen(function* () { + const next = seen + (yield* Queue.take(messages).pipe(Effect.timeout("5 seconds"))) + if (next.includes(expected)) return next + return yield* takeUntil(expected, next) + }) + + yield* write("ping-v2\n") + expect(yield* takeUntil("ping-v2")).toContain("ping-v2") + yield* write(new Socket.CloseEvent(1000, "done")).pipe(Effect.catch(() => Effect.void)) + + const removed = yield* HttpClientRequest.delete(`/api/pty/${info.id}`).pipe(directoryHeader(dir), HttpClient.execute) + expect(removed.status).toBe(204) + }), + ) +}) diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 12fa11869..08828018a 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -43,7 +43,7 @@ import { SessionV2 } from "@opencode-ai/core/session" import { SessionExecution } from "@opencode-ai/core/session/execution" import { Skill } from "../../src/skill" import { SystemPrompt } from "../../src/session/system" -import { Shell } from "../../src/shell/shell" +import { Shell } from "@opencode-ai/core/shell" import { Snapshot } from "../../src/snapshot" import { ToolRegistry } from "@/tool/registry" import { Truncate } from "@/tool/truncate" diff --git a/packages/opencode/test/tool/shell.test.ts b/packages/opencode/test/tool/shell.test.ts index d679fda1a..2ea1d1458 100644 --- a/packages/opencode/test/tool/shell.test.ts +++ b/packages/opencode/test/tool/shell.test.ts @@ -5,7 +5,7 @@ import type * as Scope from "effect/Scope" import os from "os" import path from "path" import { Config } from "@/config/config" -import { Shell } from "../../src/shell/shell" +import { Shell } from "@opencode-ai/core/shell" import { ShellTool } from "../../src/tool/shell" import { Filesystem } from "@/util/filesystem" import { provideInstance, testInstanceStoreLayer, tmpdirScoped } from "../fixture/fixture" diff --git a/packages/server/src/api.ts b/packages/server/src/api.ts index 6853ad16c..42573e064 100644 --- a/packages/server/src/api.ts +++ b/packages/server/src/api.ts @@ -11,6 +11,7 @@ import { SkillGroup } from "./groups/skill" import { EventGroup } from "./groups/event" import { AgentGroup } from "./groups/agent" import { HealthGroup } from "./groups/health" +import { PtyGroup } from "./groups/pty" import { QuestionGroup } from "./groups/question" import { ReferenceGroup } from "./groups/reference" import { Authorization } from "./middleware/authorization" @@ -34,6 +35,7 @@ export const Api = HttpApi.make("server") .add(CommandGroup) .add(SkillGroup) .add(EventGroup) + .add(PtyGroup) .add(QuestionGroup) .add(ReferenceGroup) .add(ProjectCopyGroup) diff --git a/packages/opencode/src/server/cors.ts b/packages/server/src/cors.ts similarity index 100% rename from packages/opencode/src/server/cors.ts rename to packages/server/src/cors.ts diff --git a/packages/server/src/errors.ts b/packages/server/src/errors.ts index 2b1dcaf11..2cf1eea58 100644 --- a/packages/server/src/errors.ts +++ b/packages/server/src/errors.ts @@ -84,3 +84,18 @@ export class QuestionNotFoundError extends Schema.TaggedErrorClass()( + "ForbiddenError", + { message: Schema.String }, + { httpApiStatus: 403 }, +) {} + +export class PtyNotFoundError extends Schema.TaggedErrorClass()( + "PtyNotFoundError", + { + ptyID: Schema.String, + message: Schema.String, + }, + { httpApiStatus: 404 }, +) {} diff --git a/packages/server/src/groups/pty.ts b/packages/server/src/groups/pty.ts new file mode 100644 index 000000000..b5bbc7bf5 --- /dev/null +++ b/packages/server/src/groups/pty.ts @@ -0,0 +1,144 @@ +import { Pty } from "@opencode-ai/core/pty" +import { PtyID } from "@opencode-ai/core/pty/schema" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" +import { Location } from "@opencode-ai/core/location" +import { Schema } from "effect" +import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi" +import { ForbiddenError, PtyNotFoundError } from "../errors" +import { LocationQuery, locationQueryOpenApi, LocationMiddleware } from "./location" + +export const PTY_CONNECT_TICKET_QUERY = "ticket" +export const PTY_CONNECT_TOKEN_HEADER = "x-opencode-ticket" +export const PTY_CONNECT_TOKEN_HEADER_VALUE = "1" + +const PTY_CONNECT_PATH = /^\/api\/pty\/[^/]+\/connect$/ + +// Authorization middleware skips credential checks when this matches; the PTY connect handler +// is then responsible for consuming and validating the ticket. +export function hasPtyConnectTicketURL(url: URL) { + return PTY_CONNECT_PATH.test(url.pathname) && !!url.searchParams.get(PTY_CONNECT_TICKET_QUERY) +} + +export const PtyGroup = HttpApiGroup.make("server.pty") + .add( + HttpApiEndpoint.get("pty.list", "/api/pty", { + query: LocationQuery, + success: Location.response(Schema.Array(Pty.Info)), + }) + .annotateMerge(locationQueryOpenApi) + .annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.list", + summary: "List PTY sessions", + description: "List PTY sessions for a location, including exited sessions retained until removal.", + }), + ), + ) + .add( + HttpApiEndpoint.post("pty.create", "/api/pty", { + query: LocationQuery, + payload: Pty.CreateInput, + success: Location.response(Pty.Info), + }) + .annotateMerge(locationQueryOpenApi) + .annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.create", + summary: "Create PTY session", + description: "Create a pseudo-terminal session for a location.", + }), + ), + ) + .add( + HttpApiEndpoint.get("pty.get", "/api/pty/:ptyID", { + params: { ptyID: PtyID }, + query: LocationQuery, + success: Location.response(Pty.Info), + error: PtyNotFoundError, + }) + .annotateMerge(locationQueryOpenApi) + .annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.get", + summary: "Get PTY session", + description: "Get one PTY session, including its exit code once exited.", + }), + ), + ) + .add( + HttpApiEndpoint.put("pty.update", "/api/pty/:ptyID", { + params: { ptyID: PtyID }, + query: LocationQuery, + payload: Pty.UpdateInput, + success: Location.response(Pty.Info), + error: PtyNotFoundError, + }) + .annotateMerge(locationQueryOpenApi) + .annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.update", + summary: "Update PTY session", + description: "Update the title or viewport size of one PTY session.", + }), + ), + ) + .add( + HttpApiEndpoint.delete("pty.remove", "/api/pty/:ptyID", { + params: { ptyID: PtyID }, + query: LocationQuery, + success: HttpApiSchema.NoContent, + error: PtyNotFoundError, + }) + .annotateMerge(locationQueryOpenApi) + .annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.remove", + summary: "Remove PTY session", + description: "Terminate and remove one PTY session.", + }), + ), + ) + .add( + HttpApiEndpoint.post("pty.connectToken", "/api/pty/:ptyID/connect-token", { + params: { ptyID: PtyID }, + query: LocationQuery, + success: Location.response(PtyTicket.ConnectToken), + error: [ForbiddenError, PtyNotFoundError], + }) + .annotateMerge(locationQueryOpenApi) + .annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.connectToken", + summary: "Create PTY WebSocket token", + description: "Create a short-lived single-use ticket for opening a PTY WebSocket connection.", + }), + ), + ) + .add( + // Query fields are decoded in the raw handler after the existence check so a missing + // session responds with an empty 404 before any upgrade work. + HttpApiEndpoint.get("pty.connect", "/api/pty/:ptyID/connect", { + params: { ptyID: PtyID }, + success: Schema.Boolean, + error: [ForbiddenError, PtyNotFoundError], + }).annotateMerge( + OpenApi.annotations({ + identifier: "v2.pty.connect", + summary: "Connect to PTY session", + description: "Establish a WebSocket connection streaming PTY output and accepting terminal input.", + transform: (operation) => ({ + ...operation, + parameters: [ + ...(operation.parameters ?? []), + ...["location[directory]", "location[workspace]", "cursor", PTY_CONNECT_TICKET_QUERY].map((name) => ({ + in: "query", + name, + schema: { type: "string" }, + })), + ], + }), + }), + ), + ) + .annotateMerge(OpenApi.annotations({ title: "pty", description: "Experimental location-scoped PTY routes." })) + .middleware(LocationMiddleware) diff --git a/packages/server/src/handlers.ts b/packages/server/src/handlers.ts index 31eff1818..d26c1618f 100644 --- a/packages/server/src/handlers.ts +++ b/packages/server/src/handlers.ts @@ -1,6 +1,7 @@ import { SessionV2 } from "@opencode-ai/core/session" import { LocationServiceMap } from "@opencode-ai/core/location-layer" import { PermissionSaved } from "@opencode-ai/core/permission/saved" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" import { Layer } from "effect" import { layer as locationLayer } from "./groups/location" import { sessionLocationLayer } from "./middleware/session-location" @@ -15,6 +16,7 @@ import { SkillHandler } from "./handlers/skill" import { EventHandler } from "./handlers/event" import { AgentHandler } from "./handlers/agent" import { HealthHandler } from "./handlers/health" +import { PtyHandler } from "./handlers/pty" import { QuestionHandler } from "./handlers/question" import { ReferenceHandler } from "./handlers/reference" import * as SessionExecutionLocal from "@opencode-ai/core/session/execution/local" @@ -39,6 +41,7 @@ export const handlers = Layer.mergeAll( CommandHandler, SkillHandler, EventHandler, + PtyHandler, QuestionHandler, ReferenceHandler, ProjectCopyHandler, @@ -48,6 +51,7 @@ export const handlers = Layer.mergeAll( Layer.provide(SessionV2.defaultLayer), Layer.provide(SessionExecutionLocal.defaultLayer), Layer.provide(PermissionSaved.defaultLayer), + Layer.provide(PtyTicket.defaultLayer), Layer.provide(LocationServiceMap.layer), Layer.provide(Credential.defaultLayer), ) diff --git a/packages/server/src/handlers/pty.ts b/packages/server/src/handlers/pty.ts new file mode 100644 index 000000000..0ab4567a2 --- /dev/null +++ b/packages/server/src/handlers/pty.ts @@ -0,0 +1,221 @@ +import { Pty } from "@opencode-ai/core/pty" +import { PtyProtocol } from "@opencode-ai/core/pty/protocol" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" +import { Location } from "@opencode-ai/core/location" +import { Effect, Queue } from "effect" +import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http" +import { HttpApiBuilder, HttpApiSchema } from "effect/unstable/httpapi" +import * as Socket from "effect/unstable/socket/Socket" +import { Api } from "../api" +import { CorsConfig, isAllowedRequestOrigin } from "../cors" +import { ForbiddenError, PtyNotFoundError } from "../errors" +import { + PTY_CONNECT_TICKET_QUERY, + PTY_CONNECT_TOKEN_HEADER, + PTY_CONNECT_TOKEN_HEADER_VALUE, +} from "../groups/pty" +import { response } from "../groups/location" + +const ticketScope = Effect.gen(function* () { + const location = yield* Location.Service + return { directory: location.directory as string, workspaceID: location.workspaceID } +}) + +export const PtyHandler = HttpApiBuilder.group(Api, "server.pty", (handlers) => + Effect.gen(function* () { + const tickets = yield* PtyTicket.Service + const cors = yield* CorsConfig + + return handlers + .handle( + "pty.list", + Effect.fn(function* () { + return yield* response((yield* Pty.Service).list()) + }), + ) + .handle( + "pty.create", + Effect.fn(function* (ctx) { + const pty = yield* Pty.Service + return yield* response( + pty.create({ + ...ctx.payload, + args: ctx.payload.args ? [...ctx.payload.args] : undefined, + env: ctx.payload.env ? { ...ctx.payload.env } : undefined, + }), + ) + }), + ) + .handle( + "pty.get", + Effect.fn(function* (ctx) { + const pty = yield* Pty.Service + return yield* response( + pty + .get(ctx.params.ptyID) + .pipe( + Effect.catchTag( + "Pty.NotFoundError", + () => + new PtyNotFoundError({ + ptyID: ctx.params.ptyID, + message: `PTY session not found: ${ctx.params.ptyID}`, + }), + ), + ), + ) + }), + ) + .handle( + "pty.update", + Effect.fn(function* (ctx) { + const pty = yield* Pty.Service + return yield* response( + pty + .update(ctx.params.ptyID, { + ...ctx.payload, + size: ctx.payload.size ? { ...ctx.payload.size } : undefined, + }) + .pipe( + Effect.catchTag( + "Pty.NotFoundError", + () => + new PtyNotFoundError({ + ptyID: ctx.params.ptyID, + message: `PTY session not found: ${ctx.params.ptyID}`, + }), + ), + ), + ) + }), + ) + .handle( + "pty.remove", + Effect.fn(function* (ctx) { + const pty = yield* Pty.Service + yield* pty + .remove(ctx.params.ptyID) + .pipe( + Effect.catchTag( + "Pty.NotFoundError", + () => + new PtyNotFoundError({ + ptyID: ctx.params.ptyID, + message: `PTY session not found: ${ctx.params.ptyID}`, + }), + ), + ) + return HttpApiSchema.NoContent.make() + }), + ) + .handle( + "pty.connectToken", + Effect.fn(function* (ctx) { + const request = yield* HttpServerRequest.HttpServerRequest + // The custom header forces a CORS preflight, so cross-origin browser pages cannot + // mint tickets without passing the server's origin policy. + if ( + request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE || + !isAllowedRequestOrigin(request.headers.origin, request.headers.host, cors) + ) + return yield* new ForbiddenError({ message: "Invalid PTY connect token request" }) + const pty = yield* Pty.Service + yield* pty + .get(ctx.params.ptyID) + .pipe( + Effect.catchTag( + "Pty.NotFoundError", + () => + new PtyNotFoundError({ + ptyID: ctx.params.ptyID, + message: `PTY session not found: ${ctx.params.ptyID}`, + }), + ), + ) + return yield* response(tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* ticketScope) })) + }), + ) + .handleRaw( + "pty.connect", + Effect.fn("PtyHandler.connect")(function* (ctx) { + const pty = yield* Pty.Service + const exists = yield* pty.get(ctx.params.ptyID).pipe( + Effect.as(true), + Effect.catchTag("Pty.NotFoundError", () => Effect.succeed(false)), + ) + if (!exists) return HttpServerResponse.empty({ status: 404 }) + + const url = new URL(ctx.request.url, "http://localhost") + const ticket = url.searchParams.get(PTY_CONNECT_TICKET_QUERY) + if (ticket) { + const valid = isAllowedRequestOrigin(ctx.request.headers.origin, ctx.request.headers.host, cors) + ? yield* tickets.consume({ ticket, ptyID: ctx.params.ptyID, ...(yield* ticketScope) }) + : false + if (!valid) return HttpServerResponse.empty({ status: 403 }) + } + const parsedCursor = url.searchParams.get("cursor") + const cursorNumber = parsedCursor === null ? undefined : Number(parsedCursor) + const cursor = + cursorNumber !== undefined && Number.isSafeInteger(cursorNumber) && cursorNumber >= -1 + ? cursorNumber + : undefined + + const socket = yield* Effect.orDie(ctx.request.upgrade) + const write = yield* socket.writer + const closeAccepted = (event: Socket.CloseEvent) => + socket + .runRaw(() => Effect.void, { onOpen: write(event).pipe(Effect.catch(() => Effect.void)) }) + .pipe( + Effect.timeout("1 second"), + Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), + Effect.catch(() => Effect.void), + ) + + // Outbound frames flow through one queue drained by a single writer so replay, live + // output, and the close frame keep their order. + // TODO: Integrate graceful-shutdown socket tracking before clients migrate to this route. + const outbox = yield* Queue.unbounded() + const attachment = yield* pty + .attach(ctx.params.ptyID, { + cursor, + onData: (chunk) => Queue.offerUnsafe(outbox, chunk), + onEnd: () => Queue.offerUnsafe(outbox, new Socket.CloseEvent(1000)), + }) + .pipe( + Effect.catchTags({ + "Pty.NotFoundError": () => + closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)), + "Pty.ExitedError": () => + closeAccepted(new Socket.CloseEvent(4404, "session exited")).pipe(Effect.as(undefined)), + }), + ) + if (!attachment) return HttpServerResponse.empty() + + for (const chunk of PtyProtocol.chunks(attachment.replay)) Queue.offerUnsafe(outbox, chunk) + Queue.offerUnsafe(outbox, PtyProtocol.metaFrame(attachment.cursor)) + attachment.activate() + + const drain = Effect.gen(function* () { + while (true) { + const item = yield* Queue.take(outbox) + yield* write(item) + if (item instanceof Socket.CloseEvent) return + } + }) + + yield* Effect.race( + drain, + socket.runRaw((message) => { + const decoded = PtyProtocol.decodeInput(message) + if (decoded !== undefined) attachment.write(decoded) + }), + ).pipe( + Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void), + Effect.ensuring(Effect.sync(() => attachment.detach())), + Effect.orDie, + ) + return HttpServerResponse.empty() + }), + ) + }), +) diff --git a/packages/server/src/middleware/authorization.ts b/packages/server/src/middleware/authorization.ts index edbc85bb4..5a8dae205 100644 --- a/packages/server/src/middleware/authorization.ts +++ b/packages/server/src/middleware/authorization.ts @@ -1,5 +1,6 @@ import { ServerAuth } from "../auth" import { UnauthorizedError } from "../errors" +import { hasPtyConnectTicketURL } from "../groups/pty" import { Effect, Encoding, Layer, Redacted } from "effect" import { HttpEffect, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { HttpApiMiddleware } from "effect/unstable/httpapi" @@ -45,6 +46,9 @@ export const authorizationLayer = Layer.effect( return Authorization.of((effect) => Effect.gen(function* () { const request = yield* HttpServerRequest.HttpServerRequest + // Browsers cannot set headers on WebSocket upgrades, so a ticketed PTY connect skips + // credential checks here; the connect handler consumes and validates the ticket. + if (hasPtyConnectTicketURL(new URL(request.url, "http://localhost"))) return yield* effect const credential = yield* credentialFromRequest(request) if (ServerAuth.authorized(credential, config)) return yield* effect yield* HttpEffect.appendPreResponseHandler((_request, response) =>