Reading Big Files in Node.js is a little tricky. Node.js is meant to deal with I/O tasks efficiently and
not CPU intensive computations. It is still doable though but I'd prefer doing such tasks in languages like python, R etc.
Reading, Parsing, Transforming and then Saving large data sets (I'm talking millions of records here) can be done in
a lot of ways but only a few of those are efficient. Following snippet is able to parse millions of records without
wasting a lot of CPU (15% - 30% max) and (40 MB - 60 MB max) memory. It is based on Streams
.
The following program expects the input to be a csv file source eg. big-data.unpr.csv
It saves the result as ndjson and not json as working with huge datasets is easier when done using ndjson format.
import { createReadStream, createWriteStream, readdirSync } from 'fs';
import split from 'split2';
import through2 from 'through2';
import parse from 'csv-parse';
import pump from 'pump';
import ndjson from 'ndjson';
import uniqid from 'uniqid';
import { Logger } from '../util';
import { CronJob } from 'cron';
const serialize = () => ndjson.serialize();
const source = (filename: string) => createReadStream(filename);
const output = (filename: string) => createWriteStream(filename);
const transformObjectStream = (context: any) => {
return through2.obj(async function(chunk: string, enc: string, callback: (err?: Error) => any) {
let stringChunk;
try {
stringChunk = chunk.toString();
} catch (e) {
stringChunk = null;
Logger.error(`STEP - Transform Object To String Error at line ${context.line} ${e}`);
return callback();
}
parse(
stringChunk,
{
bom: true,
skip_empty_lines: true,
skip_lines_with_empty_values: true,
skip_lines_with_error: true,
trim: true,
},
async (err: Error, parsedString: string[]) => {
if (err) {
Logger.error(`STEP - Transform Object Error at line ${context.line} ${err}`);
return callback();
}
// this will give rest to cpu
// brings cpu usage from 100% to 20-25% :-)
if (context.line % 5000 === 0) {
await delay();
}
try {
const join = parsedString[0];
if (!join || !Array.isArray(join)) {
throw new Error(`Bad CSV Line at ${context.line}`);
}
const data = {
address: join[3],
age: join[1],
city: join[2],
firstName: join[0],
gender: join[7],
lastName: join[6],
pin: join[8],
searchId: uniqid(),
state: join[4],
};
this.push(data);
context.line++;
return callback();
} catch (e) {
Logger.error(`STEP - Transform Object Error at line ${context.line} ${e}`);
return callback();
}
}
);
});
};
const onErrorOrFinish = (context: any) => (e: Error) => {
if (e) {
Logger.error(`FINAL - Error After Parsing ${context.line} lines - ${e}`);
} else {
Logger.debug(`Time Taken ${(Date.now() - context.time) / 100} seconds`);
Logger.debug(`Lines Parsed ${context.line}`);
}
globalContext.jobInProgress = false;
};
const delay = () =>
new Promise(resolve => {
setTimeout(resolve, 1000);
});
const globalContext = { jobInProgress: false };
function run() {
Logger.debug(`Job is ${globalContext.jobInProgress ? 'ALREADY' : 'NOT'} running.`);
if (globalContext.jobInProgress) {
return;
}
const files: string[] = readdirSync('./');
let fileName: string = null;
for (const name of files) {
if (
name.includes('.unpr.csv') &&
!files.includes(`${name.replace('.unpr.csv', '')}.pr.ndjson`)
) {
fileName = name;
break;
}
}
if (!fileName) {
Logger.debug(`No more files to process. Exiting.`);
return;
}
const jobContext = {
existFileName: fileName,
existFilePath: `./${fileName}`,
fileName: `./${fileName.replace('.unpr.csv', '')}.pr.ndjson`,
line: 1,
time: Date.now(),
};
globalContext.jobInProgress = true;
pump(
source(jobContext.existFilePath),
split(),
transformObjectStream(jobContext),
serialize(),
output(jobContext.fileName),
onErrorOrFinish(jobContext)
);
}
const job = new CronJob({
context: globalContext,
cronTime: '*/1 * * * *',
onTick: run,
start: false,
timeZone: 'America/Los_Angeles',
});
job.start();