Last active
May 19, 2017 10:35
-
-
Save manjeshpv/40e1172d57af09f224d5851e1b368ffa to your computer and use it in GitHub Desktop.
fast csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*** | |
* types | |
* - csv http stream -> db | pros: no space or memory usage but user need to wait and high db load on more concurrency | |
* - http csv stream -> temp file -> read stream -> db | cons: high db load on high concurrency | |
* - http csv stream -> temp file -> queue(read stream -> db) | highly robust | |
*/ | |
var http = require('http'), | |
path = require('path'), | |
os = require('os'), | |
csv = require('fast-csv'), | |
util = require('util'); | |
var xlsx = require('node-xlsx'); | |
var fsp = require('fs-promise'); | |
var stream = require('stream'); | |
var Busboy = require('busboy'); // to handle the form | |
var fastcsv = require('fast-csv'); | |
var streamToBuffer = require('stream-to-buffer') | |
function uploadFile(req){ | |
var busboy = new Busboy({ headers: req.headers }); | |
return new Promise((res, rej) => { | |
let fl = ''; | |
let csv = ''; | |
let csvFile; | |
busboy.on('file', (fieldname, file, filename, encoding, mimetype) => { | |
console.log('encodeing', encoding) | |
// 50mb file in memory | |
// mysql you can do here for direct http stream | |
fl = path.join(os.tmpDir(), path.basename(filename)); | |
const isCSV = filename.split('.').pop() === 'csv'; | |
csvFile = `${fl.substr(0, fl.lastIndexOf('.'))}.csv`; | |
console.log('filename',filename, csvFile) | |
//if (isCSV) return file.pipe(fsp.createWriteStream(csvFile)) | |
streamToBuffer(file, function (err, buffer) { | |
if (err) return rej(err); | |
// - instead of writing full buffer. writing chunks stream might be faster | |
csv = xlsx.parse(buffer)[0].data.reduce((csv, row) => (csv += row.join(",") + "\n"), ''); | |
console.log('err', fl, csvFile, csv) | |
fsp.writeFile(csvFile, JSON.parse( JSON.stringify( csv ) )).then(() => res({fl, csvFile, csv})); | |
}) | |
// releasing huge memory 50mb | |
}); | |
busboy.on('finish', function() { | |
console.log('busboy finish form!'); | |
}); | |
req.pipe(busboy); | |
}) | |
} | |
http.createServer(function(req, res) { | |
if(req.url === '/busboy' && req.method.toLowerCase() == 'post') { | |
// csv or xlsx file upload first and release the requestor with a identifier, process status can be aquired by http polling(Processing) | |
return uploadFile(req) | |
.then(({ fl,csvFile, csv}) => { | |
console.log('-----------------',{ fl,csvFile, csv}) | |
// streaming saved file to db - load the data on demand | |
let csvstream = fastcsv.fromPath(csvFile, { headers: true }) | |
.on("data", function (row) { | |
csvstream.pause(); | |
// do some heavy work | |
// when done resume the stream; | |
// - ROBUST: IMP: implementing batch inserts will make | |
// - Sending BATCH=50 | |
console.log('row', row); | |
csvstream.resume(); | |
}) | |
.on("end", function () { | |
console.log("We are done!") | |
res.end(); | |
}) | |
.on("error", function (error) { | |
console.log(error) | |
res.writeHead(500, {'content-type': 'text/html'}); | |
res.end(); | |
}); | |
}) | |
} | |
// show a file upload form | |
res.writeHead(200, {'content-type': 'text/html'}); | |
res.end( | |
'<form action="/upload" enctype="multipart/form-data" method="post">'+ | |
'<input type="text" name="title"><br>'+ | |
'<input type="file" name="upload" multiple="multiple"><br>'+ | |
'<input type="submit" value="Upload">'+ | |
'</form>' | |
); | |
}).listen(8080); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment