Created
November 2, 2022 13:49
-
-
Save jhaynie/a2a5d7efb1c4507a06f0e66544ce9ba8 to your computer and use it in GitHub Desktop.
Terrible hack to get Prisma to work with Cockroach transaction retries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncLocalStorage } from 'async_hooks'; | |
import { Prisma, PrismaClient } from '@prisma/client'; | |
const asyncLocalStorage = new AsyncLocalStorage(); | |
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); | |
const maxRetryCount = 5; | |
const retryableErrorCodes = ['P1001', 'P2028', 'P2010', 'P2034', '40001']; | |
const isRetryable = (code: string) => retryableErrorCodes.includes(code); | |
const backoffMs = 100; | |
const backoff = (c: number) => Math.pow(2, c) * (backoffMs * Math.random()) + 50; | |
function transactionProxy(thing: any, transaction: any): any { | |
return new Proxy(thing, { | |
get: (target: any, prop: any) => { | |
if (typeof target[prop] === 'function') { | |
return (...args: any[]) => { | |
if (prop === 'then') return target[prop](args[0], args[1], transaction); | |
if (prop === 'catch') return target[prop](args[0], transaction); | |
if (prop === 'finally') return target[prop](args[0], transaction); | |
return transactionProxy(target[prop](...args), transaction); | |
}; | |
} | |
return transactionProxy(target[prop], transaction); | |
}, | |
}); | |
} | |
const debugTestFailure = false; // set to true to inject a failure | |
const emptyObject = {}; | |
const runTransaction = async ( | |
prisma: any, | |
engine: any, | |
options: any, | |
runner: (tx: Prisma.TransactionClient) => Promise<any>, | |
) => { | |
const info = await engine.transaction('start', emptyObject, options); | |
const tx = transactionProxy(prisma, { id: info.id }); | |
await tx.$executeRawUnsafe('SAVEPOINT cockroach_restart'); | |
for (let c = 0; c < maxRetryCount; c++) { | |
try { | |
if (debugTestFailure || options?.injectFailure) { | |
if (c < 1) { | |
await tx.$executeRawUnsafe("SET inject_retry_errors_enabled = 'true'"); | |
} else { | |
await tx.$executeRawUnsafe("SET inject_retry_errors_enabled = 'false'"); | |
} | |
} | |
const res = await runner(tx); | |
await tx.$executeRawUnsafe('RELEASE SAVEPOINT cockroach_restart'); | |
await engine.transaction('commit', emptyObject, info); | |
return res; | |
} catch (ex: any) { | |
if (ex.code === 'P2034') { | |
await tx.$executeRawUnsafe('ROLLBACK TO SAVEPOINT cockroach_restart'); | |
const delay = backoff(c); | |
await sleep(delay); | |
continue; | |
} | |
await engine.transaction('rollback', emptyObject, info).catch(() => null); | |
throw ex; | |
} | |
} | |
}; | |
type Fn = (...args: any[]) => Promise<any>; | |
const retryFn = (method: string, fn: Fn, debug = true) => { | |
return async (...args: any[]) => { | |
const begin = Date.now(); | |
for (let c = 0; c < maxRetryCount; c++) { | |
try { | |
const started = Date.now(); | |
const res = await fn(...args); | |
return res; | |
} catch (ex: any) { | |
if (!isRetryable(ex.code)) { | |
throw ex; | |
} | |
if (ex.meta?.code === '0A000') { | |
throw ex; | |
} | |
const delay = backoff(c); | |
await sleep(delay); | |
} | |
} | |
throw new SQLRetryTimeoutException(args, Date.now() - begin); | |
}; | |
}; | |
const wrappedMethods = [ | |
'$executeRawUnsafe', | |
'create', | |
'createMany', | |
'update', | |
'updateMany', | |
'upsert', | |
'delete', | |
'deleteMany', | |
]; | |
const createTableProxy = (target: any, debug = false): any => { | |
return new Proxy(target, { | |
get: (_target: any, method: string) => { | |
// check to see if inside a transaction and a method that needs to be retried and if so, | |
// return a wrapper function that will control the retry | |
if (wrappedMethods.includes(method) && asyncLocalStorage.getStore() === undefined) { | |
return retryFn(method, _target[method].bind(_target), debug); | |
} | |
// either inside a transaction or not a wrapped method which means we run the original fn | |
return target[method]; | |
}, | |
}); | |
}; | |
export const createPrismaClient = async (optionsArg?: any) => { | |
const prisma = new PrismaClient(optionsArg); | |
const _prisma = prisma as any; | |
await prisma.$connect(); | |
const debug = !!optionsArg?.log?.includes('query') || !!process.env.SM_PRISMA_DEBUG; | |
// make a list of model properties we can wrap | |
const keys = Object.keys(Prisma.ModelName) | |
.map((x) => x.charAt(0).toLowerCase() + x.substring(1)) | |
.filter((key) => !key.startsWith('_')); | |
// wrap the main proxy to capture methods off the main instance | |
const proxy = createTableProxy(prisma, debug); | |
keys.forEach((prop) => { | |
proxy[prop] = createTableProxy(proxy[prop], debug); | |
}); | |
useMiddleware(proxy); | |
_prisma.$transaction = (args: any, options: any) => { | |
if (typeof args === 'function') { | |
return new Promise<any>((resolve, reject) => { | |
asyncLocalStorage.run(true, () => { | |
runTransaction(prisma, _prisma._engine, options, args).then(resolve).catch(reject); | |
}); | |
}); | |
} | |
throw new Error('$transaction batch unsupported. use interactive transaction instead'); | |
}; | |
return proxy; | |
}; | |
export class SQLRetryTimeoutException extends Error { | |
public params: any; | |
constructor(params: any, duration: number) { | |
super(`SQL Retry Timeout Exception after ${duration}ms`); | |
this.params = params; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment