Skip to content

Instantly share code, notes, and snippets.

@Danziger
Created April 14, 2020 08:22
Show Gist options
  • Save Danziger/b5fd983e8b4da99d66bde58ddad1dd43 to your computer and use it in GitHub Desktop.
Save Danziger/b5fd983e8b4da99d66bde58ddad1dd43 to your computer and use it in GitHub Desktop.
Simple Node.js multi-process MapReduce. This was implemented as a weekend project in 2016, beware.
const fs = require('fs');
module.exports = class FileChunksReader {
constructor(filepaths, callback, config = {}) {
if (!Array.isArray(filepaths) || filepaths.length < 1 || typeof callback !== 'function') {
throw new Error('Constructor signature not matching actual parameters.');
}
this.options = {
chunkSize: config.chunkSize || 1024, // 1 KB
breakAt: config.breakAt,
};
// TODO: IMPROVEMENT: Buffer some files, but do not open all of them at once!
const totalFiles = filepaths.length;
const files = [];
for (const filepath of filepaths) {
// console.log(`Opening '${ filepath }'...`);
fs.open(filepath, 'r', (error, fd) => {
if (error) {
throw error;
}
// console.log(`Opened '${ filepath }'. FD = ${ fd }`);
files.push(fd);
if (files.length === totalFiles) {
this.files = files;
this.currentFile = files.pop();
this.position = 0;
this.pendingChunks = 0;
this.isReading = false;
callback();
}
});
}
}
read(callback, chunksToRead = 1) {
if (this.isWaiting()) {
throw new Error('Files not opened yet.');
}
if (this.isDone()) {
throw new Error('All files have been completely read already.');
}
this.pendingChunks += chunksToRead;
if (this.isReading) { // To prevent a motherf***er race condition!
// console.log(`Waiting for read ${ this.currentFile } @ ${ this.position }... Pending Chunks = ${ this.pendingChunks }`);
return;
}
this.isReading = true;
// TODO: IMPROVEMENT: Pending chunks handling:
// TODO: OPTION 1: If chunksToRead > 1, read all at once and split afterwards (single read call)!
// TODO: OPTION 2: If chunksToRead > 1, read all at once and do NOT split (return a bigger-than-configured chunk).
const bytesToRead = this.options.chunkSize;
let buffer = new Buffer(bytesToRead);
// console.log(`Reading FD = ${ this.currentFile } @ ${ this.position }`);
fs.read(this.currentFile, buffer, 0, bytesToRead, this.position, (error, bytesRead) => {
if (error) {
// console.log(`There was an error while trying to read data:\n\n`, error);
this.read(callback, 0);
return;
}
if (bytesRead === 0) {
// console.log(`Closing FD = ${ this.currentFile }...`);
fs.close(this.currentFile, function(err) {
if (err) throw err;
});
if (this.currentFile = this.files.pop()) { // Read from the next opened file:
this.position = 0;
this.isReading = false;
this.read(callback, 0);
// TODO: Optional log.
} else { // All files have already been read:
this.currentFile = null;
callback();
// TODO: Log
// TODO: Count chunks?
}
return;
}
buffer = (bytesRead < bytesToRead ? buffer.slice(0, bytesRead) : buffer);
const lastCharacterToReturn = this.options.breakAt !== undefined && buffer.lastIndexOf(this.options.breakAt);
if (lastCharacterToReturn && lastCharacterToReturn !== -1) {
buffer = buffer.slice(0, lastCharacterToReturn + 1);
}
// console.log(`Read FD = ${ this.currentFile } @ ${ this.position }\n`);
this.position += buffer.length;
this.isReading = false;
callback(buffer.toString('utf8'));
--this.pendingChunks && this.read(callback, 0);
});
}
isWaiting() {
return this.currentFile === undefined;
}
isReady() {
return typeof(this.currentFile) === 'number';
}
isDone() {
return this.currentFile === null;
}
};
const CORES = require('os').cpus().length;
const path = require('path');
const fork = require('child_process').fork;
module.exports = class MapReduce {
constructor(map, reduce, callback, config = {}) {
if (typeof map !== 'string' || typeof reduce !== 'string' || typeof callback !== 'function') {
throw new Error('Constructor signature not matching actual parameters.');
}
this.options = {
// THREADS:
//
// Number of threads we want to use and also number of Map/Reduce nodes that will be created.
threads: config.threads || CORES,
// KEYS PER REDUCE NODE:
//
// Number of slices of the intermediate result that each Reduce node will get.
// 0 means each Reduce will get # KEYS / # NODES slices of the intermediate result.
slicesPerReduceNode: config.slicesPerReduceNode && -config.slicesPerReduceNode || 0,
};
// TODO: Add underscore
this.callback = callback;
this.workers = {};
this.workersAvailable = [];
this.dataChunks = {};
this.dataChunksAvailable = [];
this.workersToDataChunks = {};
this.dataChunksCount = 0;
this.intermediateGroupedResult = {};
this.intermediateGroupedResultsAvailable = [];
this.workersToIntermediateGroupedResults = {};
this.result = {};
this.isMapFinished = false;
this.isReduceFinished = false;
this._distributeWork = this._distributeMap.bind(this); // TODO: Is this bind needed?
// Init communication events:
for(let i = 0; i < this.options.threads; ++i) {
const worker = fork(path.join(__dirname, 'process.js'), [map, reduce]);
worker.on('message', result => {
if (result === 'READY') {
this.workersAvailable.push(worker.pid);
this._distributeWork();
} else if (this.isMapFinished) {
this._onReduceFinished(worker, result);
} else {
this._onMapFinished(worker, result);
}
});
worker.on('exit', (code, signal) => {
if (signal) {
// console.log(`worker was killed by signal: ${signal}`);
} else if (code !== 0) {
// console.log(`worker exited with error code: ${code}`);
} else {
// console.log('worker success!');
}
});
this.workers[worker.pid] = worker;
}
}
push(data) {
if (this.isMapFinished) {
throw new Error(`Can't push more data as the ${ this.isReduceFinished ? 'Reduce' : 'Map' } phase is already finished.`);
}
data = Array.isArray(data) ? data : [data];
data.forEach(dataChunk => {
const index = this.dataChunksCount++;
this.dataChunks[index] = dataChunk;
this.dataChunksAvailable.push(index);
});
this._distributeWork();
}
_distributeMap() {
if (this.dataChunksAvailable.length === 0 || this.workersAvailable.length === 0) {
// console.log('No data or nodes available...');
return;
}
const dataChunkIndex = this.dataChunksAvailable.pop();
const dataChunk = this.dataChunks[dataChunkIndex];
const workerIndex = this.workersAvailable.pop();
const worker = this.workers[workerIndex];
worker.send({
map: true,
key: dataChunkIndex,
value: dataChunk,
});
this.workersToDataChunks[workerIndex] = dataChunkIndex;
this._distributeMap();
}
_distributeReduce() {
// console.log(Object.keys(this.intermediateGroupedResult).length);
// TODO: Maybe the nodes could do that after the map, if the data is big enough!
/*
for (const key in this.intermediateGroupedResult) {
this.intermediateGroupedResult[key] = this.intermediateGroupedResult[key].reduce((a, b) => a + b, 0)
}
console.log(this.intermediateGroupedResult);
throw new Error('BYE');
*/
if (this.intermediateGroupedResultsAvailable.length === 0 || this.workersAvailable.length === 0) {
// console.log('No intermediate results or nodes available...');
return;
}
// console.log('Remaining Reduces = ' + this.intermediateGroupedResultsAvailable.length);
const intermediateResultKeys = this.intermediateGroupedResultsAvailable.splice(this.options.slicesPerReduceNode);
const intermediateResultsData = intermediateResultKeys
.map(intermediateResultKey => [intermediateResultKey, this.intermediateGroupedResult[intermediateResultKey]]);
const workerIndex = this.workersAvailable.pop();
const worker = this.workers[workerIndex];
worker.send({
map: false,
slices: intermediateResultsData,
});
this.workersToIntermediateGroupedResults[workerIndex] = intermediateResultKeys;
this._distributeReduce();
}
_groupMapResults(mapResult, groupedResult = {}) {
for (const tuple of mapResult) {
const key = tuple[0];
const value = tuple[1];
const group = groupedResult[key] || [];
group.push(value);
groupedResult[key] = group;
}
return groupedResult;
}
_onMapFinished(worker, mapResult) {
const workerId = worker.pid;
const dataChunkIndex = this.workersToDataChunks[workerId];
delete this.workersToDataChunks[workerId];
delete this.dataChunks[dataChunkIndex];
this.workersAvailable.push(workerId);
this._groupMapResults(mapResult, this.intermediateGroupedResult);
if (this.isMapFinished = Object.keys(this.dataChunks).length === 0) {
this._distributeWork = this._distributeReduce.bind(this); // TODO: Is this bind needed?
const intermediateGroupedResultsAvailable = this.intermediateGroupedResultsAvailable = Object.keys(this.intermediateGroupedResult);
if (this.options.slicesPerReduceNode === 0) {
this.options.slicesPerReduceNode = - Math.ceil(intermediateGroupedResultsAvailable.length / this.options.threads);
}
} else {
this.callback(false);
}
this._distributeWork();
}
_onReduceFinished(worker, reduceResult) {
const workerId = worker.pid;
const intermediateResultKeys = this.workersToIntermediateGroupedResults[workerId];
delete this.workersToIntermediateGroupedResults[workerId];
// TODO: Is this efficient?
intermediateResultKeys.map(key => delete this.intermediateGroupedResult[key]);
this.workersAvailable.push(workerId);
// console.log(reduceResult);
Object.assign(this.result, reduceResult);
// TODO: Should we check this.intermediateGroupedResultsAvailable instead=
if (this.isReduceFinished = Object.keys(this.intermediateGroupedResult).length === 0) {
this.callback(true, this.result);
for (const pid in this.workers) {
this.workers[pid].kill();
}
} else {
this._distributeWork();
}
}
};
const FileChunksReader = require('../src/FileChunksReader');
const MapReduce = require('../src/MapReduce');
let _map;
let _reduce;
let _mapReduceOptions;
let _formatter;
let _reader;
let _mapReduce;
let _callback;
let _chunks;
let _chunksCount;
let _start;
module.exports = function mapReduceFileChunks(files, map, reduce, readerOptions, mapReduceOptions, formatter, callback) {
_map = map;
_reduce = reduce;
_mapReduceOptions = mapReduceOptions;
_formatter = formatter;
_reader = new FileChunksReader(files, _onFilesOpened, readerOptions);
_mapReduce = null;
_callback = callback;
_chunks = [];
_chunksCount = 0;
_start = 0;
};
function _onFilesOpened() {
// console.log('Files opened.');
_preloadChunks();
}
function _preloadChunks() {
// console.log('Preloading chunk ' + (chunks.length + 1) + '...');
_reader.isReady() && _reader.read(str => {
if (_reader.isDone() || _chunks.length >= 16) { // TODO: Make configurable
_startMapReduce();
} else {
_chunks.push(str);
++_chunksCount;
_preloadChunks();
}
});
}
function _readChunk(callback) {
_reader.isReady() && _reader.read(str => {
if (_reader.isDone()) {
// console.log('Done reading. CHUNKS = ' + _chunksCount);
return;
}
++_chunksCount;
callback(str);
});
}
function _startMapReduce() {
// console.log('\nStarting MapReduce...');
_start = new Date();
_mapReduce = new MapReduce(_map, _reduce, _onMapFinished, _mapReduceOptions);
_mapReduce.push(_chunks);
}
function _onMapFinished(done, result) {
if (done) {
if (_formatter) {
_formatter(result, new Date() - _start);
} else {
// console.log(result);
// console.log(new Date() - _start);
}
if (_callback) {
_callback();
}
} else {
_readChunk(str => _mapReduce.push(str));
}
}
const map = require(process.argv[2]);
const reduce = require(process.argv[3]);
process.on('message', data => {
if (data.map) {
process.send(map(data.key, data.value));
} else {
const partialResult = {}; // TODO: Should I do all without objects, with arrays?
for (const slice of data.slices) {
const key = slice[0];
partialResult[key] = reduce(key, slice[1])[1];
}
process.send(partialResult);
}
});
process.send('READY');
#!/usr/bin/env node
const path = require('path');
const mapReduceFileChunksConnector = require('./mapReduceFileChunksConnector');
const formatter = require('../test/utils/wordCountFormatterEvaluation');
const files = process.argv.slice(2).map(filePath => path.join(__dirname, filePath)).reverse();
function processFile() {
const file = files.pop();
if (!file) return;
console.log('\n' + path.basename(file) + ':');
mapReduceFileChunksConnector(
[file],
path.join(__dirname, '../test/map/map.js'),
path.join(__dirname, '../test/reduce/reduce.js'),
{
chunkSize: 1024 * 32,
breakAt: 10,
},
{
threads: 6,
slicesPerReduceNode: 100,
},
formatter,
() => processFile()
);
}
processFile();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment