Created
April 14, 2020 08:22
-
-
Save Danziger/b5fd983e8b4da99d66bde58ddad1dd43 to your computer and use it in GitHub Desktop.
Simple Node.js multi-process MapReduce. This was implemented as a weekend project in 2016, beware.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
module.exports = class FileChunksReader { | |
constructor(filepaths, callback, config = {}) { | |
if (!Array.isArray(filepaths) || filepaths.length < 1 || typeof callback !== 'function') { | |
throw new Error('Constructor signature not matching actual parameters.'); | |
} | |
this.options = { | |
chunkSize: config.chunkSize || 1024, // 1 KB | |
breakAt: config.breakAt, | |
}; | |
// TODO: IMPROVEMENT: Buffer some files, but do not open all of them at once! | |
const totalFiles = filepaths.length; | |
const files = []; | |
for (const filepath of filepaths) { | |
// console.log(`Opening '${ filepath }'...`); | |
fs.open(filepath, 'r', (error, fd) => { | |
if (error) { | |
throw error; | |
} | |
// console.log(`Opened '${ filepath }'. FD = ${ fd }`); | |
files.push(fd); | |
if (files.length === totalFiles) { | |
this.files = files; | |
this.currentFile = files.pop(); | |
this.position = 0; | |
this.pendingChunks = 0; | |
this.isReading = false; | |
callback(); | |
} | |
}); | |
} | |
} | |
read(callback, chunksToRead = 1) { | |
if (this.isWaiting()) { | |
throw new Error('Files not opened yet.'); | |
} | |
if (this.isDone()) { | |
throw new Error('All files have been completely read already.'); | |
} | |
this.pendingChunks += chunksToRead; | |
if (this.isReading) { // To prevent a motherf***er race condition! | |
// console.log(`Waiting for read ${ this.currentFile } @ ${ this.position }... Pending Chunks = ${ this.pendingChunks }`); | |
return; | |
} | |
this.isReading = true; | |
// TODO: IMPROVEMENT: Pending chunks handling: | |
// TODO: OPTION 1: If chunksToRead > 1, read all at once and split afterwards (single read call)! | |
// TODO: OPTION 2: If chunksToRead > 1, read all at once and do NOT split (return a bigger-than-configured chunk). | |
const bytesToRead = this.options.chunkSize; | |
let buffer = new Buffer(bytesToRead); | |
// console.log(`Reading FD = ${ this.currentFile } @ ${ this.position }`); | |
fs.read(this.currentFile, buffer, 0, bytesToRead, this.position, (error, bytesRead) => { | |
if (error) { | |
// console.log(`There was an error while trying to read data:\n\n`, error); | |
this.read(callback, 0); | |
return; | |
} | |
if (bytesRead === 0) { | |
// console.log(`Closing FD = ${ this.currentFile }...`); | |
fs.close(this.currentFile, function(err) { | |
if (err) throw err; | |
}); | |
if (this.currentFile = this.files.pop()) { // Read from the next opened file: | |
this.position = 0; | |
this.isReading = false; | |
this.read(callback, 0); | |
// TODO: Optional log. | |
} else { // All files have already been read: | |
this.currentFile = null; | |
callback(); | |
// TODO: Log | |
// TODO: Count chunks? | |
} | |
return; | |
} | |
buffer = (bytesRead < bytesToRead ? buffer.slice(0, bytesRead) : buffer); | |
const lastCharacterToReturn = this.options.breakAt !== undefined && buffer.lastIndexOf(this.options.breakAt); | |
if (lastCharacterToReturn && lastCharacterToReturn !== -1) { | |
buffer = buffer.slice(0, lastCharacterToReturn + 1); | |
} | |
// console.log(`Read FD = ${ this.currentFile } @ ${ this.position }\n`); | |
this.position += buffer.length; | |
this.isReading = false; | |
callback(buffer.toString('utf8')); | |
--this.pendingChunks && this.read(callback, 0); | |
}); | |
} | |
isWaiting() { | |
return this.currentFile === undefined; | |
} | |
isReady() { | |
return typeof(this.currentFile) === 'number'; | |
} | |
isDone() { | |
return this.currentFile === null; | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const CORES = require('os').cpus().length; | |
const path = require('path'); | |
const fork = require('child_process').fork; | |
module.exports = class MapReduce { | |
constructor(map, reduce, callback, config = {}) { | |
if (typeof map !== 'string' || typeof reduce !== 'string' || typeof callback !== 'function') { | |
throw new Error('Constructor signature not matching actual parameters.'); | |
} | |
this.options = { | |
// THREADS: | |
// | |
// Number of threads we want to use and also number of Map/Reduce nodes that will be created. | |
threads: config.threads || CORES, | |
// KEYS PER REDUCE NODE: | |
// | |
// Number of slices of the intermediate result that each Reduce node will get. | |
// 0 means each Reduce will get # KEYS / # NODES slices of the intermediate result. | |
slicesPerReduceNode: config.slicesPerReduceNode && -config.slicesPerReduceNode || 0, | |
}; | |
// TODO: Add underscore | |
this.callback = callback; | |
this.workers = {}; | |
this.workersAvailable = []; | |
this.dataChunks = {}; | |
this.dataChunksAvailable = []; | |
this.workersToDataChunks = {}; | |
this.dataChunksCount = 0; | |
this.intermediateGroupedResult = {}; | |
this.intermediateGroupedResultsAvailable = []; | |
this.workersToIntermediateGroupedResults = {}; | |
this.result = {}; | |
this.isMapFinished = false; | |
this.isReduceFinished = false; | |
this._distributeWork = this._distributeMap.bind(this); // TODO: Is this bind needed? | |
// Init communication events: | |
for(let i = 0; i < this.options.threads; ++i) { | |
const worker = fork(path.join(__dirname, 'process.js'), [map, reduce]); | |
worker.on('message', result => { | |
if (result === 'READY') { | |
this.workersAvailable.push(worker.pid); | |
this._distributeWork(); | |
} else if (this.isMapFinished) { | |
this._onReduceFinished(worker, result); | |
} else { | |
this._onMapFinished(worker, result); | |
} | |
}); | |
worker.on('exit', (code, signal) => { | |
if (signal) { | |
// console.log(`worker was killed by signal: ${signal}`); | |
} else if (code !== 0) { | |
// console.log(`worker exited with error code: ${code}`); | |
} else { | |
// console.log('worker success!'); | |
} | |
}); | |
this.workers[worker.pid] = worker; | |
} | |
} | |
push(data) { | |
if (this.isMapFinished) { | |
throw new Error(`Can't push more data as the ${ this.isReduceFinished ? 'Reduce' : 'Map' } phase is already finished.`); | |
} | |
data = Array.isArray(data) ? data : [data]; | |
data.forEach(dataChunk => { | |
const index = this.dataChunksCount++; | |
this.dataChunks[index] = dataChunk; | |
this.dataChunksAvailable.push(index); | |
}); | |
this._distributeWork(); | |
} | |
_distributeMap() { | |
if (this.dataChunksAvailable.length === 0 || this.workersAvailable.length === 0) { | |
// console.log('No data or nodes available...'); | |
return; | |
} | |
const dataChunkIndex = this.dataChunksAvailable.pop(); | |
const dataChunk = this.dataChunks[dataChunkIndex]; | |
const workerIndex = this.workersAvailable.pop(); | |
const worker = this.workers[workerIndex]; | |
worker.send({ | |
map: true, | |
key: dataChunkIndex, | |
value: dataChunk, | |
}); | |
this.workersToDataChunks[workerIndex] = dataChunkIndex; | |
this._distributeMap(); | |
} | |
_distributeReduce() { | |
// console.log(Object.keys(this.intermediateGroupedResult).length); | |
// TODO: Maybe the nodes could do that after the map, if the data is big enough! | |
/* | |
for (const key in this.intermediateGroupedResult) { | |
this.intermediateGroupedResult[key] = this.intermediateGroupedResult[key].reduce((a, b) => a + b, 0) | |
} | |
console.log(this.intermediateGroupedResult); | |
throw new Error('BYE'); | |
*/ | |
if (this.intermediateGroupedResultsAvailable.length === 0 || this.workersAvailable.length === 0) { | |
// console.log('No intermediate results or nodes available...'); | |
return; | |
} | |
// console.log('Remaining Reduces = ' + this.intermediateGroupedResultsAvailable.length); | |
const intermediateResultKeys = this.intermediateGroupedResultsAvailable.splice(this.options.slicesPerReduceNode); | |
const intermediateResultsData = intermediateResultKeys | |
.map(intermediateResultKey => [intermediateResultKey, this.intermediateGroupedResult[intermediateResultKey]]); | |
const workerIndex = this.workersAvailable.pop(); | |
const worker = this.workers[workerIndex]; | |
worker.send({ | |
map: false, | |
slices: intermediateResultsData, | |
}); | |
this.workersToIntermediateGroupedResults[workerIndex] = intermediateResultKeys; | |
this._distributeReduce(); | |
} | |
_groupMapResults(mapResult, groupedResult = {}) { | |
for (const tuple of mapResult) { | |
const key = tuple[0]; | |
const value = tuple[1]; | |
const group = groupedResult[key] || []; | |
group.push(value); | |
groupedResult[key] = group; | |
} | |
return groupedResult; | |
} | |
_onMapFinished(worker, mapResult) { | |
const workerId = worker.pid; | |
const dataChunkIndex = this.workersToDataChunks[workerId]; | |
delete this.workersToDataChunks[workerId]; | |
delete this.dataChunks[dataChunkIndex]; | |
this.workersAvailable.push(workerId); | |
this._groupMapResults(mapResult, this.intermediateGroupedResult); | |
if (this.isMapFinished = Object.keys(this.dataChunks).length === 0) { | |
this._distributeWork = this._distributeReduce.bind(this); // TODO: Is this bind needed? | |
const intermediateGroupedResultsAvailable = this.intermediateGroupedResultsAvailable = Object.keys(this.intermediateGroupedResult); | |
if (this.options.slicesPerReduceNode === 0) { | |
this.options.slicesPerReduceNode = - Math.ceil(intermediateGroupedResultsAvailable.length / this.options.threads); | |
} | |
} else { | |
this.callback(false); | |
} | |
this._distributeWork(); | |
} | |
_onReduceFinished(worker, reduceResult) { | |
const workerId = worker.pid; | |
const intermediateResultKeys = this.workersToIntermediateGroupedResults[workerId]; | |
delete this.workersToIntermediateGroupedResults[workerId]; | |
// TODO: Is this efficient? | |
intermediateResultKeys.map(key => delete this.intermediateGroupedResult[key]); | |
this.workersAvailable.push(workerId); | |
// console.log(reduceResult); | |
Object.assign(this.result, reduceResult); | |
// TODO: Should we check this.intermediateGroupedResultsAvailable instead= | |
if (this.isReduceFinished = Object.keys(this.intermediateGroupedResult).length === 0) { | |
this.callback(true, this.result); | |
for (const pid in this.workers) { | |
this.workers[pid].kill(); | |
} | |
} else { | |
this._distributeWork(); | |
} | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const FileChunksReader = require('../src/FileChunksReader'); | |
const MapReduce = require('../src/MapReduce'); | |
let _map; | |
let _reduce; | |
let _mapReduceOptions; | |
let _formatter; | |
let _reader; | |
let _mapReduce; | |
let _callback; | |
let _chunks; | |
let _chunksCount; | |
let _start; | |
module.exports = function mapReduceFileChunks(files, map, reduce, readerOptions, mapReduceOptions, formatter, callback) { | |
_map = map; | |
_reduce = reduce; | |
_mapReduceOptions = mapReduceOptions; | |
_formatter = formatter; | |
_reader = new FileChunksReader(files, _onFilesOpened, readerOptions); | |
_mapReduce = null; | |
_callback = callback; | |
_chunks = []; | |
_chunksCount = 0; | |
_start = 0; | |
}; | |
function _onFilesOpened() { | |
// console.log('Files opened.'); | |
_preloadChunks(); | |
} | |
function _preloadChunks() { | |
// console.log('Preloading chunk ' + (chunks.length + 1) + '...'); | |
_reader.isReady() && _reader.read(str => { | |
if (_reader.isDone() || _chunks.length >= 16) { // TODO: Make configurable | |
_startMapReduce(); | |
} else { | |
_chunks.push(str); | |
++_chunksCount; | |
_preloadChunks(); | |
} | |
}); | |
} | |
function _readChunk(callback) { | |
_reader.isReady() && _reader.read(str => { | |
if (_reader.isDone()) { | |
// console.log('Done reading. CHUNKS = ' + _chunksCount); | |
return; | |
} | |
++_chunksCount; | |
callback(str); | |
}); | |
} | |
function _startMapReduce() { | |
// console.log('\nStarting MapReduce...'); | |
_start = new Date(); | |
_mapReduce = new MapReduce(_map, _reduce, _onMapFinished, _mapReduceOptions); | |
_mapReduce.push(_chunks); | |
} | |
function _onMapFinished(done, result) { | |
if (done) { | |
if (_formatter) { | |
_formatter(result, new Date() - _start); | |
} else { | |
// console.log(result); | |
// console.log(new Date() - _start); | |
} | |
if (_callback) { | |
_callback(); | |
} | |
} else { | |
_readChunk(str => _mapReduce.push(str)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const map = require(process.argv[2]); | |
const reduce = require(process.argv[3]); | |
process.on('message', data => { | |
if (data.map) { | |
process.send(map(data.key, data.value)); | |
} else { | |
const partialResult = {}; // TODO: Should I do all without objects, with arrays? | |
for (const slice of data.slices) { | |
const key = slice[0]; | |
partialResult[key] = reduce(key, slice[1])[1]; | |
} | |
process.send(partialResult); | |
} | |
}); | |
process.send('READY'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
const path = require('path'); | |
const mapReduceFileChunksConnector = require('./mapReduceFileChunksConnector'); | |
const formatter = require('../test/utils/wordCountFormatterEvaluation'); | |
const files = process.argv.slice(2).map(filePath => path.join(__dirname, filePath)).reverse(); | |
function processFile() { | |
const file = files.pop(); | |
if (!file) return; | |
console.log('\n' + path.basename(file) + ':'); | |
mapReduceFileChunksConnector( | |
[file], | |
path.join(__dirname, '../test/map/map.js'), | |
path.join(__dirname, '../test/reduce/reduce.js'), | |
{ | |
chunkSize: 1024 * 32, | |
breakAt: 10, | |
}, | |
{ | |
threads: 6, | |
slicesPerReduceNode: 100, | |
}, | |
formatter, | |
() => processFile() | |
); | |
} | |
processFile(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment