fix(effect-drizzle-sqlite): preserve transaction begin errors (#30448)
This commit is contained in:
parent
5b92b173ca
commit
4002b85707
@ -139,8 +139,8 @@ export class EffectSQLiteSession<TRelations extends AnyRelations> extends SQLite
|
||||
const id = connectionOption._tag === "Some" ? connectionOption.value[1] + 1 : 0
|
||||
|
||||
return connection.pipe(
|
||||
Effect.flatMap(([scope, connection]) =>
|
||||
this.executeTransactionStatement(
|
||||
Effect.flatMap(([scope, connection]) => {
|
||||
const transaction = this.executeTransactionStatement(
|
||||
connection,
|
||||
id === 0 ? `begin ${config?.behavior ?? "deferred"}` : `savepoint effect_sql_${id}`,
|
||||
).pipe(
|
||||
@ -148,35 +148,39 @@ export class EffectSQLiteSession<TRelations extends AnyRelations> extends SQLite
|
||||
Effect.provideContext(
|
||||
restore(effect),
|
||||
Context.add(services, this.client.transactionService, [connection, id]),
|
||||
).pipe(
|
||||
Effect.exit,
|
||||
Effect.flatMap((exit) => {
|
||||
const finalize = Exit.isSuccess(exit)
|
||||
? id === 0
|
||||
? this.executeTransactionStatement(connection, "commit").pipe(
|
||||
// SQLite keeps the transaction open after deferred constraint commit failures.
|
||||
Effect.catch((error) =>
|
||||
this.executeTransactionStatement(connection, "rollback").pipe(
|
||||
Effect.catch(() => Effect.void),
|
||||
Effect.andThen(Effect.fail(error)),
|
||||
),
|
||||
),
|
||||
)
|
||||
: this.executeTransactionStatement(connection, `release savepoint effect_sql_${id}`)
|
||||
: id === 0
|
||||
? this.executeTransactionStatement(connection, "rollback")
|
||||
: this.executeTransactionStatement(connection, `rollback to savepoint effect_sql_${id}`).pipe(
|
||||
Effect.andThen(
|
||||
this.executeTransactionStatement(connection, `release savepoint effect_sql_${id}`),
|
||||
),
|
||||
)
|
||||
|
||||
return finalize.pipe(Effect.flatMap(() => exit))
|
||||
}),
|
||||
),
|
||||
),
|
||||
Effect.exit,
|
||||
Effect.flatMap((exit) => {
|
||||
const finalize = Exit.isSuccess(exit)
|
||||
? id === 0
|
||||
? this.executeTransactionStatement(connection, "commit").pipe(
|
||||
// SQLite keeps the transaction open after deferred constraint commit failures.
|
||||
Effect.catch((error) =>
|
||||
this.executeTransactionStatement(connection, "rollback").pipe(
|
||||
Effect.catch(() => Effect.void),
|
||||
Effect.andThen(Effect.fail(error)),
|
||||
),
|
||||
),
|
||||
)
|
||||
: this.executeTransactionStatement(connection, `release savepoint effect_sql_${id}`)
|
||||
: id === 0
|
||||
? this.executeTransactionStatement(connection, "rollback")
|
||||
: this.executeTransactionStatement(connection, `rollback to savepoint effect_sql_${id}`).pipe(
|
||||
Effect.andThen(
|
||||
this.executeTransactionStatement(connection, `release savepoint effect_sql_${id}`),
|
||||
),
|
||||
)
|
||||
const scoped = scope === undefined ? finalize : Effect.ensuring(finalize, Scope.close(scope, exit))
|
||||
)
|
||||
|
||||
return scoped.pipe(Effect.flatMap(() => exit))
|
||||
}),
|
||||
),
|
||||
),
|
||||
return scope === undefined
|
||||
? transaction
|
||||
: transaction.pipe(Effect.onExit((exit) => Scope.close(scope, exit)))
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
import { mkdir, mkdtemp, rm } from "node:fs/promises"
|
||||
import { tmpdir } from "node:os"
|
||||
import { join } from "node:path"
|
||||
import { Database } from "bun:sqlite"
|
||||
import { expect, test } from "bun:test"
|
||||
import { SqliteClient } from "@effect/sql-sqlite-bun"
|
||||
import { eq, sql } from "drizzle-orm"
|
||||
import { integer, sqliteTable, text } from "drizzle-orm/sqlite-core"
|
||||
import { Effect } from "effect"
|
||||
import type { SqlClient as SqlClientService } from "effect/unstable/sql/SqlClient"
|
||||
import { isSqlError } from "effect/unstable/sql/SqlError"
|
||||
import { EffectDrizzleSqlite } from "../src"
|
||||
|
||||
const users = sqliteTable("users", {
|
||||
@ -97,6 +99,37 @@ test("rolls back explicit transaction rollback", async () => {
|
||||
)
|
||||
})
|
||||
|
||||
test("preserves failed transaction begin errors", async () => {
|
||||
const dir = await mkdtemp(join(tmpdir(), "effect-drizzle-sqlite-"))
|
||||
const filename = join(dir, "locked.db")
|
||||
const holder = new Database(filename)
|
||||
|
||||
try {
|
||||
holder.run("create table users (id integer primary key autoincrement, name text not null)")
|
||||
holder.run("pragma busy_timeout = 0")
|
||||
holder.run("begin immediate")
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* EffectDrizzleSqlite.makeWithDefaults()
|
||||
yield* db.run(sql`pragma busy_timeout = 0`)
|
||||
|
||||
const error = yield* db
|
||||
.transaction((tx) => tx.insert(users).values({ name: "Blocked" }), { behavior: "immediate" })
|
||||
.pipe(Effect.flip)
|
||||
|
||||
if (!isSqlError(error)) throw new Error("Expected SqlError")
|
||||
expect(error.reason._tag).toBe("LockTimeoutError")
|
||||
expect(error.reason.cause instanceof Error ? error.reason.cause.message : "").toContain("database is locked")
|
||||
}).pipe(Effect.provide(SqliteClient.layer({ filename, disableWAL: true })), Effect.scoped),
|
||||
)
|
||||
} finally {
|
||||
if (holder.inTransaction) holder.run("rollback")
|
||||
holder.close()
|
||||
await rm(dir, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
|
||||
test("supports returning and rejects empty update sets", async () => {
|
||||
await run(
|
||||
Effect.gen(function* () {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user