Last active
April 26, 2025 19:04
-
-
Save aniravi24/af48e6b2afa32b08366fbaf0c7a337f6 to your computer and use it in GitHub Desktop.
Prisma + Effect nested transactions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as Context from "@effect/data/Context"; | |
import { pipe } from "@effect/data/Function"; | |
import * as Effect from "@effect/io/Effect"; | |
import * as Exit from "@effect/io/Exit"; | |
import * as Layer from "@effect/io/Layer"; | |
import { uniqueId } from "lodash-es"; | |
import { | |
Prisma, | |
PrismaClient, | |
PrismaFlatTransactionClient, | |
rawPrismaClient, | |
startPrismaTransaction | |
} from "./prisma"; | |
export interface DBConnection { | |
// Savepoints may not be as heavily used in production code - however, they are useful for testing | |
// When testing a failure in your test code, you want to make sure you can rollback to a savepoint and not wait for the test to complete to rollback | |
currentSavepoint: string; | |
// This is typed this way because any function outside of this module should not be calling any transaction related functions | |
// we will type cast within the functions in this module to indicate what the actual types are | |
prismaClient: Prisma.TransactionClient; | |
txCount: number; | |
} | |
// this is to make sure savepoints within a transaction don't collide | |
const makeSavepointName = () => uniqueId("db_savepoint"); | |
export const DBConnection = Context.Tag<DBConnection>("DBConnection"); | |
export function dbTransaction<R, E, A>( | |
transactionCb: ( | |
rawPrismaTxClient: DBConnection["prismaClient"] | |
) => Effect.Effect<R, E, A>, | |
options?: { test?: boolean } | |
): Effect.Effect<DBConnection | R, E, A> { | |
const begin: Effect.Effect<DBConnection, never, DBConnection> = Effect.gen(function* ($) { | |
const conn = yield* $(Effect.service(DBConnection)); | |
if (conn.txCount === 0) { | |
const tx = yield* $( | |
Effect.promise(() => | |
startPrismaTransaction(conn.prismaClient as PrismaClient) | |
) | |
); | |
return { | |
currentSavepoint: conn.currentSavepoint, | |
prismaClient: tx, | |
txCount: conn.txCount + 1 | |
}; | |
} else { | |
const savepoint = makeSavepointName(); | |
yield* $( | |
Effect.promise(() => | |
conn.prismaClient.$executeRawUnsafe(`SAVEPOINT ${savepoint}`) | |
) | |
); | |
return { | |
currentSavepoint: savepoint, | |
prismaClient: conn.prismaClient, | |
txCount: conn.txCount + 1 | |
}; | |
} | |
}); | |
const rollback: Effect.Effect<DBConnection, never, void> = Effect.gen(function* ($) { | |
const conn = yield* $(Effect.service(DBConnection)); | |
// Opening the initial transaction will set this count to 1, so we can't check for 0 here | |
if (conn.txCount === 1) { | |
yield* $( | |
Effect.promise(() => | |
(conn.prismaClient as PrismaFlatTransactionClient).$rollback() | |
) | |
); | |
return yield* $(Effect.unit()); | |
} else { | |
yield* $( | |
Effect.promise(() => | |
conn.prismaClient.$executeRawUnsafe( | |
`ROLLBACK TO SAVEPOINT ${conn.currentSavepoint}` | |
) | |
) | |
); | |
return yield* $(Effect.unit()); | |
} | |
}); | |
const commit: Effect.Effect<DBConnection, never, void> = Effect.gen(function* ($) { | |
const conn = yield* $(Effect.service(DBConnection)); | |
// Opening the initial transaction will set this count to 1, so we can't check for 0 here | |
if (conn.txCount === 1) { | |
yield* $( | |
Effect.promise(() => | |
(conn.prismaClient as PrismaFlatTransactionClient).$commit() | |
) | |
); | |
return yield* $(Effect.unit()); | |
} else { | |
yield* $( | |
Effect.promise(() => | |
conn.prismaClient.$executeRawUnsafe( | |
`RELEASE SAVEPOINT ${conn.currentSavepoint}` | |
) | |
) | |
); | |
return yield* $(Effect.unit()); | |
} | |
}); | |
const injectDatabaseService = (_: DBConnection) => | |
Effect.updateService(DBConnection, () => _); | |
const acquire = pipe( | |
DBConnectionService, | |
Effect.flatMap((conn) => pipe(begin, injectDatabaseService(conn))) | |
); | |
const use = (conn: DBConnection) => | |
pipe(transactionCb(conn.prismaClient), injectDatabaseService(conn)); | |
const release = <E, A>(conn: DBConnection, exit: Exit.Exit<E, A>) => | |
pipe( | |
exit, | |
Exit.match( | |
() => rollback, | |
() => (options?.test ? rollback : commit) | |
), | |
injectDatabaseService(conn) | |
); | |
return Effect.acquireUseRelease(acquire, use, release); | |
} | |
export const DBConnectionLive = Layer.succeed(DBConnection, { | |
currentSavepoint: "UNSET", | |
prismaClient: rawPrismaClient, | |
txCount: 0 | |
}); | |
export const DBConnectionTest = Layer.succeed(DBConnection, { | |
currentSavepoint: "UNSET", | |
prismaClient: rawPrismaClient, | |
txCount: 0 | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Prisma, | |
PrismaClient, | |
} from "@prisma/client"; | |
export const rawPrismaClient = new PrismaClient(); | |
export type { Prisma, PrismaClient }; | |
export type PrismaFlatTransactionClient = Prisma.TransactionClient & { | |
$commit: () => Promise<void>; | |
$rollback: () => Promise<void>; | |
}; | |
const ROLLBACK = { [Symbol.for("prisma.client.extension.rollback")]: true }; | |
// derived from: https://github.com/prisma/prisma-client-extensions/tree/main/callback-free-itx | |
export const startPrismaTransaction = async (prisma: PrismaClient): Promise<PrismaFlatTransactionClient> => { | |
let setTxClient: (txClient: Prisma.TransactionClient) => void; | |
let commit: () => void; | |
let rollback: () => void; | |
// // a promise for getting the tx inner client | |
const txClient = new Promise<Prisma.TransactionClient>((res) => { | |
setTxClient = (txClient) => res(txClient); | |
}); | |
// // a promise for controlling the transaction | |
const txPromise = new Promise((_res, _rej) => { | |
commit = () => { | |
return _res(undefined); | |
}; | |
rollback = () => { | |
return _rej(ROLLBACK); | |
}; | |
}); | |
// // opening a transaction to control externally | |
if ("$transaction" in prisma && typeof prisma.$transaction === "function") { | |
const tx = prisma | |
.$transaction((txClient) => { | |
setTxClient(txClient); | |
return txPromise; | |
}) | |
.catch((e) => { | |
if (e === ROLLBACK) return; | |
throw e; | |
}); | |
// // return a proxy TransactionClient with `$commit` and `$rollback` methods | |
return new Proxy(await txClient, { | |
get(target, prop) { | |
if (prop === "$commit") { | |
return () => { | |
commit(); | |
return tx; | |
}; | |
} | |
if (prop === "$rollback") { | |
return () => { | |
rollback(); | |
return tx; | |
}; | |
} | |
// @ts-expect-error - Fixing this type causes the TypeScript type checker to freeze | |
return target[prop]; | |
// return target[prop as keyof typeof target]; | |
}, | |
}) as PrismaFlatTransactionClient; | |
} | |
throw new Error("Transactions are not supported by this client"); | |
}; | |
// Alternative way to implement the startPrismaTransaction method with the prisma client extension | |
// not doing it this way because editor performance appears to be worse currently: https://github.com/prisma/prisma/issues/17843 | |
// you can use this on the instance of your prisma client: https://www.prisma.io/docs/concepts/components/prisma-client/client-extensions/client#example | |
// .$extends({ | |
// client: { | |
// async $begin() { | |
// const prisma = Prisma.getExtensionContext(this); | |
// let setTxClient: (txClient: Prisma.TransactionClient) => void; | |
// let commit: () => void; | |
// let rollback: () => void; | |
// // a promise for getting the tx inner client | |
// const txClient = new Promise<Prisma.TransactionClient>((res) => { | |
// setTxClient = (txClient) => res(txClient); | |
// }); | |
// // a promise for controlling the transaction | |
// const txPromise = new Promise((_res, _rej) => { | |
// commit = () => { | |
// return _res(undefined); | |
// }; | |
// rollback = () => { | |
// return _rej(ROLLBACK); | |
// }; | |
// }); | |
// // opening a transaction to control externally | |
// if ( | |
// "$transaction" in prisma && | |
// typeof prisma.$transaction === "function" | |
// ) { | |
// const tx = prisma | |
// .$transaction((txClient) => { | |
// setTxClient(txClient); | |
// return txPromise; | |
// }) | |
// .catch((e) => { | |
// if (e === ROLLBACK) return; | |
// throw e; | |
// }); | |
// // return a proxy TransactionClient with `$commit` and `$rollback` methods | |
// return new Proxy(await txClient, { | |
// get(target, prop) { | |
// if (prop === "$commit") { | |
// return () => { | |
// commit(); | |
// return tx; | |
// }; | |
// } | |
// if (prop === "$rollback") { | |
// return () => { | |
// rollback(); | |
// return tx; | |
// }; | |
// } | |
// return target[prop as keyof typeof target]; | |
// } | |
// }) as PrismaFlatTransactionClient; | |
// } | |
// throw new Error("Transactions are not supported by this client"); | |
// } | |
// } | |
// }) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
DBConnection, | |
dbTransaction as pgtransaction, | |
DBConnectionTest | |
} from "./dbconnection"; | |
import * as Effect from "@effect/io/Effect"; | |
import { pipe } from "@effect/data/Function"; | |
import * as V from "vitest"; | |
export type API = V.TestAPI; | |
const it: API = V.it; | |
// Thanks to @pigoz's great work here: https://gist.github.com/pigoz/a0d8b7068b9d34dbb154bb0ac9ba3c6d#file-helpers_it-ts-L52 | |
// which comes from this: https://github.com/Effect-TS/io/blob/main/test/utils/extend.ts | |
// run your tests in a transaction | |
export const dbTransaction = (() => { | |
const f = <R extends DBConnectionService, E, A>( | |
name: string, | |
self: () => Effect.Effect<R, E, A>, | |
timeout = 20_000 | |
) => { | |
return it( | |
name, | |
() => | |
pipe( | |
pgtransaction(self, { test: true }), | |
Effect.provideLayer(DBConnectionTest), | |
Effect.tapErrorCause(Effect.logErrorCause), | |
Effect.runPromise | |
), | |
timeout | |
); | |
}; | |
return Object.assign(f, { | |
skip: <E, A>( | |
name: string, | |
self: () => Effect.Effect<DBConnectionService, E, A>, | |
timeout = 20_000 | |
) => { | |
return it.skip( | |
name, | |
() => | |
pipe( | |
pgtransaction(self, { test: true }), | |
Effect.provideLayer(DBConnectionTest), | |
Effect.tapErrorCause(Effect.logErrorCause), | |
Effect.runPromise | |
), | |
timeout | |
); | |
} | |
}); | |
})(); |
@aniravi24 Thank you for sharing this gist!
I've adapted a version that works in my project and would like to share it for anyone who might need it as a reference.
https://gist.github.com/hiramhuang/e2534a2a326c7929268fd06457edf096
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Outdated now as it's not using the latest Effect (this was was the requirements were the first type argument, i.e.
Effect<R, E, A>
instead ofA, E, R
like it is now) and Effect also has more test utils with https://www.npmjs.com/package/@effect/vitest. The core pieces are there though for anyone that wants to adopt this for themselves.