Skip to content

Instantly share code, notes, and snippets.

@meredian
Last active December 17, 2024 14:32
Show Gist options
  • Save meredian/186fa873142a97f25fb86e77f8645ecb to your computer and use it in GitHub Desktop.
Save meredian/186fa873142a97f25fb86e77f8645ecb to your computer and use it in GitHub Desktop.
Drizzle transaction extractor for PG Dialect. Use in cases where you want get transaction object and commit and rollback it explicitly after passing around the code - allows you not to nest all your code inside transaction, but rather be more flexible with that

Extracting Drizzle transaction object

What code does is that it wraps Drizzle transaction function into number of helpers to allow you safely extract transaction, while keeping proper error handling.

It uses PostgreSQL types for simplicity, but it can be done with any data source since transaction is part of basic interface.

What is missing

Transaction parameters are not passed here for example, you can easily add those and pass directly

Usage

// 1. Get drizzle
const db = drizzle({ ... });
// 2. Create extractor - you'll need it to commit or rollback session properly
const extractor = new DrizzlePgTransactionExtractor(db);
// 3. Get transaction. If will throw if init failed (e.g. failed to create connection)
const trx = await extractor.extractTransaction();

// Handle commit & rollback operations manyally with any dedgee control you want
try {
  await trx.execute(sql`....`)  
  await trx.commit();
} (err) {
  // tryRollback is used because `.commit` error may be raised while doing commit,
  // so that we marked state as closed, but then it failed with exception and got
  // into try catch
  await trx.tryRollback();
}

Motivation

While original approach is fine, it takes control from you where transaction is created - it always happens in direct DB call. So if you want to add some layers in the middle (logging, monitoring, etc), or you want to have more smart transaction management - that may become cumbersome.

Honestly not happy with Drizzle design choices, since all pieces are there, and it can be implemented so easily in source code - but intention obviously is different. Also rollback with sync error throwing is bruh 😅

import { EventEmitter } from 'stream';
import { ExtractTablesWithRelations } from 'drizzle-orm';
import { NodePgDatabase } from 'drizzle-orm/node-postgres';
import { PgQueryResultHKT, PgTransaction } from 'drizzle-orm/pg-core';
import { TransactionRollbackError } from 'drizzle-orm';
type DrizzlePgTransaction = PgTransaction<
PgQueryResultHKT,
Record<string, never>,
ExtractTablesWithRelations<Record<string, never>>
>;
enum TransactionExtractionStage {
None,
Starting,
Done,
Closed,
}
enum TransactionAction {
Commit = 'commit',
Rollback = 'rollback',
}
class DrizzlePgTransactionExtractorError extends Error {}
/**
* Helper class to safely extract transaction from a Drizzle PG connection
* while maintaining contract with the Drizzle ORM.
*
* Why so sophisticated? Because Drizzle ORM does not provide a way to
* extract transaction from a connection without executing a query and
* has single error management for whole transaction flow.
*
* We instead separate initilisation (which must be done immediately)
* and transaction execution (which can be done later), orchestrate
* those events and provide a way to commit or rollback transaction
* externally, while complying with Drizzle ORM transaction contract.
*
* Alternative approach would be to create tweaked Drizzle ORM driver
* which would provide a way to extract transaction without executing
* a query, but that would require a lot of work and would be more
* prone to change (so you must sync it manually on updates).
*/
export class DrizzlePgTransactionExtractor {
private _stage = TransactionExtractionStage.None;
private _emitter = new EventEmitter();
private _transaction?: DrizzlePgTransaction;
private _immediateReturnPromise?: Promise<DrizzlePgTransaction>;
private _holdingPromise?: Promise<void>;
constructor(private readonly _conn: NodePgDatabase) {}
isClosed(): boolean {
return this._stage === TransactionExtractionStage.Closed;
}
async extractTransaction(): Promise<DrizzlePgTransaction> {
if (!this._transaction) {
if (this._stage === TransactionExtractionStage.Closed) {
throw new DrizzlePgTransactionExtractorError('Transaction is closed');
} else if (this._immediateReturnPromise) {
return this._immediateReturnPromise;
}
}
this._stage = TransactionExtractionStage.Starting;
this._immediateReturnPromise = new Promise<DrizzlePgTransaction>(
(res, rej) => {
this._emitter.once('init', (trx: DrizzlePgTransaction) => {
res(trx);
});
this._emitter.once('startup-error', err => {
rej(err);
});
},
);
this._holdingPromise = new Promise<void>((res, rej) => {
this._emitter.once('commit', (action: TransactionAction) => {
this._emitter.removeAllListeners();
if (action === TransactionAction.Commit) {
res();
} else {
try {
if (!this._transaction) {
throw new Error('Bad state: transaction is not initialized');
}
// DrizzleOrm throws sync error on rollback which must be
// caught by original transaction handler. Ehm....
this._transaction.rollback();
} catch (err) {
rej(err);
}
}
});
this._emitter.once('execution-error', err => {
rej(err);
});
});
this._conn
.transaction(async trx => {
this._stage = TransactionExtractionStage.Done;
this._transaction = trx;
this._emitter.emit('init', trx);
this._emitter.removeAllListeners('startup-error');
await this._holdingPromise;
})
.catch(err => {
const eventName =
this._stage === TransactionExtractionStage.Starting
? 'startup-error'
: 'execution-error';
this._stage = TransactionExtractionStage.Closed;
this._emitter.emit(eventName, err);
this._emitter.removeAllListeners();
});
return this._immediateReturnPromise;
}
async commit() {
if (this._stage !== TransactionExtractionStage.Done) {
throw new DrizzlePgTransactionExtractorError(
'Transaction is not initialized',
);
}
this._stage = TransactionExtractionStage.Closed;
this._emitter.emit('commit', TransactionAction.Commit);
await this._holdingPromise;
}
/**
* Use for error handling when transaction may already be closed,
*/
async tryRollback() {
if (this._stage === TransactionExtractionStage.Closed) {
return;
}
return this.rollback();
}
async rollback() {
if (this._stage !== TransactionExtractionStage.Done) {
throw new DrizzlePgTransactionExtractorError(
'Transaction is not initialized',
);
}
this._stage = TransactionExtractionStage.Closed;
this._emitter.emit('commit', TransactionAction.Rollback);
await this._holdingPromise?.catch(err => {
// This is explicit rollback error, we want to suppress it
// since we already know that transaction is rolled back
if (!(err instanceof TransactionRollbackError)) {
throw err;
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment