Created
May 10, 2020 15:40
-
-
Save pkit/58887bb0c5c13c8edacc5fd0cb05c53b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Pool = require('pg-pool'); | |
const STATE = { | |
CANCELLED: 0, | |
ACTIVE: 1, | |
WAITING: 2, | |
} | |
export class ManagedPool extends Pool { | |
constructor (options) { | |
super(options) | |
const adminOpts = Object.assign({}, options, { | |
poolSize: options.adminPoolSize || 1, | |
idleTimeoutMillis: 0, // make admin connection(s) persistent | |
}) | |
this._adminPool = new Pool(adminOpts) | |
} | |
async cancel(client, wait = 1000, prevState) { | |
const pid = client.processID | |
let adminClient = await this._adminPool.connect(); | |
try { | |
const { rows } = await adminClient.query(`select state from pg_stat_activity where pid = $1`, [pid]) | |
const state = parseState(rows) | |
if (state === STATE.CANCELLED) { | |
// cancelled already, bail out | |
return | |
} | |
// try to re-cancel later, in case it hangs | |
setTimeout(async () => { | |
await this.cancel(client, wait, prevState) | |
}, wait) | |
// kill if we are here again | |
if (prevState === state) { | |
return await adminClient.query(`select pg_terminate_backend($1)`, [pid]) | |
} | |
switch (state) { | |
case STATE.WAITING: | |
// transaction is idle, rollback | |
try { | |
await client.query('rollback') | |
} finally { | |
// rollback finished, we can transition client to idle again | |
// unfortunately we cannot signal anything to the original consumer | |
client.release() | |
} | |
break | |
case STATE.ACTIVE: | |
// transaction is active, try to cancel | |
await adminClient.query(`select pg_cancel_backend($1)`, [pid]) | |
break | |
} | |
} finally { | |
adminClient.release() | |
} | |
} | |
} | |
function parseState (rows) { | |
let state = STATE.CANCELLED; | |
if (rows.length > 0) { | |
switch (rows[0].state) { | |
case 'active': | |
case 'fastpath': | |
state = STATE.ACTIVE; | |
break; | |
case 'idle in transaction': | |
case 'idle in transaction (aborted)': | |
state = STATE.WAITING; | |
break; | |
case 'idle': | |
state = STATE.CANCELLED; | |
break; | |
default: | |
throw new Error(`Unknown state returned: ${rows[0].state}`) | |
} | |
} | |
return state; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment