Skip to content

Instantly share code, notes, and snippets.

@tusharf5
Last active August 27, 2024 08:58
Show Gist options
  • Save tusharf5/979d0b67da8830bf34598f6d0ee27d1a to your computer and use it in GitHub Desktop.
Save tusharf5/979d0b67da8830bf34598f6d0ee27d1a to your computer and use it in GitHub Desktop.
Read and Process Very Large Files line by line in Node.js With less CPU and Memory usage.

Reading Big Files in Node.js is a little tricky. Node.js is meant to deal with I/O tasks efficiently and not CPU intensive computations. It is still doable though but I'd prefer doing such tasks in languages like python, R etc. Reading, Parsing, Transforming and then Saving large data sets (I'm talking millions of records here) can be done in a lot of ways but only a few of those are efficient. Following snippet is able to parse millions of records without wasting a lot of CPU (15% - 30% max) and (40 MB - 60 MB max) memory. It is based on Streams.

The following program expects the input to be a csv file source eg. big-data.unpr.csv It saves the result as ndjson and not json as working with huge datasets is easier when done using ndjson format.

import { createReadStream, createWriteStream, readdirSync } from 'fs';
import split from 'split2';
import through2 from 'through2';
import parse from 'csv-parse';
import pump from 'pump';
import ndjson from 'ndjson';
import uniqid from 'uniqid';

import { Logger } from '../util';
import { CronJob } from 'cron';

const serialize = () => ndjson.serialize();
const source = (filename: string) => createReadStream(filename);
const output = (filename: string) => createWriteStream(filename);

const transformObjectStream = (context: any) => {
  return through2.obj(async function(chunk: string, enc: string, callback: (err?: Error) => any) {
    let stringChunk;
    try {
      stringChunk = chunk.toString();
    } catch (e) {
      stringChunk = null;
      Logger.error(`STEP - Transform Object To String Error at line ${context.line} ${e}`);
      return callback();
    }
    parse(
      stringChunk,
      {
        bom: true,
        skip_empty_lines: true,
        skip_lines_with_empty_values: true,
        skip_lines_with_error: true,
        trim: true,
      },
      async (err: Error, parsedString: string[]) => {
        if (err) {
          Logger.error(`STEP - Transform Object Error at line ${context.line} ${err}`);
          return callback();
        }
        // this will give rest to cpu
        // brings cpu usage from 100% to 20-25% :-)
        if (context.line % 5000 === 0) {
          await delay();
        }
        try {
          const join = parsedString[0];
          if (!join || !Array.isArray(join)) {
            throw new Error(`Bad CSV Line at ${context.line}`);
          }
          const data = {
            address: join[3],
            age: join[1],
            city: join[2],
            firstName: join[0],
            gender: join[7],
            lastName: join[6],
            pin: join[8],
            searchId: uniqid(),
            state: join[4],
          };
          this.push(data);
          context.line++;
          return callback();
        } catch (e) {
          Logger.error(`STEP - Transform Object Error at line ${context.line} ${e}`);
          return callback();
        }
      }
    );
  });
};

const onErrorOrFinish = (context: any) => (e: Error) => {
  if (e) {
    Logger.error(`FINAL - Error After Parsing ${context.line} lines - ${e}`);
  } else {
    Logger.debug(`Time Taken ${(Date.now() - context.time) / 100} seconds`);
    Logger.debug(`Lines Parsed ${context.line}`);
  }
  globalContext.jobInProgress = false;
};

const delay = () =>
  new Promise(resolve => {
    setTimeout(resolve, 1000);
  });

const globalContext = { jobInProgress: false };

function run() {
  Logger.debug(`Job is ${globalContext.jobInProgress ? 'ALREADY' : 'NOT'} running.`);
  if (globalContext.jobInProgress) {
    return;
  }
  const files: string[] = readdirSync('./');
  let fileName: string = null;
  for (const name of files) {
    if (
      name.includes('.unpr.csv') &&
      !files.includes(`${name.replace('.unpr.csv', '')}.pr.ndjson`)
    ) {
      fileName = name;
      break;
    }
  }
  if (!fileName) {
    Logger.debug(`No more files to process. Exiting.`);
    return;
  }
  const jobContext = {
    existFileName: fileName,
    existFilePath: `./${fileName}`,
    fileName: `./${fileName.replace('.unpr.csv', '')}.pr.ndjson`,
    line: 1,
    time: Date.now(),
  };
  globalContext.jobInProgress = true;

  pump(
    source(jobContext.existFilePath),
    split(),
    transformObjectStream(jobContext),
    serialize(),
    output(jobContext.fileName),
    onErrorOrFinish(jobContext)
  );
}

const job = new CronJob({
  context: globalContext,
  cronTime: '*/1 * * * *',
  onTick: run,
  start: false,
  timeZone: 'America/Los_Angeles',
});

job.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment