diff --git a/packages/core/src/event.ts b/packages/core/src/event.ts index 32aaeae69..dedafbf03 100644 --- a/packages/core/src/event.ts +++ b/packages/core/src/event.ts @@ -30,6 +30,15 @@ export type Definition = Schema.Schema.Type +export const Payload = Schema.Struct({ + id: ID, + metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)), + type: Schema.String, + durable: Schema.optional(Schema.Struct({ aggregateID: Schema.String, seq: Schema.Int, version: Schema.Int })), + location: Schema.optional(Location.Ref), + data: Schema.Unknown, +}) + export type Payload = { readonly id: ID readonly type: D["type"] @@ -78,16 +87,13 @@ export function define>>> & Definition> { const Data = Schema.Struct(input.schema) - const Payload = Schema.Struct({ - id: ID, - metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)), + const Event = Schema.Struct({ + ...Payload.fields, type: Schema.Literal(input.type), - durable: Schema.optional(Schema.Struct({ aggregateID: Schema.String, seq: Schema.Number, version: Schema.Number })), - location: Schema.optional(Location.Ref), data: Data, }).annotate({ identifier: input.type }) - const definition = Object.assign(Payload, { + const definition = Object.assign(Event, { type: input.type, ...(input.durable === undefined ? {} : { durable: input.durable }), data: Data, diff --git a/packages/opencode/src/server/routes/instance/httpapi/public.ts b/packages/opencode/src/server/routes/instance/httpapi/public.ts index 8517da276..2a7266c51 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/public.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/public.ts @@ -152,7 +152,7 @@ function matchLegacyOpenApi(input: Record) { normalizeLegacyErrorResponses(operation) } normalizeLegacyOperation(operation, path, method) - if ((path === "/event" || path === "/global/event") && method === "get") { + if ((path === "/event" || path === "/global/event" || path === "/api/event") && method === "get") { // HttpApi has no first-class SSE response schema, and these handlers are // raw/streaming routes. Document the actual wire protocol explicitly. operation.responses!["200"] = { @@ -162,7 +162,9 @@ function matchLegacyOpenApi(input: Record) { schema: path === "/event" ? { $ref: "#/components/schemas/Event" } - : { $ref: "#/components/schemas/GlobalEvent" }, + : path === "/global/event" + ? { $ref: "#/components/schemas/GlobalEvent" } + : { $ref: "#/components/schemas/V2Event" }, }, }, } diff --git a/packages/opencode/test/server/httpapi-exercise/index.ts b/packages/opencode/test/server/httpapi-exercise/index.ts index 5febf9cb2..728810e6e 100644 --- a/packages/opencode/test/server/httpapi-exercise/index.ts +++ b/packages/opencode/test/server/httpapi-exercise/index.ts @@ -734,11 +734,11 @@ const scenarios: Scenario[] = [ .stream() .status( 200, - (ctx, result) => + (_ctx, result) => Effect.sync(() => { check(result.contentType.includes("text/event-stream"), "v2 event should be an SSE stream") check(result.text.includes("server.connected"), "v2 event should emit initial connection event") - check(!!ctx.directory && result.text.includes(ctx.directory), "v2 event should include the resolved location") + check(!result.text.includes('"location"'), "v2 connection event should not be scoped to a location") }), "status", ), diff --git a/packages/opencode/test/server/httpapi-v2-location.test.ts b/packages/opencode/test/server/httpapi-v2-location.test.ts index 9282db32c..503ecb78e 100644 --- a/packages/opencode/test/server/httpapi-v2-location.test.ts +++ b/packages/opencode/test/server/httpapi-v2-location.test.ts @@ -21,10 +21,12 @@ function request(route: string, directory: string, init: RequestInit = {}) { const Event = Schema.Struct({ id: Schema.String, type: Schema.String, - location: Schema.Struct({ - directory: Schema.String, - project: Schema.Struct({ id: Schema.String, directory: Schema.String }), - }), + location: Schema.optional( + Schema.Struct({ + directory: Schema.String, + project: Schema.Struct({ id: Schema.String, directory: Schema.String }), + }), + ), data: Schema.Unknown, }) @@ -64,17 +66,20 @@ describe("v2 location HttpApi", () => { } }) - test("streams native EventV2 payloads with resolved locations", async () => { - await using tmp = await tmpdir({ git: true }) - const response = await request("/api/event", tmp.path) + test("streams native EventV2 payloads across locations", async () => { + await using subscriber = await tmpdir({ git: true }) + await using publisher = await tmpdir({ git: true }) + const response = await request("/api/event", subscriber.path) const reader = response.body!.getReader() - expect((await readEvent(reader)).type).toBe("server.connected") + const connected = await readEvent(reader) + expect(connected.type).toBe("server.connected") + expect(connected.location).toBeUndefined() - const created = await request("/session", tmp.path, { method: "POST" }) + const created = await request("/session", publisher.path, { method: "POST" }) expect(created.status).toBe(200) expect(await readEventType(reader, "session.created")).toMatchObject({ type: "session.created", - location: { directory: tmp.path, project: { directory: tmp.path } }, + location: { directory: publisher.path, project: { directory: publisher.path } }, data: { sessionID: expect.any(String) }, }) await reader.cancel() diff --git a/packages/sdk/js/src/v2/gen/sdk.gen.ts b/packages/sdk/js/src/v2/gen/sdk.gen.ts index 7bf19806e..e6bec85f9 100644 --- a/packages/sdk/js/src/v2/gen/sdk.gen.ts +++ b/packages/sdk/js/src/v2/gen/sdk.gen.ts @@ -6243,22 +6243,12 @@ export class Event2 extends HeyApiClient { /** * Subscribe to events * - * Subscribe to native event payloads for a location. + * Subscribe to native event payloads for the server. */ - public subscribe( - parameters?: { - location?: { - directory?: string - workspace?: string - } - }, - options?: Options, - ) { - const params = buildClientParams([parameters], [{ args: [{ in: "query", key: "location" }] }]) + public subscribe(options?: Options) { return (options?.client ?? this.client).sse.get({ url: "/api/event", ...options, - ...params, }) } } diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index d2c9e2989..abb9668f6 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -2764,6 +2764,94 @@ export type ProviderNotFoundError = { message: string } +export type V2Event = + | V2EventModelsDevRefreshed + | V2EventIntegrationUpdated + | V2EventCatalogUpdated + | V2EventSessionCreated + | V2EventSessionUpdated + | V2EventSessionDeleted + | V2EventMessageUpdated + | V2EventMessageRemoved + | V2EventMessagePartUpdated + | V2EventMessagePartRemoved + | V2EventSessionNextAgentSwitched + | V2EventSessionNextModelSwitched + | V2EventSessionNextMoved + | V2EventSessionNextPrompted + | V2EventSessionNextPromptAdmitted + | V2EventSessionNextPromptPromoted + | V2EventSessionNextInterruptRequested + | V2EventSessionNextContextUpdated + | V2EventSessionNextSynthetic + | V2EventSessionNextShellStarted + | V2EventSessionNextShellEnded + | V2EventSessionNextStepStarted + | V2EventSessionNextStepEnded + | V2EventSessionNextStepFailed + | V2EventSessionNextTextStarted + | V2EventSessionNextTextDelta + | V2EventSessionNextTextEnded + | V2EventSessionNextReasoningStarted + | V2EventSessionNextReasoningDelta + | V2EventSessionNextReasoningEnded + | V2EventSessionNextToolInputStarted + | V2EventSessionNextToolInputDelta + | V2EventSessionNextToolInputEnded + | V2EventSessionNextToolCalled + | V2EventSessionNextToolProgress + | V2EventSessionNextToolSuccess + | V2EventSessionNextToolFailed + | V2EventSessionNextRetried + | V2EventSessionNextCompactionStarted + | V2EventSessionNextCompactionDelta + | V2EventSessionNextCompactionEnded + | V2EventMessagePartDelta + | V2EventSessionDiff + | V2EventSessionError + | V2EventInstallationUpdated + | V2EventInstallationUpdateAvailable + | V2EventFileEdited + | V2EventPluginAdded + | V2EventPermissionV2Asked + | V2EventPermissionV2Replied + | V2EventReferenceUpdated + | V2EventProjectDirectoriesUpdated + | V2EventFileWatcherUpdated + | V2EventPtyCreated + | V2EventPtyUpdated + | V2EventPtyExited + | V2EventPtyDeleted + | V2EventQuestionV2Asked + | V2EventQuestionV2Replied + | V2EventQuestionV2Rejected + | V2EventTodoUpdated + | V2EventLspUpdated + | V2EventPermissionAsked + | V2EventPermissionReplied + | V2EventTuiPromptAppend + | V2EventTuiCommandExecute + | V2EventTuiToastShow + | V2EventTuiSessionSelect + | V2EventMcpToolsChanged + | V2EventMcpBrowserOpenFailed + | V2EventCommandExecuted + | V2EventProjectUpdated + | V2EventSessionStatus + | V2EventSessionIdle + | V2EventQuestionAsked + | V2EventQuestionReplied + | V2EventQuestionRejected + | V2EventSessionCompacted + | V2EventVcsBranchUpdated + | V2EventWorkspaceReady + | V2EventWorkspaceFailed + | V2EventWorkspaceStatus + | V2EventWorktreeReady + | V2EventWorktreeFailed + | V2EventServerConnected + | V2EventGlobalDisposed + export type ForbiddenError = { _tag: "ForbiddenError" message: string @@ -4176,6 +4264,1760 @@ export type SkillV2Info = { content: string } +export type V2EventModelsDevRefreshed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "models-dev.refreshed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + +export type V2EventIntegrationUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "integration.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + +export type V2EventCatalogUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "catalog.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + +export type V2EventSessionCreated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.created" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + info: Session + } +} + +export type V2EventSessionUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + info: Session + } +} + +export type V2EventSessionDeleted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.deleted" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + info: Session + } +} + +export type V2EventMessageUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "message.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + info: Message + } +} + +export type V2EventMessageRemoved = { + id: string + metadata?: { + [key: string]: unknown + } + type: "message.removed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + messageID: string + } +} + +export type V2EventMessagePartUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "message.part.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + part: Part + time: number + } +} + +export type V2EventMessagePartRemoved = { + id: string + metadata?: { + [key: string]: unknown + } + type: "message.part.removed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + messageID: string + partID: string + } +} + +export type V2EventSessionNextAgentSwitched = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.agent.switched" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + agent: string + } +} + +export type V2EventSessionNextModelSwitched = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.model.switched" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + model: { + id: string + providerID: string + variant?: string + } + } +} + +export type V2EventSessionNextMoved = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.moved" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + location: LocationRef + subdirectory?: string + } +} + +export type V2EventSessionNextPrompted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.prompted" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + delivery: "steer" | "queue" + } +} + +export type V2EventSessionNextPromptAdmitted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.prompt.admitted" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + delivery: "steer" | "queue" + } +} + +export type V2EventSessionNextPromptPromoted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.prompt.promoted" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + timeCreated: number + } +} + +export type V2EventSessionNextInterruptRequested = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.interrupt.requested" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + } +} + +export type V2EventSessionNextContextUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.context.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + text: string + } +} + +export type V2EventSessionNextSynthetic = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.synthetic" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + text: string + } +} + +export type V2EventSessionNextShellStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.shell.started" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + callID: string + command: string + } +} + +export type V2EventSessionNextShellEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.shell.ended" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + callID: string + output: string + } +} + +export type V2EventSessionNextStepStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.step.started" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + agent: string + model: { + id: string + providerID: string + variant?: string + } + snapshot?: string + } +} + +export type V2EventSessionNextStepEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.step.ended" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + finish: string + cost: number + tokens: { + input: number + output: number + reasoning: number + cache: { + read: number + write: number + } + } + snapshot?: string + } +} + +export type V2EventSessionNextStepFailed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.step.failed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + error: SessionErrorUnknown + } +} + +export type V2EventSessionNextTextStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.text.started" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + textID: string + } +} + +export type V2EventSessionNextTextDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.text.delta" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + textID: string + delta: string + } +} + +export type V2EventSessionNextTextEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.text.ended" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + textID: string + text: string + } +} + +export type V2EventSessionNextReasoningStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.reasoning.started" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + reasoningID: string + providerMetadata?: { + [key: string]: { + [key: string]: unknown + } + } + } +} + +export type V2EventSessionNextReasoningDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.reasoning.delta" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + reasoningID: string + delta: string + } +} + +export type V2EventSessionNextReasoningEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.reasoning.ended" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + reasoningID: string + text: string + providerMetadata?: { + [key: string]: { + [key: string]: unknown + } + } + } +} + +export type V2EventSessionNextToolInputStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.input.started" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + name: string + } +} + +export type V2EventSessionNextToolInputDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.input.delta" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + delta: string + } +} + +export type V2EventSessionNextToolInputEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.input.ended" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + text: string + } +} + +export type V2EventSessionNextToolCalled = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.called" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + tool: string + input: { + [key: string]: unknown + } + provider: { + executed: boolean + metadata?: { + [key: string]: { + [key: string]: unknown + } + } + } + } +} + +export type V2EventSessionNextToolProgress = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.progress" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + structured: { + [key: string]: unknown + } + content: Array + } +} + +export type V2EventSessionNextToolSuccess = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.success" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + structured: { + [key: string]: unknown + } + content: Array + outputPaths?: Array + result?: unknown + provider: { + executed: boolean + metadata?: { + [key: string]: { + [key: string]: unknown + } + } + } + } +} + +export type V2EventSessionNextToolFailed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.tool.failed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + assistantMessageID: string + callID: string + error: SessionErrorUnknown + result?: unknown + provider: { + executed: boolean + metadata?: { + [key: string]: { + [key: string]: unknown + } + } + } + } +} + +export type V2EventSessionNextRetried = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.retried" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + attempt: number + error: SessionNextRetryError + } +} + +export type V2EventSessionNextCompactionStarted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.compaction.started" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + reason: "auto" | "manual" + } +} + +export type V2EventSessionNextCompactionDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.compaction.delta" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + text: string + } +} + +export type V2EventSessionNextCompactionEnded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.next.compaction.ended" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + timestamp: number + sessionID: string + messageID: string + reason: "auto" | "manual" + text: string + recent: string + } +} + +export type V2EventMessagePartDelta = { + id: string + metadata?: { + [key: string]: unknown + } + type: "message.part.delta" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + messageID: string + partID: string + field: string + delta: string + } +} + +export type V2EventSessionDiff = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.diff" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + diff: Array + } +} + +export type V2EventSessionError = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.error" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID?: string + error?: + | ProviderAuthError + | UnknownError + | MessageOutputLengthError + | MessageAbortedError + | StructuredOutputError + | ContextOverflowError + | ContentFilterError + | ApiError + } +} + +export type V2EventInstallationUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "installation.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + version: string + } +} + +export type V2EventInstallationUpdateAvailable = { + id: string + metadata?: { + [key: string]: unknown + } + type: "installation.update-available" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + version: string + } +} + +export type V2EventFileEdited = { + id: string + metadata?: { + [key: string]: unknown + } + type: "file.edited" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + file: string + } +} + +export type V2EventPluginAdded = { + id: string + metadata?: { + [key: string]: unknown + } + type: "plugin.added" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + } +} + +export type V2EventPermissionV2Asked = { + id: string + metadata?: { + [key: string]: unknown + } + type: "permission.v2.asked" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + sessionID: string + action: string + resources: Array + save?: Array + metadata?: { + [key: string]: unknown + } + source?: PermissionV2Source + } +} + +export type V2EventPermissionV2Replied = { + id: string + metadata?: { + [key: string]: unknown + } + type: "permission.v2.replied" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + requestID: string + reply: PermissionV2Reply + } +} + +export type V2EventReferenceUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "reference.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + +export type V2EventProjectDirectoriesUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "project.directories.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + projectID: string + } +} + +export type V2EventFileWatcherUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "file.watcher.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + file: string + event: "add" | "change" | "unlink" + } +} + +export type V2EventPtyCreated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "pty.created" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + info: Pty + } +} + +export type V2EventPtyUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "pty.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + info: Pty + } +} + +export type V2EventPtyExited = { + id: string + metadata?: { + [key: string]: unknown + } + type: "pty.exited" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + exitCode: number + } +} + +export type V2EventPtyDeleted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "pty.deleted" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + } +} + +export type V2EventQuestionV2Asked = { + id: string + metadata?: { + [key: string]: unknown + } + type: "question.v2.asked" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + sessionID: string + /** + * Questions to ask + */ + questions: Array + tool?: QuestionV2Tool + } +} + +export type V2EventQuestionV2Replied = { + id: string + metadata?: { + [key: string]: unknown + } + type: "question.v2.replied" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + requestID: string + answers: Array + } +} + +export type V2EventQuestionV2Rejected = { + id: string + metadata?: { + [key: string]: unknown + } + type: "question.v2.rejected" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + requestID: string + } +} + +export type V2EventTodoUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "todo.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + todos: Array + } +} + +export type V2EventLspUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "lsp.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + +export type V2EventPermissionAsked = { + id: string + metadata?: { + [key: string]: unknown + } + type: "permission.asked" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + sessionID: string + permission: string + patterns: Array + metadata: { + [key: string]: unknown + } + always: Array + tool?: { + messageID: string + callID: string + } + } +} + +export type V2EventPermissionReplied = { + id: string + metadata?: { + [key: string]: unknown + } + type: "permission.replied" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + requestID: string + reply: "once" | "always" | "reject" + } +} + +export type V2EventTuiPromptAppend = { + id: string + metadata?: { + [key: string]: unknown + } + type: "tui.prompt.append" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + text: string + } +} + +export type V2EventTuiCommandExecute = { + id: string + metadata?: { + [key: string]: unknown + } + type: "tui.command.execute" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + command: + | "session.list" + | "session.new" + | "session.share" + | "session.interrupt" + | "session.compact" + | "session.page.up" + | "session.page.down" + | "session.line.up" + | "session.line.down" + | "session.half.page.up" + | "session.half.page.down" + | "session.first" + | "session.last" + | "prompt.clear" + | "prompt.submit" + | "agent.cycle" + | string + } +} + +export type V2EventTuiToastShow = { + id: string + metadata?: { + [key: string]: unknown + } + type: "tui.toast.show" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + title?: string + message: string + variant: "info" | "success" | "warning" | "error" + duration?: number + } +} + +export type V2EventTuiSessionSelect = { + id: string + metadata?: { + [key: string]: unknown + } + type: "tui.session.select" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + /** + * Session ID to navigate to + */ + sessionID: string + } +} + +export type V2EventMcpToolsChanged = { + id: string + metadata?: { + [key: string]: unknown + } + type: "mcp.tools.changed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + server: string + } +} + +export type V2EventMcpBrowserOpenFailed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "mcp.browser.open.failed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + mcpName: string + url: string + } +} + +export type V2EventCommandExecuted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "command.executed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + name: string + sessionID: string + arguments: string + messageID: string + } +} + +export type V2EventProjectUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "project.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + worktree: string + vcs?: "git" + name?: string + icon?: { + url?: string + override?: string + color?: string + } + commands?: { + /** + * Startup script to run when creating a new workspace (worktree) + */ + start?: string + } + time: { + created: number + updated: number + initialized?: number + } + sandboxes: Array + } +} + +export type V2EventSessionStatus = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.status" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + status: SessionStatus + } +} + +export type V2EventSessionIdle = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.idle" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + } +} + +export type V2EventQuestionAsked = { + id: string + metadata?: { + [key: string]: unknown + } + type: "question.asked" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + id: string + sessionID: string + /** + * Questions to ask + */ + questions: Array + tool?: QuestionTool + } +} + +export type V2EventQuestionReplied = { + id: string + metadata?: { + [key: string]: unknown + } + type: "question.replied" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + requestID: string + answers: Array + } +} + +export type V2EventQuestionRejected = { + id: string + metadata?: { + [key: string]: unknown + } + type: "question.rejected" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + requestID: string + } +} + +export type V2EventSessionCompacted = { + id: string + metadata?: { + [key: string]: unknown + } + type: "session.compacted" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + sessionID: string + } +} + +export type V2EventVcsBranchUpdated = { + id: string + metadata?: { + [key: string]: unknown + } + type: "vcs.branch.updated" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + branch?: string + } +} + +export type V2EventWorkspaceReady = { + id: string + metadata?: { + [key: string]: unknown + } + type: "workspace.ready" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + name: string + } +} + +export type V2EventWorkspaceFailed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "workspace.failed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + message: string + } +} + +export type V2EventWorkspaceStatus = { + id: string + metadata?: { + [key: string]: unknown + } + type: "workspace.status" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + workspaceID: string + status: "connected" | "connecting" | "disconnected" | "error" + } +} + +export type V2EventWorktreeReady = { + id: string + metadata?: { + [key: string]: unknown + } + type: "worktree.ready" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + name: string + branch?: string + } +} + +export type V2EventWorktreeFailed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "worktree.failed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + message: string + } +} + +export type V2EventServerConnected = { + id: string + metadata?: { + [key: string]: unknown + } + type: "server.connected" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + +export type V2EventGlobalDisposed = { + id: string + metadata?: { + [key: string]: unknown + } + type: "global.disposed" + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + data: { + [key: string]: unknown + } +} + export type QuestionV2Request = { id: string sessionID: string @@ -10687,12 +12529,7 @@ export type V2SkillListResponse = V2SkillListResponses[keyof V2SkillListResponse export type V2EventSubscribeData = { body?: never path?: never - query?: { - location?: { - directory?: string - workspace?: string - } - } + query?: never url: "/api/event" } @@ -10711,9 +12548,9 @@ export type V2EventSubscribeError = V2EventSubscribeErrors[keyof V2EventSubscrib export type V2EventSubscribeResponses = { /** - * Success + * Event stream */ - 200: string + 200: V2Event } export type V2EventSubscribeResponse = V2EventSubscribeResponses[keyof V2EventSubscribeResponses] diff --git a/packages/server/src/groups/event.ts b/packages/server/src/groups/event.ts index 83ccdf98f..a7f912547 100644 --- a/packages/server/src/groups/event.ts +++ b/packages/server/src/groups/event.ts @@ -1,34 +1,34 @@ import { EventV2 } from "@opencode-ai/core/event" -import { Location } from "@opencode-ai/core/location" import { Schema } from "effect" -import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi" -import { LocationQuery, locationQueryOpenApi, LocationMiddleware } from "./location" +import { HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi" -const Event = Schema.Struct({ - id: EventV2.ID, - type: Schema.String, - location: Location.Info.pipe(Schema.optional), - metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional), - version: Schema.Number.pipe(Schema.optional), - data: Schema.Unknown, -}) +const Event = Schema.Union([ + ...EventV2.definitions().map((definition) => + Schema.Struct({ + ...EventV2.Payload.fields, + type: Schema.Literal(definition.type), + data: definition.data, + }).annotate({ identifier: `V2Event.${definition.type}` }), + ), + Schema.Struct({ + ...EventV2.Payload.fields, + type: Schema.Literal("server.connected"), + data: Schema.Struct({}), + }).annotate({ identifier: "V2Event.server.connected" }), +]).annotate({ identifier: "V2Event" }) export const EventGroup = HttpApiGroup.make("server.event") .add( HttpApiEndpoint.get("event.subscribe", "/api/event", { - query: LocationQuery, - success: Schema.String.pipe(HttpApiSchema.asText({ contentType: "text/event-stream" })), - }) - .annotateMerge(locationQueryOpenApi) - .annotateMerge( - OpenApi.annotations({ - identifier: "v2.event.subscribe", - summary: "Subscribe to events", - description: "Subscribe to native event payloads for a location.", - }), - ), + success: Event, + }).annotateMerge( + OpenApi.annotations({ + identifier: "v2.event.subscribe", + summary: "Subscribe to events", + description: "Subscribe to native event payloads for the server.", + }), + ), ) .annotateMerge(OpenApi.annotations({ title: "events", description: "Experimental event stream route." })) - .middleware(LocationMiddleware) export type Event = typeof Event.Type diff --git a/packages/server/src/handlers/event.ts b/packages/server/src/handlers/event.ts index 65ec78a7c..8001fb874 100644 --- a/packages/server/src/handlers/event.ts +++ b/packages/server/src/handlers/event.ts @@ -1,5 +1,4 @@ import { EventV2 } from "@opencode-ai/core/event" -import { Location } from "@opencode-ai/core/location" import { Effect, Stream } from "effect" import { HttpServerResponse } from "effect/unstable/http" import { HttpApiBuilder } from "effect/unstable/httpapi" @@ -20,30 +19,14 @@ export const EventHandler = HttpApiBuilder.group(Api, "server.event", (handlers) const events = yield* EventV2.Service return handlers.handleRaw("event.subscribe", () => Effect.gen(function* () { - const location = yield* Location.Service const connected = { id: EventV2.ID.create(), type: "server.connected", - location: new Location.Info({ - directory: location.directory, - workspaceID: location.workspaceID, - project: location.project, - }), data: {}, } return HttpServerResponse.stream( Stream.make(connected).pipe( - Stream.concat( - events - .all() - .pipe( - Stream.filter( - (event) => - event.location?.directory === location.directory && - event.location.workspaceID === location.workspaceID, - ), - ), - ), + Stream.concat(events.all()), Stream.map(eventData), Stream.pipeThroughChannel(Sse.encode()), Stream.encodeText, diff --git a/packages/tui/src/context/data.tsx b/packages/tui/src/context/data.tsx index bea6e001a..184837c54 100644 --- a/packages/tui/src/context/data.tsx +++ b/packages/tui/src/context/data.tsx @@ -1,9 +1,7 @@ -import { useEvent } from "./event" import type { AgentV2Info, CommandV2Info, IntegrationInfo, - Event, LocationRef, ModelV2Info, PermissionSavedInfo, @@ -18,11 +16,12 @@ import type { SessionMessageAssistantTool, SessionV2Info, SkillV2Info, + V2Event, } from "@opencode-ai/sdk/v2" import { createStore, produce } from "solid-js/store" import { createSimpleContext } from "./helper" import { useSDK } from "./sdk" -import { createSignal, onMount } from "solid-js" +import { createSignal, onCleanup, onMount } from "solid-js" type LocationData = { agent?: AgentV2Info[] @@ -71,7 +70,6 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ location: {}, }) - const event = useEvent() const sdk = useSDK() const [defaultLocation, setDefaultLocation] = createSignal({ directory: sdk.directory ?? process.cwd(), @@ -121,7 +119,7 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ }, } - event.subscribe((event, metadata) => { + function handleEvent(event: V2Event, metadata: { directory: string; workspace: string | undefined }) { switch (event.type) { case "catalog.updated": void Promise.all([ @@ -130,34 +128,34 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ ]) break case "session.next.agent.switched": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "agent-switched", - agent: event.properties.agent, - time: { created: event.properties.timestamp }, + agent: event.data.agent, + time: { created: event.data.timestamp }, }) }) break case "session.next.model.switched": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "model-switched", - model: event.properties.model, - time: { created: event.properties.timestamp }, + model: event.data.model, + time: { created: event.data.timestamp }, }) }) break case "session.next.prompted": { - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "user", - text: event.properties.prompt.text, - files: event.properties.prompt.files, - agents: event.properties.prompt.agents, - time: { created: event.properties.timestamp }, + text: event.data.prompt.text, + files: event.data.prompt.files, + agents: event.data.prompt.agents, + time: { created: event.data.timestamp }, }) }) break @@ -165,248 +163,248 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ case "session.next.prompt.admitted": break case "session.next.prompt.promoted": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "user", - text: event.properties.prompt.text, - files: event.properties.prompt.files, - agents: event.properties.prompt.agents, - time: { created: event.properties.timeCreated }, + text: event.data.prompt.text, + files: event.data.prompt.files, + agents: event.data.prompt.agents, + time: { created: event.data.timeCreated }, }) }) break case "session.next.context.updated": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "system", - text: event.properties.text, - time: { created: event.properties.timestamp }, + text: event.data.text, + time: { created: event.data.timestamp }, }) }) break case "session.next.synthetic": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "synthetic", - sessionID: event.properties.sessionID, - text: event.properties.text, - time: { created: event.properties.timestamp }, + sessionID: event.data.sessionID, + text: event.data.text, + time: { created: event.data.timestamp }, }) }) break case "session.next.shell.started": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "shell", - callID: event.properties.callID, - command: event.properties.command, + callID: event.data.callID, + command: event.data.command, output: "", - time: { created: event.properties.timestamp }, + time: { created: event.data.timestamp }, }) }) break case "session.next.shell.ended": - message.update(event.properties.sessionID, (draft) => { - const match = message.activeShell(draft, event.properties.callID) + message.update(event.data.sessionID, (draft) => { + const match = message.activeShell(draft, event.data.callID) if (!match) return - match.output = event.properties.output - match.time.completed = event.properties.timestamp + match.output = event.data.output + match.time.completed = event.data.timestamp }) break case "session.next.step.started": - message.update(event.properties.sessionID, (draft) => { - if (draft.some((message) => message.id === event.properties.assistantMessageID)) return + message.update(event.data.sessionID, (draft) => { + if (draft.some((message) => message.id === event.data.assistantMessageID)) return const currentAssistant = message.activeAssistant(draft) - if (currentAssistant) currentAssistant.time.completed = event.properties.timestamp + if (currentAssistant) currentAssistant.time.completed = event.data.timestamp message.prepend(draft, { - id: event.properties.assistantMessageID, + id: event.data.assistantMessageID, type: "assistant", - agent: event.properties.agent, - model: event.properties.model, + agent: event.data.agent, + model: event.data.model, content: [], - snapshot: event.properties.snapshot ? { start: event.properties.snapshot } : undefined, - time: { created: event.properties.timestamp }, + snapshot: event.data.snapshot ? { start: event.data.snapshot } : undefined, + time: { created: event.data.timestamp }, }) }) break case "session.next.step.ended": - message.update(event.properties.sessionID, (draft) => { - const currentAssistant = message.assistant(draft, event.properties.assistantMessageID) + message.update(event.data.sessionID, (draft) => { + const currentAssistant = message.assistant(draft, event.data.assistantMessageID) if (!currentAssistant) return - currentAssistant.time.completed = event.properties.timestamp - currentAssistant.finish = event.properties.finish - currentAssistant.cost = event.properties.cost - currentAssistant.tokens = event.properties.tokens - if (event.properties.snapshot) - currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.properties.snapshot } + currentAssistant.time.completed = event.data.timestamp + currentAssistant.finish = event.data.finish + currentAssistant.cost = event.data.cost + currentAssistant.tokens = event.data.tokens + if (event.data.snapshot) + currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.data.snapshot } }) break case "session.next.step.failed": - message.update(event.properties.sessionID, (draft) => { - const currentAssistant = message.assistant(draft, event.properties.assistantMessageID) + message.update(event.data.sessionID, (draft) => { + const currentAssistant = message.assistant(draft, event.data.assistantMessageID) if (!currentAssistant) return - currentAssistant.time.completed = event.properties.timestamp + currentAssistant.time.completed = event.data.timestamp currentAssistant.finish = "error" - currentAssistant.error = event.properties.error + currentAssistant.error = event.data.error }) break case "session.next.text.started": - message.update(event.properties.sessionID, (draft) => { - message.assistant(draft, event.properties.assistantMessageID)?.content.push({ + message.update(event.data.sessionID, (draft) => { + message.assistant(draft, event.data.assistantMessageID)?.content.push({ type: "text", - id: event.properties.textID, + id: event.data.textID, text: "", }) }) break case "session.next.text.delta": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestText( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.textID, + message.assistant(draft, event.data.assistantMessageID), + event.data.textID, ) - if (match) match.text += event.properties.delta + if (match) match.text += event.data.delta }) break case "session.next.text.ended": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestText( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.textID, + message.assistant(draft, event.data.assistantMessageID), + event.data.textID, ) - if (match) match.text = event.properties.text + if (match) match.text = event.data.text }) break case "session.next.tool.input.started": - message.update(event.properties.sessionID, (draft) => { - message.assistant(draft, event.properties.assistantMessageID)?.content.push({ + message.update(event.data.sessionID, (draft) => { + message.assistant(draft, event.data.assistantMessageID)?.content.push({ type: "tool", - id: event.properties.callID, - name: event.properties.name, - time: { created: event.properties.timestamp }, + id: event.data.callID, + name: event.data.name, + time: { created: event.data.timestamp }, state: { status: "pending", input: "" }, }) }) break case "session.next.tool.input.delta": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestTool( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.callID, + message.assistant(draft, event.data.assistantMessageID), + event.data.callID, ) - if (match?.state.status === "pending") match.state.input += event.properties.delta + if (match?.state.status === "pending") match.state.input += event.data.delta }) break case "session.next.tool.input.ended": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestTool( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.callID, + message.assistant(draft, event.data.assistantMessageID), + event.data.callID, ) - if (match?.state.status === "pending") match.state.input = event.properties.text + if (match?.state.status === "pending") match.state.input = event.data.text }) break case "session.next.tool.called": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestTool( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.callID, + message.assistant(draft, event.data.assistantMessageID), + event.data.callID, ) if (!match) return - match.time.ran = event.properties.timestamp - match.provider = event.properties.provider - match.state = { status: "running", input: event.properties.input, structured: {}, content: [] } + match.time.ran = event.data.timestamp + match.provider = event.data.provider + match.state = { status: "running", input: event.data.input, structured: {}, content: [] } }) break case "session.next.tool.progress": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestTool( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.callID, + message.assistant(draft, event.data.assistantMessageID), + event.data.callID, ) if (match?.state.status !== "running") return - match.state.structured = event.properties.structured - match.state.content = [...event.properties.content] + match.state.structured = event.data.structured + match.state.content = [...event.data.content] }) break case "session.next.tool.success": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestTool( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.callID, + message.assistant(draft, event.data.assistantMessageID), + event.data.callID, ) if (match?.state.status !== "running") return match.state = { status: "completed", input: match.state.input, - structured: event.properties.structured, - content: [...event.properties.content], - result: event.properties.result, + structured: event.data.structured, + content: [...event.data.content], + result: event.data.result, } match.provider = { - executed: event.properties.provider.executed || match.provider?.executed === true, + executed: event.data.provider.executed || match.provider?.executed === true, metadata: match.provider?.metadata, - resultMetadata: event.properties.provider.metadata, + resultMetadata: event.data.provider.metadata, } - match.time.completed = event.properties.timestamp + match.time.completed = event.data.timestamp }) break case "session.next.tool.failed": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestTool( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.callID, + message.assistant(draft, event.data.assistantMessageID), + event.data.callID, ) if (!match || (match.state.status !== "pending" && match.state.status !== "running")) return match.state = { status: "error", - error: event.properties.error, + error: event.data.error, input: typeof match.state.input === "string" ? {} : match.state.input, structured: match.state.status === "running" ? match.state.structured : {}, content: match.state.status === "running" ? match.state.content : [], - result: event.properties.result, + result: event.data.result, } match.provider = { - executed: event.properties.provider.executed || match.provider?.executed === true, + executed: event.data.provider.executed || match.provider?.executed === true, metadata: match.provider?.metadata, - resultMetadata: event.properties.provider.metadata, + resultMetadata: event.data.provider.metadata, } - match.time.completed = event.properties.timestamp + match.time.completed = event.data.timestamp }) break case "session.next.reasoning.started": - message.update(event.properties.sessionID, (draft) => { - message.assistant(draft, event.properties.assistantMessageID)?.content.push({ + message.update(event.data.sessionID, (draft) => { + message.assistant(draft, event.data.assistantMessageID)?.content.push({ type: "reasoning", - id: event.properties.reasoningID, + id: event.data.reasoningID, text: "", - providerMetadata: event.properties.providerMetadata, + providerMetadata: event.data.providerMetadata, }) }) break case "session.next.reasoning.delta": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestReasoning( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.reasoningID, + message.assistant(draft, event.data.assistantMessageID), + event.data.reasoningID, ) - if (match) match.text += event.properties.delta + if (match) match.text += event.data.delta }) break case "session.next.reasoning.ended": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { const match = message.latestReasoning( - message.assistant(draft, event.properties.assistantMessageID), - event.properties.reasoningID, + message.assistant(draft, event.data.assistantMessageID), + event.data.reasoningID, ) if (match) { - match.text = event.properties.text - if (event.properties.providerMetadata !== undefined) - match.providerMetadata = event.properties.providerMetadata + match.text = event.data.text + if (event.data.providerMetadata !== undefined) + match.providerMetadata = event.data.providerMetadata } }) break @@ -415,14 +413,14 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ case "session.next.compaction.delta": break case "session.next.compaction.ended": - message.update(event.properties.sessionID, (draft) => { + message.update(event.data.sessionID, (draft) => { message.prepend(draft, { - id: event.properties.messageID, + id: event.data.messageID, type: "compaction", - reason: event.properties.reason, - summary: event.properties.text, - recent: event.properties.recent, - time: { created: event.properties.timestamp }, + reason: event.data.reason, + summary: event.data.text, + recent: event.data.recent, + time: { created: event.data.timestamp }, }) }) break @@ -437,6 +435,20 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ ]) break } + } + + onMount(() => { + const controller = new AbortController() + onCleanup(() => controller.abort()) + void (async () => { + const events = await sdk.client.v2.event.subscribe({ signal: controller.signal }) + for await (const event of events.stream) { + handleEvent(event, { + directory: event.location?.directory ?? defaultLocation().directory, + workspace: event.location?.workspaceID, + }) + } + })().catch(() => {}) }) const result = { diff --git a/packages/tui/test/cli/tui/data.test.tsx b/packages/tui/test/cli/tui/data.test.tsx index 92894620a..0d6ada4d1 100644 --- a/packages/tui/test/cli/tui/data.test.tsx +++ b/packages/tui/test/cli/tui/data.test.tsx @@ -26,6 +26,7 @@ function emitEvent(events: ReturnType, payload: Event) } test("refreshes resources into reactive getters", async () => { + const events = createEventSource() const location = { directory, project: { id: "proj_test", directory }, @@ -49,8 +50,7 @@ test("refreshes resources into reactive getters", async () => { data: [{ id: "build", request: { headers: {}, body: {} }, mode: "primary", hidden: false, permissions: [] }], }) return undefined - }) - const events = createEventSource() + }, events) let data!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { @@ -119,7 +119,7 @@ test("refreshes integrations after integration updates", async () => { }, ], }) - }) + }, events) let data!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { @@ -171,7 +171,7 @@ test("refreshes effective catalog data after catalog updates", async () => { requests.provider++ return json({ location: { directory, project: { id: "proj_test", directory } }, data: [] }) } - }) + }, events) const app = await testRender(() => ( @@ -205,7 +205,7 @@ test("refreshes references after updates", async () => { location: { directory, project: { id: "proj_test", directory } }, data: requests === 1 ? [] : [{ name: "docs", path: "/docs", source: { type: "local", path: "/docs" } }], }) - }) + }, events) let data!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { @@ -243,7 +243,7 @@ test("refreshes references after updates", async () => { test("settles pending tools when a live failure arrives", async () => { const events = createEventSource() - const calls = createFetch() + const calls = createFetch(undefined, events) let sync!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { @@ -372,7 +372,7 @@ test("settles pending tools when a live failure arrives", async () => { test("renders admitted prompts only after promotion", async () => { const events = createEventSource() - const calls = createFetch() + const calls = createFetch(undefined, events) let sync!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { @@ -436,7 +436,7 @@ test("renders admitted prompts only after promotion", async () => { test("renders a promoted prompt when admission was missed", async () => { const events = createEventSource() - const calls = createFetch() + const calls = createFetch(undefined, events) let sync!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { @@ -484,7 +484,7 @@ test("renders a promoted prompt when admission was missed", async () => { test("projects live context updates with their message ID", async () => { const events = createEventSource() - const calls = createFetch() + const calls = createFetch(undefined, events) let sync!: ReturnType let ready!: () => void const mounted = new Promise((resolve) => { diff --git a/packages/tui/test/fixture/tui-sdk.ts b/packages/tui/test/fixture/tui-sdk.ts index ed18b7acd..d1cf3c7df 100644 --- a/packages/tui/test/fixture/tui-sdk.ts +++ b/packages/tui/test/fixture/tui-sdk.ts @@ -17,6 +17,8 @@ export function eventSource(): EventSource { export function createEventSource() { let fn: ((event: GlobalEvent) => void) | undefined + let stream: ReadableStreamDefaultController | undefined + const pending: Uint8Array[] = [] return { source: { subscribe: async (handler: (event: GlobalEvent) => void) => { @@ -29,19 +31,44 @@ export function createEventSource() { emit(event: GlobalEvent) { if (!fn) throw new Error("event source not ready") fn(event) + if (!("properties" in event.payload)) return + const chunk = new TextEncoder().encode( + `data: ${JSON.stringify({ + ...event.payload, + location: { directory: event.directory, workspaceID: event.workspace }, + data: event.payload.properties, + })}\n\n`, + ) + if (stream) return stream.enqueue(chunk) + pending.push(chunk) + }, + response() { + return new Response( + new ReadableStream({ + start(controller) { + stream = controller + for (const chunk of pending.splice(0)) controller.enqueue(chunk) + }, + cancel() { + stream = undefined + }, + }), + { headers: { "content-type": "text/event-stream" } }, + ) }, } } export type FetchHandler = (url: URL) => Response | Promise | undefined -export function createFetch(override?: FetchHandler) { +export function createFetch(override?: FetchHandler, events?: ReturnType) { const session = [] as URL[] const fetch = (async (input: RequestInfo | URL) => { const url = new URL(input instanceof Request ? input.url : String(input)) if (url.pathname === "/session") session.push(url) const overridden = await override?.(url) if (overridden) return overridden + if (url.pathname === "/api/event" && events) return events.response() if ( [