refactor(core): canonicalize pty service (#32182)

This commit is contained in:
Shoubhit Dash 2026-06-14 16:16:39 +05:30 committed by GitHub
parent 7efade2d53
commit f2cf607376
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1134 additions and 506 deletions

View File

@ -1,6 +1,6 @@
import { Context, Effect, Layer, Schema } from "effect"
import { Project } from "./project"
import { AbsolutePath } from "./schema"
import { AbsolutePath, optionalOmitUndefined } from "./schema"
import { WorkspaceV2 } from "./workspace"
export * as Location from "./location"
@ -12,7 +12,7 @@ export class Ref extends Schema.Class<Ref>("Location.Ref")({
export class Info extends Schema.Class<Info>("Location.Info")({
directory: AbsolutePath,
workspaceID: WorkspaceV2.ID.pipe(Schema.optional),
workspaceID: optionalOmitUndefined(WorkspaceV2.ID),
project: Schema.Struct({
id: Project.ID,
directory: AbsolutePath,

View File

@ -2,22 +2,27 @@ export * as Pty from "./pty"
import type { Disp, Proc } from "#pty"
import { Context, Effect, Layer, Schema, Types } from "effect"
import { Config } from "./config"
import { EventV2 } from "./event"
import { Location } from "./location"
import { NonNegativeInt, PositiveInt } from "./schema"
import { PtyID } from "./pty/schema"
import { Shell } from "./shell"
import { lazy } from "./util/lazy"
const BUFFER_LIMIT = 1024 * 1024 * 2
const BUFFER_CHUNK = 64 * 1024
const encoder = new TextEncoder()
// Exited sessions stay observable (status, exit code, retained output) until removed explicitly.
// Cap retention so abandoned terminals do not accumulate unbounded buffers.
const EXITED_LIMIT = 25
const pty = lazy(() => import("#pty"))
type Socket = {
readyState: number
data?: unknown
send: (data: string | Uint8Array | ArrayBuffer) => void
close: (code?: number, reason?: string) => void
type Subscriber = {
readonly onData: (chunk: string) => void
readonly onEnd: (event: { exitCode?: number }) => void
active: boolean
detached: boolean
pending: string[]
end?: { exitCode?: number }
}
type Active = {
@ -26,22 +31,10 @@ type Active = {
buffer: string
bufferCursor: number
cursor: number
subscribers: Map<unknown, Socket>
subscribers: Map<object, Subscriber>
listeners: Disp[]
}
const sock = (ws: Socket) => (ws.data && typeof ws.data === "object" ? ws.data : ws)
// WebSocket control frame: 0x00 + UTF-8 JSON.
const meta = (cursor: number) => {
const json = JSON.stringify({ cursor })
const bytes = encoder.encode(json)
const out = new Uint8Array(bytes.length + 1)
out[0] = 0
out.set(bytes, 1)
return out
}
export const Info = Schema.Struct({
id: PtyID,
title: Schema.String,
@ -51,6 +44,8 @@ export const Info = Schema.Struct({
status: Schema.Literals(["running", "exited"]),
// Windows ConPTY assigns the child pid asynchronously, so 0 is valid at spawn time.
pid: NonNegativeInt,
// Present once status is "exited".
exitCode: Schema.optional(NonNegativeInt),
}).annotate({ identifier: "Pty" })
export type Info = Types.DeepMutable<typeof Info.Type>
@ -65,14 +60,6 @@ export const CreateInput = Schema.Struct({
export type CreateInput = Types.DeepMutable<typeof CreateInput.Type>
export type PreparedCreate = {
readonly command: string
readonly args: string[]
readonly cwd: string
readonly title?: string
readonly env: Record<string, string>
}
export const UpdateInput = Schema.Struct({
title: Schema.optional(Schema.String),
size: Schema.optional(
@ -85,10 +72,34 @@ export const UpdateInput = Schema.Struct({
export type UpdateInput = Types.DeepMutable<typeof UpdateInput.Type>
export type AttachInput = {
// Absolute output cursor to replay from. -1 tails from the current end; omitted replays the full retained buffer.
readonly cursor?: number
// Callbacks fire synchronously from the native PTY data path; keep them non-blocking.
readonly onData: (chunk: string) => void
// Fired once when the session stops producing output: process exit (exitCode set), removal, or service teardown.
readonly onEnd: (event: { exitCode?: number }) => void
}
export type Attachment = {
// Retained output from the requested cursor to the current end.
readonly replay: string
// Absolute output cursor after replay.
readonly cursor: number
readonly write: (data: string) => void
// Starts live delivery after the caller has applied replay and cursor metadata.
readonly activate: () => void
readonly detach: () => void
}
export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("Pty.NotFoundError", {
ptyID: PtyID,
}) {}
export class ExitedError extends Schema.TaggedErrorClass<ExitedError>()("Pty.ExitedError", {
ptyID: PtyID,
}) {}
export const Event = {
Created: EventV2.define({ type: "pty.created", schema: { info: Info } }),
Updated: EventV2.define({ type: "pty.updated", schema: { info: Info } }),
@ -99,19 +110,11 @@ export const Event = {
export interface Interface {
readonly list: () => Effect.Effect<Info[]>
readonly get: (id: PtyID) => Effect.Effect<Info, NotFoundError>
readonly create: (input: PreparedCreate) => Effect.Effect<Info>
readonly create: (input: CreateInput) => Effect.Effect<Info>
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info, NotFoundError>
readonly remove: (id: PtyID) => Effect.Effect<void, NotFoundError>
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void, NotFoundError>
readonly write: (id: PtyID, data: string) => Effect.Effect<void, NotFoundError>
readonly connect: (
id: PtyID,
ws: Socket,
cursor?: number,
) => Effect.Effect<
{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined,
NotFoundError
>
readonly attach: (id: PtyID, input: AttachInput) => Effect.Effect<Attachment, NotFoundError | ExitedError>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Pty") {}
@ -121,28 +124,41 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const events = yield* EventV2.Service
const location = yield* Location.Service
const config = yield* Config.Service
const context = yield* Effect.context()
const runFork = Effect.runForkWith(context)
const sessions = new Map<PtyID, Active>()
const exitOrder: PtyID[] = []
function notifyEnd(session: Active, event: { exitCode?: number }) {
for (const subscriber of session.subscribers.values()) {
if (!subscriber.active) {
subscriber.end = event
continue
}
try {
subscriber.onEnd(event)
} catch {}
}
session.subscribers.clear()
}
function teardown(session: Active) {
for (const listener of session.listeners) listener.dispose()
session.listeners.length = 0
try {
session.process.kill()
} catch {}
for (const [sub, ws] of session.subscribers.entries()) {
if (session.info.status === "running") {
try {
if (sock(ws) === sub) ws.close()
session.process.kill()
} catch {}
}
session.subscribers.clear()
notifyEnd(session, {})
}
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const session of sessions.values()) teardown(session)
sessions.clear()
exitOrder.length = 0
}),
)
@ -154,12 +170,13 @@ export const layer = Layer.effect(
const removeSession = Effect.fnUntraced(function* (id: PtyID) {
const session = sessions.get(id)
if (!session) return false
if (!session) return
sessions.delete(id)
const index = exitOrder.indexOf(id)
if (index !== -1) exitOrder.splice(index, 1)
yield* Effect.logInfo("removing session", { id })
teardown(session)
yield* events.publish(Event.Deleted, { id: session.info.id })
return true
})
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
@ -175,26 +192,36 @@ export const layer = Layer.effect(
return (yield* requireSession(id)).info
})
const create = Effect.fn("Pty.create")(function* (input: PreparedCreate) {
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
const id = PtyID.ascending()
yield* Effect.logInfo("creating session", { id, cmd: input.command, args: input.args, cwd: input.cwd })
const command = input.command || Shell.preferred(Config.latest(yield* config.entries(), "shell"))
const args = Shell.login(command) ? [...(input.args ?? []), "-l"] : [...(input.args ?? [])]
const cwd = input.cwd || location.directory
// TODO: Apply plugin shell.env environment augmentation once V2 plugin hooks exist; legacy
// routes merge plugin-provided values into input.env at the boundary.
const env = {
...process.env,
...input.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
yield* Effect.logInfo("creating session", { id, cmd: command, args, cwd })
const { spawn } = yield* Effect.promise(() => pty())
const proc = yield* Effect.sync(() =>
spawn(input.command, input.args, {
name: "xterm-256color",
cwd: input.cwd,
env: input.env,
}),
)
const info = {
const proc = yield* Effect.sync(() => spawn(command, args, { name: "xterm-256color", cwd, env }))
const info: Info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command: input.command,
args: input.args,
cwd: input.cwd,
command,
args,
cwd,
status: "running",
pid: proc.pid,
} as const
}
const session: Active = {
info,
process: proc,
@ -208,15 +235,15 @@ export const layer = Layer.effect(
session.listeners.push(
proc.onData((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1 || sock(ws) !== key) {
session.subscribers.delete(key)
for (const [token, subscriber] of session.subscribers.entries()) {
if (!subscriber.active) {
subscriber.pending.push(chunk)
continue
}
try {
ws.send(chunk)
subscriber.onData(chunk)
} catch {
session.subscribers.delete(key)
session.subscribers.delete(token)
}
}
session.buffer += chunk
@ -227,12 +254,19 @@ export const layer = Layer.effect(
}),
proc.onExit(({ exitCode }) => {
if (session.info.status === "exited") return
session.info.status = "exited"
session.info.exitCode = exitCode
notifyEnd(session, { exitCode })
exitOrder.push(id)
runFork(
Effect.gen(function* () {
yield* Effect.logInfo("session exited", { id, exitCode })
session.info.status = "exited"
yield* events.publish(Event.Exited, { id, exitCode })
yield* removeSession(id)
while (exitOrder.length > EXITED_LIMIT) {
const oldest = exitOrder[0]
if (!oldest) break
yield* removeSession(oldest)
}
}),
)
}),
@ -244,66 +278,72 @@ export const layer = Layer.effect(
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
const session = yield* requireSession(id)
if (input.title) session.info.title = input.title
if (input.size) session.process.resize(input.size.cols, input.size.rows)
if (input.size && session.info.status === "running")
session.process.resize(input.size.cols, input.size.rows)
yield* events.publish(Event.Updated, { info: session.info })
return session.info
})
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
const session = yield* requireSession(id)
if (session.info.status === "running") session.process.resize(cols, rows)
})
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
const session = yield* requireSession(id)
if (session.info.status === "running") session.process.write(data)
})
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
const session = yield* requireSession(id).pipe(Effect.tapError(() => Effect.sync(() => ws.close())))
yield* Effect.logInfo("client connected to session", { id, directory: location.directory })
const sub = sock(ws)
session.subscribers.delete(sub)
session.subscribers.set(sub, ws)
const cleanup = () => session.subscribers.delete(sub)
const attach = Effect.fn("Pty.attach")(function* (id: PtyID, input: AttachInput) {
const session = yield* requireSession(id)
if (session.info.status !== "running") return yield* new ExitedError({ ptyID: id })
yield* Effect.logInfo("client attached to session", { id, directory: location.directory })
const token = {}
const subscriber: Subscriber = {
onData: input.onData,
onEnd: input.onEnd,
active: false,
detached: false,
pending: [],
}
session.subscribers.set(token, subscriber)
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
input.cursor === -1
? end
: typeof input.cursor === "number" && Number.isSafeInteger(input.cursor)
? Math.max(0, input.cursor)
: 0
const replay = (() => {
if (!session.buffer || from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) ws.send(data.slice(i, i + BUFFER_CHUNK))
} catch {
cleanup()
ws.close()
return
}
}
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(typeof message === "string" ? message : new TextDecoder().decode(message))
replay,
cursor: end,
write: (data: string) => {
if (session.info.status === "running") session.process.write(data)
},
onClose: () => {
cleanup()
activate: () => {
if (subscriber.active || subscriber.detached) return
subscriber.active = true
try {
for (const chunk of subscriber.pending) subscriber.onData(chunk)
subscriber.pending.length = 0
if (subscriber.end) subscriber.onEnd(subscriber.end)
} catch {
session.subscribers.delete(token)
}
},
detach: () => {
subscriber.detached = true
subscriber.pending.length = 0
subscriber.end = undefined
session.subscribers.delete(token)
},
}
})
return Service.of({ list, get, create, update, remove, resize, write, connect })
return Service.of({ list, get, create, update, remove, write, attach })
}),
)
export const locationLayer = layer
export const locationLayer = layer.pipe(Layer.provide(Config.locationLayer))

View File

@ -1,24 +0,0 @@
import { Effect } from "effect"
const inputDecoder = new TextDecoder("utf-8", { fatal: true })
export function handlePtyInput(
handler: { onMessage: (message: string | ArrayBuffer) => void },
message: string | Uint8Array,
) {
if (typeof message === "string") {
handler.onMessage(message)
return Effect.void
}
return Effect.try({
try: () => inputDecoder.decode(message),
catch: () => new Error("invalid PTY websocket input"),
}).pipe(
Effect.catch(() => Effect.succeed(undefined)),
Effect.flatMap((decoded) => {
if (decoded === undefined) return Effect.void
handler.onMessage(decoded)
return Effect.void
}),
)
}

View File

@ -0,0 +1,37 @@
export * as PtyProtocol from "./protocol"
// Wire protocol for PTY websocket transports. The PTY domain service is transport-free; server
// routes adapt Pty.attach to websockets with these helpers so every surface speaks one protocol.
//
// Outbound frames are raw UTF-8 terminal chunks. One control frame — a 0x00 byte followed by
// UTF-8 JSON — carries the absolute output cursor after replay so clients can resume later.
const encoder = new TextEncoder()
const decoder = new TextDecoder("utf-8", { fatal: true })
// Replay can be megabytes; send it in bounded frames.
export const REPLAY_CHUNK = 64 * 1024
export function metaFrame(cursor: number) {
const bytes = encoder.encode(JSON.stringify({ cursor }))
const out = new Uint8Array(bytes.length + 1)
out[0] = 0
out.set(bytes, 1)
return out
}
export function chunks(data: string) {
const out: string[] = []
for (let i = 0; i < data.length; i += REPLAY_CHUNK) out.push(data.slice(i, i + REPLAY_CHUNK))
return out
}
// Inbound client frames are UTF-8 text or binary; invalid UTF-8 input is dropped.
export function decodeInput(message: string | Uint8Array | ArrayBuffer) {
if (typeof message === "string") return message
try {
return decoder.decode(message instanceof ArrayBuffer ? new Uint8Array(message) : message)
} catch {
return undefined
}
}

View File

@ -1,10 +1,13 @@
import { Flag } from "@opencode-ai/core/flag/flag"
import { lazy } from "@/util/lazy"
import { Filesystem } from "@/util/filesystem"
import { which } from "@opencode-ai/core/util/which"
export * as Shell from "./shell"
import path from "path"
import { spawn, type ChildProcess } from "child_process"
import { readFile } from "fs/promises"
import { statSync } from "fs"
import { setTimeout as sleep } from "node:timers/promises"
import { Flag } from "./flag/flag"
import { FSUtil } from "./fs-util"
import { which } from "./util/which"
const SIGKILL_TIMEOUT_MS = 200
const META: Record<string, { deny?: boolean; login?: boolean; posix?: boolean; ps?: boolean }> = {
@ -47,7 +50,7 @@ export async function killTree(proc: ChildProcess, opts?: { exited?: () => boole
if (!opts?.exited?.()) {
process.kill(-pid, "SIGKILL")
}
} catch (_e) {
} catch {
proc.kill("SIGTERM")
await sleep(SIGKILL_TIMEOUT_MS)
if (!opts?.exited?.()) {
@ -56,9 +59,13 @@ export async function killTree(proc: ChildProcess, opts?: { exited?: () => boole
}
}
function stat(file: string) {
return statSync(file, { throwIfNoEntry: false }) ?? undefined
}
function full(file: string) {
if (process.platform !== "win32") return file
const shell = Filesystem.windowsPath(file)
const shell = FSUtil.windowsPath(file)
if (path.win32.dirname(shell) !== ".") {
if (shell.startsWith("/") && name(shell) === "bash") return gitbash() || shell
return shell
@ -76,13 +83,13 @@ function ok(file: string) {
}
function rooted(file: string) {
return path.isAbsolute(Filesystem.windowsPath(file))
return path.isAbsolute(FSUtil.windowsPath(file))
}
function resolve(file: string) {
const shell = full(file)
if (rooted(shell)) {
if (Filesystem.stat(shell)?.isFile()) return shell
if (stat(shell)?.isFile()) return shell
return
}
return which(shell) ?? undefined
@ -99,7 +106,7 @@ function win() {
}
async function unix() {
const text = await Filesystem.readText("/etc/shells").catch(() => "")
const text = await readFile("/etc/shells", "utf8").catch(() => "")
if (text) return Array.from(new Set(text.split("\n").filter((line) => line.trim() && !line.startsWith("#"))))
return ["/bin/bash", "/bin/zsh", "/bin/sh"]
}
@ -109,7 +116,7 @@ function select(file: string | undefined, opts?: { acceptable?: boolean }) {
const shell = resolve(file)
if (shell) return shell
}
if (process.platform === "win32") return win()[0]!
if (process.platform === "win32") return win()[0]
return fallback()
}
@ -119,7 +126,7 @@ export function gitbash() {
const git = which("git")
if (!git) return
const file = path.join(git, "..", "..", "bin", "bash.exe")
if (Filesystem.stat(file)?.size) return file
if (stat(file)?.size) return file
}
function fallback() {
@ -130,7 +137,7 @@ function fallback() {
}
export function name(file: string) {
if (process.platform === "win32") return path.win32.parse(Filesystem.windowsPath(file)).name.toLowerCase()
if (process.platform === "win32") return path.win32.parse(FSUtil.windowsPath(file)).name.toLowerCase()
return path.basename(file).toLowerCase()
}
@ -192,24 +199,28 @@ export function args(file: string, command: string, cwd: string) {
return ["-c", command]
}
const defaultPreferred = lazy(() => select(process.env.SHELL))
const defaultAcceptable = lazy(() => select(process.env.SHELL, { acceptable: true }))
let defaultPreferred: string | undefined
let defaultAcceptable: string | undefined
export function preferred(configShell?: string) {
if (configShell) return select(configShell)
return defaultPreferred()
defaultPreferred ??= select(process.env.SHELL)
return defaultPreferred
}
preferred.reset = () => {
defaultPreferred = undefined
}
preferred.reset = () => defaultPreferred.reset()
export function acceptable(configShell?: string) {
if (configShell) return select(configShell, { acceptable: true })
return defaultAcceptable()
defaultAcceptable ??= select(process.env.SHELL, { acceptable: true })
return defaultAcceptable
}
acceptable.reset = () => {
defaultAcceptable = undefined
}
acceptable.reset = () => defaultAcceptable.reset()
export async function list(): Promise<Item[]> {
const shells = process.platform === "win32" ? win() : await unix()
return shells.filter((s) => resolve(s)).map(info)
}
export * as Shell from "./shell"

View File

@ -24,4 +24,9 @@ describe("Pty.Info", () => {
test("rejects a negative pid", () => {
expect(() => Schema.decodeUnknownSync(Pty.Info)(sample(-1))).toThrow()
})
test("accepts an exit code for retained exited sessions", () => {
const info = Schema.decodeUnknownSync(Pty.Info)({ ...sample(48012), status: "exited", exitCode: 4 })
expect(info.exitCode).toBe(4)
})
})

View File

@ -1,19 +0,0 @@
import { describe, expect } from "bun:test"
import { Effect } from "effect"
import { handlePtyInput } from "@opencode-ai/core/pty/input"
import { it } from "../lib/effect"
describe("pty websocket input", () => {
it.effect("does not forward invalid binary frames to the PTY handler", () =>
Effect.gen(function* () {
const messages: Array<string | ArrayBuffer> = []
const handler = { onMessage: (message: string | ArrayBuffer) => messages.push(message) }
yield* handlePtyInput(handler, "ready")
yield* handlePtyInput(handler, new Uint8Array([0xff, 0xfe, 0xfd]))
yield* handlePtyInput(handler, new TextEncoder().encode("hello"))
expect(messages).toEqual(["ready", "hello"])
}),
)
})

View File

@ -0,0 +1,27 @@
import { describe, expect, test } from "bun:test"
import { PtyProtocol } from "@opencode-ai/core/pty/protocol"
describe("pty protocol", () => {
test("drops invalid binary input frames and decodes valid ones", () => {
expect(PtyProtocol.decodeInput("ready")).toBe("ready")
expect(PtyProtocol.decodeInput(new Uint8Array([0xff, 0xfe, 0xfd]))).toBeUndefined()
expect(PtyProtocol.decodeInput(new TextEncoder().encode("hello"))).toBe("hello")
expect(PtyProtocol.decodeInput(new TextEncoder().encode("hello").buffer)).toBe("hello")
})
test("encodes the cursor as a 0x00-prefixed JSON control frame", () => {
const frame = PtyProtocol.metaFrame(42)
expect(frame[0]).toBe(0)
expect(JSON.parse(new TextDecoder().decode(frame.subarray(1)))).toEqual({ cursor: 42 })
})
test("splits replay into bounded frames", () => {
expect(PtyProtocol.chunks("")).toEqual([])
expect(PtyProtocol.chunks("abc")).toEqual(["abc"])
const big = "x".repeat(PtyProtocol.REPLAY_CHUNK + 1)
const frames = PtyProtocol.chunks(big)
expect(frames.length).toBe(2)
expect(frames[0].length).toBe(PtyProtocol.REPLAY_CHUNK)
expect(frames.join("")).toBe(big)
})
})

View File

@ -1,110 +0,0 @@
import { describe, expect } from "bun:test"
import { Duration, Effect, Layer, Queue } from "effect"
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { Pty } from "@opencode-ai/core/pty"
import { AbsolutePath } from "@opencode-ai/core/schema"
import { location } from "../fixture/location"
import { testEffect } from "../lib/effect"
type Socket = Parameters<Pty.Interface["connect"]>[1]
const locationLayer = Layer.succeed(
Location.Service,
Location.Service.of(location({ directory: AbsolutePath.make("/tmp") })),
)
const it = testEffect(Pty.layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provideMerge(locationLayer)))
const ptyTest = process.platform === "win32" ? it.live.skip : it.live
const createPty = Effect.fn("PtyOutputIsolationTest.createPty")(function* (command: string) {
const pty = yield* Pty.Service
return yield* Effect.acquireRelease(
pty.create({ command, args: [], cwd: "/tmp", env: { TERM: "xterm-256color", OPENCODE_TERMINAL: "1" } }),
(info) => pty.remove(info.id).pipe(Effect.ignore),
)
})
const decodeOutput = (data: string | Uint8Array | ArrayBuffer) =>
typeof data === "string"
? data
: Buffer.from(data instanceof Uint8Array ? data : new Uint8Array(data)).toString("utf8")
const makeSocket = Effect.fn("PtyOutputIsolationTest.makeSocket")(function* (data: unknown) {
const output = yield* Queue.unbounded<string>()
const socket: Socket = {
readyState: 1,
data,
send: (data) => Queue.offerUnsafe(output, decodeOutput(data)),
close: () => {},
}
return { socket, output }
})
const waitForOutput = (output: Queue.Queue<string>, text: string, duration: Duration.Input = "5 seconds") =>
Effect.gen(function* () {
let received = ""
while (!received.includes(text)) received += yield* Queue.take(output)
return received
}).pipe(
Effect.timeoutOrElse({
duration,
orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)),
}),
)
describe("pty output isolation", () => {
ptyTest("does not leak output when websocket objects are reused", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const a = yield* createPty("cat")
const b = yield* createPty("cat")
const shared = yield* makeSocket({ events: { connection: "a" } })
const outB = yield* Queue.unbounded<string>()
yield* pty.connect(a.id, shared.socket)
shared.socket.data = { events: { connection: "b" } }
shared.socket.send = (data) => Queue.offerUnsafe(outB, decodeOutput(data))
yield* pty.connect(b.id, shared.socket)
yield* pty.write(a.id, "AAA\n")
const verify = yield* makeSocket({ events: { connection: "verify-a" } })
yield* pty.connect(a.id, verify.socket)
expect(yield* waitForOutput(verify.output, "AAA")).toContain("AAA")
expect(yield* waitForOutput(outB, "AAA", "100 millis").pipe(Effect.option)).toMatchObject({ _tag: "None" })
}),
)
ptyTest("does not leak output when Bun recycles websocket objects before re-connect", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const info = yield* createPty("cat")
const first = yield* makeSocket({ events: { connection: "a" } })
const recycled = yield* Queue.unbounded<string>()
yield* pty.connect(info.id, first.socket)
first.socket.data = { events: { connection: "b" } }
first.socket.send = (data) => Queue.offerUnsafe(recycled, decodeOutput(data))
yield* pty.write(info.id, "AAA\n")
const verify = yield* makeSocket({ events: { connection: "verify" } })
yield* pty.connect(info.id, verify.socket)
expect(yield* waitForOutput(verify.output, "AAA")).toContain("AAA")
expect(yield* waitForOutput(recycled, "AAA", "100 millis").pipe(Effect.option)).toMatchObject({ _tag: "None" })
}),
)
ptyTest("treats in-place socket data mutation as the same connection", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const info = yield* createPty("cat")
const data = { connId: 1 }
const socket = yield* makeSocket(data)
yield* pty.connect(info.id, socket.socket)
data.connId = 2
yield* pty.write(info.id, "AAA\n")
expect(yield* waitForOutput(socket.output, "AAA")).toContain("AAA")
}),
)
})

View File

@ -1,5 +1,6 @@
import { describe, expect } from "bun:test"
import { Cause, Effect, Exit, Layer, Queue } from "effect"
import { Cause, Deferred, Effect, Exit, Layer, Queue } from "effect"
import { Config } from "@opencode-ai/core/config"
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { Pty } from "@opencode-ai/core/pty"
@ -14,7 +15,14 @@ const locationLayer = Layer.succeed(
Location.Service,
Location.Service.of(location({ directory: AbsolutePath.make("/tmp") })),
)
const it = testEffect(Pty.layer.pipe(Layer.provideMerge(EventV2.defaultLayer), Layer.provideMerge(locationLayer)))
const configLayer = Layer.mock(Config.Service)({ entries: () => Effect.succeed([]) })
const it = testEffect(
Pty.layer.pipe(
Layer.provide(configLayer),
Layer.provideMerge(EventV2.defaultLayer),
Layer.provideMerge(locationLayer),
),
)
const ptyTest = process.platform === "win32" ? it.live.skip : it.live
const subscribePtyEvents = Effect.fn("PtySessionTest.subscribePtyEvents")(function* () {
@ -56,36 +64,176 @@ const waitForEvents = (events: Queue.Queue<PtyEvent>, id: PtyID, count: number)
}),
)
const attachCollecting = Effect.fn("PtySessionTest.attachCollecting")(function* (id: PtyID, cursor?: number) {
const pty = yield* Pty.Service
const output = yield* Queue.unbounded<string>()
const ended = yield* Deferred.make<{ exitCode?: number }>()
const attachment = yield* pty.attach(id, {
cursor,
onData: (chunk) => Queue.offerUnsafe(output, chunk),
onEnd: (event) => Deferred.doneUnsafe(ended, Effect.succeed(event)),
})
attachment.activate()
return { attachment, output, ended }
})
const waitForOutput = (output: Queue.Queue<string>, text: string) =>
Effect.gen(function* () {
let received = ""
while (!received.includes(text)) received += yield* Queue.take(output)
return received
}).pipe(
Effect.timeoutOrElse({
duration: "5 seconds",
orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)),
}),
)
describe("pty", () => {
it.live("returns typed not found errors for missing sessions", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const id = "pty_missing" as PtyID
let closed = false
const socket = { readyState: 1, send: () => {}, close: () => void (closed = true) }
for (const result of [
yield* pty.get(id).pipe(Effect.asVoid, Effect.exit),
yield* pty.update(id, { title: "missing" }).pipe(Effect.asVoid, Effect.exit),
yield* pty.remove(id).pipe(Effect.exit),
yield* pty.resize(id, 80, 24).pipe(Effect.exit),
yield* pty.write(id, "input").pipe(Effect.exit),
yield* pty.connect(id, socket).pipe(Effect.asVoid, Effect.exit),
yield* pty.attach(id, { onData: () => {}, onEnd: () => {} }).pipe(Effect.asVoid, Effect.exit),
]) {
expect(Exit.isFailure(result)).toBe(true)
if (Exit.isFailure(result))
expect(Cause.squash(result.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
}
expect(closed).toBe(true)
}),
)
ptyTest("publishes created, exited, deleted in order for a short-lived process", () =>
ptyTest("retains exited sessions until removed", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const events = yield* subscribePtyEvents()
const info = yield* createPty("/usr/bin/env", ["sh", "-c", "sleep 0.1"])
const info = yield* createPty("/usr/bin/env", ["sh", "-c", "exit 3"])
expect(yield* waitForEvents(events, info.id, 3)).toEqual(["created", "exited", "deleted"])
expect(yield* waitForEvents(events, info.id, 2)).toEqual(["created", "exited"])
const exited = yield* pty.get(info.id)
expect(exited.status).toBe("exited")
expect(exited.exitCode).toBe(3)
yield* pty.remove(info.id)
expect(yield* waitForEvents(events, info.id, 1)).toEqual(["deleted"])
const missing = yield* pty.get(info.id).pipe(Effect.exit)
expect(Exit.isFailure(missing)).toBe(true)
}),
)
ptyTest("replays buffered output and streams live output to attachments", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const info = yield* createPty("cat")
yield* pty.write(info.id, "AAA\n")
const first = yield* attachCollecting(info.id)
expect(yield* waitForOutput(first.output, "AAA")).toContain("AAA")
first.attachment.write("BBB\n")
yield* waitForOutput(first.output, "BBB")
// A later attachment replays everything already buffered.
const replayed = yield* attachCollecting(info.id)
expect(replayed.attachment.replay).toContain("AAA")
expect(replayed.attachment.replay).toContain("BBB")
expect(replayed.attachment.cursor).toBeGreaterThan(0)
// Tail attachments skip the buffer and only see subsequent output.
const tail = yield* attachCollecting(info.id, -1)
expect(tail.attachment.replay).toBe("")
expect(tail.attachment.cursor).toBe(replayed.attachment.cursor)
}),
)
ptyTest("stops delivering output after detach", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const info = yield* createPty("cat")
const attached = yield* attachCollecting(info.id, -1)
attached.attachment.detach()
yield* pty.write(info.id, "AAA\n")
const verify = yield* attachCollecting(info.id)
yield* waitForOutput(verify.output, "AAA")
const leaked = yield* Queue.poll(attached.output)
expect(leaked._tag).toBe("None")
}),
)
ptyTest("isolates output between sessions", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const a = yield* createPty("cat")
const b = yield* createPty("cat")
const attachedA = yield* attachCollecting(a.id)
const attachedB = yield* attachCollecting(b.id)
yield* pty.write(a.id, "AAA\n")
yield* waitForOutput(attachedA.output, "AAA")
const leaked = yield* Queue.poll(attachedB.output)
expect(leaked._tag).toBe("None")
}),
)
ptyTest("notifies attachments with the exit code and rejects attach after exit", () =>
Effect.gen(function* () {
const pty = yield* Pty.Service
const events = yield* subscribePtyEvents()
const info = yield* createPty("cat")
const attached = yield* attachCollecting(info.id)
yield* pty.write(info.id, "\u0004")
expect(yield* Deferred.await(attached.ended).pipe(Effect.timeout("5 seconds"))).toEqual({ exitCode: 0 })
yield* waitForEvents(events, info.id, 2)
const result = yield* pty.attach(info.id, { onData: () => {}, onEnd: () => {} }).pipe(Effect.exit)
expect(Exit.isFailure(result)).toBe(true)
if (Exit.isFailure(result))
expect(Cause.squash(result.cause)).toMatchObject({ _tag: "Pty.ExitedError", ptyID: info.id })
}),
)
})
const configuredShell = process.platform === "win32" ? undefined : Bun.which("bash")
const configuredIt = testEffect(
Pty.layer.pipe(
Layer.provide(
Layer.mock(Config.Service)({
entries: () =>
Effect.succeed(
configuredShell
? [new Config.Document({ type: "document", info: new Config.Info({ shell: configuredShell }) })]
: [],
),
}),
),
Layer.provideMerge(EventV2.defaultLayer),
Layer.provideMerge(locationLayer),
),
)
const configuredTest = process.platform === "win32" ? configuredIt.live.skip : configuredIt.live
describe("pty create defaults", () => {
configuredTest("defaults command, login args, and cwd from config and location", () =>
Effect.gen(function* () {
if (!configuredShell) return
const pty = yield* Pty.Service
const info = yield* Effect.acquireRelease(pty.create({ title: "configured" }), (created) =>
pty.remove(created.id).pipe(Effect.ignore),
)
expect(info.command).toBe(configuredShell)
expect(info.args).toEqual(["-l"])
expect(info.cwd).toBe("/tmp")
expect(info.title).toBe("configured")
}),
)
})

View File

@ -1,7 +1,7 @@
import { describe, expect, test } from "bun:test"
import path from "path"
import { Shell } from "../../src/shell/shell"
import { Filesystem } from "@/util/filesystem"
import { Shell } from "@opencode-ai/core/shell"
import { FSUtil } from "@opencode-ai/core/fs-util"
import { which } from "@opencode-ai/core/util/which"
const withShell = async (shell: string | undefined, fn: () => void | Promise<void>) => {
@ -54,6 +54,15 @@ describe("shell", () => {
expect(Shell.name(Shell.acceptable("nu"))).not.toBe("nu")
})
test("builds command args per shell family", () => {
expect(Shell.args("/bin/sh", "echo hi", "/tmp")).toEqual(["-c", "echo hi"])
expect(Shell.args("/usr/bin/fish", "echo hi", "/tmp")).toEqual(["-c", "echo hi"])
const zsh = Shell.args("/bin/zsh", "echo hi", "/tmp")
expect(zsh[0]).toBe("-l")
expect(zsh[1]).toBe("-c")
expect(zsh.at(-1)).toBe("/tmp")
})
if (process.platform === "win32") {
test("rejects blacklisted shells case-insensitively", async () => {
await withShell("NU.EXE", async () => {
@ -64,7 +73,7 @@ describe("shell", () => {
test("normalizes Git Bash shell paths from env", async () => {
const shell = "/cygdrive/c/Program Files/Git/bin/bash.exe"
await withShell(shell, async () => {
expect(Shell.preferred()).toBe(Filesystem.windowsPath(shell))
expect(Shell.preferred()).toBe(FSUtil.windowsPath(shell))
})
})

View File

@ -1,30 +0,0 @@
export * as PtyPreparation from "./pty-preparation"
import { Config } from "@/config/config"
import * as InstanceState from "@/effect/instance-state"
import { Plugin } from "@/plugin"
import { Shell } from "@/shell/shell"
import { Pty } from "@opencode-ai/core/pty"
import { Effect } from "effect"
export const prepareCreate = Effect.fn("PtyPreparation.prepareCreate")(function* (input: Pty.CreateInput) {
const config = yield* Config.Service
const plugin = yield* Plugin.Service
const command = input.command || Shell.preferred((yield* config.get()).shell)
const args = Shell.login(command) ? [...(input.args ?? []), "-l"] : [...(input.args ?? [])]
const cwd = input.cwd || (yield* InstanceState.context).directory
const shell = yield* plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shell.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
return { command, args, cwd, title: input.title, env }
})

View File

@ -1,23 +1,22 @@
import * as InstanceState from "@/effect/instance-state"
import { registerDisposer } from "@/effect/instance-registry"
import { InstanceRef, WorkspaceRef } from "@/effect/instance-ref"
import { PtyPreparation } from "@/pty-preparation"
import { Plugin } from "@/plugin"
import { Pty } from "@opencode-ai/core/pty"
import { handlePtyInput } from "@opencode-ai/core/pty/input"
import { PtyProtocol } from "@opencode-ai/core/pty/protocol"
import { PtyID } from "@opencode-ai/core/pty/schema"
import { PtyTicket } from "@opencode-ai/core/pty/ticket"
import { LocationServiceMap } from "@opencode-ai/core/location-layer"
import { Location } from "@opencode-ai/core/location"
import { AbsolutePath } from "@opencode-ai/core/schema"
import { Shell } from "@/shell/shell"
import { EffectBridge } from "@/effect/bridge"
import { CorsConfig, isAllowedRequestOrigin, type CorsOptions } from "@/server/cors"
import { Shell } from "@opencode-ai/core/shell"
import { CorsConfig, isAllowedRequestOrigin, type CorsOptions } from "@opencode-ai/server/cors"
import {
PTY_CONNECT_TICKET_QUERY,
PTY_CONNECT_TOKEN_HEADER,
PTY_CONNECT_TOKEN_HEADER_VALUE,
} from "@/server/shared/pty-ticket"
import { Effect, Layer, Option, Schema } from "effect"
import { Effect, Layer, Option, Queue, Schema } from "effect"
import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder } from "effect/unstable/httpapi"
import * as Socket from "effect/unstable/socket/Socket"
@ -36,10 +35,14 @@ const ticketScope = Effect.gen(function* () {
return { directory: instance?.directory, workspaceID }
})
// Legacy surface compatibility: before exited-session retention, sessions vanished the moment
// their process exited. These routes preserve that observable behavior — exited sessions are
// invisible here — while the canonical /api/pty surface exposes them until removal.
export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handlers) =>
Effect.gen(function* () {
const tickets = yield* PtyTicket.Service
const cors = yield* CorsConfig
const plugin = yield* Plugin.Service
const locations = yield* LocationServiceMap
const unregister = registerDisposer((directory) =>
Effect.runPromise(locations.invalidate(Location.Ref.make({ directory: AbsolutePath.make(directory) }))),
@ -59,33 +62,42 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
})
const list = Effect.fn("PtyHttpApi.list")(function* () {
return yield* pty(Pty.Service.use((service) => service.list()))
const sessions = yield* pty(Pty.Service.use((service) => service.list()))
return sessions.filter((info) => info.status === "running")
})
const create = Effect.fn("PtyHttpApi.create")(function* (ctx: { payload: typeof Pty.CreateInput.Type }) {
const cwd = ctx.payload.cwd || (yield* InstanceState.context).directory
const shell = yield* plugin.trigger("shell.env", { cwd }, { env: {} as Record<string, string> })
return yield* pty(
Pty.Service.use((service) =>
Effect.flatMap(
PtyPreparation.prepareCreate({
...ctx.payload,
args: ctx.payload.args ? [...ctx.payload.args] : undefined,
env: ctx.payload.env ? { ...ctx.payload.env } : undefined,
}),
service.create,
),
service.create({
...ctx.payload,
args: ctx.payload.args ? [...ctx.payload.args] : undefined,
cwd,
env: { ...ctx.payload.env, ...shell.env },
}),
),
)
})
const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) {
return yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe(
Effect.catchTag("Pty.NotFoundError", (error) =>
Effect.fail(
Effect.catchTag(
"Pty.NotFoundError",
(error) =>
new ApiError.PtyNotFoundError({
ptyID: error.ptyID,
message: `PTY session not found: ${error.ptyID}`,
}),
),
),
Effect.flatMap((info) =>
info.status === "running"
? Effect.succeed(info)
: new ApiError.PtyNotFoundError({
ptyID: ctx.params.ptyID,
message: `PTY session not found: ${ctx.params.ptyID}`,
}),
),
)
})
@ -94,6 +106,7 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
params: { ptyID: PtyID }
payload: typeof Pty.UpdateInput.Type
}) {
yield* get(ctx)
return yield* pty(
Pty.Service.use((service) =>
service.update(ctx.params.ptyID, {
@ -102,26 +115,27 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
}),
),
).pipe(
Effect.catchTag("Pty.NotFoundError", (error) =>
Effect.fail(
Effect.catchTag(
"Pty.NotFoundError",
(error) =>
new ApiError.PtyNotFoundError({
ptyID: error.ptyID,
message: `PTY session not found: ${error.ptyID}`,
}),
),
),
)
})
const remove = Effect.fn("PtyHttpApi.remove")(function* (ctx: { params: { ptyID: PtyID } }) {
yield* get(ctx)
yield* pty(Pty.Service.use((service) => service.remove(ctx.params.ptyID))).pipe(
Effect.catchTag("Pty.NotFoundError", (error) =>
Effect.fail(
Effect.catchTag(
"Pty.NotFoundError",
(error) =>
new ApiError.PtyNotFoundError({
ptyID: error.ptyID,
message: `PTY session not found: ${error.ptyID}`,
}),
),
),
)
return true
@ -131,16 +145,7 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
const request = yield* HttpServerRequest.HttpServerRequest
if (request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE || !validOrigin(request, cors))
return yield* new ApiError.PtyForbiddenError({ message: "Invalid PTY connect token request" })
yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe(
Effect.catchTag("Pty.NotFoundError", (error) =>
Effect.fail(
new ApiError.PtyNotFoundError({
ptyID: error.ptyID,
message: `PTY session not found: ${error.ptyID}`,
}),
),
),
)
yield* get(ctx)
return yield* tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* ticketScope) })
})
@ -180,7 +185,7 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne
request: HttpServerRequest.HttpServerRequest
}) {
const exists = yield* pty(Pty.Service.use((service) => service.get(ctx.params.ptyID))).pipe(
Effect.as(true),
Effect.map((info) => info.status === "running"),
Effect.catchTag("Pty.NotFoundError", () => Effect.succeed(false)),
)
if (!exists) return HttpServerResponse.empty({ status: 404 })
@ -214,48 +219,53 @@ export const ptyConnectHandlers = HttpApiBuilder.group(PtyConnectApi, "pty-conne
yield* closeAccepted(WebSocketTracker.SERVER_CLOSING_EVENT())
return HttpServerResponse.empty()
}
const bridge = yield* EffectBridge.make()
const writeScoped = (effect: Effect.Effect<void, unknown>) => {
bridge.fork(effect.pipe(Effect.catch(() => Effect.void)))
}
let closed = false
const adapter = {
get readyState() {
return closed ? 3 : 1
},
send: (data: string | Uint8Array | ArrayBuffer) => {
if (closed) return
writeScoped(write(data instanceof ArrayBuffer ? new Uint8Array(data) : data))
},
close: (code?: number, reason?: string) => {
if (closed) return
closed = true
writeScoped(write(new Socket.CloseEvent(code, reason)))
},
}
const handler = yield* pty(
Pty.Service.use((service) => service.connect(ctx.params.ptyID, adapter, cursor)),
).pipe(
Effect.catchTag("Pty.NotFoundError", () =>
closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)),
),
)
if (!handler) return HttpServerResponse.empty()
// The handshake runs inside `socket.runRaw`, after the input callback is
// registered, so the client cannot send frames before PTY input is wired.
yield* socket
.runRaw((message) => handlePtyInput(handler, message))
.pipe(
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.ensuring(
Effect.sync(() => {
closed = true
handler.onClose()
}),
),
Effect.orDie,
)
// Outbound frames flow through one queue drained by a single writer so replay, live
// output, and the close frame keep their order.
const outbox = yield* Queue.unbounded<string | Uint8Array | Socket.CloseEvent>()
const attachment = yield* pty(
Pty.Service.use((service) =>
service.attach(ctx.params.ptyID, {
cursor,
onData: (chunk) => Queue.offerUnsafe(outbox, chunk),
onEnd: () => Queue.offerUnsafe(outbox, new Socket.CloseEvent(1000)),
}),
),
).pipe(
Effect.catchTags({
"Pty.NotFoundError": () =>
closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)),
"Pty.ExitedError": () =>
closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)),
}),
)
if (!attachment) return HttpServerResponse.empty()
for (const chunk of PtyProtocol.chunks(attachment.replay)) Queue.offerUnsafe(outbox, chunk)
Queue.offerUnsafe(outbox, PtyProtocol.metaFrame(attachment.cursor))
attachment.activate()
const drain = Effect.gen(function* () {
while (true) {
const item = yield* Queue.take(outbox)
yield* write(item)
if (item instanceof Socket.CloseEvent) return
}
})
// The reader runs concurrently with the writer; whichever finishes first ends the
// connection and the attachment is always released.
yield* Effect.race(
drain,
socket.runRaw((message) => {
const decoded = PtyProtocol.decodeInput(message)
if (decoded !== undefined) attachment.write(decoded)
}),
).pipe(
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.ensuring(Effect.sync(() => attachment.detach())),
Effect.orDie,
)
return HttpServerResponse.empty()
}),
)

View File

@ -61,7 +61,7 @@ import { PtyTicket } from "@opencode-ai/core/pty/ticket"
import { Ripgrep } from "@opencode-ai/core/ripgrep"
import { SessionProjector } from "@opencode-ai/core/session/projector"
import { lazy } from "@/util/lazy"
import { CorsConfig, isAllowedCorsOrigin, type CorsOptions } from "@/server/cors"
import { CorsConfig, isAllowedCorsOrigin, type CorsOptions } from "@opencode-ai/server/cors"
import { serveUIEffect } from "@/server/shared/ui"
import { ServerAuth } from "@/server/auth"
import { InstanceHttpApi, RootHttpApi } from "./api"

View File

@ -10,7 +10,7 @@ import { HttpApiApp } from "./routes/instance/httpapi/server"
import { disposeMiddleware } from "./routes/instance/httpapi/lifecycle"
import { WebSocketTracker } from "./routes/instance/httpapi/websocket-tracker"
import { PublicApi } from "./routes/instance/httpapi/public"
import type { CorsOptions } from "./cors"
import type { CorsOptions } from "@opencode-ai/server/cors"
import { lazy } from "@/util/lazy"
// @ts-ignore This global is needed to prevent ai-sdk from logging warnings to stdout https://github.com/vercel/ai/blob/2dc67e0ef538307f21368db32d5a12345d98831b/packages/ai/src/logger/log-warnings.ts#L85

View File

@ -35,7 +35,7 @@ import { Tool } from "@/tool/tool"
import { Permission } from "@/permission"
import { SessionStatus } from "./status"
import { LLM } from "./llm"
import { Shell } from "@/shell/shell"
import { Shell } from "@opencode-ai/core/shell"
import { ShellID } from "@/tool/shell/id"
import { FSUtil } from "@opencode-ai/core/fs-util"
import { Truncate } from "@/tool/truncate"

View File

@ -12,7 +12,7 @@ import { FSUtil } from "@opencode-ai/core/fs-util"
import { fileURLToPath } from "url"
import { Config } from "@/config/config"
import { RuntimeFlags } from "@/effect/runtime-flags"
import { Shell } from "@/shell/shell"
import { Shell } from "@opencode-ai/core/shell"
import { ShellID } from "./shell/id"
import * as Truncate from "./truncate"

View File

@ -1,102 +0,0 @@
import { describe, expect } from "bun:test"
import { Effect, Layer } from "effect"
import { Config } from "../../src/config/config"
import { Plugin } from "../../src/plugin"
import { PtyPreparation } from "../../src/pty-preparation"
import { Pty } from "@opencode-ai/core/pty"
import { Shell } from "../../src/shell/shell"
import { testEffect } from "../lib/effect"
Shell.preferred.reset()
const it = testEffect(Layer.mergeAll(Config.defaultLayer, Plugin.defaultLayer))
const preparationIt = testEffect(
Layer.mergeAll(
Layer.mock(Config.Service)({ get: () => Effect.succeed({}) }),
Layer.mock(Plugin.Service)({
trigger: <Name extends string, Input, Output>(_name: Name, _input: Input, output: Output) =>
Effect.sync(() => {
const result = output as { env: Record<string, string> }
result.env.INPUT = "plugin"
result.env.FROM_PLUGIN = "plugin"
result.env.TERM = "plugin"
return output
}),
list: () => Effect.succeed([]),
init: () => Effect.void,
}),
),
)
const preparePty = (input: Pty.CreateInput) => PtyPreparation.prepareCreate(input)
describe("pty shell args", () => {
if (process.platform !== "win32") return
const ps = Bun.which("pwsh") || Bun.which("powershell")
if (ps) {
it.instance(
"does not add login args to pwsh",
() =>
Effect.gen(function* () {
const info = yield* preparePty({ command: ps, title: "pwsh" })
expect(info.args).toEqual([])
}),
{ timeout: 30000 },
)
}
const bash = (() => {
const shell = Shell.preferred()
if (Shell.name(shell) === "bash") return shell
return Shell.gitbash()
})()
if (bash) {
it.instance(
"adds login args to bash",
() =>
Effect.gen(function* () {
const info = yield* preparePty({ command: bash, title: "bash" })
expect(info.args).toEqual(["-l"])
}),
{ timeout: 30000 },
)
}
})
describe("pty configured shell", () => {
const configured = process.platform === "win32" ? Bun.which("pwsh") || Bun.which("powershell") : Bun.which("bash")
it.instance(
"uses configured shell for default PTY command",
() =>
Effect.gen(function* () {
if (!configured) return
const info = yield* preparePty({ title: "configured" })
if (process.platform === "win32") {
expect(info.command.toLowerCase()).toBe(configured.toLowerCase())
} else {
expect(info.command).toBe(configured)
}
expect(info.args).toEqual(process.platform === "win32" ? [] : ["-l"])
}),
configured ? { config: { shell: Shell.name(configured) } } : undefined,
{ timeout: 30000 },
)
})
describe("pty environment preparation", () => {
preparationIt.instance("merges plugin environment before forced PTY values", () =>
Effect.gen(function* () {
const input = { command: "/bin/sh", args: [] as string[], env: { INPUT: "caller" } }
const prepared = yield* preparePty(input)
expect(input.args).toEqual([])
expect(prepared.env.INPUT).toBe("plugin")
expect(prepared.env.FROM_PLUGIN).toBe("plugin")
expect(prepared.env.TERM).toBe("xterm-256color")
expect(prepared.env.OPENCODE_TERMINAL).toBe("1")
}),
)
})

View File

@ -757,6 +757,44 @@ const scenarios: Scenario[] = [
.seeded((ctx) => ctx.file("hello.txt", "hello\n"))
.at((ctx) => ({ path: "/api/fs/find?query=hello&type=file", headers: ctx.headers() }))
.json(200, locationData(array)),
http.protected.get("/api/pty", "v2.pty.list").json(200, locationData(array)),
http.protected
.post("/api/pty", "v2.pty.create")
.mutating()
.at((ctx) => ({ path: "/api/pty", headers: ctx.headers(), body: controlledPtyInput("HTTP API V2 PTY") }))
.json(200, locationData(object)),
http.protected
.get("/api/pty/{ptyID}", "v2.pty.get")
.at((ctx) => ({ path: route("/api/pty/{ptyID}", { ptyID: "pty_httpapi_missing" }), headers: ctx.headers() }))
.json(404, object, "status"),
http.protected
.put("/api/pty/{ptyID}", "v2.pty.update")
.mutating()
.at((ctx) => ({
path: route("/api/pty/{ptyID}", { ptyID: "pty_httpapi_missing" }),
headers: ctx.headers(),
body: { title: "missing" },
}))
.json(404, object, "status"),
http.protected
.delete("/api/pty/{ptyID}", "v2.pty.remove")
.mutating()
.at((ctx) => ({ path: route("/api/pty/{ptyID}", { ptyID: "pty_httpapi_missing" }), headers: ctx.headers() }))
.json(404, object, "status"),
http.protected
.post("/api/pty/{ptyID}/connect-token", "v2.pty.connectToken")
.at((ctx) => ({
path: route("/api/pty/{ptyID}/connect-token", { ptyID: "pty_httpapi_missing" }),
headers: { ...ctx.headers(), "x-opencode-ticket": "1" },
}))
.json(404, object, "status"),
http.protected
.get("/api/pty/{ptyID}/connect", "v2.pty.connect")
.at((ctx) => ({
path: route("/api/pty/{ptyID}/connect", { ptyID: "pty_httpapi_missing" }),
headers: ctx.headers(),
}))
.status(404, undefined, "none"),
http.protected.get("/api/reference", "v2.reference.list").json(200, object),
http.protected
.get("/api/provider/{providerID}", "v2.provider.get")

View File

@ -136,6 +136,33 @@ describe("pty HttpApi bridge", () => {
})
})
testPty("hides exited sessions on the legacy surface", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const headers = { "x-opencode-directory": tmp.path }
const created = await app().request(PtyPaths.create, {
method: "POST",
headers: { ...headers, "content-type": "application/json" },
body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "exit 0"] }),
})
expect(created.status).toBe(200)
const info = await created.json()
// Exited sessions are retained by core for the canonical surface, but the legacy
// routes preserve pre-retention behavior: exited sessions are invisible here.
const deadline = Date.now() + 5_000
while (Date.now() < deadline) {
const found = await app().request(PtyPaths.get.replace(":ptyID", info.id), { headers })
if (found.status === 404) break
await new Promise((resolve) => setTimeout(resolve, 50))
}
const found = await app().request(PtyPaths.get.replace(":ptyID", info.id), { headers })
expect(found.status).toBe(404)
const list = await app().request(PtyPaths.list, { headers })
expect(list.status).toBe(200)
expect(await list.json()).toEqual([])
})
testPty("disposes PTY sessions with their legacy instance", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const headers = { "x-opencode-directory": tmp.path }

View File

@ -0,0 +1,171 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Context, Config as EffectConfig, Effect, Layer, Queue, Schema } from "effect"
import { NodeHttpServer, NodeServices } from "@effect/platform-node"
import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http"
import * as Socket from "effect/unstable/socket/Socket"
import { Location } from "@opencode-ai/core/location"
import { Pty } from "@opencode-ai/core/pty"
import { PtyTicket } from "@opencode-ai/core/pty/ticket"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir, tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
const context = Context.empty() as Context.Context<unknown>
const testPty = process.platform === "win32" ? test.skip : test
function request(route: string, directory: string, init: RequestInit = {}) {
const headers = new Headers(init.headers)
headers.set("x-opencode-directory", directory)
return HttpApiApp.webHandler().handler(
new Request(`http://localhost${route}`, {
...init,
headers,
}),
context,
)
}
const testStateLayer = Layer.effectDiscard(
Effect.gen(function* () {
yield* Effect.promise(() => resetDatabase())
yield* Effect.addFinalizer(() => Effect.promise(() => resetDatabase()))
}),
)
const servedRoutes: Layer.Layer<never, EffectConfig.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)
const effectIt = testEffect(
Layer.mergeAll(
testStateLayer,
Socket.layerWebSocketConstructorGlobal,
servedRoutes.pipe(
Layer.provide(Socket.layerWebSocketConstructorGlobal),
Layer.provideMerge(NodeHttpServer.layerTest),
Layer.provideMerge(NodeServices.layer),
),
),
)
const directoryHeader = (dir: string) => HttpClientRequest.setHeader("x-opencode-directory", dir)
const serverUrl = () => HttpServer.HttpServer.use((server) => Effect.succeed(HttpServer.formatAddress(server.address)))
afterEach(async () => {
await disposeAllInstances()
await resetDatabase()
})
describe("v2 pty HttpApi", () => {
testPty("serves location-wrapped PTY routes and retains exited sessions", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const empty = await request("/api/pty", tmp.path)
expect(empty.status).toBe(200)
expect(Schema.decodeUnknownSync(Location.response(Schema.Array(Pty.Info)))(await empty.json()).data).toEqual([])
const created = await request("/api/pty", tmp.path, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "exit 4"], title: "v2" }),
})
expect(created.status).toBe(200)
const body = Schema.decodeUnknownSync(Location.response(Pty.Info))(await created.json())
expect(String(body.location.directory)).toBe(tmp.path)
expect(body.data.title).toBe("v2")
// The canonical surface keeps exited sessions observable with their exit code.
const deadline = Date.now() + 5_000
let info: { status: string; exitCode?: number } | undefined
while (Date.now() < deadline) {
const found = await request(`/api/pty/${body.data.id}`, tmp.path)
expect(found.status).toBe(200)
info = Schema.decodeUnknownSync(Location.response(Pty.Info))(await found.json()).data
if (info.status === "exited") break
await new Promise((resolve) => setTimeout(resolve, 50))
}
expect(info).toMatchObject({ status: "exited", exitCode: 4 })
const removed = await request(`/api/pty/${body.data.id}`, tmp.path, { method: "DELETE" })
expect(removed.status).toBe(204)
const missing = await request(`/api/pty/${body.data.id}`, tmp.path)
expect(missing.status).toBe(404)
expect(await missing.json()).toMatchObject({ _tag: "PtyNotFoundError", ptyID: body.data.id })
})
testPty("rejects connect tokens without the CSRF header and connects with a valid ticket", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const created = await request("/api/pty", tmp.path, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ command: "/usr/bin/env", args: ["sh", "-c", "sleep 5"] }),
})
expect(created.status).toBe(200)
const info = Schema.decodeUnknownSync(Location.response(Pty.Info))(await created.json()).data
try {
const forbidden = await request(`/api/pty/${info.id}/connect-token`, tmp.path, { method: "POST" })
expect(forbidden.status).toBe(403)
expect(await forbidden.json()).toMatchObject({ _tag: "ForbiddenError" })
const token = await request(`/api/pty/${info.id}/connect-token`, tmp.path, {
method: "POST",
headers: { "x-opencode-ticket": "1" },
})
expect(token.status).toBe(200)
const ticket = Schema.decodeUnknownSync(Location.response(PtyTicket.ConnectToken))(await token.json()).data.ticket
expect(ticket).toBeTruthy()
const invalid = await request(`/api/pty/${info.id}/connect?ticket=not-a-ticket`, tmp.path)
expect(invalid.status).toBe(403)
} finally {
await request(`/api/pty/${info.id}`, tmp.path, { method: "DELETE" })
}
})
;(process.platform === "win32" ? effectIt.live.skip : effectIt.live)(
"serves PTY websocket output and input through the canonical route",
() =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true, config: { formatter: false, lsp: false } })
const created = yield* HttpClientRequest.post("/api/pty").pipe(
directoryHeader(dir),
HttpClientRequest.bodyJson({ command: "/bin/cat", title: "v2-websocket" }),
Effect.flatMap(HttpClient.execute),
)
expect(created.status).toBe(200)
const body = yield* Schema.decodeUnknownEffect(Location.response(Pty.Info))(yield* created.json)
const info = body.data
const socket = yield* Socket.makeWebSocket(
`${(yield* serverUrl()).replace(/^http/, "ws")}/api/pty/${info.id}/connect?cursor=-1&location[directory]=${encodeURIComponent(dir)}`,
{ closeCodeIsError: () => false },
)
const messages = yield* Queue.unbounded<string>()
yield* socket
.runRaw((message) =>
Queue.offer(messages, typeof message === "string" ? message : new TextDecoder().decode(message)),
)
.pipe(Effect.catch(() => Effect.void))
.pipe(Effect.forkScoped)
const write = yield* socket.writer
const takeUntil = (expected: string, seen = ""): Effect.Effect<string, unknown> =>
Effect.gen(function* () {
const next = seen + (yield* Queue.take(messages).pipe(Effect.timeout("5 seconds")))
if (next.includes(expected)) return next
return yield* takeUntil(expected, next)
})
yield* write("ping-v2\n")
expect(yield* takeUntil("ping-v2")).toContain("ping-v2")
yield* write(new Socket.CloseEvent(1000, "done")).pipe(Effect.catch(() => Effect.void))
const removed = yield* HttpClientRequest.delete(`/api/pty/${info.id}`).pipe(directoryHeader(dir), HttpClient.execute)
expect(removed.status).toBe(204)
}),
)
})

