Skip to content

Instantly share code, notes, and snippets.

@alxgrk
Last active February 10, 2023 14:11
Show Gist options
  • Save alxgrk/1dba6222c38c52b842a29db5dd7451d6 to your computer and use it in GitHub Desktop.
Save alxgrk/1dba6222c38c52b842a29db5dd7451d6 to your computer and use it in GitHub Desktop.
A good enough MySQL-dump-to-CSV script.
#!/usr/bin/env node
const Transform = require("stream").Transform;
function getArgValue(argName, defaultValue) {
const argIndex = process.argv.findIndex(arg => arg.indexOf(argName) !== -1);
const argValue = argIndex >= 0 && process.argv.length >= argIndex + 1
? process.argv[argIndex]
: defaultValue;
return argValue ? (/--[\w-]+=/.test(argValue) ? argValue.split("=")[1] : process.argv[argIndex + 1]) : argValue;
}
let notAfterTimestamp = getArgValue("--not-after");
if (notAfterTimestamp) {
notAfterTimestamp = new Date(notAfterTimestamp)
console.error(`--not-after set - not considering any row after ${notAfterTimestamp.toISOString()}`)
}
let notBeforeTimestamp = getArgValue("--not-before");
if (notBeforeTimestamp) {
notBeforeTimestamp = new Date(notBeforeTimestamp)
console.error(`--not-before set - not considering any row before ${notBeforeTimestamp.toISOString()}`)
}
let additionalColumns = getArgValue("--additional-columns");
if (additionalColumns) {
console.error(`--additional-columns set - appending '${additionalColumns}' to every row`)
additionalColumns = `,${additionalColumns}`
}
let statementBuffer = "";
let totalProcessRows = 0;
let processedChunks = 0;
let expectedNumberOfSeparators = -1;
const startTime = new Date()
const timeFilter = (date) => {
const isNotAfter = notAfterTimestamp ? date < notAfterTimestamp : true
const isNotBefore = notBeforeTimestamp ? date > notBeforeTimestamp : true
return isNotAfter && isNotBefore
}
const countSeparators = (row) => (row.match(/,/g) || []).length;
const filter = new Transform({
transform: (data, encoding, callback) => {
data = data.toString("utf8");
const dataWithStatementBuffer = statementBuffer + data
const statements = dataWithStatementBuffer.split(/(\),\()|(INSERT INTO.*VALUES \()/);
const filteredStatements = statements
.filter(line => line && line.startsWith("'")); // first row value is wrapped with "'"
// last "statement" might not be a complete line
statementBuffer = filteredStatements.pop();
// deduce number of columns from first row only once
if (expectedNumberOfSeparators === -1 && filteredStatements.length) {
expectedNumberOfSeparators = countSeparators(filteredStatements[0]);
}
if (statementBuffer) {
const isEndOfInsertStatement = statementBuffer.match(/\);[\n]?$/);
const isValidRow = countSeparators(statementBuffer) === expectedNumberOfSeparators
&& statementBuffer.match(/(,NULL$)|(,\d+$)|(,'[^']+'$)/); // last row value contains either NULL, digits or a text not containing "'" wrapped with "'"
if (isEndOfInsertStatement || isValidRow) {
filteredStatements.push(statementBuffer);
statementBuffer = "";
}
}
const cleanedStatements = filteredStatements
.map(row => row
&& row.replace(/(NULL)|(\);)|(\n)/g, "")
.replace(/'/g, "\"")
)
.map(row => row ? row.concat(additionalColumns || "") : row)
.filter(row => row.indexOf("INSERT INTO") === -1)
.filter(row => timeFilter(new Date(row.split(",")[0].replace(/"/g, "")))); // assuming row 0 contains the timestamp
totalProcessRows += cleanedStatements.length
processedChunks++
if (processedChunks % 1000 === 0) {
console.error(`Processed ${totalProcessRows} rows in total.`)
}
const output = cleanedStatements.length !== 0
? cleanedStatements.reduce((a, b) => a + "\n" + b)
: undefined;
callback(null, output ? output + "\n" : "");
}
});
const filtered = process.stdin.pipe(filter);
filtered.on('end', () => {
const endTime = new Date();
const totalSeconds = (endTime.getTime() - startTime.getTime()) / 1000
console.error(`Processing ${totalProcessRows} rows took ${totalSeconds} seconds, started at ${startTime.toISOString()} and ended at ${endTime}`)
})
filtered.pipe(process.stdout);
@alxgrk
Copy link
Author

alxgrk commented Feb 3, 2022

To use it for streaming mysqldump output to S3, a command would look like this:

mysqldump -h localhost --port 3306 -u admin -p -C db_name table_name --single-transaction --quick -t \
       | node sql-to-csv.js \
       | gzip \
       | aws s3 cp - s3://bucket/data-$(date +"%Y-%m-%d-%H-%M-%S").csv.gz

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment