Skip to content

Instantly share code, notes, and snippets.

@manjeshpv
Last active May 19, 2017 10:35
Show Gist options
  • Save manjeshpv/40e1172d57af09f224d5851e1b368ffa to your computer and use it in GitHub Desktop.
Save manjeshpv/40e1172d57af09f224d5851e1b368ffa to your computer and use it in GitHub Desktop.
fast csv
/***
* types
* - csv http stream -> db | pros: no space or memory usage but user need to wait and high db load on more concurrency
* - http csv stream -> temp file -> read stream -> db | cons: high db load on high concurrency
* - http csv stream -> temp file -> queue(read stream -> db) | highly robust
*/
var http = require('http'),
path = require('path'),
os = require('os'),
csv = require('fast-csv'),
util = require('util');
var xlsx = require('node-xlsx');
var fsp = require('fs-promise');
var stream = require('stream');
var Busboy = require('busboy'); // to handle the form
var fastcsv = require('fast-csv');
var streamToBuffer = require('stream-to-buffer')
function uploadFile(req){
var busboy = new Busboy({ headers: req.headers });
return new Promise((res, rej) => {
let fl = '';
let csv = '';
let csvFile;
busboy.on('file', (fieldname, file, filename, encoding, mimetype) => {
console.log('encodeing', encoding)
// 50mb file in memory
// mysql you can do here for direct http stream
fl = path.join(os.tmpDir(), path.basename(filename));
const isCSV = filename.split('.').pop() === 'csv';
csvFile = `${fl.substr(0, fl.lastIndexOf('.'))}.csv`;
console.log('filename',filename, csvFile)
//if (isCSV) return file.pipe(fsp.createWriteStream(csvFile))
streamToBuffer(file, function (err, buffer) {
if (err) return rej(err);
// - instead of writing full buffer. writing chunks stream might be faster
csv = xlsx.parse(buffer)[0].data.reduce((csv, row) => (csv += row.join(",") + "\n"), '');
console.log('err', fl, csvFile, csv)
fsp.writeFile(csvFile, JSON.parse( JSON.stringify( csv ) )).then(() => res({fl, csvFile, csv}));
})
// releasing huge memory 50mb
});
busboy.on('finish', function() {
console.log('busboy finish form!');
});
req.pipe(busboy);
})
}
http.createServer(function(req, res) {
if(req.url === '/busboy' && req.method.toLowerCase() == 'post') {
// csv or xlsx file upload first and release the requestor with a identifier, process status can be aquired by http polling(Processing)
return uploadFile(req)
.then(({ fl,csvFile, csv}) => {
console.log('-----------------',{ fl,csvFile, csv})
// streaming saved file to db - load the data on demand
let csvstream = fastcsv.fromPath(csvFile, { headers: true })
.on("data", function (row) {
csvstream.pause();
// do some heavy work
// when done resume the stream;
// - ROBUST: IMP: implementing batch inserts will make
// - Sending BATCH=50
console.log('row', row);
csvstream.resume();
})
.on("end", function () {
console.log("We are done!")
res.end();
})
.on("error", function (error) {
console.log(error)
res.writeHead(500, {'content-type': 'text/html'});
res.end();
});
})
}
// show a file upload form
res.writeHead(200, {'content-type': 'text/html'});
res.end(
'<form action="/upload" enctype="multipart/form-data" method="post">'+
'<input type="text" name="title"><br>'+
'<input type="file" name="upload" multiple="multiple"><br>'+
'<input type="submit" value="Upload">'+
'</form>'
);
}).listen(8080);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment