refactor(core): move database schema ownership (#29068)

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
Dax 2026-05-30 21:08:38 -04:00 committed by GitHub
parent 6bcb9cb9bb
commit 7f571d36ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
390 changed files with 11127 additions and 9164 deletions

View File

@ -2,6 +2,9 @@
"$schema": "https://opencode.ai/config.json",
"provider": {},
"permission": {},
"reference": {
"effect": "github.com/Effect-TS/effect-smol",
},
"mcp": {},
"tools": {
"github-triage": false,

View File

@ -47,6 +47,12 @@ obj.b
const { a, b } = obj
```
### Imports
- Never alias imports. Do not use `import { foo as bar } from "..."` or renamed imports like `resolve as pathResolve`.
- Never use star imports. Do not use `import * as Foo from "..."` or `import type * as Foo from "..."`.
- If a namespace-style value is needed, import the module's own exported namespace by name, for example `import { Project } from "@opencode-ai/core/project"`, then reference `Project.ID`.
### Variables
Prefer `const` over `let`. Use ternaries or early returns instead of reassignment.

View File

@ -259,8 +259,11 @@
"@aws-sdk/credential-providers": "3.993.0",
"@effect/opentelemetry": "catalog:",
"@effect/platform-node": "catalog:",
"@effect/sql-sqlite-bun": "catalog:",
"@npmcli/arborist": "9.4.0",
"@npmcli/config": "10.8.1",
"@opencode-ai/effect-drizzle-sqlite": "workspace:*",
"@opencode-ai/effect-sqlite-node": "workspace:*",
"@openrouter/ai-sdk-provider": "2.8.1",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/context-async-hooks": "2.6.1",
@ -268,6 +271,7 @@
"@opentelemetry/sdk-trace-base": "2.6.1",
"ai-gateway-provider": "3.1.2",
"cross-spawn": "catalog:",
"drizzle-orm": "catalog:",
"effect": "catalog:",
"gitlab-ai-provider": "6.8.0",
"glob": "13.0.5",
@ -289,6 +293,7 @@
"@types/npm-package-arg": "6.1.4",
"@types/npmcli__arborist": "6.3.3",
"@types/semver": "catalog:",
"drizzle-kit": "catalog:",
},
},
"packages/desktop": {
@ -360,6 +365,18 @@
"@typescript/native-preview": "catalog:",
},
},
"packages/effect-sqlite-node": {
"name": "@opencode-ai/effect-sqlite-node",
"version": "1.15.10",
"dependencies": {
"effect": "catalog:",
},
"devDependencies": {
"@tsconfig/bun": "catalog:",
"@types/node": "catalog:",
"@typescript/native-preview": "catalog:",
},
},
"packages/enterprise": {
"name": "@opencode-ai/enterprise",
"version": "1.15.13",
@ -569,7 +586,6 @@
"@types/which": "3.0.4",
"@types/yargs": "17.0.33",
"@typescript/native-preview": "catalog:",
"drizzle-kit": "catalog:",
"drizzle-orm": "catalog:",
"prettier": "3.6.2",
"typescript": "catalog:",
@ -1684,6 +1700,8 @@
"@opencode-ai/effect-drizzle-sqlite": ["@opencode-ai/effect-drizzle-sqlite@workspace:packages/effect-drizzle-sqlite"],
"@opencode-ai/effect-sqlite-node": ["@opencode-ai/effect-sqlite-node@workspace:packages/effect-sqlite-node"],
"@opencode-ai/enterprise": ["@opencode-ai/enterprise@workspace:packages/enterprise"],
"@opencode-ai/function": ["@opencode-ai/function@workspace:packages/function"],

View File

@ -3,6 +3,8 @@ import { AgentV2 } from "@opencode-ai/core/agent"
import { PluginBoot } from "@opencode-ai/core/plugin/boot"
import * as Effect from "effect/Effect"
import * as Command from "effect/unstable/cli/Command"
import { LocationServiceMap } from "@opencode-ai/core/location-layer"
import { AbsolutePath } from "@opencode-ai/core/schema"
export const AgentsCommand = Command.make("agents", {}, () =>
Effect.gen(function* () {
@ -15,5 +17,11 @@ export const AgentsCommand = Command.make("agents", {}, () =>
2,
) + EOL,
)
}),
}).pipe(
Effect.provide(
LocationServiceMap.get({
directory: AbsolutePath.make(process.cwd()),
}),
),
),
).pipe(Command.withDescription("List all agents"))

View File

@ -2,48 +2,17 @@
import * as NodeRuntime from "@effect/platform-node/NodeRuntime"
import * as NodeServices from "@effect/platform-node/NodeServices"
import { AccountV2 } from "@opencode-ai/core/account"
import { AgentV2 } from "@opencode-ai/core/agent"
import { Catalog } from "@opencode-ai/core/catalog"
import { Config } from "@opencode-ai/core/config"
import { EventV2 } from "@opencode-ai/core/event"
import { Location } from "@opencode-ai/core/location"
import { Npm } from "@opencode-ai/core/npm"
import { PluginV2 } from "@opencode-ai/core/plugin"
import { PluginBoot } from "@opencode-ai/core/plugin/boot"
import { Policy } from "@opencode-ai/core/policy"
import { AbsolutePath } from "@opencode-ai/core/schema"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Command from "effect/unstable/cli/Command"
import { DebugCommand } from "./debug"
import { LocationServiceMap } from "@opencode-ai/core/location-layer"
const cli = Command.make("opencode", {}, () => Effect.void).pipe(
Command.withDescription("OpenCode command line interface"),
Command.withSubcommands([DebugCommand]),
)
const locationLayer = Location.defaultLayer({
directory: AbsolutePath.make(process.cwd()),
})
const policyLayer = Policy.defaultLayer.pipe(Layer.provideMerge(locationLayer))
const pluginLayer = PluginV2.defaultLayer
const eventLayer = EventV2.defaultLayer
const layer = PluginBoot.layer.pipe(
Layer.provideMerge(
Layer.mergeAll(
NodeServices.layer,
Catalog.layer.pipe(Layer.provideMerge(Layer.mergeAll(eventLayer, pluginLayer, policyLayer))),
eventLayer,
pluginLayer,
AccountV2.defaultLayer,
AgentV2.defaultLayer,
Config.defaultLayer.pipe(Layer.provideMerge(policyLayer)),
Npm.defaultLayer,
),
),
)
const layer = Layer.mergeAll(LocationServiceMap.layer, NodeServices.layer)
Command.run(cli, { version: "local" }).pipe(Effect.provide(layer), Effect.scoped, NodeRuntime.runMain)

View File

@ -2,7 +2,7 @@ import { defineConfig } from "drizzle-kit"
export default defineConfig({
dialect: "sqlite",
schema: "./src/**/*.sql.ts",
schema: ["./src/**/*.sql.ts", "./src/**/sql.ts"],
out: "./migration",
dbCredentials: {
url: "/home/thdxr/.local/share/opencode/opencode.db",

View File

@ -0,0 +1 @@
ALTER TABLE `session` ADD `metadata` text;

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,8 @@
"license": "MIT",
"private": true,
"scripts": {
"db": "bun drizzle-kit",
"migration": "bun run script/migration.ts",
"test": "bun test",
"test:ci": "mkdir -p .artifacts/unit && bun test --timeout 30000 --reporter=junit --reporter-outfile=.artifacts/unit/junit.xml",
"typecheck": "tsgo --noEmit"
@ -16,14 +18,21 @@
"exports": {
"./*": "./src/*.ts"
},
"imports": {},
"imports": {
"#sqlite": {
"bun": "./src/database/sqlite.bun.ts",
"node": "./src/database/sqlite.node.ts",
"default": "./src/database/sqlite.bun.ts"
}
},
"devDependencies": {
"@tsconfig/bun": "catalog:",
"@types/bun": "catalog:",
"@types/cross-spawn": "catalog:",
"@types/npm-package-arg": "6.1.4",
"@types/npmcli__arborist": "6.3.3",
"@types/semver": "catalog:"
"@types/semver": "catalog:",
"drizzle-kit": "catalog:"
},
"dependencies": {
"@ai-sdk/alibaba": "1.0.17",
@ -49,8 +58,11 @@
"@aws-sdk/credential-providers": "3.993.0",
"@effect/opentelemetry": "catalog:",
"@effect/platform-node": "catalog:",
"@effect/sql-sqlite-bun": "catalog:",
"@npmcli/arborist": "9.4.0",
"@npmcli/config": "10.8.1",
"@opencode-ai/effect-drizzle-sqlite": "workspace:*",
"@opencode-ai/effect-sqlite-node": "workspace:*",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/context-async-hooks": "2.6.1",
"@opentelemetry/exporter-trace-otlp-http": "0.214.0",
@ -58,6 +70,7 @@
"@openrouter/ai-sdk-provider": "2.8.1",
"ai-gateway-provider": "3.1.2",
"cross-spawn": "catalog:",
"drizzle-orm": "catalog:",
"effect": "catalog:",
"gitlab-ai-provider": "6.8.0",
"glob": "13.0.5",

View File

@ -0,0 +1,113 @@
#!/usr/bin/env bun
import { $ } from "bun"
import fs from "fs/promises"
import os from "os"
import path from "path"
import { pathToFileURL } from "url"
const root = path.resolve(import.meta.dirname, "../../..")
const sqlDir = path.join(root, "packages/core/migration")
const tsDir = path.join(root, "packages/core/src/database/migration")
const registry = path.join(root, "packages/core/src/database/migration.gen.ts")
if (Bun.argv.includes("--check")) {
await check()
process.exit(0)
}
await $`bun drizzle-kit generate`.cwd(path.join(root, "packages/core"))
const sqlMigrations = (await Array.fromAsync(new Bun.Glob("*/migration.sql").scan({ cwd: sqlDir })))
.map((file) => file.split("/")[0])
.filter((name) => name !== undefined)
.sort()
for (const name of sqlMigrations) {
if (await Bun.file(path.join(tsDir, `${name}.ts`)).exists()) continue
await Bun.write(path.join(tsDir, `${name}.ts`), renderMigration(name, await Bun.file(path.join(sqlDir, name, "migration.sql")).text()))
}
await Bun.write(registry, renderRegistry(sqlMigrations))
async function check() {
const temporary = await fs.mkdtemp(path.join(os.tmpdir(), "opencode-core-migration-check-"))
const output = path.join(temporary, "migration")
try {
await fs.cp(sqlDir, output, { recursive: true })
const config = path.join(temporary, "drizzle.config.ts")
await Bun.write(
config,
`import config from ${JSON.stringify(pathToFileURL(path.join(root, "packages/core/drizzle.config.ts")).href)}
export default { ...config, out: ${JSON.stringify(output)} }
`,
)
const before = await snapshot(output)
await $`bun drizzle-kit generate --config ${config}`.cwd(path.join(root, "packages/core"))
const after = await snapshot(output)
if (JSON.stringify(after) !== JSON.stringify(before)) {
throw new Error("Core schema has ungenerated database migrations. Run `bun script/migration.ts` from packages/core.")
}
const migrations = before
.map((entry) => entry.path.split("/")[0])
.filter((name, index, all) => name !== undefined && all.indexOf(name) === index)
.sort()
for (const name of migrations) {
if (await Bun.file(path.join(tsDir, `${name}.ts`)).exists()) continue
throw new Error(`Database migration TypeScript wrapper is missing for ${name}. Run \`bun script/migration.ts\` from packages/core.`)
}
if ((await Bun.file(registry).text()) !== renderRegistry(migrations)) {
throw new Error("Database migration registry is stale. Run `bun script/migration.ts` from packages/core.")
}
} finally {
await fs.rm(temporary, { recursive: true, force: true })
}
}
async function snapshot(directory: string) {
const files = await Array.fromAsync(new Bun.Glob("**/*").scan({ cwd: directory, onlyFiles: true }))
return Promise.all(
files.sort().map(async (file) => ({ path: file, contents: await Bun.file(path.join(directory, file)).text() })),
)
}
function renderMigration(name: string, sql: string) {
return `import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: ${JSON.stringify(name)},
up(tx) {
return Effect.gen(function* () {
${sql
.split("--> statement-breakpoint")
.map((statement) => statement.trim())
.filter((statement) => statement.length > 0)
.map(renderRun)
.join("\n")}
})
},
} satisfies DatabaseMigration.Migration
`
}
function renderRun(statement: string) {
const lines = statement.replaceAll("\t", " ").split("\n")
if (lines.length === 1) return ` yield* tx.run(\`${escapeTemplate(lines[0])}\`)`
return ` yield* tx.run(\`\n${lines.map((line) => ` ${escapeTemplate(line)}`).join("\n")}\n \`)`
}
function escapeTemplate(line: string) {
return line.replaceAll("\\", "\\\\").replaceAll("`", "\\`").replaceAll("${", "\\${")
}
function renderRegistry(names: string[]) {
return `import type { DatabaseMigration } from "./migration"
export const migrations = (await Promise.all([
${names.map((name) => ` import("./migration/${name}"),`).join("\n")}
])).map((module) => module.default) satisfies DatabaseMigration.Migration[]
`
}

View File

@ -1,319 +1,101 @@
import path from "path"
import { Effect, Layer, Option, Schema, Context, SynchronizedRef } from "effect"
import { Identifier } from "./util/identifier"
import { NonNegativeInt, withStatics } from "./schema"
import { Global } from "./global"
import { AppFileSystem } from "./filesystem"
import { EventV2 } from "./event"
export * as AccountV2 from "./account"
export const ID = Schema.String.pipe(
Schema.brand("AccountV2.ID"),
withStatics((schema) => ({ create: () => schema.make("acc_" + Identifier.ascending()) })),
)
export type ID = typeof ID.Type
import { Schema } from "effect"
import type * as HttpClientError from "effect/unstable/http/HttpClientError"
export const ServiceID = Schema.String.pipe(Schema.brand("ServiceID"))
export type ServiceID = typeof ServiceID.Type
export const ID = Schema.String.pipe(Schema.brand("AccountID"))
export type ID = Schema.Schema.Type<typeof ID>
export class OAuthCredential extends Schema.Class<OAuthCredential>("AccountV2.OAuthCredential")({
type: Schema.Literal("oauth"),
refresh: Schema.String,
access: Schema.String,
expires: NonNegativeInt,
}) {}
export const OrgID = Schema.String.pipe(Schema.brand("OrgID"))
export type OrgID = Schema.Schema.Type<typeof OrgID>
export class ApiKeyCredential extends Schema.Class<ApiKeyCredential>("AccountV2.ApiKeyCredential")({
type: Schema.Literal("api"),
key: Schema.String,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.String)),
}) {}
export const AccessToken = Schema.String.pipe(Schema.brand("AccessToken"))
export type AccessToken = Schema.Schema.Type<typeof AccessToken>
export const Credential = Schema.Union([OAuthCredential, ApiKeyCredential])
.pipe(Schema.toTaggedUnion("type"))
.annotate({
identifier: "AccountV2.Credential",
})
export type Credential = Schema.Schema.Type<typeof Credential>
export const RefreshToken = Schema.String.pipe(Schema.brand("RefreshToken"))
export type RefreshToken = Schema.Schema.Type<typeof RefreshToken>
export class Info extends Schema.Class<Info>("AccountV2.Info")({
export const DeviceCode = Schema.String.pipe(Schema.brand("DeviceCode"))
export type DeviceCode = Schema.Schema.Type<typeof DeviceCode>
export const UserCode = Schema.String.pipe(Schema.brand("UserCode"))
export type UserCode = Schema.Schema.Type<typeof UserCode>
export class Info extends Schema.Class<Info>("Account")({
id: ID,
serviceID: ServiceID,
description: Schema.String,
credential: Credential,
email: Schema.String,
url: Schema.String,
active_org_id: Schema.NullOr(OrgID),
}) {}
export class FileWriteError extends Schema.TaggedErrorClass<FileWriteError>()("AccountV2.FileWriteError", {
operation: Schema.Union([Schema.Literal("migrate"), Schema.Literal("write")]),
export class Org extends Schema.Class<Org>("Org")({
id: OrgID,
name: Schema.String,
}) {}
export class AccountRepoError extends Schema.TaggedErrorClass<AccountRepoError>()("AccountRepoError", {
message: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
export class AccountServiceError extends Schema.TaggedErrorClass<AccountServiceError>()("AccountServiceError", {
message: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
export class AccountTransportError extends Schema.TaggedErrorClass<AccountTransportError>()("AccountTransportError", {
method: Schema.String,
url: Schema.String,
description: Schema.optional(Schema.String),
cause: Schema.optional(Schema.Defect),
}) {
static fromHttpClientError(error: HttpClientError.TransportError): AccountTransportError {
return new AccountTransportError({
method: error.request.method,
url: error.request.url,
description: error.description,
cause: error.cause,
})
}
override get message(): string {
return [
`Could not reach ${this.method} ${this.url}.`,
`This failed before the server returned an HTTP response.`,
this.description,
`Check your network, proxy, or VPN configuration and try again.`,
]
.filter(Boolean)
.join("\n")
}
}
export type AccountError = AccountRepoError | AccountServiceError | AccountTransportError
export class Login extends Schema.Class<Login>("Login")({
code: DeviceCode,
user: UserCode,
url: Schema.String,
server: Schema.String,
expiry: Schema.Duration,
interval: Schema.Duration,
}) {}
export class PollSuccess extends Schema.TaggedClass<PollSuccess>()("PollSuccess", {
email: Schema.String,
}) {}
export class PollPending extends Schema.TaggedClass<PollPending>()("PollPending", {}) {}
export class PollSlow extends Schema.TaggedClass<PollSlow>()("PollSlow", {}) {}
export class PollExpired extends Schema.TaggedClass<PollExpired>()("PollExpired", {}) {}
export class PollDenied extends Schema.TaggedClass<PollDenied>()("PollDenied", {}) {}
export class PollError extends Schema.TaggedClass<PollError>()("PollError", {
cause: Schema.Defect,
}) {}
export type Error = FileWriteError
export const Event = {
Added: EventV2.define({
type: "account.added",
schema: {
account: Info,
},
}),
Removed: EventV2.define({
type: "account.removed",
schema: {
account: Info,
},
}),
Switched: EventV2.define({
type: "account.switched",
schema: {
serviceID: ServiceID,
from: Schema.optional(ID),
to: Schema.optional(ID),
},
}),
}
interface Writable {
version: 2
accounts: Record<string, Info>
active: Record<string, ID>
}
const decodeV1 = Schema.decodeUnknownOption(Schema.Record(Schema.String, Credential))
function migrate(old: Record<string, unknown>): Writable {
const accounts: Record<string, Info> = {}
const active: Record<string, ID> = {}
for (const [serviceID, value] of Object.entries(old)) {
const decoded = Option.getOrElse(decodeV1({ [serviceID]: value }), () => ({}))
const parsed = (decoded as Record<string, Credential>)[serviceID]
if (!parsed) continue
const id = Identifier.ascending()
const account = ID.make(id)
const brandedServiceID = ServiceID.make(serviceID)
accounts[id] = new Info({
id: account,
serviceID: brandedServiceID,
description: "default",
credential: parsed,
})
active[brandedServiceID] = account
}
return { version: 2, accounts, active }
}
export interface Interface {
readonly get: (id: ID) => Effect.Effect<Info | undefined, Error>
readonly all: () => Effect.Effect<Info[], Error>
readonly create: (input: {
serviceID: ServiceID
credential: Credential
description?: string
}) => Effect.Effect<Info | undefined, Error>
readonly update: (id: ID, updates: Partial<Pick<Info, "description" | "credential">>) => Effect.Effect<void, Error>
readonly remove: (id: ID) => Effect.Effect<void, Error>
readonly activate: (id: ID) => Effect.Effect<void, Error>
readonly active: (serviceID: ServiceID) => Effect.Effect<Info | undefined, Error>
readonly forService: (serviceID: ServiceID) => Effect.Effect<Info[], Error>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Account") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fsys = yield* AppFileSystem.Service
const global = yield* Global.Service
const events = yield* EventV2.Service
const file = path.join(global.data, "account.json")
const legacyFile = path.join(global.data, "auth.json")
const writeMigrated = Effect.fnUntraced(function* (raw: Record<string, unknown>) {
const migrated = migrate(raw)
yield* fsys
.writeJson(file, migrated, 0o600)
.pipe(Effect.mapError((cause) => new FileWriteError({ operation: "migrate", cause })))
return migrated
})
const parseAuthContent = () => {
try {
return JSON.parse(process.env.OPENCODE_AUTH_CONTENT ?? "")
} catch {}
}
const load: () => Effect.Effect<Writable, Error> = Effect.fnUntraced(function* () {
if (process.env.OPENCODE_AUTH_CONTENT) {
const raw = parseAuthContent()
if (raw && typeof raw === "object") {
if ("version" in raw && raw.version === 2) return raw as Writable
return yield* writeMigrated(raw as Record<string, unknown>)
}
return { version: 2, accounts: {}, active: {} }
}
const legacy = yield* fsys.readJson(legacyFile).pipe(Effect.orElseSucceed(() => null))
if (legacy && typeof legacy === "object") return yield* writeMigrated(legacy as Record<string, unknown>)
const raw = yield* fsys.readJson(file).pipe(Effect.orElseSucceed(() => null))
if (raw && typeof raw === "object") {
if ("version" in raw && raw.version === 2) return raw as Writable
return yield* writeMigrated(raw as Record<string, unknown>)
}
return { version: 2, accounts: {}, active: {} }
})
const write = (data: Writable) =>
fsys
.writeJson(file, data, 0o600)
.pipe(Effect.mapError((cause) => new FileWriteError({ operation: "write", cause })))
const state = SynchronizedRef.makeUnsafe(
yield* load().pipe(Effect.orElseSucceed((): Writable => ({ version: 2, accounts: {}, active: {} }))),
)
const activate = Effect.fn("AccountV2.activate")(function* (id: ID) {
const data = yield* SynchronizedRef.get(state)
const account = data.accounts[id]
if (!account) return
const activated = yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
const nextAccount = data.accounts[id]
if (!nextAccount) return [undefined, data] as const
const next = { ...data, active: { ...data.active, [nextAccount.serviceID]: id } }
yield* write(next)
return [{ serviceID: nextAccount.serviceID, from: data.active[nextAccount.serviceID], to: id }, next] as const
}),
)
if (activated) yield* events.publish(Event.Switched, activated)
})
const result: Interface = {
get: Effect.fn("AccountV2.get")(function* (id) {
return (yield* SynchronizedRef.get(state)).accounts[id]
}),
all: Effect.fn("AccountV2.all")(function* () {
return Object.values((yield* SynchronizedRef.get(state)).accounts)
}),
active: Effect.fn("AccountV2.active")(function* (serviceID) {
const data = yield* SynchronizedRef.get(state)
return (
data.accounts[data.active[serviceID]] ?? Object.values(data.accounts).find((a) => a.serviceID === serviceID)
)
}),
forService: Effect.fn("AccountV2.list")(function* (serviceID) {
return Object.values((yield* SynchronizedRef.get(state)).accounts).filter((a) => a.serviceID === serviceID)
}),
create: Effect.fn("AccountV2.add")(function* (input) {
const id = ID.make(Identifier.ascending())
const account = new Info({
id,
serviceID: input.serviceID,
description: input.description ?? "default",
credential: input.credential,
})
const added = yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
const next = {
...data,
accounts: { ...data.accounts, [account.id]: account },
active: { ...data.active, [account.serviceID]: account.id },
}
yield* write(next)
return [
{
account,
switched: { serviceID: account.serviceID, from: data.active[account.serviceID], to: account.id },
},
next,
] as const
}),
)
yield* events.publish(Event.Added, { account: added.account })
yield* events.publish(Event.Switched, added.switched)
return added.account
}),
update: Effect.fn("AccountV2.update")(function* (id, updates) {
const existing = (yield* SynchronizedRef.get(state)).accounts[id]
if (!existing) return
yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
if (!data.accounts[id]) return [undefined, data] as const
const next = {
...data,
accounts: {
...data.accounts,
[id]: new Info({
id,
serviceID: existing.serviceID,
description: updates.description ?? existing.description,
credential: updates.credential ?? existing.credential,
}),
},
}
yield* write(next)
return [undefined, next] as const
}),
)
}),
remove: Effect.fn("AccountV2.remove")(function* (id) {
const removed = yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
const accounts = { ...data.accounts }
const active = { ...data.active }
const removed = accounts[id]
if (!removed) return [undefined, data] as const
const wasActive = active[removed.serviceID] === id
delete accounts[id]
const replacement = Object.values(accounts).find((account) => account.serviceID === removed.serviceID)
if (wasActive) {
if (replacement) active[removed.serviceID] = replacement.id
else delete active[removed.serviceID]
}
const next = { ...data, accounts, active }
yield* write(next)
return [
{
account: removed,
switched: wasActive ? { serviceID: removed.serviceID, from: id, to: replacement?.id } : undefined,
},
next,
] as const
}),
)
if (removed) {
yield* events.publish(Event.Removed, { account: removed.account })
if (removed.switched) yield* events.publish(Event.Switched, removed.switched)
}
}),
activate,
}
return Service.of(result)
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(Global.defaultLayer),
Layer.provide(EventV2.defaultLayer),
)
export * as AccountV2 from "./account"
export const PollResult = Schema.Union([PollSuccess, PollPending, PollSlow, PollExpired, PollDenied, PollError])
export type PollResult = Schema.Schema.Type<typeof PollResult>

View File

@ -1,14 +1,14 @@
import { sqliteTable, text, integer, primaryKey } from "drizzle-orm/sqlite-core"
import { type AccessToken, type AccountID, type OrgID, type RefreshToken } from "./schema"
import { Timestamps } from "../storage/schema.sql"
import { AccountV2 } from "../account"
import { Timestamps } from "../database/schema.sql"
export const AccountTable = sqliteTable("account", {
id: text().$type<AccountID>().primaryKey(),
id: text().$type<AccountV2.ID>().primaryKey(),
email: text().notNull(),
url: text().notNull(),
access_token: text().$type<AccessToken>().notNull(),
refresh_token: text().$type<RefreshToken>().notNull(),
access_token: text().$type<AccountV2.AccessToken>().notNull(),
refresh_token: text().$type<AccountV2.RefreshToken>().notNull(),
token_expiry: integer(),
...Timestamps,
})
@ -16,9 +16,9 @@ export const AccountTable = sqliteTable("account", {
export const AccountStateTable = sqliteTable("account_state", {
id: integer().primaryKey(),
active_account_id: text()
.$type<AccountID>()
.$type<AccountV2.ID>()
.references(() => AccountTable.id, { onDelete: "set null" }),
active_org_id: text().$type<OrgID>(),
active_org_id: text().$type<AccountV2.OrgID>(),
})
// LEGACY
@ -27,8 +27,8 @@ export const ControlAccountTable = sqliteTable(
{
email: text().notNull(),
url: text().notNull(),
access_token: text().$type<AccessToken>().notNull(),
refresh_token: text().$type<RefreshToken>().notNull(),
access_token: text().$type<AccountV2.AccessToken>().notNull(),
refresh_token: text().$type<AccountV2.RefreshToken>().notNull(),
token_expiry: integer(),
active: integer({ mode: "boolean" })
.notNull()

View File

@ -104,4 +104,4 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer
export const locationLayer = layer

View File

@ -3,6 +3,7 @@ export * as AISDK from "./aisdk"
import type { LanguageModelV3 } from "@ai-sdk/provider"
import { Cause, Context, Effect, Layer, Schema } from "effect"
import { ModelV2 } from "./model"
import { EventV2 } from "./event"
import { PluginV2 } from "./plugin"
import { ProviderV2 } from "./provider"
@ -169,4 +170,6 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer.pipe(Layer.provide(PluginV2.defaultLayer))
export const defaultLayer = layer.pipe(
Layer.provide(PluginV2.locationLayer.pipe(Layer.provide(EventV2.defaultLayer))),
)

326
packages/core/src/auth.ts Normal file
View File

@ -0,0 +1,326 @@
export * as Auth from "./auth"
import path from "path"
import { Effect, Layer, Option, Schema, Context, SynchronizedRef } from "effect"
import { Identifier } from "./util/identifier"
import { NonNegativeInt, withStatics } from "./schema"
import { Global } from "./global"
import { AppFileSystem } from "./filesystem"
import { EventV2 } from "./event"
export const ID = Schema.String.pipe(
Schema.brand("Auth.ID"),
withStatics((schema) => ({ create: () => schema.make("acc_" + Identifier.ascending()) })),
)
export type ID = typeof ID.Type
export const ServiceID = Schema.String.pipe(Schema.brand("ServiceID"))
export type ServiceID = typeof ServiceID.Type
export const OrgID = Schema.String.pipe(Schema.brand("OrgID"))
export type OrgID = typeof OrgID.Type
export const AccessToken = Schema.String.pipe(Schema.brand("AccessToken"))
export type AccessToken = typeof AccessToken.Type
export const RefreshToken = Schema.String.pipe(Schema.brand("RefreshToken"))
export type RefreshToken = typeof RefreshToken.Type
export class OAuthCredential extends Schema.Class<OAuthCredential>("Auth.OAuthCredential")({
type: Schema.Literal("oauth"),
refresh: Schema.String,
access: Schema.String,
expires: NonNegativeInt,
}) {}
export class ApiKeyCredential extends Schema.Class<ApiKeyCredential>("Auth.ApiKeyCredential")({
type: Schema.Literal("api"),
key: Schema.String,
metadata: Schema.optional(Schema.Record(Schema.String, Schema.String)),
}) {}
export const Credential = Schema.Union([OAuthCredential, ApiKeyCredential])
.pipe(Schema.toTaggedUnion("type"))
.annotate({
identifier: "Auth.Credential",
})
export type Credential = Schema.Schema.Type<typeof Credential>
export class Info extends Schema.Class<Info>("Auth.Info")({
id: ID,
serviceID: ServiceID,
description: Schema.String,
credential: Credential,
}) {}
export class FileWriteError extends Schema.TaggedErrorClass<FileWriteError>()("Auth.FileWriteError", {
operation: Schema.Union([Schema.Literal("migrate"), Schema.Literal("write")]),
cause: Schema.Defect,
}) {}
export type Error = FileWriteError
export const Event = {
Added: EventV2.define({
type: "account.added",
schema: {
account: Info,
},
}),
Removed: EventV2.define({
type: "account.removed",
schema: {
account: Info,
},
}),
Switched: EventV2.define({
type: "account.switched",
schema: {
serviceID: ServiceID,
from: Schema.optional(ID),
to: Schema.optional(ID),
},
}),
}
interface Writable {
version: 2
accounts: Record<string, Info>
active: Record<string, ID>
}
const decodeV1 = Schema.decodeUnknownOption(Schema.Record(Schema.String, Credential))
function migrate(old: Record<string, unknown>): Writable {
const accounts: Record<string, Info> = {}
const active: Record<string, ID> = {}
for (const [serviceID, value] of Object.entries(old)) {
const decoded = Option.getOrElse(decodeV1({ [serviceID]: value }), () => ({}))
const parsed = (decoded as Record<string, Credential>)[serviceID]
if (!parsed) continue
const id = Identifier.ascending()
const account = ID.make(id)
const brandedServiceID = ServiceID.make(serviceID)
accounts[id] = new Info({
id: account,
serviceID: brandedServiceID,
description: "default",
credential: parsed,
})
active[brandedServiceID] = account
}
return { version: 2, accounts, active }
}
export interface Interface {
readonly get: (id: ID) => Effect.Effect<Info | undefined, Error>
readonly all: () => Effect.Effect<Info[], Error>
readonly create: (input: {
serviceID: ServiceID
credential: Credential
description?: string
}) => Effect.Effect<Info | undefined, Error>
readonly update: (id: ID, updates: Partial<Pick<Info, "description" | "credential">>) => Effect.Effect<void, Error>
readonly remove: (id: ID) => Effect.Effect<void, Error>
readonly activate: (id: ID) => Effect.Effect<void, Error>
readonly active: (serviceID: ServiceID) => Effect.Effect<Info | undefined, Error>
readonly forService: (serviceID: ServiceID) => Effect.Effect<Info[], Error>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/Account") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fsys = yield* AppFileSystem.Service
const global = yield* Global.Service
const events = yield* EventV2.Service
const file = path.join(global.data, "account.json")
const legacyFile = path.join(global.data, "auth.json")
const writeMigrated = Effect.fnUntraced(function* (raw: Record<string, unknown>) {
const migrated = migrate(raw)
yield* fsys
.writeJson(file, migrated, 0o600)
.pipe(Effect.mapError((cause) => new FileWriteError({ operation: "migrate", cause })))
return migrated
})
const parseAuthContent = () => {
try {
return JSON.parse(process.env.OPENCODE_AUTH_CONTENT ?? "")
} catch {}
}
const load: () => Effect.Effect<Writable, Error> = Effect.fnUntraced(function* () {
if (process.env.OPENCODE_AUTH_CONTENT) {
const raw = parseAuthContent()
if (raw && typeof raw === "object") {
if ("version" in raw && raw.version === 2) return raw as Writable
return yield* writeMigrated(raw as Record<string, unknown>)
}
return { version: 2, accounts: {}, active: {} }
}
const legacy = yield* fsys.readJson(legacyFile).pipe(Effect.orElseSucceed(() => null))
if (legacy && typeof legacy === "object") return yield* writeMigrated(legacy as Record<string, unknown>)
const raw = yield* fsys.readJson(file).pipe(Effect.orElseSucceed(() => null))
if (raw && typeof raw === "object") {
if ("version" in raw && raw.version === 2) return raw as Writable
return yield* writeMigrated(raw as Record<string, unknown>)
}
return { version: 2, accounts: {}, active: {} }
})
const write = (data: Writable) =>
fsys
.writeJson(file, data, 0o600)
.pipe(Effect.mapError((cause) => new FileWriteError({ operation: "write", cause })))
const state = SynchronizedRef.makeUnsafe(
yield* load().pipe(Effect.orElseSucceed((): Writable => ({ version: 2, accounts: {}, active: {} }))),
)
const activate = Effect.fn("Auth.activate")(function* (id: ID) {
const data = yield* SynchronizedRef.get(state)
const account = data.accounts[id]
if (!account) return
const activated = yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
const nextAccount = data.accounts[id]
if (!nextAccount) return [undefined, data] as const
const next = { ...data, active: { ...data.active, [nextAccount.serviceID]: id } }
yield* write(next)
return [{ serviceID: nextAccount.serviceID, from: data.active[nextAccount.serviceID], to: id }, next] as const
}),
)
if (activated) yield* events.publish(Event.Switched, activated)
})
const result: Interface = {
get: Effect.fn("Auth.get")(function* (id) {
return (yield* SynchronizedRef.get(state)).accounts[id]
}),
all: Effect.fn("Auth.all")(function* () {
return Object.values((yield* SynchronizedRef.get(state)).accounts)
}),
active: Effect.fn("Auth.active")(function* (serviceID) {
const data = yield* SynchronizedRef.get(state)
return (
data.accounts[data.active[serviceID]] ?? Object.values(data.accounts).find((a) => a.serviceID === serviceID)
)
}),
forService: Effect.fn("Auth.list")(function* (serviceID) {
return Object.values((yield* SynchronizedRef.get(state)).accounts).filter((a) => a.serviceID === serviceID)
}),
create: Effect.fn("Auth.add")(function* (input) {
const id = ID.make(Identifier.ascending())
const account = new Info({
id,
serviceID: input.serviceID,
description: input.description ?? "default",
credential: input.credential,
})
const added = yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
const next = {
...data,
accounts: { ...data.accounts, [account.id]: account },
active: { ...data.active, [account.serviceID]: account.id },
}
yield* write(next)
return [
{
account,
switched: { serviceID: account.serviceID, from: data.active[account.serviceID], to: account.id },
},
next,
] as const
}),
)
yield* events.publish(Event.Added, { account: added.account })
yield* events.publish(Event.Switched, added.switched)
return added.account
}),
update: Effect.fn("Auth.update")(function* (id, updates) {
const existing = (yield* SynchronizedRef.get(state)).accounts[id]
if (!existing) return
yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
if (!data.accounts[id]) return [undefined, data] as const
const next = {
...data,
accounts: {
...data.accounts,
[id]: new Info({
id,
serviceID: existing.serviceID,
description: updates.description ?? existing.description,
credential: updates.credential ?? existing.credential,
}),
},
}
yield* write(next)
return [undefined, next] as const
}),
)
}),
remove: Effect.fn("Auth.remove")(function* (id) {
const removed = yield* SynchronizedRef.modifyEffect(
state,
Effect.fnUntraced(function* (data) {
const accounts = { ...data.accounts }
const active = { ...data.active }
const removed = accounts[id]
if (!removed) return [undefined, data] as const
const wasActive = active[removed.serviceID] === id
delete accounts[id]
const replacement = Object.values(accounts).find((account) => account.serviceID === removed.serviceID)
if (wasActive) {
if (replacement) active[removed.serviceID] = replacement.id
else delete active[removed.serviceID]
}
const next = { ...data, accounts, active }
yield* write(next)
return [
{
account: removed,
switched: wasActive ? { serviceID: removed.serviceID, from: id, to: replacement?.id } : undefined,
},
next,
] as const
}),
)
if (removed) {
yield* events.publish(Event.Removed, { account: removed.account })
if (removed.switched) yield* events.publish(Event.Switched, removed.switched)
}
}),
activate,
}
return Service.of(result)
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(Global.defaultLayer),
Layer.provide(EventV2.defaultLayer),
)

View File

@ -89,7 +89,7 @@ enableMapSet()
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
yield* Location.Service
const location = yield* Location.Service
const plugin = yield* PluginV2.Service
const events = yield* EventV2.Service
const policy = yield* Policy.Service
@ -199,6 +199,11 @@ export const layer = Layer.effect(
})
yield* events.subscribe(PluginV2.Event.Added).pipe(
// Plugin registries are location scoped even though the event bus is process scoped.
Stream.filter(
(event) =>
event.location?.directory === location.directory && event.location.workspaceID === location.workspaceID,
),
Stream.runForEach((event) =>
state.update((catalog) => plugin.triggerFor(event.data.id, "catalog.transform", catalog, {}), "plugin.added"),
),
@ -317,8 +322,7 @@ export const layer = Layer.effect(
const SMALL_MODEL_RE = /\b(nano|flash|lite|mini|haiku|small|fast)\b/
export const defaultLayer = layer.pipe(
Layer.provide(EventV2.defaultLayer),
Layer.provide(PluginV2.defaultLayer),
Layer.provide(Policy.defaultLayer),
export const locationLayer = layer.pipe(
Layer.provideMerge(PluginV2.locationLayer),
Layer.provideMerge(Policy.locationLayer),
)

View File

@ -200,8 +200,4 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(Global.defaultLayer),
Layer.provide(Policy.defaultLayer),
)
export const locationLayer = layer.pipe(Layer.provideMerge(Policy.locationLayer))

View File

@ -1,17 +1,17 @@
import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"
import { ProjectTable } from "../project/project.sql"
import type { ProjectID } from "../project/schema"
import type { WorkspaceID } from "./schema"
import { ProjectTable } from "../project/sql"
import { ProjectV2 } from "../project"
import { WorkspaceV2 } from "../workspace"
export const WorkspaceTable = sqliteTable("workspace", {
id: text().$type<WorkspaceID>().primaryKey(),
id: text().$type<WorkspaceV2.ID>().primaryKey(),
type: text().notNull(),
name: text().notNull().default(""),
branch: text(),
directory: text(),
extra: text({ mode: "json" }),
project_id: text()
.$type<ProjectID>()
.$type<ProjectV2.ID>()
.notNull()
.references(() => ProjectTable.id, { onDelete: "cascade" }),
time_used: integer()

View File

@ -0,0 +1,60 @@
export * as Database from "./database"
import { EffectDrizzleSqlite } from "@opencode-ai/effect-drizzle-sqlite"
import { layer as sqliteLayer } from "#sqlite"
import { Context, Effect, Layer } from "effect"
import { Global } from "../global"
import { Flag } from "../flag/flag"
import { isAbsolute, join } from "path"
import { DatabaseMigration } from "./migration"
import { InstallationChannel } from "../installation/version"
const makeDatabase = EffectDrizzleSqlite.makeWithDefaults()
type DatabaseShape = Effect.Success<typeof makeDatabase>
export interface Interface {
db: DatabaseShape
}
export class Service extends Context.Service<Service, Interface>()("@opencode/v2/storage/Database") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const db = yield* makeDatabase
yield* db.run("PRAGMA journal_mode = WAL")
yield* db.run("PRAGMA synchronous = NORMAL")
yield* db.run("PRAGMA busy_timeout = 5000")
yield* db.run("PRAGMA cache_size = -64000")
yield* db.run("PRAGMA foreign_keys = ON")
yield* db.run("PRAGMA wal_checkpoint(PASSIVE)")
yield* DatabaseMigration.apply(db)
return { db }
}).pipe(Effect.orDie),
)
export function layerFromPath(filename: string) {
return layer.pipe(Layer.provide(sqliteLayer({ filename })))
}
export function path() {
if (Flag.OPENCODE_DB) {
if (Flag.OPENCODE_DB === ":memory:" || isAbsolute(Flag.OPENCODE_DB)) return Flag.OPENCODE_DB
return join(Global.Path.data, Flag.OPENCODE_DB)
}
if (
["latest", "beta", "prod"].includes(InstallationChannel) ||
process.env.OPENCODE_DISABLE_CHANNEL_DB === "1" ||
process.env.OPENCODE_DISABLE_CHANNEL_DB === "true"
)
return join(Global.Path.data, "opencode.db")
return join(Global.Path.data, `opencode-${InstallationChannel.replace(/[^a-zA-Z0-9._-]/g, "-")}.db`)
}
export const defaultLayer = Layer.unwrap(
Effect.gen(function* () {
return layerFromPath(path())
}),
).pipe(Layer.provide(Global.defaultLayer))

View File

@ -0,0 +1,25 @@
import type { DatabaseMigration } from "./migration"
export const migrations = (await Promise.all([
import("./migration/20260127222353_familiar_lady_ursula"),
import("./migration/20260211171708_add_project_commands"),
import("./migration/20260213144116_wakeful_the_professor"),
import("./migration/20260225215848_workspace"),
import("./migration/20260227213759_add_session_workspace_id"),
import("./migration/20260228203230_blue_harpoon"),
import("./migration/20260303231226_add_workspace_fields"),
import("./migration/20260309230000_move_org_to_state"),
import("./migration/20260312043431_session_message_cursor"),
import("./migration/20260323234822_events"),
import("./migration/20260410174513_workspace-name"),
import("./migration/20260413175956_chief_energizer"),
import("./migration/20260423070820_add_icon_url_override"),
import("./migration/20260427172553_slow_nightmare"),
import("./migration/20260428004200_add_session_path"),
import("./migration/20260501142318_next_venus"),
import("./migration/20260504145000_add_sync_owner"),
import("./migration/20260507164347_add_workspace_time"),
import("./migration/20260510033149_session_usage"),
import("./migration/20260511000411_data_migration_state"),
import("./migration/20260530232709_lovely_romulus"),
])).map((module) => module.default) satisfies DatabaseMigration.Migration[]

View File

@ -0,0 +1,58 @@
export * as DatabaseMigration from "./migration"
import { sql } from "drizzle-orm"
import { Effect } from "effect"
import type { EffectDrizzleSqlite } from "@opencode-ai/effect-drizzle-sqlite"
import { migrations } from "./migration.gen"
type Database = EffectDrizzleSqlite.EffectSQLiteDatabase
type Transaction = Parameters<Parameters<Database["transaction"]>[0]>[0]
export type Migration = {
id: string
up: (tx: Transaction) => Effect.Effect<void, unknown>
}
export function apply(db: Database) {
return applyOnly(db, migrations)
}
export function applyOnly(db: Database, input: Migration[]) {
return Effect.gen(function* () {
yield* db.run(
sql`CREATE TABLE IF NOT EXISTS ${sql.identifier("migration")} (id TEXT PRIMARY KEY, time_completed INTEGER NOT NULL)`,
)
let completed = new Set(
(yield* db.all<{ id: string }>(sql`SELECT id FROM ${sql.identifier("migration")}`)).map((row) => row.id),
)
if (completed.size === 0) {
// Existing installs used Drizzle's migration journal. Seed the new
// journal once so TypeScript migrations don't replay old SQL.
if (
yield* db.get(sql`SELECT name FROM sqlite_master WHERE type = 'table' AND name = ${"__drizzle_migrations"}`)
) {
yield* db.run(sql`
INSERT OR IGNORE INTO ${sql.identifier("migration")} (id, time_completed)
SELECT name, ${Date.now()}
FROM ${sql.identifier("__drizzle_migrations")}
WHERE name IS NOT NULL
`)
completed = new Set(
(yield* db.all<{ id: string }>(sql`SELECT id FROM ${sql.identifier("migration")}`)).map((row) => row.id),
)
}
}
for (const migration of input) {
if (completed.has(migration.id)) continue
yield* db.transaction((tx) =>
Effect.gen(function* () {
yield* migration.up(tx)
yield* tx.run(
sql`INSERT INTO ${sql.identifier("migration")} (id, time_completed) VALUES (${migration.id}, ${Date.now()})`,
)
}),
)
}
})
}

View File

@ -0,0 +1,107 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260127222353_familiar_lady_ursula",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`project\` (
\`id\` text PRIMARY KEY,
\`worktree\` text NOT NULL,
\`vcs\` text,
\`name\` text,
\`icon_url\` text,
\`icon_color\` text,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`time_initialized\` integer,
\`sandboxes\` text NOT NULL
);
`)
yield* tx.run(`
CREATE TABLE \`message\` (
\`id\` text PRIMARY KEY,
\`session_id\` text NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`data\` text NOT NULL,
CONSTRAINT \`fk_message_session_id_session_id_fk\` FOREIGN KEY (\`session_id\`) REFERENCES \`session\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`
CREATE TABLE \`part\` (
\`id\` text PRIMARY KEY,
\`message_id\` text NOT NULL,
\`session_id\` text NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`data\` text NOT NULL,
CONSTRAINT \`fk_part_message_id_message_id_fk\` FOREIGN KEY (\`message_id\`) REFERENCES \`message\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`
CREATE TABLE \`permission\` (
\`project_id\` text PRIMARY KEY,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`data\` text NOT NULL,
CONSTRAINT \`fk_permission_project_id_project_id_fk\` FOREIGN KEY (\`project_id\`) REFERENCES \`project\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`
CREATE TABLE \`session\` (
\`id\` text PRIMARY KEY,
\`project_id\` text NOT NULL,
\`parent_id\` text,
\`slug\` text NOT NULL,
\`directory\` text NOT NULL,
\`title\` text NOT NULL,
\`version\` text NOT NULL,
\`share_url\` text,
\`summary_additions\` integer,
\`summary_deletions\` integer,
\`summary_files\` integer,
\`summary_diffs\` text,
\`revert\` text,
\`permission\` text,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`time_compacting\` integer,
\`time_archived\` integer,
CONSTRAINT \`fk_session_project_id_project_id_fk\` FOREIGN KEY (\`project_id\`) REFERENCES \`project\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`
CREATE TABLE \`todo\` (
\`session_id\` text NOT NULL,
\`content\` text NOT NULL,
\`status\` text NOT NULL,
\`priority\` text NOT NULL,
\`position\` integer NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
CONSTRAINT \`todo_pk\` PRIMARY KEY(\`session_id\`, \`position\`),
CONSTRAINT \`fk_todo_session_id_session_id_fk\` FOREIGN KEY (\`session_id\`) REFERENCES \`session\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`
CREATE TABLE \`session_share\` (
\`session_id\` text PRIMARY KEY,
\`id\` text NOT NULL,
\`secret\` text NOT NULL,
\`url\` text NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
CONSTRAINT \`fk_session_share_session_id_session_id_fk\` FOREIGN KEY (\`session_id\`) REFERENCES \`session\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`CREATE INDEX \`message_session_idx\` ON \`message\` (\`session_id\`);`)
yield* tx.run(`CREATE INDEX \`part_message_idx\` ON \`part\` (\`message_id\`);`)
yield* tx.run(`CREATE INDEX \`part_session_idx\` ON \`part\` (\`session_id\`);`)
yield* tx.run(`CREATE INDEX \`session_project_idx\` ON \`session\` (\`project_id\`);`)
yield* tx.run(`CREATE INDEX \`session_parent_idx\` ON \`session\` (\`parent_id\`);`)
yield* tx.run(`CREATE INDEX \`todo_session_idx\` ON \`todo\` (\`session_id\`);`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,11 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260211171708_add_project_commands",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`project\` ADD \`commands\` text;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,23 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260213144116_wakeful_the_professor",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`control_account\` (
\`email\` text NOT NULL,
\`url\` text NOT NULL,
\`access_token\` text NOT NULL,
\`refresh_token\` text NOT NULL,
\`token_expiry\` integer,
\`active\` integer NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
CONSTRAINT \`control_account_pk\` PRIMARY KEY(\`email\`, \`url\`)
);
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,19 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260225215848_workspace",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`workspace\` (
\`id\` text PRIMARY KEY,
\`branch\` text,
\`project_id\` text NOT NULL,
\`config\` text NOT NULL,
CONSTRAINT \`fk_workspace_project_id_project_id_fk\` FOREIGN KEY (\`project_id\`) REFERENCES \`project\`(\`id\`) ON DELETE CASCADE
);
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,12 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260227213759_add_session_workspace_id",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session\` ADD \`workspace_id\` text;`)
yield* tx.run(`CREATE INDEX \`session_workspace_idx\` ON \`session\` (\`workspace_id\`);`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,30 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260228203230_blue_harpoon",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`account\` (
\`id\` text PRIMARY KEY,
\`email\` text NOT NULL,
\`url\` text NOT NULL,
\`access_token\` text NOT NULL,
\`refresh_token\` text NOT NULL,
\`token_expiry\` integer,
\`selected_org_id\` text,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL
);
`)
yield* tx.run(`
CREATE TABLE \`account_state\` (
\`id\` integer PRIMARY KEY NOT NULL,
\`active_account_id\` text,
FOREIGN KEY (\`active_account_id\`) REFERENCES \`account\`(\`id\`) ON UPDATE no action ON DELETE set null
);
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,15 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260303231226_add_workspace_fields",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`workspace\` ADD \`type\` text NOT NULL;`)
yield* tx.run(`ALTER TABLE \`workspace\` ADD \`name\` text;`)
yield* tx.run(`ALTER TABLE \`workspace\` ADD \`directory\` text;`)
yield* tx.run(`ALTER TABLE \`workspace\` ADD \`extra\` text;`)
yield* tx.run(`ALTER TABLE \`workspace\` DROP COLUMN \`config\`;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,13 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260309230000_move_org_to_state",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`account_state\` ADD \`active_org_id\` text;`)
yield* tx.run(`UPDATE \`account_state\` SET \`active_org_id\` = (SELECT \`selected_org_id\` FROM \`account\` WHERE \`account\`.\`id\` = \`account_state\`.\`active_account_id\`);`)
yield* tx.run(`ALTER TABLE \`account\` DROP COLUMN \`selected_org_id\`;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,14 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260312043431_session_message_cursor",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`DROP INDEX IF EXISTS \`message_session_idx\`;`)
yield* tx.run(`DROP INDEX IF EXISTS \`part_message_idx\`;`)
yield* tx.run(`CREATE INDEX \`message_session_time_created_id_idx\` ON \`message\` (\`session_id\`,\`time_created\`,\`id\`);`)
yield* tx.run(`CREATE INDEX \`part_message_id_id_idx\` ON \`part\` (\`message_id\`,\`id\`);`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,26 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260323234822_events",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`event_sequence\` (
\`aggregate_id\` text PRIMARY KEY,
\`seq\` integer NOT NULL
);
`)
yield* tx.run(`
CREATE TABLE \`event\` (
\`id\` text PRIMARY KEY,
\`aggregate_id\` text NOT NULL,
\`seq\` integer NOT NULL,
\`type\` text NOT NULL,
\`data\` text NOT NULL,
CONSTRAINT \`fk_event_aggregate_id_event_sequence_aggregate_id_fk\` FOREIGN KEY (\`aggregate_id\`) REFERENCES \`event_sequence\`(\`aggregate_id\`) ON DELETE CASCADE
);
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,27 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260410174513_workspace-name",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`PRAGMA foreign_keys=OFF;`)
yield* tx.run(`
CREATE TABLE \`__new_workspace\` (
\`id\` text PRIMARY KEY,
\`type\` text NOT NULL,
\`name\` text DEFAULT '' NOT NULL,
\`branch\` text,
\`directory\` text,
\`extra\` text,
\`project_id\` text NOT NULL,
CONSTRAINT \`fk_workspace_project_id_project_id_fk\` FOREIGN KEY (\`project_id\`) REFERENCES \`project\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`INSERT INTO \`__new_workspace\`(\`id\`, \`type\`, \`branch\`, \`name\`, \`directory\`, \`extra\`, \`project_id\`) SELECT \`id\`, \`type\`, \`branch\`, \`name\`, \`directory\`, \`extra\`, \`project_id\` FROM \`workspace\`;`)
yield* tx.run(`DROP TABLE \`workspace\`;`)
yield* tx.run(`ALTER TABLE \`__new_workspace\` RENAME TO \`workspace\`;`)
yield* tx.run(`PRAGMA foreign_keys=ON;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,24 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260413175956_chief_energizer",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`session_entry\` (
\`id\` text PRIMARY KEY,
\`session_id\` text NOT NULL,
\`type\` text NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`data\` text NOT NULL,
CONSTRAINT \`fk_session_entry_session_id_session_id_fk\` FOREIGN KEY (\`session_id\`) REFERENCES \`session\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`CREATE INDEX \`session_entry_session_idx\` ON \`session_entry\` (\`session_id\`);`)
yield* tx.run(`CREATE INDEX \`session_entry_session_type_idx\` ON \`session_entry\` (\`session_id\`,\`type\`);`)
yield* tx.run(`CREATE INDEX \`session_entry_time_created_idx\` ON \`session_entry\` (\`time_created\`);`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,14 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260423070820_add_icon_url_override",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
ALTER TABLE \`project\` ADD \`icon_url_override\` text;
UPDATE \`project\` SET \`icon_url_override\` = \`icon_url\` WHERE \`icon_url\` IS NOT NULL;
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,28 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260427172553_slow_nightmare",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`session_message\` (
\`id\` text PRIMARY KEY,
\`session_id\` text NOT NULL,
\`type\` text NOT NULL,
\`time_created\` integer NOT NULL,
\`time_updated\` integer NOT NULL,
\`data\` text NOT NULL,
CONSTRAINT \`fk_session_message_session_id_session_id_fk\` FOREIGN KEY (\`session_id\`) REFERENCES \`session\`(\`id\`) ON DELETE CASCADE
);
`)
yield* tx.run(`DROP INDEX IF EXISTS \`session_entry_session_idx\`;`)
yield* tx.run(`DROP INDEX IF EXISTS \`session_entry_session_type_idx\`;`)
yield* tx.run(`DROP INDEX IF EXISTS \`session_entry_time_created_idx\`;`)
yield* tx.run(`CREATE INDEX \`session_message_session_idx\` ON \`session_message\` (\`session_id\`);`)
yield* tx.run(`CREATE INDEX \`session_message_session_type_idx\` ON \`session_message\` (\`session_id\`,\`type\`);`)
yield* tx.run(`CREATE INDEX \`session_message_time_created_idx\` ON \`session_message\` (\`time_created\`);`)
yield* tx.run(`DROP TABLE \`session_entry\`;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,11 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260428004200_add_session_path",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session\` ADD \`path\` text;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,12 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260501142318_next_venus",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session\` ADD \`agent\` text;`)
yield* tx.run(`ALTER TABLE \`session\` ADD \`model\` text;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,11 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260504145000_add_sync_owner",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`event_sequence\` ADD \`owner_id\` text;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,11 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260507164347_add_workspace_time",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`workspace\` ADD \`time_used\` integer NOT NULL DEFAULT 0;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,56 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260510033149_session_usage",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session\` ADD \`cost\` real DEFAULT 0 NOT NULL;`)
yield* tx.run(`ALTER TABLE \`session\` ADD \`tokens_input\` integer DEFAULT 0 NOT NULL;`)
yield* tx.run(`ALTER TABLE \`session\` ADD \`tokens_output\` integer DEFAULT 0 NOT NULL;`)
yield* tx.run(`ALTER TABLE \`session\` ADD \`tokens_reasoning\` integer DEFAULT 0 NOT NULL;`)
yield* tx.run(`ALTER TABLE \`session\` ADD \`tokens_cache_read\` integer DEFAULT 0 NOT NULL;`)
yield* tx.run(`ALTER TABLE \`session\` ADD \`tokens_cache_write\` integer DEFAULT 0 NOT NULL;`)
yield* tx.run(`
UPDATE session
SET
cost = coalesce((
SELECT sum(coalesce(json_extract(message.data, '$.cost'), 0))
FROM message
WHERE message.session_id = session.id
AND json_extract(message.data, '$.role') = 'assistant'
), 0),
tokens_input = coalesce((
SELECT sum(coalesce(json_extract(message.data, '$.tokens.input'), 0))
FROM message
WHERE message.session_id = session.id
AND json_extract(message.data, '$.role') = 'assistant'
), 0),
tokens_output = coalesce((
SELECT sum(coalesce(json_extract(message.data, '$.tokens.output'), 0))
FROM message
WHERE message.session_id = session.id
AND json_extract(message.data, '$.role') = 'assistant'
), 0),
tokens_reasoning = coalesce((
SELECT sum(coalesce(json_extract(message.data, '$.tokens.reasoning'), 0))
FROM message
WHERE message.session_id = session.id
AND json_extract(message.data, '$.role') = 'assistant'
), 0),
tokens_cache_read = coalesce((
SELECT sum(coalesce(json_extract(message.data, '$.tokens.cache.read'), 0))
FROM message
WHERE message.session_id = session.id
AND json_extract(message.data, '$.role') = 'assistant'
), 0),
tokens_cache_write = coalesce((
SELECT sum(coalesce(json_extract(message.data, '$.tokens.cache.write'), 0))
FROM message
WHERE message.session_id = session.id
AND json_extract(message.data, '$.role') = 'assistant'
), 0)
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,16 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260511000411_data_migration_state",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TABLE \`data_migration\` (
\`name\` text PRIMARY KEY,
\`time_completed\` integer NOT NULL
);
`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,11 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"
export default {
id: "20260530232709_lovely_romulus",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session\` ADD \`metadata\` text;`)
})
},
} satisfies DatabaseMigration.Migration

View File

@ -0,0 +1,177 @@
import { Database } from "bun:sqlite"
import { drizzle } from "drizzle-orm/bun-sqlite"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import { identity } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Scope from "effect/Scope"
import * as Semaphore from "effect/Semaphore"
import * as Stream from "effect/Stream"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
import * as Client from "effect/unstable/sql/SqlClient"
import type { Connection } from "effect/unstable/sql/SqlConnection"
import { classifySqliteError, SqlError } from "effect/unstable/sql/SqlError"
import * as Statement from "effect/unstable/sql/Statement"
import { Sqlite } from "./sqlite"
const ATTR_DB_SYSTEM_NAME = "db.system.name"
const TypeId = "~@opencode-ai/core/database/SqliteBun" as const
type TypeId = typeof TypeId
interface SqliteClient extends Client.SqlClient {
readonly [TypeId]: TypeId
readonly config: Config
readonly export: Effect.Effect<Uint8Array, SqlError>
readonly loadExtension: (path: string) => Effect.Effect<void, SqlError>
readonly updateValues: never
}
interface Config {
readonly filename: string
readonly readonly?: boolean
readonly create?: boolean
readonly readwrite?: boolean
readonly disableWAL?: boolean
readonly spanAttributes?: Record<string, unknown>
readonly transformResultNames?: (str: string) => string
readonly transformQueryNames?: (str: string) => string
}
interface SqliteConnection extends Connection {
readonly export: Effect.Effect<Uint8Array, SqlError>
readonly loadExtension: (path: string) => Effect.Effect<void, SqlError>
}
const make = (options: Config) =>
Effect.gen(function* () {
const native = (yield* Sqlite.Native) as Database
const compiler = Statement.makeCompilerSqlite(options.transformQueryNames)
const transformRows = options.transformResultNames ? Statement.defaultTransforms(options.transformResultNames).array : undefined
const run = (query: string, params: ReadonlyArray<unknown> = []) =>
Effect.withFiber<Array<Record<string, unknown>>, SqlError>((fiber) => {
const statement = native.query(query)
// @ts-ignore bun-types missing safeIntegers method, fixed in https://github.com/oven-sh/bun/pull/26627
statement.safeIntegers(Context.get(fiber.context, Client.SafeIntegers))
try {
return Effect.succeed((statement.all(...(params as any)) ?? []) as Array<Record<string, unknown>>)
} catch (cause) {
return Effect.fail(
new SqlError({
reason: classifySqliteError(cause, { message: "Failed to execute statement", operation: "execute" }),
}),
)
}
})
const runValues = (query: string, params: ReadonlyArray<unknown> = []) =>
Effect.withFiber<Array<unknown[]>, SqlError>((fiber) => {
const statement = native.query(query)
// @ts-ignore bun-types missing safeIntegers method, fixed in https://github.com/oven-sh/bun/pull/26627
statement.safeIntegers(Context.get(fiber.context, Client.SafeIntegers))
try {
return Effect.succeed((statement.values(...(params as any)) ?? []) as Array<unknown[]>)
} catch (cause) {
return Effect.fail(
new SqlError({
reason: classifySqliteError(cause, { message: "Failed to execute statement", operation: "execute" }),
}),
)
}
})
const connection = identity<SqliteConnection>({
execute(query, params, transformRows) {
return transformRows ? Effect.map(run(query, params), transformRows) : run(query, params)
},
executeRaw(query, params) {
return run(query, params)
},
executeValues(query, params) {
return runValues(query, params)
},
executeUnprepared(query, params, transformRows) {
return this.execute(query, params, transformRows)
},
executeStream() {
return Stream.die("executeStream not implemented")
},
export: Effect.try({
try: () => native.serialize(),
catch: (cause) =>
new SqlError({ reason: classifySqliteError(cause, { message: "Failed to export database", operation: "export" }) }),
}),
loadExtension: (path) =>
Effect.try({
try: () => native.loadExtension(path),
catch: (cause) =>
new SqlError({
reason: classifySqliteError(cause, { message: "Failed to load extension", operation: "loadExtension" }),
}),
}),
})
const semaphore = yield* Semaphore.make(1)
const acquirer = semaphore.withPermits(1)(Effect.succeed(connection))
const transactionAcquirer = Effect.uninterruptibleMask((restore) => {
const fiber = Fiber.getCurrent()!
const scope = Context.getUnsafe(fiber.context, Scope.Scope)
return Effect.as(Effect.tap(restore(semaphore.take(1)), () => Scope.addFinalizer(scope, semaphore.release(1))), connection)
})
const client = Object.assign(
(yield* Client.make({
acquirer,
compiler,
transactionAcquirer,
spanAttributes: [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[ATTR_DB_SYSTEM_NAME, "sqlite"],
],
transformRows,
})) as SqliteClient,
{
[TypeId]: TypeId,
config: options,
export: Effect.flatMap(acquirer, (_) => _.export),
loadExtension: (path: string) => Effect.flatMap(acquirer, (_) => _.loadExtension(path)),
},
)
return client
})
const nativeLayer = (config: Config) =>
Layer.effect(
Sqlite.Native,
Effect.gen(function* () {
const native = new Database(config.filename, {
readonly: config.readonly,
readwrite: config.readwrite ?? true,
create: config.create ?? true,
})
yield* Effect.addFinalizer(() => Effect.sync(() => native.close()))
if (config.disableWAL !== true) native.run("PRAGMA journal_mode = WAL;")
return native
}),
)
const sqliteLayer = (config: Config) => Layer.effect(Client.SqlClient, make(config))
const drizzleLayer = Layer.effect(
Sqlite.Drizzle,
Effect.gen(function* () {
return drizzle({ client: (yield* Sqlite.Native) as Database })
}),
)
export const layer = (config: Config) =>
Layer.merge(
nativeLayer(config),
Layer.merge(sqliteLayer(config), drizzleLayer).pipe(Layer.provide(nativeLayer(config))),
).pipe(
Layer.provide(Reactivity.layer),
)

View File

@ -0,0 +1,172 @@
import { DatabaseSync, type SQLInputValue } from "node:sqlite"
import { drizzle } from "drizzle-orm/node-sqlite"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import { identity } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Scope from "effect/Scope"
import * as Semaphore from "effect/Semaphore"
import * as Stream from "effect/Stream"
import * as Reactivity from "effect/unstable/reactivity/Reactivity"
import * as Client from "effect/unstable/sql/SqlClient"
import type { Connection } from "effect/unstable/sql/SqlConnection"
import { classifySqliteError, SqlError } from "effect/unstable/sql/SqlError"
import * as Statement from "effect/unstable/sql/Statement"
import { Sqlite } from "./sqlite"
const ATTR_DB_SYSTEM_NAME = "db.system.name"
const TypeId = "~@opencode-ai/core/database/SqliteNode" as const
type TypeId = typeof TypeId
interface SqliteClient extends Client.SqlClient {
readonly [TypeId]: TypeId
readonly config: Config
readonly loadExtension: (path: string) => Effect.Effect<void, SqlError>
readonly updateValues: never
}
interface Config {
readonly filename: string
readonly readonly?: boolean
readonly create?: boolean
readonly readwrite?: boolean
readonly disableWAL?: boolean
readonly timeout?: number
readonly allowExtension?: boolean
readonly spanAttributes?: Record<string, unknown>
readonly transformResultNames?: (str: string) => string
readonly transformQueryNames?: (str: string) => string
}
interface SqliteConnection extends Connection {
readonly loadExtension: (path: string) => Effect.Effect<void, SqlError>
}
const make = (options: Config) =>
Effect.gen(function* () {
const native = (yield* Sqlite.Native) as DatabaseSync
const compiler = Statement.makeCompilerSqlite(options.transformQueryNames)
const transformRows = options.transformResultNames ? Statement.defaultTransforms(options.transformResultNames).array : undefined
const run = (query: string, params: ReadonlyArray<unknown> = []) =>
Effect.withFiber<Array<Record<string, unknown>>, SqlError>((fiber) => {
const statement = native.prepare(query)
statement.setReadBigInts(Context.get(fiber.context, Client.SafeIntegers))
try {
return Effect.succeed(statement.all(...(params as SQLInputValue[])) as Array<Record<string, unknown>>)
} catch (cause) {
return Effect.fail(
new SqlError({
reason: classifySqliteError(cause, { message: "Failed to execute statement", operation: "execute" }),
}),
)
}
})
const runValues = (query: string, params: ReadonlyArray<unknown> = []) =>
Effect.withFiber<ReadonlyArray<ReadonlyArray<unknown>>, SqlError>((fiber) => {
const statement = native.prepare(query)
statement.setReadBigInts(Context.get(fiber.context, Client.SafeIntegers))
statement.setReturnArrays(true)
try {
return Effect.succeed(statement.all(...(params as SQLInputValue[])) as unknown as ReadonlyArray<ReadonlyArray<unknown>>)
} catch (cause) {
return Effect.fail(
new SqlError({
reason: classifySqliteError(cause, { message: "Failed to execute statement", operation: "execute" }),
}),
)
}
})
const connection = identity<SqliteConnection>({
execute(query, params, transformRows) {
return transformRows ? Effect.map(run(query, params), transformRows) : run(query, params)
},
executeRaw(query, params) {
return run(query, params)
},
executeValues(query, params) {
return runValues(query, params)
},
executeUnprepared(query, params, transformRows) {
return this.execute(query, params, transformRows)
},
executeStream() {
return Stream.die("executeStream not implemented")
},
loadExtension: (path) =>
Effect.try({
try: () => native.loadExtension(path),
catch: (cause) =>
new SqlError({
reason: classifySqliteError(cause, { message: "Failed to load extension", operation: "loadExtension" }),
}),
}),
})
const semaphore = yield* Semaphore.make(1)
const acquirer = semaphore.withPermits(1)(Effect.succeed(connection))
const transactionAcquirer = Effect.uninterruptibleMask((restore) => {
const fiber = Fiber.getCurrent()!
const scope = Context.getUnsafe(fiber.context, Scope.Scope)
return Effect.as(Effect.tap(restore(semaphore.take(1)), () => Scope.addFinalizer(scope, semaphore.release(1))), connection)
})
const client = Object.assign(
(yield* Client.make({
acquirer,
compiler,
transactionAcquirer,
spanAttributes: [
...(options.spanAttributes ? Object.entries(options.spanAttributes) : []),
[ATTR_DB_SYSTEM_NAME, "sqlite"],
],
transformRows,
})) as SqliteClient,
{
[TypeId]: TypeId,
config: options,
loadExtension: (path: string) => Effect.flatMap(acquirer, (_) => _.loadExtension(path)),
},
)
return client
})
const nativeLayer = (config: Config) =>
Layer.effect(
Sqlite.Native,
Effect.gen(function* () {
const native = new DatabaseSync(config.filename, {
readOnly: config.readonly,
timeout: config.timeout,
allowExtension: config.allowExtension,
enableForeignKeyConstraints: true,
open: true,
})
yield* Effect.addFinalizer(() => Effect.sync(() => native.close()))
if (config.disableWAL !== true && config.readonly !== true) native.exec("PRAGMA journal_mode = WAL;")
return native
}),
)
const sqliteLayer = (config: Config) => Layer.effect(Client.SqlClient, make(config))
const drizzleLayer = Layer.effect(
Sqlite.Drizzle,
Effect.gen(function* () {
return drizzle({ client: (yield* Sqlite.Native) as DatabaseSync }) as unknown as Sqlite.DrizzleClient
}),
)
export const layer = (config: Config) =>
Layer.merge(
nativeLayer(config),
Layer.merge(sqliteLayer(config), drizzleLayer).pipe(Layer.provide(nativeLayer(config))),
).pipe(
Layer.provide(Reactivity.layer),
)

View File

@ -0,0 +1,8 @@
export * as Sqlite from "./sqlite"
import { Context } from "effect"
import type { drizzle } from "drizzle-orm/bun-sqlite"
export type DrizzleClient = ReturnType<typeof drizzle>
export class Native extends Context.Service<Native, unknown>()("@opencode-ai/core/database/SqliteNative") {}
export class Drizzle extends Context.Service<Drizzle, DrizzleClient>()("@opencode-ai/core/database/SqliteDrizzle") {}

View File

@ -1,6 +1,9 @@
export * as EventV2 from "./event"
import { Context, Effect, Layer, Option, PubSub, Schema, Stream } from "effect"
import { eq } from "drizzle-orm"
import { Database } from "./database/database"
import { EventSequenceTable, EventTable } from "./event/sql"
import { Location } from "./location"
import { withStatics } from "./schema"
import { Identifier } from "./util/identifier"
@ -13,8 +16,10 @@ export type ID = typeof ID.Type
export type Definition<Type extends string = string, DataSchema extends Schema.Top = Schema.Top> = {
readonly type: Type
readonly version?: number
readonly aggregate?: string
readonly sync?: {
readonly version: number
readonly aggregate: string
}
readonly data: DataSchema
}
@ -29,14 +34,41 @@ export type Payload<D extends Definition = Definition> = {
readonly metadata?: Record<string, unknown>
}
export type Projector<D extends Definition = Definition> = (event: Payload<D>) => Effect.Effect<void>
type AnyProjector = (event: Payload) => Effect.Effect<void>
export type Listener = (event: Payload) => Effect.Effect<void>
export type Sync = (event: Payload) => Effect.Effect<void>
export type Unsubscribe = Effect.Effect<void>
export type SerializedEvent = {
readonly id: ID
readonly type: string
readonly seq: number
readonly aggregateID: string
readonly data: Record<string, unknown>
}
export class InvalidSyncEventError extends Schema.TaggedErrorClass<InvalidSyncEventError>()(
"EventV2.InvalidSyncEvent",
{
type: Schema.String,
message: Schema.String,
},
) {}
export function versionedType(type: string, version: number) {
return `${type}.${version}`
}
export const registry = new Map<string, Definition>()
const syncRegistry = new Map<string, Definition & { readonly sync: NonNullable<Definition["sync"]> }>()
export function define<const Type extends string, Fields extends Schema.Struct.Fields>(input: {
readonly type: Type
readonly version?: number
readonly aggregate?: string
readonly sync?: {
readonly version: number
readonly aggregate: string
}
readonly schema: Fields
}): Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> & Definition<Type, Schema.Struct<Fields>> {
const Data = Schema.Struct(input.schema)
@ -51,11 +83,18 @@ export function define<const Type extends string, Fields extends Schema.Struct.F
const definition = Object.assign(Payload, {
type: input.type,
...(input.version === undefined ? {} : { version: input.version }),
...(input.aggregate === undefined ? {} : { aggregate: input.aggregate }),
...(input.sync === undefined ? {} : { sync: input.sync }),
data: Data,
})
registry.set(input.type, definition)
const existing = registry.get(input.type)
if (input.sync === undefined || existing?.sync === undefined || input.sync.version >= existing.sync.version) {
registry.set(input.type, definition)
}
if (input.sync)
syncRegistry.set(
versionedType(input.type, input.sync.version),
definition as Definition & { readonly sync: NonNullable<Definition["sync"]> },
)
return definition as Schema.Schema<Payload<Definition<Type, Schema.Struct<Fields>>>> &
Definition<Type, Schema.Struct<Fields>>
}
@ -67,20 +106,30 @@ export function definitions() {
export interface PublishOptions {
readonly id?: ID
readonly metadata?: Record<string, unknown>
readonly location?: Location.Ref
}
export type Unsubscribe = Effect.Effect<void>
export interface Interface {
readonly publish: <D extends Definition>(
definition: D,
data: Data<D>,
options?: PublishOptions,
) => Effect.Effect<Payload<D>>
readonly publishEvent: <D extends Definition>(event: Payload<D>) => Effect.Effect<Payload<D>>
readonly subscribe: <D extends Definition>(definition: D) => Stream.Stream<Payload<D>>
readonly all: () => Stream.Stream<Payload>
readonly sync: (handler: Sync) => Effect.Effect<Unsubscribe>
readonly listen: (listener: Listener) => Effect.Effect<Unsubscribe>
readonly project: <D extends Definition>(definition: D, projector: Projector<D>) => Effect.Effect<void>
readonly replay: (
event: SerializedEvent,
options?: { readonly publish?: boolean; readonly ownerID?: string },
) => Effect.Effect<void>
readonly replayAll: (
events: SerializedEvent[],
options?: { readonly publish?: boolean; readonly ownerID?: string },
) => Effect.Effect<string | undefined>
readonly remove: (aggregateID: string) => Effect.Effect<void>
readonly claim: (aggregateID: string, ownerID: string) => Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/Event") {}
@ -90,7 +139,10 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const all = yield* PubSub.unbounded<Payload>()
const typed = new Map<string, PubSub.PubSub<Payload>>()
const projectors = new Map<string, AnyProjector[]>()
const listeners = new Array<Listener>()
const syncHandlers = new Array<Sync>()
const { db } = yield* Database.Service
const getOrCreate = (definition: Definition) =>
Effect.gen(function* () {
@ -108,11 +160,97 @@ export const layer = Layer.effect(
}),
)
function commitSyncEvent(
event: Payload,
input?: { readonly seq: number; readonly aggregateID: string; readonly ownerID?: string },
) {
return Effect.gen(function* () {
const definition = registry.get(event.type)
const sync = definition?.sync
if (sync) {
if (event.version !== sync.version) {
yield* Effect.die(
new InvalidSyncEventError({
type: event.type,
message: `Expected event version ${sync.version}, got ${event.version}`,
}),
)
}
const aggregateID = (event.data as Record<string, unknown>)[sync.aggregate]
if (typeof aggregateID !== "string") {
yield* Effect.die(
new InvalidSyncEventError({
type: event.type,
message: `Expected string aggregate field ${sync.aggregate}`,
}),
)
} else {
const list = projectors.get(event.type) ?? []
yield* db
.transaction(
() =>
Effect.gen(function* () {
const row = yield* db
.select({ seq: EventSequenceTable.seq, ownerID: EventSequenceTable.owner_id })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
.get()
.pipe(Effect.orDie)
const latest = row?.seq ?? -1
if (input && input.seq <= latest) return
if (input && row?.ownerID && row.ownerID !== input.ownerID) return
const seq = input?.seq ?? latest + 1
if (input && seq !== latest + 1) {
yield* Effect.die(
new InvalidSyncEventError({
type: event.type,
message: `Sequence mismatch for aggregate ${aggregateID}: expected ${latest + 1}, got ${seq}`,
}),
)
}
for (const projector of list) {
yield* projector(event as Payload)
}
yield* db
.insert(EventSequenceTable)
.values([{ aggregate_id: aggregateID, seq, owner_id: input?.ownerID }])
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq },
})
.run()
.pipe(Effect.orDie)
yield* db
.insert(EventTable)
.values([
{
id: event.id,
aggregate_id: aggregateID,
seq,
type: versionedType(definition.type, sync.version),
data: event.data as Record<string, unknown>,
},
])
.run()
.pipe(Effect.orDie)
}),
{ behavior: "immediate" },
)
.pipe(Effect.orDie)
}
}
})
}
function publishEvent<D extends Definition>(event: Payload<D>) {
return Effect.gen(function* () {
for (const sync of syncHandlers) {
yield* sync(event as Payload)
}
yield* commitSyncEvent(event as Payload)
for (const listener of listeners) {
yield* listener(event as Payload)
}
const pubsub = typed.get(event.type)
if (pubsub) yield* PubSub.publish(pubsub, event as Payload)
yield* PubSub.publish(all, event as Payload)
@ -122,25 +260,116 @@ export const layer = Layer.effect(
function publish<D extends Definition>(definition: D, data: Data<D>, options?: PublishOptions) {
return Effect.gen(function* () {
const location = Option.getOrUndefined(yield* Effect.serviceOption(Location.Service))
const event = {
const serviceLocation = Option.getOrUndefined(yield* Effect.serviceOption(Location.Service))
const location = options?.location ??
(serviceLocation
? { directory: serviceLocation.directory, workspaceID: serviceLocation.workspaceID }
: undefined)
return yield* publishEvent({
id: options?.id ?? ID.create(),
...(options?.metadata ? { metadata: options.metadata } : {}),
type: definition.type,
...(definition.version === undefined ? {} : { version: definition.version }),
...(location ? { location: { directory: location.directory, workspaceID: location.workspaceID } } : {}),
...(definition.sync === undefined ? {} : { version: definition.sync.version }),
...(location ? { location } : {}),
data,
} as Payload<D>
return yield* publishEvent(event)
} as Payload<D>)
})
}
function replay(event: SerializedEvent, options?: { readonly publish?: boolean; readonly ownerID?: string }) {
return Effect.gen(function* () {
const definition = syncRegistry.get(event.type)
if (!definition) {
yield* Effect.die(
new InvalidSyncEventError({ type: event.type, message: `Unknown sync event type ${event.type}` }),
)
} else {
const payload = {
id: event.id,
type: definition.type,
version: definition.sync.version,
data: event.data,
} as Payload
yield* commitSyncEvent(payload, { seq: event.seq, aggregateID: event.aggregateID, ownerID: options?.ownerID })
if (options?.publish) {
for (const listener of listeners) {
yield* listener(payload)
}
const pubsub = typed.get(payload.type)
if (pubsub) yield* PubSub.publish(pubsub, payload)
yield* PubSub.publish(all, payload)
}
}
})
}
function replayAll(events: SerializedEvent[], options?: { readonly publish?: boolean; readonly ownerID?: string }) {
return Effect.gen(function* () {
const source = events[0]?.aggregateID
if (!source) return undefined
if (events.some((event) => event.aggregateID !== source)) {
yield* Effect.die(
new InvalidSyncEventError({
type: events[0]?.type ?? "unknown",
message: "Replay events must belong to the same aggregate",
}),
)
}
const start = events[0]?.seq ?? 0
for (const [index, event] of events.entries()) {
const seq = start + index
if (event.seq !== seq) {
yield* Effect.die(
new InvalidSyncEventError({
type: event.type,
message: `Replay sequence mismatch at index ${index}: expected ${seq}, got ${event.seq}`,
}),
)
}
}
for (const event of events) {
yield* replay(event, options)
}
return source
})
}
function remove(aggregateID: string) {
return db
.transaction(() =>
Effect.gen(function* () {
yield* db.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
yield* db.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
}),
)
.pipe(Effect.orDie)
}
function claim(aggregateID: string, ownerID: string) {
return db
.update(EventSequenceTable)
.set({ owner_id: ownerID })
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
.run()
.pipe(Effect.orDie)
}
const subscribe = <D extends Definition>(definition: D): Stream.Stream<Payload<D>> =>
Stream.unwrap(getOrCreate(definition).pipe(Effect.map((pubsub) => Stream.fromPubSub(pubsub)))).pipe(
Stream.map((event) => event as Payload<D>),
)
const streamAll = (): Stream.Stream<Payload> => Stream.fromPubSub(all)
const listen = (listener: Listener): Effect.Effect<Unsubscribe> =>
Effect.sync(() => {
listeners.push(listener)
return Effect.sync(() => {
const index = listeners.indexOf(listener)
if (index >= 0) listeners.splice(index, 1)
})
})
const sync = (handler: Sync): Effect.Effect<Unsubscribe> =>
Effect.sync(() => {
syncHandlers.push(handler)
@ -150,8 +379,15 @@ export const layer = Layer.effect(
})
})
return Service.of({ publish, publishEvent, subscribe, all: streamAll, sync })
const project = <D extends Definition>(definition: D, projector: Projector<D>): Effect.Effect<void> =>
Effect.sync(() => {
const list = projectors.get(definition.type) ?? []
list.push((event) => projector(event as Payload<D>))
projectors.set(definition.type, list)
})
return Service.of({ publish, subscribe, all: streamAll, sync, listen, project, replay, replayAll, remove, claim })
}),
)
export const defaultLayer = layer
export const defaultLayer = layer.pipe(Layer.provide(Database.defaultLayer))

View File

@ -1,4 +1,5 @@
import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"
import type { EventV2 } from "../event"
export const EventSequenceTable = sqliteTable("event_sequence", {
aggregate_id: text().notNull().primaryKey(),
@ -7,7 +8,7 @@ export const EventSequenceTable = sqliteTable("event_sequence", {
})
export const EventTable = sqliteTable("event", {
id: text().primaryKey(),
id: text().$type<EventV2.ID>().primaryKey(),
aggregate_id: text()
.notNull()
.references(() => EventSequenceTable.aggregate_id, { onDelete: "cascade" }),

View File

@ -0,0 +1,80 @@
import { randomBytes } from "crypto"
const prefixes = {
job: "job",
event: "evt",
session: "ses",
message: "msg",
permission: "per",
question: "que",
part: "prt",
pty: "pty",
tool: "tool",
workspace: "wrk",
} as const
const LENGTH = 26
// State for monotonic ID generation
let lastTimestamp = 0
let counter = 0
export function ascending(prefix: keyof typeof prefixes, given?: string) {
return generateID(prefix, "ascending", given)
}
export function descending(prefix: keyof typeof prefixes, given?: string) {
return generateID(prefix, "descending", given)
}
function generateID(prefix: keyof typeof prefixes, direction: "descending" | "ascending", given?: string): string {
if (!given) {
return create(prefixes[prefix], direction)
}
if (!given.startsWith(prefixes[prefix])) {
throw new Error(`ID ${given} does not start with ${prefixes[prefix]}`)
}
return given
}
function randomBase62(length: number): string {
const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
let result = ""
const bytes = randomBytes(length)
for (let i = 0; i < length; i++) {
result += chars[bytes[i] % 62]
}
return result
}
export function create(prefix: string, direction: "descending" | "ascending", timestamp?: number): string {
const currentTimestamp = timestamp ?? Date.now()
if (currentTimestamp !== lastTimestamp) {
lastTimestamp = currentTimestamp
counter = 0
}
counter++
let now = BigInt(currentTimestamp) * BigInt(0x1000) + BigInt(counter)
now = direction === "descending" ? ~now : now
const timeBytes = Buffer.alloc(6)
for (let i = 0; i < 6; i++) {
timeBytes[i] = Number((now >> BigInt(40 - 8 * i)) & BigInt(0xff))
}
return prefix + "_" + timeBytes.toString("hex") + randomBase62(LENGTH - 12)
}
/** Extract timestamp from an ascending ID. Does not work with descending IDs. */
export function timestamp(id: string): number {
const prefix = id.split("_")[0]
const hex = id.slice(prefix.length + 1, prefix.length + 13)
const encoded = BigInt("0x" + hex)
return Number(encoded / BigInt(0x1000))
}
export * as Identifier from "./id"

View File

@ -1,19 +1,40 @@
import { Layer, LayerMap } from "effect"
import { Location } from "./location"
import { Catalog } from "./catalog"
import { PluginBoot } from "./plugin/boot"
import { Policy } from "./policy"
import { Config } from "./config"
import { PluginV2 } from "./plugin"
import { Catalog } from "./catalog"
import { AgentV2 } from "./agent"
import { PluginBoot } from "./plugin/boot"
import { Project } from "./project"
import { EventV2 } from "./event"
import { Auth } from "./auth"
import { Npm } from "./npm"
import { ModelsDev } from "./models-dev"
import { AppFileSystem } from "./filesystem"
import { Global } from "./global"
export class LocationServiceMap extends LayerMap.Service<LocationServiceMap>()("@opencode/example/LocationServiceMap", {
lookup: (ref: Location.Ref) => {
const result = Layer.fresh(
Layer.mergeAll(Catalog.defaultLayer, PluginBoot.defaultLayer, Config.defaultLayer, AgentV2.defaultLayer).pipe(
Layer.provideMerge(Location.defaultLayer(ref)),
),
)
return result
const location = Location.layer(ref)
return Layer.mergeAll(
location,
Policy.locationLayer,
Config.locationLayer,
PluginV2.locationLayer,
Catalog.locationLayer,
AgentV2.locationLayer,
PluginBoot.locationLayer,
).pipe(Layer.provideMerge(location), Layer.fresh)
},
idleTimeToLive: "60 minutes",
dependencies: [],
dependencies: [
Project.defaultLayer,
EventV2.defaultLayer,
Auth.defaultLayer,
Npm.defaultLayer,
ModelsDev.defaultLayer,
AppFileSystem.defaultLayer,
Global.defaultLayer,
],
}) {}

View File

@ -36,5 +36,3 @@ export const layer = (ref: Ref) =>
})
}),
)
export const defaultLayer = (ref: Ref) => layer(ref).pipe(Layer.provide(Project.defaultLayer))

View File

@ -2,6 +2,17 @@ export * as PermissionV2 from "./permission"
import { Schema } from "effect"
import { Wildcard } from "./util/wildcard"
import { Identifier } from "./id/id"
import { Newtype } from "./schema"
export class PermissionID extends Newtype<PermissionID>()(
"PermissionID",
Schema.String.check(Schema.isStartsWith("per")),
) {
static ascending(id?: string): PermissionID {
return this.make(Identifier.ascending("permission", id))
}
}
export const Action = Schema.Literals(["allow", "deny", "ask"]).annotate({ identifier: "PermissionV2.Action" })
export type Action = typeof Action.Type

View File

@ -26,9 +26,9 @@ type HookSpec = {
}
"account.switched": {
input: {
serviceID: import("./account").AccountV2.ServiceID
from?: import("./account").AccountV2.ID
to?: import("./account").AccountV2.ID
serviceID: import("./auth").Auth.ServiceID
from?: import("./auth").Auth.ID
to?: import("./auth").Auth.ID
}
output: {}
}
@ -169,7 +169,7 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer.pipe(Layer.provide(EventV2.defaultLayer))
export const locationLayer = layer
// opencode
// sdcok

View File

@ -1,16 +1,18 @@
import { Effect, Scope, Stream } from "effect"
import { AccountV2 } from "../account"
import { EventV2 } from "../event"
import { PluginV2 } from "../plugin"
import { Auth } from "../auth"
// Depending on what account is active, enable matching providers for that
// service
export const AccountPlugin = PluginV2.define({
id: PluginV2.ID.make("account"),
effect: Effect.gen(function* () {
const accounts = yield* AccountV2.Service
const accounts = yield* Auth.Service
const events = yield* EventV2.Service
const scope = yield* Scope.Scope
yield* events.subscribe(AccountV2.Event.Switched).pipe(
yield* events.subscribe(Auth.Event.Switched).pipe(
Stream.runForEach((event) =>
PluginV2.Service.use((plugin) => plugin.trigger("account.switched", event.data, {})).pipe(Effect.asVoid),
),
@ -20,7 +22,7 @@ export const AccountPlugin = PluginV2.define({
return {
"catalog.transform": Effect.fn(function* (evt) {
for (const item of evt.provider.list()) {
const account = yield* accounts.active(AccountV2.ServiceID.make(item.provider.id)).pipe(Effect.orDie)
const account = yield* accounts.active(Auth.ServiceID.make(item.provider.id)).pipe(Effect.orDie)
if (!account) continue
evt.provider.update(item.provider.id, (provider) => {
provider.enabled = {

View File

@ -1,13 +1,14 @@
export * as PluginBoot from "./boot"
import { Context, Deferred, Effect, Layer } from "effect"
import { AccountV2 } from "../account"
import { Auth } from "../auth"
import { AgentV2 } from "../agent"
import { Catalog } from "../catalog"
import { Config } from "../config"
import { ConfigAgentPlugin } from "../config/plugin/agent"
import { EventV2 } from "../event"
import { Location } from "../location"
import { ModelsDev } from "../models-dev"
import { Npm } from "../npm"
import { PluginV2 } from "../plugin"
import { AccountPlugin } from "./account"
@ -21,13 +22,14 @@ type Plugin = {
id: PluginV2.ID
effect: PluginV2.Effect<
| Catalog.Service
| AccountV2.Service
| Auth.Service
| AgentV2.Service
| Npm.Service
| EventV2.Service
| Location.Service
| PluginV2.Service
| Config.Service
| ModelsDev.Service
>
}
@ -42,10 +44,11 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const catalog = yield* Catalog.Service
const plugin = yield* PluginV2.Service
const accounts = yield* AccountV2.Service
const accounts = yield* Auth.Service
const agents = yield* AgentV2.Service
const config = yield* Config.Service
const location = yield* Location.Service
const modelsDev = yield* ModelsDev.Service
const npm = yield* Npm.Service
const events = yield* EventV2.Service
const done = yield* Deferred.make<void>()
@ -55,10 +58,11 @@ export const layer = Layer.effect(
id: input.id,
effect: input.effect.pipe(
Effect.provideService(Catalog.Service, catalog),
Effect.provideService(AccountV2.Service, accounts),
Effect.provideService(Auth.Service, accounts),
Effect.provideService(AgentV2.Service, agents),
Effect.provideService(Config.Service, config),
Effect.provideService(Location.Service, location),
Effect.provideService(ModelsDev.Service, modelsDev),
Effect.provideService(Npm.Service, npm),
Effect.provideService(EventV2.Service, events),
Effect.provideService(PluginV2.Service, plugin),
@ -90,12 +94,8 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(Catalog.defaultLayer),
Layer.provide(EventV2.defaultLayer),
Layer.provide(PluginV2.defaultLayer),
Layer.provide(AccountV2.defaultLayer),
Layer.provide(AgentV2.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(Npm.defaultLayer),
export const locationLayer = layer.pipe(
Layer.provideMerge(Catalog.locationLayer),
Layer.provideMerge(Config.locationLayer),
Layer.provideMerge(AgentV2.locationLayer),
)

View File

@ -114,7 +114,7 @@ export const ModelsDevPlugin = PluginV2.define({
yield* refresh()
yield* events.subscribe(ModelsDev.Event.Refreshed).pipe(
Stream.runForEach(() => refresh()),
Effect.forkIn(scope, { startImmediately: true }),
Effect.forkScoped({ startImmediately: true }),
)
}).pipe(Effect.provide(ModelsDev.defaultLayer)),
}),
})

View File

@ -1 +1,67 @@
export { ProviderPlugins } from "./provider/index"
import { AlibabaPlugin } from "./provider/alibaba"
import { AmazonBedrockPlugin } from "./provider/amazon-bedrock"
import { AnthropicPlugin } from "./provider/anthropic"
import { AzureCognitiveServicesPlugin, AzurePlugin } from "./provider/azure"
import { CerebrasPlugin } from "./provider/cerebras"
import { CloudflareAIGatewayPlugin } from "./provider/cloudflare-ai-gateway"
import { CloudflareWorkersAIPlugin } from "./provider/cloudflare-workers-ai"
import { CoherePlugin } from "./provider/cohere"
import { DeepInfraPlugin } from "./provider/deepinfra"
import { DynamicProviderPlugin } from "./provider/dynamic"
import { GatewayPlugin } from "./provider/gateway"
import { GithubCopilotPlugin } from "./provider/github-copilot"
import { GitLabPlugin } from "./provider/gitlab"
import { GooglePlugin } from "./provider/google"
import { GoogleVertexAnthropicPlugin, GoogleVertexPlugin } from "./provider/google-vertex"
import { GroqPlugin } from "./provider/groq"
import { KiloPlugin } from "./provider/kilo"
import { LLMGatewayPlugin } from "./provider/llmgateway"
import { MistralPlugin } from "./provider/mistral"
import { NvidiaPlugin } from "./provider/nvidia"
import { OpenAIPlugin } from "./provider/openai"
import { OpenAICompatiblePlugin } from "./provider/openai-compatible"
import { OpencodePlugin } from "./provider/opencode"
import { OpenRouterPlugin } from "./provider/openrouter"
import { PerplexityPlugin } from "./provider/perplexity"
import { SapAICorePlugin } from "./provider/sap-ai-core"
import { TogetherAIPlugin } from "./provider/togetherai"
import { VercelPlugin } from "./provider/vercel"
import { VenicePlugin } from "./provider/venice"
import { XAIPlugin } from "./provider/xai"
import { ZenmuxPlugin } from "./provider/zenmux"
export const ProviderPlugins = [
AlibabaPlugin,
AmazonBedrockPlugin,
AnthropicPlugin,
AzureCognitiveServicesPlugin,
AzurePlugin,
CerebrasPlugin,
CloudflareAIGatewayPlugin,
CloudflareWorkersAIPlugin,
CoherePlugin,
DeepInfraPlugin,
GatewayPlugin,
GithubCopilotPlugin,
GitLabPlugin,
GooglePlugin,
GoogleVertexAnthropicPlugin,
GoogleVertexPlugin,
GroqPlugin,
KiloPlugin,
LLMGatewayPlugin,
MistralPlugin,
NvidiaPlugin,
OpencodePlugin,
OpenAICompatiblePlugin,
OpenAIPlugin,
OpenRouterPlugin,
PerplexityPlugin,
SapAICorePlugin,
TogetherAIPlugin,
VercelPlugin,
VenicePlugin,
XAIPlugin,
ZenmuxPlugin,
DynamicProviderPlugin,
]

View File

@ -1,67 +0,0 @@
import { AlibabaPlugin } from "./alibaba"
import { AmazonBedrockPlugin } from "./amazon-bedrock"
import { AnthropicPlugin } from "./anthropic"
import { AzureCognitiveServicesPlugin, AzurePlugin } from "./azure"
import { CerebrasPlugin } from "./cerebras"
import { CloudflareAIGatewayPlugin } from "./cloudflare-ai-gateway"
import { CloudflareWorkersAIPlugin } from "./cloudflare-workers-ai"
import { CoherePlugin } from "./cohere"
import { DeepInfraPlugin } from "./deepinfra"
import { DynamicProviderPlugin } from "./dynamic"
import { GatewayPlugin } from "./gateway"
import { GithubCopilotPlugin } from "./github-copilot"
import { GitLabPlugin } from "./gitlab"
import { GooglePlugin } from "./google"
import { GoogleVertexAnthropicPlugin, GoogleVertexPlugin } from "./google-vertex"
import { GroqPlugin } from "./groq"
import { KiloPlugin } from "./kilo"
import { LLMGatewayPlugin } from "./llmgateway"
import { MistralPlugin } from "./mistral"
import { NvidiaPlugin } from "./nvidia"
import { OpenAIPlugin } from "./openai"
import { OpenAICompatiblePlugin } from "./openai-compatible"
import { OpencodePlugin } from "./opencode"
import { OpenRouterPlugin } from "./openrouter"
import { PerplexityPlugin } from "./perplexity"
import { SapAICorePlugin } from "./sap-ai-core"
import { TogetherAIPlugin } from "./togetherai"
import { VercelPlugin } from "./vercel"
import { VenicePlugin } from "./venice"
import { XAIPlugin } from "./xai"
import { ZenmuxPlugin } from "./zenmux"
export const ProviderPlugins = [
AlibabaPlugin,
AmazonBedrockPlugin,
AnthropicPlugin,
AzureCognitiveServicesPlugin,
AzurePlugin,
CerebrasPlugin,
CloudflareAIGatewayPlugin,
CloudflareWorkersAIPlugin,
CoherePlugin,
DeepInfraPlugin,
GatewayPlugin,
GithubCopilotPlugin,
GitLabPlugin,
GooglePlugin,
GoogleVertexAnthropicPlugin,
GoogleVertexPlugin,
GroqPlugin,
KiloPlugin,
LLMGatewayPlugin,
MistralPlugin,
NvidiaPlugin,
OpencodePlugin,
OpenAICompatiblePlugin,
OpenAIPlugin,
OpenRouterPlugin,
PerplexityPlugin,
SapAICorePlugin,
TogetherAIPlugin,
VercelPlugin,
VenicePlugin,
XAIPlugin,
ZenmuxPlugin,
DynamicProviderPlugin,
]

View File

@ -41,4 +41,4 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer
export const locationLayer = layer

Some files were not shown because too many files have changed in this diff Show More