feat(server): stream events across locations

This commit is contained in:
Dax Raad 2026-06-22 08:12:38 -04:00
parent 57ce1b9ca8
commit 595cc91ddc
11 changed files with 2088 additions and 226 deletions

View File

@ -30,6 +30,15 @@ export type Definition<Type extends string = string, DataSchema extends Schema.T
export type Data<D extends Definition> = Schema.Schema.Type<D["data"]>
export const Payload = Schema.Struct({
id: ID,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)),
type: Schema.String,
durable: Schema.optional(Schema.Struct({ aggregateID: Schema.String, seq: Schema.Int, version: Schema.Int })),
location: Schema.optional(Location.Ref),
data: Schema.Unknown,
})
export type Payload<D extends Definition = Definition> = {
readonly id: ID
readonly type: D["type"]
@ -78,16 +87,13 @@ export function define<const Type extends string, Fields extends Schema.Struct.F
readonly schema: Fields
}): Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> & Definition<Type, Schema.Struct<Fields>> {
const Data = Schema.Struct(input.schema)
const Payload = Schema.Struct({
id: ID,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.Unknown)),
const Event = Schema.Struct({
...Payload.fields,
type: Schema.Literal(input.type),
durable: Schema.optional(Schema.Struct({ aggregateID: Schema.String, seq: Schema.Number, version: Schema.Number })),
location: Schema.optional(Location.Ref),
data: Data,
}).annotate({ identifier: input.type })
const definition = Object.assign(Payload, {
const definition = Object.assign(Event, {
type: input.type,
...(input.durable === undefined ? {} : { durable: input.durable }),
data: Data,

View File

@ -152,7 +152,7 @@ function matchLegacyOpenApi(input: Record<string, unknown>) {
normalizeLegacyErrorResponses(operation)
}
normalizeLegacyOperation(operation, path, method)
if ((path === "/event" || path === "/global/event") && method === "get") {
if ((path === "/event" || path === "/global/event" || path === "/api/event") && method === "get") {
// HttpApi has no first-class SSE response schema, and these handlers are
// raw/streaming routes. Document the actual wire protocol explicitly.
operation.responses!["200"] = {
@ -162,7 +162,9 @@ function matchLegacyOpenApi(input: Record<string, unknown>) {
schema:
path === "/event"
? { $ref: "#/components/schemas/Event" }
: { $ref: "#/components/schemas/GlobalEvent" },
: path === "/global/event"
? { $ref: "#/components/schemas/GlobalEvent" }
: { $ref: "#/components/schemas/V2Event" },
},
},
}

View File

@ -734,11 +734,11 @@ const scenarios: Scenario[] = [
.stream()
.status(
200,
(ctx, result) =>
(_ctx, result) =>
Effect.sync(() => {
check(result.contentType.includes("text/event-stream"), "v2 event should be an SSE stream")
check(result.text.includes("server.connected"), "v2 event should emit initial connection event")
check(!!ctx.directory && result.text.includes(ctx.directory), "v2 event should include the resolved location")
check(!result.text.includes('"location"'), "v2 connection event should not be scoped to a location")
}),
"status",
),

View File

@ -21,10 +21,12 @@ function request(route: string, directory: string, init: RequestInit = {}) {
const Event = Schema.Struct({
id: Schema.String,
type: Schema.String,
location: Schema.Struct({
directory: Schema.String,
project: Schema.Struct({ id: Schema.String, directory: Schema.String }),
}),
location: Schema.optional(
Schema.Struct({
directory: Schema.String,
project: Schema.Struct({ id: Schema.String, directory: Schema.String }),
}),
),
data: Schema.Unknown,
})
@ -64,17 +66,20 @@ describe("v2 location HttpApi", () => {
}
})
test("streams native EventV2 payloads with resolved locations", async () => {
await using tmp = await tmpdir({ git: true })
const response = await request("/api/event", tmp.path)
test("streams native EventV2 payloads across locations", async () => {
await using subscriber = await tmpdir({ git: true })
await using publisher = await tmpdir({ git: true })
const response = await request("/api/event", subscriber.path)
const reader = response.body!.getReader()
expect((await readEvent(reader)).type).toBe("server.connected")
const connected = await readEvent(reader)
expect(connected.type).toBe("server.connected")
expect(connected.location).toBeUndefined()
const created = await request("/session", tmp.path, { method: "POST" })
const created = await request("/session", publisher.path, { method: "POST" })
expect(created.status).toBe(200)
expect(await readEventType(reader, "session.created")).toMatchObject({
type: "session.created",
location: { directory: tmp.path, project: { directory: tmp.path } },
location: { directory: publisher.path, project: { directory: publisher.path } },
data: { sessionID: expect.any(String) },
})
await reader.cancel()

View File

@ -6243,22 +6243,12 @@ export class Event2 extends HeyApiClient {
/**
* Subscribe to events
*
* Subscribe to native event payloads for a location.
* Subscribe to native event payloads for the server.
*/
public subscribe<ThrowOnError extends boolean = false>(
parameters?: {
location?: {
directory?: string
workspace?: string
}
},
options?: Options<never, ThrowOnError>,
) {
const params = buildClientParams([parameters], [{ args: [{ in: "query", key: "location" }] }])
public subscribe<ThrowOnError extends boolean = false>(options?: Options<never, ThrowOnError>) {
return (options?.client ?? this.client).sse.get<V2EventSubscribeResponses, V2EventSubscribeErrors, ThrowOnError>({
url: "/api/event",
...options,
...params,
})
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,34 +1,34 @@
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { Schema } from "effect"
import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
import { LocationQuery, locationQueryOpenApi, LocationMiddleware } from "./location"
import { HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
const Event = Schema.Struct({
id: EventV2.ID,
type: Schema.String,
location: Location.Info.pipe(Schema.optional),
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
version: Schema.Number.pipe(Schema.optional),
data: Schema.Unknown,
})
const Event = Schema.Union([
...EventV2.definitions().map((definition) =>
Schema.Struct({
...EventV2.Payload.fields,
type: Schema.Literal(definition.type),
data: definition.data,
}).annotate({ identifier: `V2Event.${definition.type}` }),
),
Schema.Struct({
...EventV2.Payload.fields,
type: Schema.Literal("server.connected"),
data: Schema.Struct({}),
}).annotate({ identifier: "V2Event.server.connected" }),
]).annotate({ identifier: "V2Event" })
export const EventGroup = HttpApiGroup.make("server.event")
.add(
HttpApiEndpoint.get("event.subscribe", "/api/event", {
query: LocationQuery,
success: Schema.String.pipe(HttpApiSchema.asText({ contentType: "text/event-stream" })),
})
.annotateMerge(locationQueryOpenApi)
.annotateMerge(
OpenApi.annotations({
identifier: "v2.event.subscribe",
summary: "Subscribe to events",
description: "Subscribe to native event payloads for a location.",
}),
),
success: Event,
}).annotateMerge(
OpenApi.annotations({
identifier: "v2.event.subscribe",
summary: "Subscribe to events",
description: "Subscribe to native event payloads for the server.",
}),
),
)
.annotateMerge(OpenApi.annotations({ title: "events", description: "Experimental event stream route." }))
.middleware(LocationMiddleware)
export type Event = typeof Event.Type

View File

@ -1,5 +1,4 @@
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { Effect, Stream } from "effect"
import { HttpServerResponse } from "effect/unstable/http"
import { HttpApiBuilder } from "effect/unstable/httpapi"
@ -20,30 +19,14 @@ export const EventHandler = HttpApiBuilder.group(Api, "server.event", (handlers)
const events = yield* EventV2.Service
return handlers.handleRaw("event.subscribe", () =>
Effect.gen(function* () {
const location = yield* Location.Service
const connected = {
id: EventV2.ID.create(),
type: "server.connected",
location: new Location.Info({
directory: location.directory,
workspaceID: location.workspaceID,
project: location.project,
}),
data: {},
}
return HttpServerResponse.stream(
Stream.make(connected).pipe(
Stream.concat(
events
.all()
.pipe(
Stream.filter(
(event) =>
event.location?.directory === location.directory &&
event.location.workspaceID === location.workspaceID,
),
),
),
Stream.concat(events.all()),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),
Stream.encodeText,

View File

@ -1,9 +1,7 @@
import { useEvent } from "./event"
import type {
AgentV2Info,
CommandV2Info,
IntegrationInfo,
Event,
LocationRef,
ModelV2Info,
PermissionSavedInfo,
@ -18,11 +16,12 @@ import type {
SessionMessageAssistantTool,
SessionV2Info,
SkillV2Info,
V2Event,
} from "@opencode-ai/sdk/v2"
import { createStore, produce } from "solid-js/store"
import { createSimpleContext } from "./helper"
import { useSDK } from "./sdk"
import { createSignal, onMount } from "solid-js"
import { createSignal, onCleanup, onMount } from "solid-js"
type LocationData = {
agent?: AgentV2Info[]
@ -71,7 +70,6 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({
location: {},
})
const event = useEvent()
const sdk = useSDK()
const [defaultLocation, setDefaultLocation] = createSignal<LocationRef>({
directory: sdk.directory ?? process.cwd(),
@ -121,7 +119,7 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({
},
}
event.subscribe((event, metadata) => {
function handleEvent(event: V2Event, metadata: { directory: string; workspace: string | undefined }) {
switch (event.type) {
case "catalog.updated":
void Promise.all([
@ -130,34 +128,34 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({
])
break
case "session.next.agent.switched":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "agent-switched",
agent: event.properties.agent,
time: { created: event.properties.timestamp },
agent: event.data.agent,
time: { created: event.data.timestamp },
})
})
break
case "session.next.model.switched":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "model-switched",
model: event.properties.model,
time: { created: event.properties.timestamp },
model: event.data.model,
time: { created: event.data.timestamp },
})
})
break
case "session.next.prompted": {
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "user",
text: event.properties.prompt.text,
files: event.properties.prompt.files,
agents: event.properties.prompt.agents,
time: { created: event.properties.timestamp },
text: event.data.prompt.text,
files: event.data.prompt.files,
agents: event.data.prompt.agents,
time: { created: event.data.timestamp },
})
})
break
@ -165,248 +163,248 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({
case "session.next.prompt.admitted":
break
case "session.next.prompt.promoted":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "user",
text: event.properties.prompt.text,
files: event.properties.prompt.files,
agents: event.properties.prompt.agents,
time: { created: event.properties.timeCreated },
text: event.data.prompt.text,
files: event.data.prompt.files,
agents: event.data.prompt.agents,
time: { created: event.data.timeCreated },
})
})
break
case "session.next.context.updated":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "system",
text: event.properties.text,
time: { created: event.properties.timestamp },
text: event.data.text,
time: { created: event.data.timestamp },
})
})
break
case "session.next.synthetic":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "synthetic",
sessionID: event.properties.sessionID,
text: event.properties.text,
time: { created: event.properties.timestamp },
sessionID: event.data.sessionID,
text: event.data.text,
time: { created: event.data.timestamp },
})
})
break
case "session.next.shell.started":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "shell",
callID: event.properties.callID,
command: event.properties.command,
callID: event.data.callID,
command: event.data.command,
output: "",
time: { created: event.properties.timestamp },
time: { created: event.data.timestamp },
})
})
break
case "session.next.shell.ended":
message.update(event.properties.sessionID, (draft) => {
const match = message.activeShell(draft, event.properties.callID)
message.update(event.data.sessionID, (draft) => {
const match = message.activeShell(draft, event.data.callID)
if (!match) return
match.output = event.properties.output
match.time.completed = event.properties.timestamp
match.output = event.data.output
match.time.completed = event.data.timestamp
})
break
case "session.next.step.started":
message.update(event.properties.sessionID, (draft) => {
if (draft.some((message) => message.id === event.properties.assistantMessageID)) return
message.update(event.data.sessionID, (draft) => {
if (draft.some((message) => message.id === event.data.assistantMessageID)) return
const currentAssistant = message.activeAssistant(draft)
if (currentAssistant) currentAssistant.time.completed = event.properties.timestamp
if (currentAssistant) currentAssistant.time.completed = event.data.timestamp
message.prepend(draft, {
id: event.properties.assistantMessageID,
id: event.data.assistantMessageID,
type: "assistant",
agent: event.properties.agent,
model: event.properties.model,
agent: event.data.agent,
model: event.data.model,
content: [],
snapshot: event.properties.snapshot ? { start: event.properties.snapshot } : undefined,
time: { created: event.properties.timestamp },
snapshot: event.data.snapshot ? { start: event.data.snapshot } : undefined,
time: { created: event.data.timestamp },
})
})
break
case "session.next.step.ended":
message.update(event.properties.sessionID, (draft) => {
const currentAssistant = message.assistant(draft, event.properties.assistantMessageID)
message.update(event.data.sessionID, (draft) => {
const currentAssistant = message.assistant(draft, event.data.assistantMessageID)
if (!currentAssistant) return
currentAssistant.time.completed = event.properties.timestamp
currentAssistant.finish = event.properties.finish
currentAssistant.cost = event.properties.cost
currentAssistant.tokens = event.properties.tokens
if (event.properties.snapshot)
currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.properties.snapshot }
currentAssistant.time.completed = event.data.timestamp
currentAssistant.finish = event.data.finish
currentAssistant.cost = event.data.cost
currentAssistant.tokens = event.data.tokens
if (event.data.snapshot)
currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.data.snapshot }
})
break
case "session.next.step.failed":
message.update(event.properties.sessionID, (draft) => {
const currentAssistant = message.assistant(draft, event.properties.assistantMessageID)
message.update(event.data.sessionID, (draft) => {
const currentAssistant = message.assistant(draft, event.data.assistantMessageID)
if (!currentAssistant) return
currentAssistant.time.completed = event.properties.timestamp
currentAssistant.time.completed = event.data.timestamp
currentAssistant.finish = "error"
currentAssistant.error = event.properties.error
currentAssistant.error = event.data.error
})
break
case "session.next.text.started":
message.update(event.properties.sessionID, (draft) => {
message.assistant(draft, event.properties.assistantMessageID)?.content.push({
message.update(event.data.sessionID, (draft) => {
message.assistant(draft, event.data.assistantMessageID)?.content.push({
type: "text",
id: event.properties.textID,
id: event.data.textID,
text: "",
})
})
break
case "session.next.text.delta":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestText(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.textID,
message.assistant(draft, event.data.assistantMessageID),
event.data.textID,
)
if (match) match.text += event.properties.delta
if (match) match.text += event.data.delta
})
break
case "session.next.text.ended":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestText(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.textID,
message.assistant(draft, event.data.assistantMessageID),
event.data.textID,
)
if (match) match.text = event.properties.text
if (match) match.text = event.data.text
})
break
case "session.next.tool.input.started":
message.update(event.properties.sessionID, (draft) => {
message.assistant(draft, event.properties.assistantMessageID)?.content.push({
message.update(event.data.sessionID, (draft) => {
message.assistant(draft, event.data.assistantMessageID)?.content.push({
type: "tool",
id: event.properties.callID,
name: event.properties.name,
time: { created: event.properties.timestamp },
id: event.data.callID,
name: event.data.name,
time: { created: event.data.timestamp },
state: { status: "pending", input: "" },
})
})
break
case "session.next.tool.input.delta":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestTool(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.callID,
message.assistant(draft, event.data.assistantMessageID),
event.data.callID,
)
if (match?.state.status === "pending") match.state.input += event.properties.delta
if (match?.state.status === "pending") match.state.input += event.data.delta
})
break
case "session.next.tool.input.ended":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestTool(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.callID,
message.assistant(draft, event.data.assistantMessageID),
event.data.callID,
)
if (match?.state.status === "pending") match.state.input = event.properties.text
if (match?.state.status === "pending") match.state.input = event.data.text
})
break
case "session.next.tool.called":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestTool(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.callID,
message.assistant(draft, event.data.assistantMessageID),
event.data.callID,
)
if (!match) return
match.time.ran = event.properties.timestamp
match.provider = event.properties.provider
match.state = { status: "running", input: event.properties.input, structured: {}, content: [] }
match.time.ran = event.data.timestamp
match.provider = event.data.provider
match.state = { status: "running", input: event.data.input, structured: {}, content: [] }
})
break
case "session.next.tool.progress":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestTool(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.callID,
message.assistant(draft, event.data.assistantMessageID),
event.data.callID,
)
if (match?.state.status !== "running") return
match.state.structured = event.properties.structured
match.state.content = [...event.properties.content]
match.state.structured = event.data.structured
match.state.content = [...event.data.content]
})
break
case "session.next.tool.success":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestTool(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.callID,
message.assistant(draft, event.data.assistantMessageID),
event.data.callID,
)
if (match?.state.status !== "running") return
match.state = {
status: "completed",
input: match.state.input,
structured: event.properties.structured,
content: [...event.properties.content],
result: event.properties.result,
structured: event.data.structured,
content: [...event.data.content],
result: event.data.result,
}
match.provider = {
executed: event.properties.provider.executed || match.provider?.executed === true,
executed: event.data.provider.executed || match.provider?.executed === true,
metadata: match.provider?.metadata,
resultMetadata: event.properties.provider.metadata,
resultMetadata: event.data.provider.metadata,
}
match.time.completed = event.properties.timestamp
match.time.completed = event.data.timestamp
})
break
case "session.next.tool.failed":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestTool(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.callID,
message.assistant(draft, event.data.assistantMessageID),
event.data.callID,
)
if (!match || (match.state.status !== "pending" && match.state.status !== "running")) return
match.state = {
status: "error",
error: event.properties.error,
error: event.data.error,
input: typeof match.state.input === "string" ? {} : match.state.input,
structured: match.state.status === "running" ? match.state.structured : {},
content: match.state.status === "running" ? match.state.content : [],
result: event.properties.result,
result: event.data.result,
}
match.provider = {
executed: event.properties.provider.executed || match.provider?.executed === true,
executed: event.data.provider.executed || match.provider?.executed === true,
metadata: match.provider?.metadata,
resultMetadata: event.properties.provider.metadata,
resultMetadata: event.data.provider.metadata,
}
match.time.completed = event.properties.timestamp
match.time.completed = event.data.timestamp
})
break
case "session.next.reasoning.started":
message.update(event.properties.sessionID, (draft) => {
message.assistant(draft, event.properties.assistantMessageID)?.content.push({
message.update(event.data.sessionID, (draft) => {
message.assistant(draft, event.data.assistantMessageID)?.content.push({
type: "reasoning",
id: event.properties.reasoningID,
id: event.data.reasoningID,
text: "",
providerMetadata: event.properties.providerMetadata,
providerMetadata: event.data.providerMetadata,
})
})
break
case "session.next.reasoning.delta":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestReasoning(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.reasoningID,
message.assistant(draft, event.data.assistantMessageID),
event.data.reasoningID,
)
if (match) match.text += event.properties.delta
if (match) match.text += event.data.delta
})
break
case "session.next.reasoning.ended":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
const match = message.latestReasoning(
message.assistant(draft, event.properties.assistantMessageID),
event.properties.reasoningID,
message.assistant(draft, event.data.assistantMessageID),
event.data.reasoningID,
)
if (match) {
match.text = event.properties.text
if (event.properties.providerMetadata !== undefined)
match.providerMetadata = event.properties.providerMetadata
match.text = event.data.text
if (event.data.providerMetadata !== undefined)
match.providerMetadata = event.data.providerMetadata
}
})
break
@ -415,14 +413,14 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({
case "session.next.compaction.delta":
break
case "session.next.compaction.ended":
message.update(event.properties.sessionID, (draft) => {
message.update(event.data.sessionID, (draft) => {
message.prepend(draft, {
id: event.properties.messageID,
id: event.data.messageID,
type: "compaction",
reason: event.properties.reason,
summary: event.properties.text,
recent: event.properties.recent,
time: { created: event.properties.timestamp },
reason: event.data.reason,
summary: event.data.text,
recent: event.data.recent,
time: { created: event.data.timestamp },
})
})
break
@ -437,6 +435,20 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({
])
break
}
}
onMount(() => {
const controller = new AbortController()
onCleanup(() => controller.abort())
void (async () => {
const events = await sdk.client.v2.event.subscribe({ signal: controller.signal })
for await (const event of events.stream) {
handleEvent(event, {
directory: event.location?.directory ?? defaultLocation().directory,
workspace: event.location?.workspaceID,
})
}
})().catch(() => {})
})
const result = {

View File

@ -26,6 +26,7 @@ function emitEvent(events: ReturnType<typeof createEventSource>, payload: Event)
}
test("refreshes resources into reactive getters", async () => {
const events = createEventSource()
const location = {
directory,
project: { id: "proj_test", directory },
@ -49,8 +50,7 @@ test("refreshes resources into reactive getters", async () => {
data: [{ id: "build", request: { headers: {}, body: {} }, mode: "primary", hidden: false, permissions: [] }],
})
return undefined
})
const events = createEventSource()
}, events)
let data!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {
@ -119,7 +119,7 @@ test("refreshes integrations after integration updates", async () => {
},
],
})
})
}, events)
let data!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {
@ -171,7 +171,7 @@ test("refreshes effective catalog data after catalog updates", async () => {
requests.provider++
return json({ location: { directory, project: { id: "proj_test", directory } }, data: [] })
}
})
}, events)
const app = await testRender(() => (
<TestTuiContexts>
@ -205,7 +205,7 @@ test("refreshes references after updates", async () => {
location: { directory, project: { id: "proj_test", directory } },
data: requests === 1 ? [] : [{ name: "docs", path: "/docs", source: { type: "local", path: "/docs" } }],
})
})
}, events)
let data!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {
@ -243,7 +243,7 @@ test("refreshes references after updates", async () => {
test("settles pending tools when a live failure arrives", async () => {
const events = createEventSource()
const calls = createFetch()
const calls = createFetch(undefined, events)
let sync!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {
@ -372,7 +372,7 @@ test("settles pending tools when a live failure arrives", async () => {
test("renders admitted prompts only after promotion", async () => {
const events = createEventSource()
const calls = createFetch()
const calls = createFetch(undefined, events)
let sync!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {
@ -436,7 +436,7 @@ test("renders admitted prompts only after promotion", async () => {
test("renders a promoted prompt when admission was missed", async () => {
const events = createEventSource()
const calls = createFetch()
const calls = createFetch(undefined, events)
let sync!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {
@ -484,7 +484,7 @@ test("renders a promoted prompt when admission was missed", async () => {
test("projects live context updates with their message ID", async () => {
const events = createEventSource()
const calls = createFetch()
const calls = createFetch(undefined, events)
let sync!: ReturnType<typeof useData>
let ready!: () => void
const mounted = new Promise<void>((resolve) => {

View File

@ -17,6 +17,8 @@ export function eventSource(): EventSource {
export function createEventSource() {
let fn: ((event: GlobalEvent) => void) | undefined
let stream: ReadableStreamDefaultController<Uint8Array> | undefined
const pending: Uint8Array[] = []
return {
source: {
subscribe: async (handler: (event: GlobalEvent) => void) => {
@ -29,19 +31,44 @@ export function createEventSource() {
emit(event: GlobalEvent) {
if (!fn) throw new Error("event source not ready")
fn(event)
if (!("properties" in event.payload)) return
const chunk = new TextEncoder().encode(
`data: ${JSON.stringify({
...event.payload,
location: { directory: event.directory, workspaceID: event.workspace },
data: event.payload.properties,
})}\n\n`,
)
if (stream) return stream.enqueue(chunk)
pending.push(chunk)
},
response() {
return new Response(
new ReadableStream<Uint8Array>({
start(controller) {
stream = controller
for (const chunk of pending.splice(0)) controller.enqueue(chunk)
},
cancel() {
stream = undefined
},
}),
{ headers: { "content-type": "text/event-stream" } },
)
},
}
}
export type FetchHandler = (url: URL) => Response | Promise<Response> | undefined
export function createFetch(override?: FetchHandler) {
export function createFetch(override?: FetchHandler, events?: ReturnType<typeof createEventSource>) {
const session = [] as URL[]
const fetch = (async (input: RequestInfo | URL) => {
const url = new URL(input instanceof Request ? input.url : String(input))
if (url.pathname === "/session") session.push(url)
const overridden = await override?.(url)
if (overridden) return overridden
if (url.pathname === "/api/event" && events) return events.response()
if (
[