From 932fb6c9ecf780efbdb457a340cb3fa0ccfe1f99 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Wed, 3 Jun 2026 15:17:46 +0530 Subject: [PATCH] refactor(core): consolidate pty service (#30537) --- bun.lock | 4 +- package.json | 2 +- packages/core/package.json | 8 + .../{opencode => core}/script/fix-node-pty.ts | 0 packages/core/src/location-layer.ts | 2 + .../src/pty/index.ts => core/src/pty.ts} | 267 +++++++----------- packages/{opencode => core}/src/pty/input.ts | 0 .../{opencode => core}/src/pty/pty.bun.ts | 0 .../{opencode => core}/src/pty/pty.node.ts | 0 packages/{opencode => core}/src/pty/pty.ts | 0 packages/{opencode => core}/src/pty/schema.ts | 5 +- packages/{opencode => core}/src/pty/ticket.ts | 16 +- .../test/pty/info-schema.test.ts | 8 +- .../test/pty/input.test.ts} | 4 +- .../test/pty/pty-output-isolation.test.ts | 110 ++++++++ packages/core/test/pty/pty-session.test.ts | 91 ++++++ .../test/pty/ticket.test.ts | 4 +- packages/opencode/package.json | 8 - packages/opencode/src/effect/app-runtime.ts | 4 - packages/opencode/src/pty-preparation.ts | 30 ++ .../routes/instance/httpapi/groups/pty.ts | 6 +- .../routes/instance/httpapi/handlers/pty.ts | 125 +++++--- .../server/routes/instance/httpapi/server.ts | 4 +- .../test/pty/pty-output-isolation.test.ts | 162 ----------- .../opencode/test/pty/pty-session.test.ts | 140 --------- packages/opencode/test/pty/pty-shell.test.ts | 57 +++- .../httpapi-instance-route-auth.test.ts | 2 +- .../opencode/test/server/httpapi-pty.test.ts | 21 +- 28 files changed, 504 insertions(+), 576 deletions(-) rename packages/{opencode => core}/script/fix-node-pty.ts (100%) rename packages/{opencode/src/pty/index.ts => core/src/pty.ts} (53%) rename packages/{opencode => core}/src/pty/input.ts (100%) rename packages/{opencode => core}/src/pty/pty.bun.ts (100%) rename packages/{opencode => core}/src/pty/pty.node.ts (100%) rename packages/{opencode => core}/src/pty/pty.ts (100%) rename packages/{opencode => core}/src/pty/schema.ts (79%) rename packages/{opencode => core}/src/pty/ticket.ts (83%) rename packages/{opencode => core}/test/pty/info-schema.test.ts (57%) rename packages/{opencode/test/server/httpapi-pty-websocket.test.ts => core/test/pty/input.test.ts} (86%) create mode 100644 packages/core/test/pty/pty-output-isolation.test.ts create mode 100644 packages/core/test/pty/pty-session.test.ts rename packages/{opencode => core}/test/pty/ticket.test.ts (95%) create mode 100644 packages/opencode/src/pty-preparation.ts delete mode 100644 packages/opencode/test/pty/pty-output-isolation.test.ts delete mode 100644 packages/opencode/test/pty/pty-session.test.ts diff --git a/bun.lock b/bun.lock index 5e7bc100b..95190cbd8 100644 --- a/bun.lock +++ b/bun.lock @@ -260,6 +260,7 @@ "@effect/opentelemetry": "catalog:", "@effect/platform-node": "catalog:", "@effect/sql-sqlite-bun": "catalog:", + "@lydell/node-pty": "catalog:", "@npmcli/arborist": "9.4.0", "@npmcli/config": "10.8.1", "@opencode-ai/effect-drizzle-sqlite": "workspace:*", @@ -271,6 +272,7 @@ "@opentelemetry/sdk-trace-base": "2.6.1", "@parcel/watcher": "2.5.1", "ai-gateway-provider": "3.1.2", + "bun-pty": "0.4.8", "cross-spawn": "catalog:", "drizzle-orm": "catalog:", "effect": "catalog:", @@ -501,7 +503,6 @@ "@effect/opentelemetry": "catalog:", "@effect/platform-node": "catalog:", "@gitlab/opencode-gitlab-auth": "1.3.3", - "@lydell/node-pty": "catalog:", "@modelcontextprotocol/sdk": "1.27.1", "@octokit/graphql": "9.0.2", "@octokit/rest": "catalog:", @@ -531,7 +532,6 @@ "ai": "catalog:", "ai-gateway-provider": "3.1.2", "bonjour-service": "1.3.0", - "bun-pty": "0.4.8", "chokidar": "4.0.3", "clipboardy": "4.0.0", "cross-spawn": "catalog:", diff --git a/package.json b/package.json index 1e3062699..519112f49 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "lint": "oxlint", "typecheck": "bun turbo typecheck", "upgrade-opentui": "bun run script/upgrade-opentui.ts", - "postinstall": "bun run --cwd packages/opencode fix-node-pty", + "postinstall": "bun run --cwd packages/core fix-node-pty", "prepare": "husky", "random": "echo 'Random script'", "sso": "aws sso login --sso-session=opencode --no-browser", diff --git a/packages/core/package.json b/packages/core/package.json index a191be643..eb4148e55 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -8,6 +8,7 @@ "scripts": { "db": "bun drizzle-kit", "migration": "bun run script/migration.ts", + "fix-node-pty": "bun run script/fix-node-pty.ts", "test": "bun test", "test:ci": "mkdir -p .artifacts/unit && bun test --timeout 30000 --reporter=junit --reporter-outfile=.artifacts/unit/junit.xml", "typecheck": "tsgo --noEmit" @@ -23,6 +24,11 @@ "bun": "./src/database/sqlite.bun.ts", "node": "./src/database/sqlite.node.ts", "default": "./src/database/sqlite.bun.ts" + }, + "#pty": { + "bun": "./src/pty/pty.bun.ts", + "node": "./src/pty/pty.node.ts", + "default": "./src/pty/pty.bun.ts" } }, "devDependencies": { @@ -69,6 +75,7 @@ "@effect/opentelemetry": "catalog:", "@effect/platform-node": "catalog:", "@effect/sql-sqlite-bun": "catalog:", + "@lydell/node-pty": "catalog:", "@npmcli/arborist": "9.4.0", "@npmcli/config": "10.8.1", "@opencode-ai/effect-drizzle-sqlite": "workspace:*", @@ -80,6 +87,7 @@ "@parcel/watcher": "2.5.1", "@openrouter/ai-sdk-provider": "2.8.1", "ai-gateway-provider": "3.1.2", + "bun-pty": "0.4.8", "cross-spawn": "catalog:", "drizzle-orm": "catalog:", "effect": "catalog:", diff --git a/packages/opencode/script/fix-node-pty.ts b/packages/core/script/fix-node-pty.ts similarity index 100% rename from packages/opencode/script/fix-node-pty.ts rename to packages/core/script/fix-node-pty.ts diff --git a/packages/core/src/location-layer.ts b/packages/core/src/location-layer.ts index c0b072354..161961a1e 100644 --- a/packages/core/src/location-layer.ts +++ b/packages/core/src/location-layer.ts @@ -21,6 +21,7 @@ import { FileSystem } from "./filesystem" import { Watcher } from "./filesystem/watcher" import { ProjectReference } from "./project-reference" import { RepositoryCache } from "./repository-cache" +import { Pty } from "./pty" export class LocationServiceMap extends LayerMap.Service()("@opencode/example/LocationServiceMap", { lookup: (ref: Location.Ref) => { @@ -37,6 +38,7 @@ export class LocationServiceMap extends LayerMap.Service()(" PermissionV2.locationLayer, FileSystem.locationLayer, Watcher.locationLayer, + Pty.locationLayer, ).pipe(Layer.provideMerge(location), Layer.fresh) }, idleTimeToLive: "60 minutes", diff --git a/packages/opencode/src/pty/index.ts b/packages/core/src/pty.ts similarity index 53% rename from packages/opencode/src/pty/index.ts rename to packages/core/src/pty.ts index 9816faffe..db377df22 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/core/src/pty.ts @@ -1,22 +1,19 @@ -import { EventV2Bridge } from "@/event-v2-bridge" -import { EventV2 } from "@opencode-ai/core/event" -import { Config } from "@/config/config" -import { InstanceState } from "@/effect/instance-state" -import { EffectBridge } from "@/effect/bridge" -import { lazy } from "@opencode-ai/core/util/lazy" -import { Plugin } from "@/plugin" -import { Shell } from "@/shell/shell" -import type { Proc } from "#pty" -import * as Log from "@opencode-ai/core/util/log" -import { PtyID } from "./schema" -import { Effect, Layer, Context, Schema, Types } from "effect" -import { NonNegativeInt, PositiveInt } from "@opencode-ai/core/schema" +export * as Pty from "./pty" + +import type { Disp, Proc } from "#pty" +import { Context, Effect, Layer, Schema, Types } from "effect" +import { EventV2 } from "./event" +import { Location } from "./location" +import { NonNegativeInt, PositiveInt } from "./schema" +import { PtyID } from "./pty/schema" +import { lazy } from "./util/lazy" +import * as Log from "./util/log" const log = Log.create({ service: "pty" }) - const BUFFER_LIMIT = 1024 * 1024 * 2 const BUFFER_CHUNK = 64 * 1024 const encoder = new TextEncoder() +const pty = lazy(() => import("#pty")) type Socket = { readyState: number @@ -25,8 +22,6 @@ type Socket = { close: (code?: number, reason?: string) => void } -const sock = (ws: Socket) => (ws.data && typeof ws.data === "object" ? ws.data : ws) - type Active = { info: Info process: Proc @@ -34,12 +29,10 @@ type Active = { bufferCursor: number cursor: number subscribers: Map + listeners: Disp[] } -type State = { - dir: string - sessions: Map -} +const sock = (ws: Socket) => (ws.data && typeof ws.data === "object" ? ws.data : ws) // WebSocket control frame: 0x00 + UTF-8 JSON. const meta = (cursor: number) => { @@ -51,8 +44,6 @@ const meta = (cursor: number) => { return out } -const pty = lazy(() => import("#pty")) - export const Info = Schema.Struct({ id: PtyID, title: Schema.String, @@ -60,14 +51,11 @@ export const Info = Schema.Struct({ args: Schema.Array(Schema.String), cwd: Schema.String, status: Schema.Literals(["running", "exited"]), - // Windows ConPTY (@lydell/node-pty >= 1.2.0-beta.12) assigns the child pid - // asynchronously, so `proc.pid` is 0 at the synchronous spawn point and only - // resolves a tick later. `create` snapshots it immediately, so 0 is a valid - // "pid not yet assigned" value here. + // Windows ConPTY assigns the child pid asynchronously, so 0 is valid at spawn time. pid: NonNegativeInt, }).annotate({ identifier: "Pty" }) -export type Info = Types.DeepMutable> +export type Info = Types.DeepMutable export const CreateInput = Schema.Struct({ command: Schema.optional(Schema.String), @@ -77,7 +65,15 @@ export const CreateInput = Schema.Struct({ env: Schema.optional(Schema.Record(Schema.String, Schema.String)), }) -export type CreateInput = Types.DeepMutable> +export type CreateInput = Types.DeepMutable + +export type PreparedCreate = { + readonly command: string + readonly args: string[] + readonly cwd: string + readonly title?: string + readonly env: Record +} export const UpdateInput = Schema.Struct({ title: Schema.optional(Schema.String), @@ -89,7 +85,7 @@ export const UpdateInput = Schema.Struct({ ), }) -export type UpdateInput = Types.DeepMutable> +export type UpdateInput = Types.DeepMutable export class NotFoundError extends Schema.TaggedErrorClass()("Pty.NotFoundError", { ptyID: PtyID, @@ -105,7 +101,7 @@ export const Event = { export interface Interface { readonly list: () => Effect.Effect readonly get: (id: PtyID) => Effect.Effect - readonly create: (input: CreateInput) => Effect.Effect + readonly create: (input: PreparedCreate) => Effect.Effect readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect readonly remove: (id: PtyID) => Effect.Effect readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect @@ -120,16 +116,20 @@ export interface Interface { > } -export class Service extends Context.Service()("@opencode/Pty") {} +export class Service extends Context.Service()("@opencode/v2/Pty") {} export const layer = Layer.effect( Service, Effect.gen(function* () { - const config = yield* Config.Service - const events = yield* EventV2Bridge.Service - const plugin = yield* Plugin.Service + const events = yield* EventV2.Service + const location = yield* Location.Service + const context = yield* Effect.context() + const runFork = Effect.runForkWith(context) + const sessions = new Map() function teardown(session: Active) { + for (const listener of session.listeners) listener.dispose() + session.listeners.length = 0 try { session.process.kill() } catch {} @@ -141,93 +141,59 @@ export const layer = Layer.effect( session.subscribers.clear() } - const state = yield* InstanceState.make( - Effect.fn("Pty.state")(function* (ctx) { - const state = { - dir: ctx.directory, - sessions: new Map(), - } - - yield* Effect.addFinalizer(() => - Effect.sync(() => { - for (const session of state.sessions.values()) { - teardown(session) - } - state.sessions.clear() - }), - ) - - return state + yield* Effect.addFinalizer(() => + Effect.sync(() => { + for (const session of sessions.values()) teardown(session) + sessions.clear() }), ) const requireSession = Effect.fn("Pty.requireSession")(function* (id: PtyID) { - const session = (yield* InstanceState.get(state)).sessions.get(id) + const session = sessions.get(id) if (!session) return yield* new NotFoundError({ ptyID: id }) return session }) - const remove = Effect.fn("Pty.remove")(function* (id: PtyID) { - const s = yield* InstanceState.get(state) - const session = yield* requireSession(id) - s.sessions.delete(id) + const removeSession = Effect.fnUntraced(function* (id: PtyID) { + const session = sessions.get(id) + if (!session) return false + sessions.delete(id) log.info("removing session", { id }) teardown(session) yield* events.publish(Event.Deleted, { id: session.info.id }) + return true + }) + + const remove = Effect.fn("Pty.remove")(function* (id: PtyID) { + yield* requireSession(id) + yield* removeSession(id) }) const list = Effect.fn("Pty.list")(function* () { - const s = yield* InstanceState.get(state) - return Array.from(s.sessions.values()).map((session) => session.info) + return Array.from(sessions.values()).map((session) => session.info) }) const get = Effect.fn("Pty.get")(function* (id: PtyID) { return (yield* requireSession(id)).info }) - const create = Effect.fn("Pty.create")(function* (input: CreateInput) { - const s = yield* InstanceState.get(state) - const bridge = yield* EffectBridge.make() - const cfg = yield* config.get() + const create = Effect.fn("Pty.create")(function* (input: PreparedCreate) { const id = PtyID.ascending() - const command = input.command || Shell.preferred(cfg.shell) - const args = input.args || [] - if (Shell.login(command)) { - args.push("-l") - } - - const cwd = input.cwd || s.dir - const shell = yield* plugin.trigger("shell.env", { cwd }, { env: {} }) - const env = { - ...process.env, - ...input.env, - ...shell.env, - TERM: "xterm-256color", - OPENCODE_TERMINAL: "1", - } as Record - - if (process.platform === "win32") { - env.LC_ALL = "C.UTF-8" - env.LC_CTYPE = "C.UTF-8" - env.LANG = "C.UTF-8" - } - log.info("creating session", { id, cmd: command, args, cwd }) - + log.info("creating session", { id, cmd: input.command, args: input.args, cwd: input.cwd }) const { spawn } = yield* Effect.promise(() => pty()) const proc = yield* Effect.sync(() => - spawn(command, args, { + spawn(input.command, input.args, { name: "xterm-256color", - cwd, - env, + cwd: input.cwd, + env: input.env, }), ) - const info = { id, title: input.title || `Terminal ${id.slice(-4)}`, - command, - args, - cwd, + command: input.command, + args: input.args, + cwd: input.cwd, status: "running", pid: proc.pid, } as const @@ -238,113 +204,89 @@ export const layer = Layer.effect( bufferCursor: 0, cursor: 0, subscribers: new Map(), + listeners: [], } - s.sessions.set(id, session) - proc.onData((chunk) => { - session.cursor += chunk.length - - for (const [key, ws] of session.subscribers.entries()) { - if (ws.readyState !== 1) { - session.subscribers.delete(key) - continue + sessions.set(id, session) + session.listeners.push( + proc.onData((chunk) => { + session.cursor += chunk.length + for (const [key, ws] of session.subscribers.entries()) { + if (ws.readyState !== 1 || sock(ws) !== key) { + session.subscribers.delete(key) + continue + } + try { + ws.send(chunk) + } catch { + session.subscribers.delete(key) + } } - if (sock(ws) !== key) { - session.subscribers.delete(key) - continue - } - try { - ws.send(chunk) - } catch { - session.subscribers.delete(key) - } - } - - session.buffer += chunk - if (session.buffer.length <= BUFFER_LIMIT) return - const excess = session.buffer.length - BUFFER_LIMIT - session.buffer = session.buffer.slice(excess) - session.bufferCursor += excess - }) - proc.onExit(({ exitCode }) => { - if (session.info.status === "exited") return - log.info("session exited", { id, exitCode }) - session.info.status = "exited" - bridge.fork(events.publish(Event.Exited, { id, exitCode })) - bridge.fork(remove(id)) - }) + session.buffer += chunk + if (session.buffer.length <= BUFFER_LIMIT) return + const excess = session.buffer.length - BUFFER_LIMIT + session.buffer = session.buffer.slice(excess) + session.bufferCursor += excess + }), + proc.onExit(({ exitCode }) => { + if (session.info.status === "exited") return + runFork( + Effect.gen(function* () { + log.info("session exited", { id, exitCode }) + session.info.status = "exited" + yield* events.publish(Event.Exited, { id, exitCode }) + yield* removeSession(id) + }), + ) + }), + ) yield* events.publish(Event.Created, { info }) return info }) const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) { const session = yield* requireSession(id) - if (input.title) { - session.info.title = input.title - } - if (input.size) { - session.process.resize(input.size.cols, input.size.rows) - } + if (input.title) session.info.title = input.title + if (input.size) session.process.resize(input.size.cols, input.size.rows) yield* events.publish(Event.Updated, { info: session.info }) return session.info }) const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) { const session = yield* requireSession(id) - if (session.info.status === "running") { - session.process.resize(cols, rows) - } + if (session.info.status === "running") session.process.resize(cols, rows) }) const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) { const session = yield* requireSession(id) - if (session.info.status === "running") { - session.process.write(data) - } + if (session.info.status === "running") session.process.write(data) }) const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) { - const session = yield* requireSession(id).pipe( - Effect.tapError(() => - Effect.sync(() => { - ws.close() - }), - ), - ) - log.info("client connected to session", { id }) - + const session = yield* requireSession(id).pipe(Effect.tapError(() => Effect.sync(() => ws.close()))) + log.info("client connected to session", { id, directory: location.directory }) const sub = sock(ws) session.subscribers.delete(sub) session.subscribers.set(sub, ws) - - const cleanup = () => { - session.subscribers.delete(sub) - } - + const cleanup = () => session.subscribers.delete(sub) const start = session.bufferCursor const end = session.cursor const from = cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 - const data = (() => { - if (!session.buffer) return "" - if (from >= end) return "" + if (!session.buffer || from >= end) return "" const offset = Math.max(0, from - start) if (offset >= session.buffer.length) return "" return session.buffer.slice(offset) })() - if (data) { try { - for (let i = 0; i < data.length; i += BUFFER_CHUNK) { - ws.send(data.slice(i, i + BUFFER_CHUNK)) - } + for (let i = 0; i < data.length; i += BUFFER_CHUNK) ws.send(data.slice(i, i + BUFFER_CHUNK)) } catch { cleanup() ws.close() return } } - try { ws.send(meta(end)) } catch { @@ -352,7 +294,6 @@ export const layer = Layer.effect( ws.close() return } - return { onMessage: (message: string | ArrayBuffer) => { session.process.write(typeof message === "string" ? message : new TextDecoder().decode(message)) @@ -368,10 +309,4 @@ export const layer = Layer.effect( }), ) -export const defaultLayer = layer.pipe( - Layer.provide(EventV2Bridge.defaultLayer), - Layer.provide(Plugin.defaultLayer), - Layer.provide(Config.defaultLayer), -) - -export * as Pty from "." +export const locationLayer = layer diff --git a/packages/opencode/src/pty/input.ts b/packages/core/src/pty/input.ts similarity index 100% rename from packages/opencode/src/pty/input.ts rename to packages/core/src/pty/input.ts diff --git a/packages/opencode/src/pty/pty.bun.ts b/packages/core/src/pty/pty.bun.ts similarity index 100% rename from packages/opencode/src/pty/pty.bun.ts rename to packages/core/src/pty/pty.bun.ts diff --git a/packages/opencode/src/pty/pty.node.ts b/packages/core/src/pty/pty.node.ts similarity index 100% rename from packages/opencode/src/pty/pty.node.ts rename to packages/core/src/pty/pty.node.ts diff --git a/packages/opencode/src/pty/pty.ts b/packages/core/src/pty/pty.ts similarity index 100% rename from packages/opencode/src/pty/pty.ts rename to packages/core/src/pty/pty.ts diff --git a/packages/opencode/src/pty/schema.ts b/packages/core/src/pty/schema.ts similarity index 79% rename from packages/opencode/src/pty/schema.ts rename to packages/core/src/pty/schema.ts index c86ae8c73..b8c973862 100644 --- a/packages/opencode/src/pty/schema.ts +++ b/packages/core/src/pty/schema.ts @@ -1,7 +1,6 @@ import { Schema } from "effect" - -import { Identifier } from "@/id/id" -import { withStatics } from "@opencode-ai/core/schema" +import { Identifier } from "../id/id" +import { withStatics } from "../schema" const ptyIdSchema = Schema.String.check(Schema.isStartsWith("pty")).pipe(Schema.brand("PtyID")) diff --git a/packages/opencode/src/pty/ticket.ts b/packages/core/src/pty/ticket.ts similarity index 83% rename from packages/opencode/src/pty/ticket.ts rename to packages/core/src/pty/ticket.ts index cf6751fb1..1d2452cda 100644 --- a/packages/opencode/src/pty/ticket.ts +++ b/packages/core/src/pty/ticket.ts @@ -1,9 +1,8 @@ export * as PtyTicket from "./ticket" -import { WorkspaceV2 } from "@opencode-ai/core/workspace" -import { InstanceRef, WorkspaceRef } from "@/effect/instance-ref" -import { PtyID } from "@/pty/schema" -import { PositiveInt } from "@opencode-ai/core/schema" +import { WorkspaceV2 } from "../workspace" +import { PositiveInt } from "../schema" +import { PtyID } from "./schema" import { Cache, Context, Duration, Effect, Layer, Schema } from "effect" const DEFAULT_TTL = Duration.seconds(60) @@ -57,12 +56,3 @@ export const make = (ttl: Duration.Input = DEFAULT_TTL) => export const layer = Layer.effect(Service, make()) export const defaultLayer = layer - -export const scope = Effect.gen(function* () { - const instance = yield* InstanceRef - const workspaceID = yield* WorkspaceRef - return { - directory: instance?.directory, - workspaceID, - } -}) diff --git a/packages/opencode/test/pty/info-schema.test.ts b/packages/core/test/pty/info-schema.test.ts similarity index 57% rename from packages/opencode/test/pty/info-schema.test.ts rename to packages/core/test/pty/info-schema.test.ts index 429f29b00..9f58c45c8 100644 --- a/packages/opencode/test/pty/info-schema.test.ts +++ b/packages/core/test/pty/info-schema.test.ts @@ -1,13 +1,7 @@ import { describe, expect, test } from "bun:test" import { Schema } from "effect" -import { Pty } from "../../src/pty" +import { Pty } from "@opencode-ai/core/pty" -// Windows ConPTY (via @lydell/node-pty >= 1.2.0-beta.12) assigns the child pid -// asynchronously: `proc.pid` reads back as 0 at the synchronous spawn point and -// only resolves to the real pid a tick later. `Pty.create` snapshots `proc.pid` -// while building `Info`, so `Info.pid` legitimately carries 0 right after spawn. -// `Pty.Info` must be able to represent that, otherwise every `pty.create` on -// Windows fails to encode/decode and the terminal feature is unusable. const sample = (pid: number) => ({ id: "pty_01J5Y5H0AH4Q4NXJ6P4C3P5V2K", title: "demo", diff --git a/packages/opencode/test/server/httpapi-pty-websocket.test.ts b/packages/core/test/pty/input.test.ts similarity index 86% rename from packages/opencode/test/server/httpapi-pty-websocket.test.ts rename to packages/core/test/pty/input.test.ts index 19d97ef09..2cfe9756b 100644 --- a/packages/opencode/test/server/httpapi-pty-websocket.test.ts +++ b/packages/core/test/pty/input.test.ts @@ -1,9 +1,9 @@ import { describe, expect } from "bun:test" import { Effect } from "effect" -import { handlePtyInput } from "../../src/pty/input" +import { handlePtyInput } from "@opencode-ai/core/pty/input" import { it } from "../lib/effect" -describe("pty HttpApi websocket input", () => { +describe("pty websocket input", () => { it.effect("does not forward invalid binary frames to the PTY handler", () => Effect.gen(function* () { const messages: Array = [] diff --git a/packages/core/test/pty/pty-output-isolation.test.ts b/packages/core/test/pty/pty-output-isolation.test.ts new file mode 100644 index 000000000..b7a1fec12 --- /dev/null +++ b/packages/core/test/pty/pty-output-isolation.test.ts @@ -0,0 +1,110 @@ +import { describe, expect } from "bun:test" +import { Duration, Effect, Layer, Queue } from "effect" +import { EventV2 } from "@opencode-ai/core/event" +import { Location } from "@opencode-ai/core/location" +import { Pty } from "@opencode-ai/core/pty" +import { AbsolutePath } from "@opencode-ai/core/schema" +import { location } from "../fixture/location" +import { testEffect } from "../lib/effect" + +type Socket = Parameters[1] + +const locationLayer = Layer.succeed( + Location.Service, + Location.Service.of(location({ directory: AbsolutePath.make("/tmp") })), +) +const it = testEffect(Pty.layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provideMerge(locationLayer))) +const ptyTest = process.platform === "win32" ? it.live.skip : it.live + +const createPty = Effect.fn("PtyOutputIsolationTest.createPty")(function* (command: string) { + const pty = yield* Pty.Service + return yield* Effect.acquireRelease( + pty.create({ command, args: [], cwd: "/tmp", env: { TERM: "xterm-256color", OPENCODE_TERMINAL: "1" } }), + (info) => pty.remove(info.id).pipe(Effect.ignore), + ) +}) + +const decodeOutput = (data: string | Uint8Array | ArrayBuffer) => + typeof data === "string" + ? data + : Buffer.from(data instanceof Uint8Array ? data : new Uint8Array(data)).toString("utf8") + +const makeSocket = Effect.fn("PtyOutputIsolationTest.makeSocket")(function* (data: unknown) { + const output = yield* Queue.unbounded() + const socket: Socket = { + readyState: 1, + data, + send: (data) => Queue.offerUnsafe(output, decodeOutput(data)), + close: () => {}, + } + return { socket, output } +}) + +const waitForOutput = (output: Queue.Queue, text: string, duration: Duration.Input = "5 seconds") => + Effect.gen(function* () { + let received = "" + while (!received.includes(text)) received += yield* Queue.take(output) + return received + }).pipe( + Effect.timeoutOrElse({ + duration, + orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)), + }), + ) + +describe("pty output isolation", () => { + ptyTest("does not leak output when websocket objects are reused", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* createPty("cat") + const b = yield* createPty("cat") + const shared = yield* makeSocket({ events: { connection: "a" } }) + const outB = yield* Queue.unbounded() + + yield* pty.connect(a.id, shared.socket) + shared.socket.data = { events: { connection: "b" } } + shared.socket.send = (data) => Queue.offerUnsafe(outB, decodeOutput(data)) + yield* pty.connect(b.id, shared.socket) + yield* pty.write(a.id, "AAA\n") + + const verify = yield* makeSocket({ events: { connection: "verify-a" } }) + yield* pty.connect(a.id, verify.socket) + expect(yield* waitForOutput(verify.output, "AAA")).toContain("AAA") + expect(yield* waitForOutput(outB, "AAA", "100 millis").pipe(Effect.option)).toMatchObject({ _tag: "None" }) + }), + ) + + ptyTest("does not leak output when Bun recycles websocket objects before re-connect", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const info = yield* createPty("cat") + const first = yield* makeSocket({ events: { connection: "a" } }) + const recycled = yield* Queue.unbounded() + + yield* pty.connect(info.id, first.socket) + first.socket.data = { events: { connection: "b" } } + first.socket.send = (data) => Queue.offerUnsafe(recycled, decodeOutput(data)) + yield* pty.write(info.id, "AAA\n") + + const verify = yield* makeSocket({ events: { connection: "verify" } }) + yield* pty.connect(info.id, verify.socket) + expect(yield* waitForOutput(verify.output, "AAA")).toContain("AAA") + expect(yield* waitForOutput(recycled, "AAA", "100 millis").pipe(Effect.option)).toMatchObject({ _tag: "None" }) + }), + ) + + ptyTest("treats in-place socket data mutation as the same connection", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const info = yield* createPty("cat") + const data = { connId: 1 } + const socket = yield* makeSocket(data) + + yield* pty.connect(info.id, socket.socket) + data.connId = 2 + yield* pty.write(info.id, "AAA\n") + + expect(yield* waitForOutput(socket.output, "AAA")).toContain("AAA") + }), + ) +}) diff --git a/packages/core/test/pty/pty-session.test.ts b/packages/core/test/pty/pty-session.test.ts new file mode 100644 index 000000000..5903db3f3 --- /dev/null +++ b/packages/core/test/pty/pty-session.test.ts @@ -0,0 +1,91 @@ +import { describe, expect } from "bun:test" +import { Cause, Effect, Exit, Layer, Queue } from "effect" +import { EventV2 } from "@opencode-ai/core/event" +import { Location } from "@opencode-ai/core/location" +import { Pty } from "@opencode-ai/core/pty" +import type { PtyID } from "@opencode-ai/core/pty/schema" +import { AbsolutePath } from "@opencode-ai/core/schema" +import { location } from "../fixture/location" +import { testEffect } from "../lib/effect" + +type PtyEvent = { type: "created" | "exited" | "deleted"; id: PtyID } + +const locationLayer = Layer.succeed( + Location.Service, + Location.Service.of(location({ directory: AbsolutePath.make("/tmp") })), +) +const it = testEffect(Pty.layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provideMerge(locationLayer))) +const ptyTest = process.platform === "win32" ? it.live.skip : it.live + +const subscribePtyEvents = Effect.fn("PtySessionTest.subscribePtyEvents")(function* () { + const source = yield* EventV2.Service + const events = yield* Queue.unbounded() + const unsubscribe = yield* source.listen((event) => { + if (event.type === Pty.Event.Created.type) + Queue.offerUnsafe(events, { type: "created", id: (event.data as typeof Pty.Event.Created.data.Type).info.id }) + if (event.type === Pty.Event.Exited.type) + Queue.offerUnsafe(events, { type: "exited", id: (event.data as typeof Pty.Event.Exited.data.Type).id }) + if (event.type === Pty.Event.Deleted.type) + Queue.offerUnsafe(events, { type: "deleted", id: (event.data as typeof Pty.Event.Deleted.data.Type).id }) + return Effect.void + }) + yield* Effect.addFinalizer(() => unsubscribe) + return events +}) + +const createPty = Effect.fn("PtySessionTest.createPty")(function* (command: string, args: string[] = []) { + const pty = yield* Pty.Service + return yield* Effect.acquireRelease( + pty.create({ command, args, cwd: "/tmp", env: { TERM: "xterm-256color", OPENCODE_TERMINAL: "1" } }), + (info) => pty.remove(info.id).pipe(Effect.ignore), + ) +}) + +const waitForEvents = (events: Queue.Queue, id: PtyID, count: number) => + Effect.gen(function* () { + const picked: Array = [] + while (picked.length < count) { + const evt = yield* Queue.take(events) + if (evt.id === id) picked.push(evt.type) + } + return picked + }).pipe( + Effect.timeoutOrElse({ + duration: "5 seconds", + orElse: () => Effect.fail(new Error("timeout waiting for pty events")), + }), + ) + +describe("pty", () => { + it.live("returns typed not found errors for missing sessions", () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const id = "pty_missing" as PtyID + let closed = false + const socket = { readyState: 1, send: () => {}, close: () => void (closed = true) } + + for (const result of [ + yield* pty.get(id).pipe(Effect.asVoid, Effect.exit), + yield* pty.update(id, { title: "missing" }).pipe(Effect.asVoid, Effect.exit), + yield* pty.remove(id).pipe(Effect.exit), + yield* pty.resize(id, 80, 24).pipe(Effect.exit), + yield* pty.write(id, "input").pipe(Effect.exit), + yield* pty.connect(id, socket).pipe(Effect.asVoid, Effect.exit), + ]) { + expect(Exit.isFailure(result)).toBe(true) + if (Exit.isFailure(result)) + expect(Cause.squash(result.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) + } + expect(closed).toBe(true) + }), + ) + + ptyTest("publishes created, exited, deleted in order for a short-lived process", () => + Effect.gen(function* () { + const events = yield* subscribePtyEvents() + const info = yield* createPty("/usr/bin/env", ["sh", "-c", "sleep 0.1"]) + + expect(yield* waitForEvents(events, info.id, 3)).toEqual(["created", "exited", "deleted"]) + }), + ) +}) diff --git a/packages/opencode/test/pty/ticket.test.ts b/packages/core/test/pty/ticket.test.ts similarity index 95% rename from packages/opencode/test/pty/ticket.test.ts rename to packages/core/test/pty/ticket.test.ts index fa7f9277d..e36808fa2 100644 --- a/packages/opencode/test/pty/ticket.test.ts +++ b/packages/core/test/pty/ticket.test.ts @@ -1,8 +1,8 @@ import { describe, expect } from "bun:test" import { Effect, Layer } from "effect" +import { PtyID } from "@opencode-ai/core/pty/schema" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" import { WorkspaceV2 } from "@opencode-ai/core/workspace" -import { PtyID } from "../../src/pty/schema" -import { PtyTicket } from "../../src/pty/ticket" import { testEffect } from "../lib/effect" const it = testEffect(PtyTicket.layer) diff --git a/packages/opencode/package.json b/packages/opencode/package.json index aaae01cdf..9538218a6 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -13,7 +13,6 @@ "bench:test": "bun run script/bench-test-suite.ts", "profile:test": "bun run script/profile-test-files.ts", "build": "bun run script/build.ts", - "fix-node-pty": "bun run script/fix-node-pty.ts", "dev": "bun run --conditions=browser ./src/index.ts", "dev:temporary": "bun run --conditions=browser ./src/temporary.ts" }, @@ -28,11 +27,6 @@ "bun": "./src/storage/db.bun.ts", "node": "./src/storage/db.node.ts", "default": "./src/storage/db.bun.ts" - }, - "#pty": { - "bun": "./src/pty/pty.bun.ts", - "node": "./src/pty/pty.node.ts", - "default": "./src/pty/pty.bun.ts" } }, "devDependencies": { @@ -86,7 +80,6 @@ "@effect/opentelemetry": "catalog:", "@effect/platform-node": "catalog:", "@gitlab/opencode-gitlab-auth": "1.3.3", - "@lydell/node-pty": "catalog:", "@modelcontextprotocol/sdk": "1.27.1", "@octokit/graphql": "9.0.2", "@octokit/rest": "catalog:", @@ -116,7 +109,6 @@ "ai": "catalog:", "ai-gateway-provider": "3.1.2", "bonjour-service": "1.3.0", - "bun-pty": "0.4.8", "chokidar": "4.0.3", "clipboardy": "4.0.0", "cross-spawn": "catalog:", diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index 27dd18c7a..3b34bcc4d 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -44,8 +44,6 @@ import { Vcs } from "@/project/vcs" import { Reference } from "@/reference/reference" import { Workspace } from "@/control-plane/workspace" import { Worktree } from "@/worktree" -import { Pty } from "@/pty" -import { PtyTicket } from "@/pty/ticket" import { Installation } from "@/installation" import { ShareNext } from "@/share/share-next" import { SessionShare } from "@/share/session" @@ -101,8 +99,6 @@ export const AppLayer = Layer.mergeAll( Reference.defaultLayer, Workspace.defaultLayer, Worktree.appLayer, - Pty.defaultLayer, - PtyTicket.defaultLayer, Installation.defaultLayer, ShareNext.defaultLayer, SessionShare.defaultLayer, diff --git a/packages/opencode/src/pty-preparation.ts b/packages/opencode/src/pty-preparation.ts new file mode 100644 index 000000000..6362225f5 --- /dev/null +++ b/packages/opencode/src/pty-preparation.ts @@ -0,0 +1,30 @@ +export * as PtyPreparation from "./pty-preparation" + +import { Config } from "@/config/config" +import * as InstanceState from "@/effect/instance-state" +import { Plugin } from "@/plugin" +import { Shell } from "@/shell/shell" +import { Pty } from "@opencode-ai/core/pty" +import { Effect } from "effect" + +export const prepareCreate = Effect.fn("PtyPreparation.prepareCreate")(function* (input: Pty.CreateInput) { + const config = yield* Config.Service + const plugin = yield* Plugin.Service + const command = input.command || Shell.preferred((yield* config.get()).shell) + const args = Shell.login(command) ? [...(input.args ?? []), "-l"] : [...(input.args ?? [])] + const cwd = input.cwd || (yield* InstanceState.context).directory + const shell = yield* plugin.trigger("shell.env", { cwd }, { env: {} }) + const env = { + ...process.env, + ...input.env, + ...shell.env, + TERM: "xterm-256color", + OPENCODE_TERMINAL: "1", + } as Record + if (process.platform === "win32") { + env.LC_ALL = "C.UTF-8" + env.LC_CTYPE = "C.UTF-8" + env.LANG = "C.UTF-8" + } + return { command, args, cwd, title: input.title, env } +}) diff --git a/packages/opencode/src/server/routes/instance/httpapi/groups/pty.ts b/packages/opencode/src/server/routes/instance/httpapi/groups/pty.ts index cca9d0013..9d49009a5 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/groups/pty.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/groups/pty.ts @@ -1,6 +1,6 @@ -import { Pty } from "@/pty" -import { PtyTicket } from "@/pty/ticket" -import { PtyID } from "@/pty/schema" +import { Pty } from "@opencode-ai/core/pty" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" +import { PtyID } from "@opencode-ai/core/pty/schema" import { PTY_CONNECT_TICKET_QUERY } from "@/server/shared/pty-ticket" import { Schema } from "effect" import { HttpApi, HttpApiEndpoint, HttpApiError, HttpApiGroup, OpenApi } from "effect/unstable/httpapi" diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts index f9349464b..d8f9e55a0 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts @@ -1,7 +1,13 @@ -import { Pty } from "@/pty" -import { PtyID } from "@/pty/schema" -import { PtyTicket } from "@/pty/ticket" -import { handlePtyInput } from "@/pty/input" +import * as InstanceState from "@/effect/instance-state" +import { registerDisposer } from "@/effect/instance-registry" +import { InstanceRef, WorkspaceRef } from "@/effect/instance-ref" +import { PtyPreparation } from "@/pty-preparation" +import { Pty } from "@opencode-ai/core/pty" +import { handlePtyInput } from "@opencode-ai/core/pty/input" +import { PtyID } from "@opencode-ai/core/pty/schema" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" +import { LocationServiceMap } from "@opencode-ai/core/location-layer" +import { AbsolutePath } from "@opencode-ai/core/schema" import { Shell } from "@/shell/shell" import { EffectBridge } from "@/effect/bridge" import { CorsConfig, isAllowedRequestOrigin, type CorsOptions } from "@/server/cors" @@ -10,7 +16,7 @@ import { PTY_CONNECT_TOKEN_HEADER, PTY_CONNECT_TOKEN_HEADER_VALUE, } from "@/server/shared/pty-ticket" -import { Effect, Option, Schema } from "effect" +import { Effect, Layer, Option, Schema } from "effect" import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { HttpApiBuilder } from "effect/unstable/httpapi" import * as Socket from "effect/unstable/socket/Socket" @@ -23,30 +29,53 @@ function validOrigin(request: HttpServerRequest.HttpServerRequest, opts: CorsOpt return isAllowedRequestOrigin(request.headers.origin, request.headers.host, opts) } +const ticketScope = Effect.gen(function* () { + const instance = yield* InstanceRef + const workspaceID = yield* WorkspaceRef + return { directory: instance?.directory, workspaceID } +}) + export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handlers) => Effect.gen(function* () { - const pty = yield* Pty.Service const tickets = yield* PtyTicket.Service const cors = yield* CorsConfig + const locations = yield* LocationServiceMap + const unregister = registerDisposer((directory) => + Effect.runPromise(locations.invalidate({ directory: AbsolutePath.make(directory) })), + ) + yield* Effect.addFinalizer(() => Effect.sync(unregister)) + + const pty = Effect.fnUntraced(function* (effect: Effect.Effect) { + return yield* effect.pipe( + Effect.provide(locations.get({ directory: AbsolutePath.make((yield* InstanceState.context).directory) })), + ) + }) const shells = Effect.fn("PtyHttpApi.shells")(function* () { return yield* Effect.promise(() => Shell.list()) }) const list = Effect.fn("PtyHttpApi.list")(function* () { - return yield* pty.list() + return yield* pty(Pty.Service.use((service) => service.list())) }) const create = Effect.fn("PtyHttpApi.create")(function* (ctx: { payload: typeof Pty.CreateInput.Type }) { - return yield* pty.create({ - ...ctx.payload, - args: ctx.payload.args ? [...ctx.payload.args] : undefined, - env: ctx.payload.env ? { ...ctx.payload.env } : undefined, - }) + return yield* pty( + Pty.Service.use((service) => + Effect.flatMap( + PtyPreparation.prepareCreate({ + ...ctx.payload, + args: ctx.payload.args ? [...ctx.payload.args] : undefined, + env: ctx.payload.env ? { ...ctx.payload.env } : undefined, + }), + service.create, + ), + ), + ) }) const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) { - return yield* pty.get(ctx.params.ptyID).pipe( + return yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe( Effect.catchTag("Pty.NotFoundError", (error) => Effect.fail( new ApiError.PtyNotFoundError({ @@ -62,25 +91,27 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler params: { ptyID: PtyID } payload: typeof Pty.UpdateInput.Type }) { - return yield* pty - .update(ctx.params.ptyID, { - ...ctx.payload, - size: ctx.payload.size ? { ...ctx.payload.size } : undefined, - }) - .pipe( - Effect.catchTag("Pty.NotFoundError", (error) => - Effect.fail( - new ApiError.PtyNotFoundError({ - ptyID: error.ptyID, - message: `PTY session not found: ${error.ptyID}`, - }), - ), + return yield* pty( + Pty.Service.use((service) => + service.update(ctx.params.ptyID, { + ...ctx.payload, + size: ctx.payload.size ? { ...ctx.payload.size } : undefined, + }), + ), + ).pipe( + Effect.catchTag("Pty.NotFoundError", (error) => + Effect.fail( + new ApiError.PtyNotFoundError({ + ptyID: error.ptyID, + message: `PTY session not found: ${error.ptyID}`, + }), ), - ) + ), + ) }) const remove = Effect.fn("PtyHttpApi.remove")(function* (ctx: { params: { ptyID: PtyID } }) { - yield* pty.remove(ctx.params.ptyID).pipe( + yield* pty(Pty.Service.use((service) => service.remove(ctx.params.ptyID))).pipe( Effect.catchTag("Pty.NotFoundError", (error) => Effect.fail( new ApiError.PtyNotFoundError({ @@ -97,7 +128,7 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler const request = yield* HttpServerRequest.HttpServerRequest if (request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE || !validOrigin(request, cors)) return yield* new ApiError.PtyForbiddenError({ message: "Invalid PTY connect token request" }) - yield* pty.get(ctx.params.ptyID).pipe( + yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe( Effect.catchTag("Pty.NotFoundError", (error) => Effect.fail( new ApiError.PtyNotFoundError({ @@ -107,7 +138,7 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler ), ), ) - return yield* tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* PtyTicket.scope) }) + return yield* tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* ticketScope) }) }) return handlers @@ -119,13 +150,23 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler .handle("remove", remove) .handle("connectToken", connectToken) }), -) +).pipe(Layer.provide(LocationServiceMap.layer)) export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-connect", (handlers) => Effect.gen(function* () { - const pty = yield* Pty.Service const tickets = yield* PtyTicket.Service const cors = yield* CorsConfig + const locations = yield* LocationServiceMap + const unregister = registerDisposer((directory) => + Effect.runPromise(locations.invalidate({ directory: AbsolutePath.make(directory) })), + ) + yield* Effect.addFinalizer(() => Effect.sync(unregister)) + + const pty = Effect.fnUntraced(function* (effect: Effect.Effect) { + return yield* effect.pipe( + Effect.provide(locations.get({ directory: AbsolutePath.make((yield* InstanceState.context).directory) })), + ) + }) return handlers.handleRaw( "connect", @@ -133,7 +174,7 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne params: { ptyID: PtyID } request: HttpServerRequest.HttpServerRequest }) { - const exists = yield* pty.get(ctx.params.ptyID).pipe( + const exists = yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe( Effect.as(true), Effect.catchTag("Pty.NotFoundError", () => Effect.succeed(false)), ) @@ -144,7 +185,7 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne const ticket = new URL(ctx.request.url, "http://localhost").searchParams.get(PTY_CONNECT_TICKET_QUERY) if (ticket) { const valid = validOrigin(ctx.request, cors) - ? yield* tickets.consume({ ticket, ptyID: ctx.params.ptyID, ...(yield* PtyTicket.scope) }) + ? yield* tickets.consume({ ticket, ptyID: ctx.params.ptyID, ...(yield* ticketScope) }) : false if (!valid) return HttpServerResponse.empty({ status: 403 }) } @@ -187,13 +228,13 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne writeScoped(write(new Socket.CloseEvent(code, reason))) }, } - const handler = yield* pty - .connect(ctx.params.ptyID, adapter, cursor) - .pipe( - Effect.catchTag("Pty.NotFoundError", () => - closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)), - ), - ) + const handler = yield* pty( + Pty.Service.use((service) => service.connect(ctx.params.ptyID, adapter, cursor)), + ).pipe( + Effect.catchTag("Pty.NotFoundError", () => + closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)), + ), + ) if (!handler) return HttpServerResponse.empty() // The handshake runs inside `socket.runRaw`, after the input callback is @@ -214,4 +255,4 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne }), ) }), -) +).pipe(Layer.provide(LocationServiceMap.layer)) diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts index 7a0d6ab52..dfbb1a88b 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/server.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts @@ -31,8 +31,7 @@ import { ProjectCopy } from "@opencode-ai/core/project/copy" import { ProviderAuth } from "@/provider/auth" import { ModelsDev } from "@opencode-ai/core/models-dev" import { Provider } from "@/provider/provider" -import { Pty } from "@/pty" -import { PtyTicket } from "@/pty/ticket" +import { PtyTicket } from "@opencode-ai/core/pty/ticket" import { Question } from "@/question" import { Session } from "@/session/session" import { SessionCompaction } from "@/session/compaction" @@ -212,7 +211,6 @@ export function createRoutes( ProjectCopy.defaultLayer, ProviderAuth.defaultLayer, Provider.defaultLayer, - Pty.defaultLayer, PtyTicket.defaultLayer, Question.defaultLayer, Ripgrep.defaultLayer, diff --git a/packages/opencode/test/pty/pty-output-isolation.test.ts b/packages/opencode/test/pty/pty-output-isolation.test.ts deleted file mode 100644 index 20975d986..000000000 --- a/packages/opencode/test/pty/pty-output-isolation.test.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { describe, expect } from "bun:test" -import { EventV2Bridge } from "../../src/event-v2-bridge" -import { Config } from "../../src/config/config" -import { Plugin } from "../../src/plugin" -import { Pty } from "../../src/pty" -import { Duration, Effect, Layer, Queue } from "effect" -import { testEffect } from "../lib/effect" - -type Socket = Parameters[1] - -const it = testEffect( - Pty.layer.pipe( - Layer.provideMerge(EventV2Bridge.defaultLayer), - Layer.provideMerge(Config.defaultLayer), - Layer.provideMerge(Plugin.defaultLayer), - ), -) -const ptyTest = process.platform === "win32" ? it.instance.skip : it.instance - -const createPty = Effect.fn("PtyOutputIsolationTest.createPty")(function* (input: Pty.CreateInput) { - const pty = yield* Pty.Service - return yield* Effect.acquireRelease(pty.create(input), (info) => pty.remove(info.id).pipe(Effect.ignore)) -}) - -const decodeOutput = (data: string | Uint8Array | ArrayBuffer) => - typeof data === "string" - ? data - : Buffer.from(data instanceof Uint8Array ? data : new Uint8Array(data)).toString("utf8") - -const makeSocket = Effect.fn("PtyOutputIsolationTest.makeSocket")(function* (data: unknown) { - const output = yield* Queue.unbounded() - const chunks: string[] = [] - const socket: Socket = { - readyState: 1, - data, - send: (data) => { - const text = decodeOutput(data) - chunks.push(text) - Queue.offerUnsafe(output, text) - }, - close: () => { - // no-op (simulate abrupt drop) - }, - } - - return { socket, output, chunks } -}) - -const waitForOutput = (output: Queue.Queue, text: string, duration: Duration.Input = "5 seconds") => - Effect.gen(function* () { - let received = "" - while (!received.includes(text)) { - received += yield* Queue.take(output) - } - return received - }).pipe( - Effect.timeoutOrElse({ - duration, - orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)), - }), - ) - -const waitForLeakedOutput = (output: Queue.Queue, text: string) => - Effect.gen(function* () { - let received = "" - while (!received.includes(text)) { - received += yield* Queue.take(output) - } - return received - }).pipe( - Effect.timeoutOrElse({ - duration: "100 millis", - orElse: () => Effect.succeed(undefined), - }), - ) - -describe("pty", () => { - ptyTest( - "does not leak output when websocket objects are reused", - () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* createPty({ command: "cat", title: "a" }) - const b = yield* createPty({ command: "cat", title: "b" }) - const connectionA = yield* makeSocket({ events: { connection: "a" } }) - const connectionB = { events: { connection: "b" } } - - yield* pty.connect(a.id, connectionA.socket) - - const outBQueue = yield* Queue.unbounded() - const outB: string[] = [] - connectionA.socket.data = connectionB - connectionA.socket.send = (data) => { - const text = decodeOutput(data) - outB.push(text) - Queue.offerUnsafe(outBQueue, text) - } - yield* pty.connect(b.id, connectionA.socket) - - connectionA.chunks.length = 0 - outB.length = 0 - - yield* pty.write(a.id, "AAA\n") - const verifyA = yield* makeSocket({ events: { connection: "verify-a" } }) - yield* pty.connect(a.id, verifyA.socket) - yield* waitForOutput(verifyA.output, "AAA") - - expect(outB.join("")).not.toContain("AAA") - expect(yield* waitForLeakedOutput(outBQueue, "AAA")).toBeUndefined() - }), - { git: true }, - ) - - ptyTest( - "does not leak output when Bun recycles websocket objects before re-connect", - () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* createPty({ command: "cat", title: "a" }) - const outA = yield* makeSocket({ events: { connection: "a" } }) - const outB = yield* Queue.unbounded() - - yield* pty.connect(a.id, outA.socket) - outA.chunks.length = 0 - - const connectionB = { events: { connection: "b" } } - outA.socket.data = connectionB - outA.socket.send = (data) => { - Queue.offerUnsafe(outB, decodeOutput(data)) - } - - yield* pty.write(a.id, "AAA\n") - const verifyA = yield* makeSocket({ events: { connection: "verify-a" } }) - yield* pty.connect(a.id, verifyA.socket) - yield* waitForOutput(verifyA.output, "AAA") - - expect(yield* waitForLeakedOutput(outB, "AAA")).toBeUndefined() - }), - { git: true }, - ) - - ptyTest( - "treats in-place socket data mutation as the same connection", - () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* createPty({ command: "cat", title: "a" }) - const ctx = { connId: 1 } - const out = yield* makeSocket(ctx) - - yield* pty.connect(a.id, out.socket) - out.chunks.length = 0 - - ctx.connId = 2 - - yield* pty.write(a.id, "AAA\n") - - expect(yield* waitForOutput(out.output, "AAA")).toContain("AAA") - }), - { git: true }, - ) -}) diff --git a/packages/opencode/test/pty/pty-session.test.ts b/packages/opencode/test/pty/pty-session.test.ts deleted file mode 100644 index 74c9f70ec..000000000 --- a/packages/opencode/test/pty/pty-session.test.ts +++ /dev/null @@ -1,140 +0,0 @@ -import { describe, expect } from "bun:test" -import { EventV2Bridge } from "../../src/event-v2-bridge" -import { Config } from "../../src/config/config" -import { Plugin } from "../../src/plugin" -import { Pty } from "../../src/pty" -import type { PtyID } from "../../src/pty/schema" -import { Cause, Effect, Exit, Layer, Queue } from "effect" -import { testEffect } from "../lib/effect" - -type PtyEvent = { type: "created" | "exited" | "deleted"; id: PtyID } - -const it = testEffect( - Pty.layer.pipe( - Layer.provideMerge(EventV2Bridge.defaultLayer), - Layer.provideMerge(Config.defaultLayer), - Layer.provideMerge(Plugin.defaultLayer), - ), -) -const ptyTest = process.platform === "win32" ? it.instance.skip : it.instance - -const subscribePtyEvents = Effect.fn("PtySessionTest.subscribePtyEvents")(function* () { - const source = yield* EventV2Bridge.Service - const events = yield* Queue.unbounded() - - const unsubscribe = yield* source.listen((event) => { - if (event.type === Pty.Event.Created.type) - Queue.offerUnsafe(events, { type: "created", id: (event.data as typeof Pty.Event.Created.data.Type).info.id }) - if (event.type === Pty.Event.Exited.type) - Queue.offerUnsafe(events, { type: "exited", id: (event.data as typeof Pty.Event.Exited.data.Type).id }) - if (event.type === Pty.Event.Deleted.type) - Queue.offerUnsafe(events, { type: "deleted", id: (event.data as typeof Pty.Event.Deleted.data.Type).id }) - return Effect.void - }) - yield* Effect.addFinalizer(() => unsubscribe) - - return events -}) - -const createPty = Effect.fn("PtySessionTest.createPty")(function* (input: Pty.CreateInput) { - const pty = yield* Pty.Service - return yield* Effect.acquireRelease(pty.create(input), (info) => pty.remove(info.id).pipe(Effect.ignore)) -}) - -const waitForEvents = (events: Queue.Queue, id: PtyID, count: number) => { - return Effect.gen(function* () { - const picked: Array = [] - while (picked.length < count) { - const evt = yield* Queue.take(events) - if (evt.id === id) picked.push(evt.type) - } - return picked - }).pipe( - Effect.timeoutOrElse({ - duration: "5 seconds", - orElse: () => Effect.fail(new Error("timeout waiting for pty events")), - }), - ) -} - -describe("pty", () => { - it.instance( - "returns typed not found errors for missing sessions", - () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const id = "pty_missing" as PtyID - let closed = false - const socket = { - readyState: 1, - send: () => {}, - close: () => { - closed = true - }, - } - - const get = yield* pty.get(id).pipe(Effect.exit) - expect(Exit.isFailure(get)).toBe(true) - if (Exit.isFailure(get)) expect(Cause.squash(get.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) - - const update = yield* pty.update(id, { title: "missing" }).pipe(Effect.exit) - expect(Exit.isFailure(update)).toBe(true) - if (Exit.isFailure(update)) - expect(Cause.squash(update.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) - - const remove = yield* pty.remove(id).pipe(Effect.exit) - expect(Exit.isFailure(remove)).toBe(true) - if (Exit.isFailure(remove)) - expect(Cause.squash(remove.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) - - const resize = yield* pty.resize(id, 80, 24).pipe(Effect.exit) - expect(Exit.isFailure(resize)).toBe(true) - if (Exit.isFailure(resize)) - expect(Cause.squash(resize.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) - - const write = yield* pty.write(id, "input").pipe(Effect.exit) - expect(Exit.isFailure(write)).toBe(true) - if (Exit.isFailure(write)) - expect(Cause.squash(write.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) - - const connect = yield* pty.connect(id, socket).pipe(Effect.exit) - expect(Exit.isFailure(connect)).toBe(true) - if (Exit.isFailure(connect)) - expect(Cause.squash(connect.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id }) - expect(closed).toBe(true) - }), - { git: true }, - ) - - ptyTest( - "publishes created, exited, deleted in order for a short-lived process", - () => - Effect.gen(function* () { - const events = yield* subscribePtyEvents() - const info = yield* createPty({ - command: "/usr/bin/env", - args: ["sh", "-c", "sleep 0.1"], - title: "sleep", - }) - - expect(yield* waitForEvents(events, info.id, 3)).toEqual(["created", "exited", "deleted"]) - }), - { git: true }, - ) - - ptyTest( - "publishes created, exited, deleted in order for /bin/sh + remove", - () => - Effect.gen(function* () { - const pty = yield* Pty.Service - const events = yield* subscribePtyEvents() - const info = yield* createPty({ command: "/bin/sh", title: "sh" }) - - expect(yield* waitForEvents(events, info.id, 1)).toEqual(["created"]) - yield* pty.write(info.id, "exit\n") - expect(yield* waitForEvents(events, info.id, 2)).toEqual(["exited", "deleted"]) - yield* pty.remove(info.id).pipe(Effect.ignore) - }), - { git: true }, - ) -}) diff --git a/packages/opencode/test/pty/pty-shell.test.ts b/packages/opencode/test/pty/pty-shell.test.ts index e8132dec7..c3fbec66e 100644 --- a/packages/opencode/test/pty/pty-shell.test.ts +++ b/packages/opencode/test/pty/pty-shell.test.ts @@ -1,22 +1,34 @@ import { describe, expect } from "bun:test" -import { Effect } from "effect" -import { Pty } from "../../src/pty" +import { Effect, Layer } from "effect" +import { Config } from "../../src/config/config" +import { Plugin } from "../../src/plugin" +import { PtyPreparation } from "../../src/pty-preparation" +import { Pty } from "@opencode-ai/core/pty" import { Shell } from "../../src/shell/shell" import { testEffect } from "../lib/effect" Shell.preferred.reset() -const it = testEffect(Pty.defaultLayer) - -const createPty = (input: Pty.CreateInput) => - Effect.acquireRelease( - Effect.gen(function* () { - const pty = yield* Pty.Service - const info = yield* pty.create(input) - return { pty, info } +const it = testEffect(Layer.mergeAll(Config.defaultLayer, Plugin.defaultLayer)) +const preparationIt = testEffect( + Layer.mergeAll( + Layer.mock(Config.Service)({ get: () => Effect.succeed({}) }), + Layer.mock(Plugin.Service)({ + trigger: (_name: Name, _input: Input, output: Output) => + Effect.sync(() => { + const result = output as { env: Record } + result.env.INPUT = "plugin" + result.env.FROM_PLUGIN = "plugin" + result.env.TERM = "plugin" + return output + }), + list: () => Effect.succeed([]), + init: () => Effect.void, }), - ({ pty, info }) => pty.remove(info.id).pipe(Effect.ignore), - ).pipe(Effect.map(({ info }) => info)) + ), +) + +const preparePty = (input: Pty.CreateInput) => PtyPreparation.prepareCreate(input) describe("pty shell args", () => { if (process.platform !== "win32") return @@ -27,7 +39,7 @@ describe("pty shell args", () => { "does not add login args to pwsh", () => Effect.gen(function* () { - const info = yield* createPty({ command: ps, title: "pwsh" }) + const info = yield* preparePty({ command: ps, title: "pwsh" }) expect(info.args).toEqual([]) }), { timeout: 30000 }, @@ -44,7 +56,7 @@ describe("pty shell args", () => { "adds login args to bash", () => Effect.gen(function* () { - const info = yield* createPty({ command: bash, title: "bash" }) + const info = yield* preparePty({ command: bash, title: "bash" }) expect(info.args).toEqual(["-l"]) }), { timeout: 30000 }, @@ -61,7 +73,7 @@ describe("pty configured shell", () => { Effect.gen(function* () { if (!configured) return - const info = yield* createPty({ title: "configured" }) + const info = yield* preparePty({ title: "configured" }) if (process.platform === "win32") { expect(info.command.toLowerCase()).toBe(configured.toLowerCase()) } else { @@ -73,3 +85,18 @@ describe("pty configured shell", () => { { timeout: 30000 }, ) }) + +describe("pty environment preparation", () => { + preparationIt.instance("merges plugin environment before forced PTY values", () => + Effect.gen(function* () { + const input = { command: "/bin/sh", args: [] as string[], env: { INPUT: "caller" } } + const prepared = yield* preparePty(input) + + expect(input.args).toEqual([]) + expect(prepared.env.INPUT).toBe("plugin") + expect(prepared.env.FROM_PLUGIN).toBe("plugin") + expect(prepared.env.TERM).toBe("xterm-256color") + expect(prepared.env.OPENCODE_TERMINAL).toBe("1") + }), + ) +}) diff --git a/packages/opencode/test/server/httpapi-instance-route-auth.test.ts b/packages/opencode/test/server/httpapi-instance-route-auth.test.ts index 4713540b9..4a306f386 100644 --- a/packages/opencode/test/server/httpapi-instance-route-auth.test.ts +++ b/packages/opencode/test/server/httpapi-instance-route-auth.test.ts @@ -5,7 +5,7 @@ import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/even import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty" import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server" import { ServerAuth } from "../../src/server/auth" -import { PtyID } from "../../src/pty/schema" +import { PtyID } from "@opencode-ai/core/pty/schema" import { resetDatabase } from "../fixture/db" import { disposeAllInstances, tmpdir } from "../fixture/fixture" import * as Log from "@opencode-ai/core/util/log" diff --git a/packages/opencode/test/server/httpapi-pty.test.ts b/packages/opencode/test/server/httpapi-pty.test.ts index f26bc68e6..d0d0d1fd8 100644 --- a/packages/opencode/test/server/httpapi-pty.test.ts +++ b/packages/opencode/test/server/httpapi-pty.test.ts @@ -1,6 +1,6 @@ import { afterEach, describe, expect, test } from "bun:test" import { NodeHttpServer, NodeServices } from "@effect/platform-node" -import { PtyID } from "../../src/pty/schema" +import { PtyID } from "@opencode-ai/core/pty/schema" import { Server } from "../../src/server/server" import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty" import * as Log from "@opencode-ai/core/util/log" @@ -10,7 +10,7 @@ import { Config, Effect, Layer, Queue, Schema } from "effect" import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http" import * as Socket from "effect/unstable/socket/Socket" import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server" -import { Pty } from "../../src/pty" +import { Pty } from "@opencode-ai/core/pty" import { testEffect } from "../lib/effect" void Log.init({ print: false }) @@ -139,6 +139,23 @@ describe("pty HttpApi bridge", () => { }) }) + testPty("disposes PTY sessions with their legacy instance", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const headers = { "x-opencode-directory": tmp.path } + const created = await app().request(PtyPaths.create, { + method: "POST", + headers: { ...headers, "content-type": "application/json" }, + body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "sleep 5"] }), + }) + expect(created.status).toBe(200) + + await disposeAllInstances() + + const list = await app().request(PtyPaths.list, { headers }) + expect(list.status).toBe(200) + expect(await list.json()).toEqual([]) + }) + test("returns 404 for missing PTY websocket before upgrade", async () => { await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) const response = await app().request(PtyPaths.connect.replace(":ptyID", PtyID.ascending()), {