View File

@ -43,7 +43,7 @@ import { SessionV2 } from "@opencode-ai/core/session"
import { SessionExecution } from "@opencode-ai/core/session/execution"
import { Skill } from "../../src/skill"
import { SystemPrompt } from "../../src/session/system"
import { Shell } from "../../src/shell/shell"
import { Shell } from "@opencode-ai/core/shell"
import { Snapshot } from "../../src/snapshot"
import { ToolRegistry } from "@/tool/registry"
import { Truncate } from "@/tool/truncate"

View File

@ -5,7 +5,7 @@ import type * as Scope from "effect/Scope"
import os from "os"
import path from "path"
import { Config } from "@/config/config"
import { Shell } from "../../src/shell/shell"
import { Shell } from "@opencode-ai/core/shell"
import { ShellTool } from "../../src/tool/shell"
import { Filesystem } from "@/util/filesystem"
import { provideInstance, testInstanceStoreLayer, tmpdirScoped } from "../fixture/fixture"

View File

@ -11,6 +11,7 @@ import { SkillGroup } from "./groups/skill"
import { EventGroup } from "./groups/event"
import { AgentGroup } from "./groups/agent"
import { HealthGroup } from "./groups/health"
import { PtyGroup } from "./groups/pty"
import { QuestionGroup } from "./groups/question"
import { ReferenceGroup } from "./groups/reference"
import { Authorization } from "./middleware/authorization"
@ -34,6 +35,7 @@ export const Api = HttpApi.make("server")
.add(CommandGroup)
.add(SkillGroup)
.add(EventGroup)
.add(PtyGroup)
.add(QuestionGroup)
.add(ReferenceGroup)
.add(ProjectCopyGroup)

View File

@ -84,3 +84,18 @@ export class QuestionNotFoundError extends Schema.TaggedErrorClass<QuestionNotFo
},
{ httpApiStatus: 404 },
) {}
export class ForbiddenError extends Schema.TaggedErrorClass<ForbiddenError>()(
"ForbiddenError",
{ message: Schema.String },
{ httpApiStatus: 403 },
) {}
export class PtyNotFoundError extends Schema.TaggedErrorClass<PtyNotFoundError>()(
"PtyNotFoundError",
{
ptyID: Schema.String,
message: Schema.String,
},
{ httpApiStatus: 404 },
) {}

View File

@ -0,0 +1,144 @@
import { Pty } from "@opencode-ai/core/pty"
import { PtyID } from "@opencode-ai/core/pty/schema"
import { PtyTicket } from "@opencode-ai/core/pty/ticket"
import { Location } from "@opencode-ai/core/location"
import { Schema } from "effect"
import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
import { ForbiddenError, PtyNotFoundError } from "../errors"
import { LocationQuery, locationQueryOpenApi, LocationMiddleware } from "./location"
export const PTY_CONNECT_TICKET_QUERY = "ticket"
export const PTY_CONNECT_TOKEN_HEADER = "x-opencode-ticket"
export const PTY_CONNECT_TOKEN_HEADER_VALUE = "1"
const PTY_CONNECT_PATH = /^\/api\/pty\/[^/]+\/connect$/
// Authorization middleware skips credential checks when this matches; the PTY connect handler
// is then responsible for consuming and validating the ticket.
export function hasPtyConnectTicketURL(url: URL) {
return PTY_CONNECT_PATH.test(url.pathname) && !!url.searchParams.get(PTY_CONNECT_TICKET_QUERY)
}
export const PtyGroup = HttpApiGroup.make("server.pty")
.add(
HttpApiEndpoint.get("pty.list", "/api/pty", {
query: LocationQuery,
success: Location.response(Schema.Array(Pty.Info)),
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.list",
summary: "List PTY sessions",
description: "List PTY sessions for a location, including exited sessions retained until removal.",
}),
),
)
.add(
HttpApiEndpoint.post("pty.create", "/api/pty", {
query: LocationQuery,
payload: Pty.CreateInput,
success: Location.response(Pty.Info),
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.create",
summary: "Create PTY session",
description: "Create a pseudo-terminal session for a location.",
}),
),
)
.add(
HttpApiEndpoint.get("pty.get", "/api/pty/:ptyID", {
params: { ptyID: PtyID },
query: LocationQuery,
success: Location.response(Pty.Info),
error: PtyNotFoundError,
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.get",
summary: "Get PTY session",
description: "Get one PTY session, including its exit code once exited.",
}),
),
)
.add(
HttpApiEndpoint.put("pty.update", "/api/pty/:ptyID", {
params: { ptyID: PtyID },
query: LocationQuery,
payload: Pty.UpdateInput,
success: Location.response(Pty.Info),
error: PtyNotFoundError,
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.update",
summary: "Update PTY session",
description: "Update the title or viewport size of one PTY session.",
}),
),
)
.add(
HttpApiEndpoint.delete("pty.remove", "/api/pty/:ptyID", {
params: { ptyID: PtyID },
query: LocationQuery,
success: HttpApiSchema.NoContent,
error: PtyNotFoundError,
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.remove",
summary: "Remove PTY session",
description: "Terminate and remove one PTY session.",
}),
),
)
.add(
HttpApiEndpoint.post("pty.connectToken", "/api/pty/:ptyID/connect-token", {
params: { ptyID: PtyID },
query: LocationQuery,
success: Location.response(PtyTicket.ConnectToken),
error: [ForbiddenError, PtyNotFoundError],
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.connectToken",
summary: "Create PTY WebSocket token",
description: "Create a short-lived single-use ticket for opening a PTY WebSocket connection.",
}),
),
)
.add(
// Query fields are decoded in the raw handler after the existence check so a missing
// session responds with an empty 404 before any upgrade work.
HttpApiEndpoint.get("pty.connect", "/api/pty/:ptyID/connect", {
params: { ptyID: PtyID },
success: Schema.Boolean,
error: [ForbiddenError, PtyNotFoundError],
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.pty.connect",
summary: "Connect to PTY session",
description: "Establish a WebSocket connection streaming PTY output and accepting terminal input.",
transform: (operation) => ({
...operation,
parameters: [
...(operation.parameters ?? []),
...["location[directory]", "location[workspace]", "cursor", PTY_CONNECT_TICKET_QUERY].map((name) => ({
in: "query",
name,
schema: { type: "string" },
})),
],
}),
}),
),
)
.annotateMerge(OpenApi.annotations({ title: "pty", description: "Experimental location-scoped PTY routes." }))
.middleware(LocationMiddleware)

View File

@ -1,6 +1,7 @@
import { SessionV2 } from "@opencode-ai/core/session"
import { LocationServiceMap } from "@opencode-ai/core/location-layer"
import { PermissionSaved } from "@opencode-ai/core/permission/saved"
import { PtyTicket } from "@opencode-ai/core/pty/ticket"
import { Layer } from "effect"
import { layer as locationLayer } from "./groups/location"
import { sessionLocationLayer } from "./middleware/session-location"
@ -15,6 +16,7 @@ import { SkillHandler } from "./handlers/skill"
import { EventHandler } from "./handlers/event"
import { AgentHandler } from "./handlers/agent"
import { HealthHandler } from "./handlers/health"
import { PtyHandler } from "./handlers/pty"
import { QuestionHandler } from "./handlers/question"
import { ReferenceHandler } from "./handlers/reference"
import * as SessionExecutionLocal from "@opencode-ai/core/session/execution/local"
@ -39,6 +41,7 @@ export const handlers = Layer.mergeAll(
CommandHandler,
SkillHandler,
EventHandler,
PtyHandler,
QuestionHandler,
ReferenceHandler,
ProjectCopyHandler,
@ -48,6 +51,7 @@ export const handlers = Layer.mergeAll(
Layer.provide(SessionV2.defaultLayer),
Layer.provide(SessionExecutionLocal.defaultLayer),
Layer.provide(PermissionSaved.defaultLayer),
Layer.provide(PtyTicket.defaultLayer),
Layer.provide(LocationServiceMap.layer),
Layer.provide(Credential.defaultLayer),
)

View File

@ -0,0 +1,221 @@
import { Pty } from "@opencode-ai/core/pty"
import { PtyProtocol } from "@opencode-ai/core/pty/protocol"
import { PtyTicket } from "@opencode-ai/core/pty/ticket"
import { Location } from "@opencode-ai/core/location"
import { Effect, Queue } from "effect"
import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder, HttpApiSchema } from "effect/unstable/httpapi"
import * as Socket from "effect/unstable/socket/Socket"
import { Api } from "../api"
import { CorsConfig, isAllowedRequestOrigin } from "../cors"
import { ForbiddenError, PtyNotFoundError } from "../errors"
import {
PTY_CONNECT_TICKET_QUERY,
PTY_CONNECT_TOKEN_HEADER,
PTY_CONNECT_TOKEN_HEADER_VALUE,
} from "../groups/pty"
import { response } from "../groups/location"
const ticketScope = Effect.gen(function* () {
const location = yield* Location.Service
return { directory: location.directory as string, workspaceID: location.workspaceID }
})
export const PtyHandler = HttpApiBuilder.group(Api, "server.pty", (handlers) =>
Effect.gen(function* () {
const tickets = yield* PtyTicket.Service
const cors = yield* CorsConfig
return handlers
.handle(
"pty.list",
Effect.fn(function* () {
return yield* response((yield* Pty.Service).list())
}),
)
.handle(
"pty.create",
Effect.fn(function* (ctx) {
const pty = yield* Pty.Service
return yield* response(
pty.create({
...ctx.payload,
args: ctx.payload.args ? [...ctx.payload.args] : undefined,
env: ctx.payload.env ? { ...ctx.payload.env } : undefined,
}),
)
}),
)
.handle(
"pty.get",
Effect.fn(function* (ctx) {
const pty = yield* Pty.Service
return yield* response(
pty
.get(ctx.params.ptyID)
.pipe(
Effect.catchTag(
"Pty.NotFoundError",
() =>
new PtyNotFoundError({
ptyID: ctx.params.ptyID,
message: `PTY session not found: ${ctx.params.ptyID}`,
}),
),
),
)
}),
)
.handle(
"pty.update",
Effect.fn(function* (ctx) {
const pty = yield* Pty.Service
return yield* response(
pty
.update(ctx.params.ptyID, {
...ctx.payload,
size: ctx.payload.size ? { ...ctx.payload.size } : undefined,
})
.pipe(
Effect.catchTag(
"Pty.NotFoundError",
() =>
new PtyNotFoundError({
ptyID: ctx.params.ptyID,
message: `PTY session not found: ${ctx.params.ptyID}`,
}),
),
),
)
}),
)
.handle(
"pty.remove",
Effect.fn(function* (ctx) {
const pty = yield* Pty.Service
yield* pty
.remove(ctx.params.ptyID)
.pipe(
Effect.catchTag(
"Pty.NotFoundError",
() =>
new PtyNotFoundError({
ptyID: ctx.params.ptyID,
message: `PTY session not found: ${ctx.params.ptyID}`,
}),
),
)
return HttpApiSchema.NoContent.make()
}),
)
.handle(
"pty.connectToken",
Effect.fn(function* (ctx) {
const request = yield* HttpServerRequest.HttpServerRequest
// The custom header forces a CORS preflight, so cross-origin browser pages cannot
// mint tickets without passing the server's origin policy.
if (
request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE ||
!isAllowedRequestOrigin(request.headers.origin, request.headers.host, cors)
)
return yield* new ForbiddenError({ message: "Invalid PTY connect token request" })
const pty = yield* Pty.Service
yield* pty
.get(ctx.params.ptyID)
.pipe(
Effect.catchTag(
"Pty.NotFoundError",
() =>
new PtyNotFoundError({
ptyID: ctx.params.ptyID,
message: `PTY session not found: ${ctx.params.ptyID}`,
}),
),
)
return yield* response(tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* ticketScope) }))
}),
)
.handleRaw(
"pty.connect",
Effect.fn("PtyHandler.connect")(function* (ctx) {
const pty = yield* Pty.Service
const exists = yield* pty.get(ctx.params.ptyID).pipe(
Effect.as(true),
Effect.catchTag("Pty.NotFoundError", () => Effect.succeed(false)),
)
if (!exists) return HttpServerResponse.empty({ status: 404 })
const url = new URL(ctx.request.url, "http://localhost")
const ticket = url.searchParams.get(PTY_CONNECT_TICKET_QUERY)
if (ticket) {
const valid = isAllowedRequestOrigin(ctx.request.headers.origin, ctx.request.headers.host, cors)
? yield* tickets.consume({ ticket, ptyID: ctx.params.ptyID, ...(yield* ticketScope) })
: false
if (!valid) return HttpServerResponse.empty({ status: 403 })
}
const parsedCursor = url.searchParams.get("cursor")
const cursorNumber = parsedCursor === null ? undefined : Number(parsedCursor)
const cursor =
cursorNumber !== undefined && Number.isSafeInteger(cursorNumber) && cursorNumber >= -1
? cursorNumber
: undefined
const socket = yield* Effect.orDie(ctx.request.upgrade)
const write = yield* socket.writer
const closeAccepted = (event: Socket.CloseEvent) =>
socket
.runRaw(() => Effect.void, { onOpen: write(event).pipe(Effect.catch(() => Effect.void)) })
.pipe(
Effect.timeout("1 second"),
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.catch(() => Effect.void),
)
// Outbound frames flow through one queue drained by a single writer so replay, live
// output, and the close frame keep their order.
// TODO: Integrate graceful-shutdown socket tracking before clients migrate to this route.
const outbox = yield* Queue.unbounded<string | Uint8Array | Socket.CloseEvent>()
const attachment = yield* pty
.attach(ctx.params.ptyID, {
cursor,
onData: (chunk) => Queue.offerUnsafe(outbox, chunk),
onEnd: () => Queue.offerUnsafe(outbox, new Socket.CloseEvent(1000)),
})
.pipe(
Effect.catchTags({
"Pty.NotFoundError": () =>
closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)),
"Pty.ExitedError": () =>
closeAccepted(new Socket.CloseEvent(4404, "session exited")).pipe(Effect.as(undefined)),
}),
)
if (!attachment) return HttpServerResponse.empty()
for (const chunk of PtyProtocol.chunks(attachment.replay)) Queue.offerUnsafe(outbox, chunk)
Queue.offerUnsafe(outbox, PtyProtocol.metaFrame(attachment.cursor))
attachment.activate()
const drain = Effect.gen(function* () {
while (true) {
const item = yield* Queue.take(outbox)
yield* write(item)
if (item instanceof Socket.CloseEvent) return
}
})
yield* Effect.race(
drain,
socket.runRaw((message) => {
const decoded = PtyProtocol.decodeInput(message)
if (decoded !== undefined) attachment.write(decoded)
}),
).pipe(
Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
Effect.ensuring(Effect.sync(() => attachment.detach())),
Effect.orDie,
)
return HttpServerResponse.empty()
}),
)
}),
)

View File

@ -1,5 +1,6 @@
import { ServerAuth } from "../auth"
import { UnauthorizedError } from "../errors"
import { hasPtyConnectTicketURL } from "../groups/pty"
import { Effect, Encoding, Layer, Redacted } from "effect"
import { HttpEffect, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
import { HttpApiMiddleware } from "effect/unstable/httpapi"
@ -45,6 +46,9 @@ export const authorizationLayer = Layer.effect(
return Authorization.of((effect) =>
Effect.gen(function* () {
const request = yield* HttpServerRequest.HttpServerRequest
// Browsers cannot set headers on WebSocket upgrades, so a ticketed PTY connect skips
// credential checks here; the connect handler consumes and validates the ticket.
if (hasPtyConnectTicketURL(new URL(request.url, "http://localhost"))) return yield* effect
const credential = yield* credentialFromRequest(request)
if (ServerAuth.authorized(credential, config)) return yield* effect
yield* HttpEffect.appendPreResponseHandler((_request, response) =>