Last active
May 20, 2023 20:07
-
-
Save thecatontheflat/316aee98aed3156a7104ddd81ef87824 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Client } = require('pg'); | |
const losslessJson = require('lossless-json'); | |
const SLOT_NAME = process.env.SLOT_NAME; | |
const SCHEMA_NAME = 'public'; | |
const SHARDS_MAPPING = {}; | |
async function processInsert(targetClient, change, tableName) { | |
let i = 1; | |
const valuesTokens = []; | |
for (const value of change.columnvalues) { | |
valuesTokens.push(`$${i}`); | |
i++; | |
} | |
const sql = `INSERT INTO ${change.schema}.${tableName} (${change.columnnames.join(',')}) VALUES (${valuesTokens.join(',')})`; | |
console.log(sql); | |
await targetClient.query(sql, change.columnvalues); | |
} | |
async function processDelete(targetClient, change, tableName) { | |
let i = 1; | |
const valuesTokens = []; | |
for (const value of change.oldkeys.keyvalues) { | |
valuesTokens.push(`$${i}`); | |
i++; | |
} | |
const sql = `DELETE FROM ${change.schema}.${tableName} WHERE (${change.oldkeys.keynames.join(',')}) IN ((${valuesTokens.join(',')}))`; | |
console.log(sql); | |
await targetClient.query(sql, change.oldkeys.keyvalues); | |
} | |
async function processUpdate(targetClient, change, tableName) { | |
let i = 1; | |
const oldValuesTokens = []; | |
const setValues = []; | |
const setsArray = []; | |
for (const idx in change.columnvalues) { | |
const colName = change.columnnames[idx]; | |
if (change.pk.pknames.includes(colName)) continue; | |
setsArray.push(`${colName} = $${i}`); | |
setValues.push(change.columnvalues[idx]); | |
i++; | |
} | |
for (const value of change.oldkeys.keyvalues) { | |
oldValuesTokens.push(`$${i}`); | |
i++; | |
} | |
const sql = `UPDATE ${change.schema}.${tableName} SET ${setsArray.join(',')} WHERE (${change.oldkeys.keynames.join(',')}) IN ((${oldValuesTokens.join(',')}))`; | |
console.log(sql, [...setValues, ...change.oldkeys.keyvalues]); | |
await targetClient.query(sql, [...setValues, ...change.oldkeys.keyvalues]); | |
} | |
async function run(sourceClient, targetClient, isWorker) { | |
const result = await sourceClient.query(`SELECT * FROM pg_logical_slot_peek_changes('${SLOT_NAME}', NULL, NULL, 'include-pk', '1')`); | |
console.log(`Fetched ${result.rows.length} rows from ${isWorker ? 'worker' : 'coordinator'}`); | |
for (const row of result.rows) { | |
const { lsn, xid, data } = row; | |
const parsedData = losslessJson.parse(data, undefined, (value) => value); | |
for (const change of parsedData.change) { | |
if (change.schema !== SCHEMA_NAME) continue; | |
const tableName = isWorker ? SHARDS_MAPPING[change.table] : change.table; | |
switch (change.kind) { | |
case 'insert': | |
await processInsert(targetClient, change, tableName); | |
break; | |
case 'delete': | |
await processDelete(targetClient, change, tableName); | |
break; | |
case 'update': | |
await processUpdate(targetClient, change, tableName); | |
break; | |
default: | |
console.error('NOT IMPLEMENTED', change.kind); | |
console.log(change); | |
process.exit(); | |
} | |
} | |
console.log('All consumed, advancing', lsn); | |
await sourceClient.query(`SELECT * FROM pg_replication_slot_advance('${SLOT_NAME}', $1)`, [lsn]); | |
} | |
} | |
async function wait(ms) { | |
return new Promise((res, rej) => { | |
setTimeout(res, ms); | |
}); | |
} | |
const sourceWorkerClients = []; | |
const connectionStrings = process.env.SOURCE_WORKERS_DATABASE_URLS.split(' '); | |
for (const connectionString of connectionStrings) { | |
const sourceClient = new Client({ | |
connectionString, | |
ssl: { | |
rejectUnauthorized: false, | |
}, | |
}); | |
sourceWorkerClients.push(sourceClient); | |
} | |
const sourceClient = new Client({ | |
connectionString: process.env.SOURCE_DATABASE_URL, | |
ssl: { | |
rejectUnauthorized: false, | |
}, | |
}); | |
const targetClient = new Client({ | |
connectionString: process.env.TARGET_DATABASE_URL, | |
ssl: { | |
rejectUnauthorized: false, | |
}, | |
}); | |
Promise.all([ | |
sourceWorkerClients.map((client) => client.connect()), | |
sourceClient.connect(), | |
targetClient.connect(), | |
]).then(async () => { | |
const shardsResult = await sourceClient.query(`SELECT table_name, shard_name, citus_table_type FROM citus_shards`); | |
for (const row of shardsResult.rows) { | |
SHARDS_MAPPING[row['shard_name']] = row['table_name']; | |
} | |
while (true) { | |
await run(sourceClient, targetClient, false); | |
for (const client of sourceWorkerClients) { | |
await run(client, targetClient, true); | |
} | |
await wait(1000); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment