Last active
August 29, 2015 14:20
-
-
Save simg/055ea65685633cf1505e to your computer and use it in GitHub Desktop.
Non-blocking single threaded data import from xml to postgresql "template" using node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This is a simple node program originally written to import a large volume of data | |
stored in XML files and import it into a postgres database. | |
It uses the async library to implement paralell processing with a concurrency of 20. | |
Adjusting the concurrency can alter the performance depending on the machine and the | |
exact workload. | |
The program is single threaded, so for multi-core machines you might want to consider | |
using the cluster.js library to implement multi-threading for further increases in | |
throughput. | |
Further improvements are probably possible by batching inputs to the database. | |
ie rather than 1 insert per file, process multiple files and batch import a few hundred | |
rows at a time? | |
I have had to remove proprietary code that does the actual conversion from XML to | |
SQL so the program in it's current form doesn't actually do anything. | |
You would need to add your own code in the relevant places. | |
*/ | |
// include libaries | |
var fs = require("fs"); | |
var cheerio = require('cheerio'); //used for xml / xhtml parsing | |
var S = require("string"); | |
var pg = require('pg'); | |
var async = require('async'); | |
var moment = require('moment'); | |
// Configuration | |
var input_dir = '/wherever/to_import'; | |
var output_dir = '/somewherelse/imported'; | |
var conString = "postgres://postgres:db_password@localhost/db_user"; | |
var debug = false; | |
// Set up database connection | |
var client = new pg.Client(conString); | |
client.connect(); | |
// initialise variables | |
var progress_counter = 0; | |
var start_time = moment(); | |
var timer = null; // used to calculate etas / performance | |
// get list of files to process | |
var filelist = fs.readdirSync(input_dir); | |
// if in debug mode, only process the first 2 files | |
if (debug) filelist = filelist.slice(1,2); | |
// kick off processing in parallel with a concurrency of 20 | |
async.eachLimit(filelist, 20, processFile, final); | |
// process each input file | |
function processFile(filename, next) { | |
//main processing sequence for each file | |
async.waterfall([ | |
function(callback) { | |
readFile(filename, callback) | |
}, | |
extractData, | |
validateData, | |
updateDatabase, | |
moveFile | |
], | |
function(err, filename) { | |
// this function called if any of the async.waterfall functions return an error | |
// processing continues with the next file in the list | |
if (err) { | |
console.log("error processing "+filename, err); | |
} else { | |
if (debug) console.log("file processed succesfully", filename); | |
} | |
next(); | |
}); | |
// display a progress counter | |
if (progress_counter++ % 1000 == 0) { | |
var msg; | |
if (timer) { | |
var time_per_thou = moment().diff(timer); | |
var remaining = (filelist.length - progress_counter)/1000; | |
var files_per_minute = parseInt((1000 / (time_per_thou / 60000))); | |
msg = "eta "+ moment.duration(time_per_thou * remaining).humanize(true) + " avg: "+files_per_minute+" files/minute"; | |
} else { | |
// first time through the loop | |
msg = "Import started at "+moment().format("dddd, MMMM Do YYYY, h:mm:ss a"); | |
} | |
timer = moment(); | |
console.log(progress_counter + " of " + filelist.length + " : "+ msg); | |
} | |
}; | |
// called at end of processing for clean up / summary | |
function final() { | |
//close database connection | |
client.end(); | |
// generate summary | |
var total_time = moment().diff(start_time); | |
var files_per_minute = parseInt(filelist.length / (total_time/60000)); | |
console.log(progress_counter +" files imported in "+moment.duration(total_time).humanize(true)+" avg: "+files_per_minute+" files/minute"); | |
} | |
// read input file (TODO: tihs function is a bit superflous) | |
function readFile(filename, next) { | |
if (debug) console.log("read file " + input_dir+'/'+filename); | |
fs.readFile(input_dir+'/'+filename, "utf8", function(err, data){ | |
next(err, filename, data) | |
}); | |
} | |
// handle extraction of data from input file (add your code here) | |
function extractData(filename, data, next) { | |
if (debug) console.log("extracting data " + filename); | |
var $ = cheerio.load(data, {xmlmode:true}); | |
var result = {}; | |
result.field1 = $("Field1").first().text() || null; | |
result.field2 = $("Field2").first().text() || null; | |
result.field3 = $("Field3").first().text() | |
//etc | |
next(null, filename, result); | |
} | |
// validate the extracted data | |
function validateData(filename, json, next) { | |
var err = null; | |
if (!json.whatever) { | |
err = "whatever not valid" | |
} | |
if (err) err += " in "+filename; | |
next(err, filename, json) | |
} | |
// add the data to the database | |
function updateDatabase(filename, data, next) { | |
var cols = [], values = [], placeholders = [], query; | |
var i = 1; | |
for (d in data) { | |
cols.push(d); | |
values.push(data[d]) | |
placeholders.push("$"+i++); | |
} | |
// generate SQL command | |
query = 'INSERT INTO my_table ("'+cols.join('","')+'") values('+placeholders.join(',')+')'; | |
query += ' RETURNING id"' | |
client.query({ | |
name:"importdata", | |
text:query, | |
values:values | |
},function(err, result) { | |
if (debug && err) console.log("values", data) | |
next(err, filename) | |
}); | |
} | |
// move processed files to another folder to prevent re-processing | |
function moveFile(filename, next) { | |
fs.rename(input_dir+'/'+filename, output_dir+'/'+filename, function(err) { | |
next(null, filename); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment