Last active
August 29, 2015 14:01
-
-
Save retrohacker/3329c61bceb94eb095b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Refer to comment for description of XML file. | |
* This script loads in a medium size (23 GB) XML file and parses out timestamps (w/o date) as a map. | |
* The timestamps are the creation dates of each entity in the XML document. | |
*/ | |
var cluster = require('cluster') | |
var queue = [] | |
var timestampMap = {} | |
var threads = 11; | |
var needWork = [] | |
/** | |
* Determine what chunk of the program to run | |
*/ | |
if(cluster.isMaster) { | |
master() | |
} else { | |
console.log("Worker "+cluster.worker.id+" started") | |
map() | |
} | |
/** | |
* Master reads in data from posts.xml interpreting an entry as a single line of the file. | |
* It then spawns off threads each responsible for ~*chunksize* worth of entries. | |
* Finally it writes the generated key-value pairs to the HDD at output.data | |
*/ | |
function master() { | |
for(var i = 0; i < threads; i++) { | |
needWork.push(cluster.fork()) | |
} | |
var fs = require('fs') | |
var stream = fs.createReadStream('posts.xml') | |
var raw = "" | |
var progress = 0 | |
var chunksize = 5000 | |
stream.on('data', function(chunk) { | |
raw += chunk | |
arr = raw.split("\n") | |
raw = arr.pop() | |
queue = queue.concat(arr) | |
if(queue.length>chunksize) { | |
progress+=queue.length | |
startWorker(workerComplete(stream)) | |
if(needWork.length==0) {stream.pause()} | |
} //end if | |
}) //end stream.on | |
stream.on('end',function() { | |
queue = queue.concat(raw.split("\n")) | |
getTimestamps() | |
console.log("Writing data to FS") | |
fs.writeFile("output.data",JSON.stringify(timestampMap)) | |
}) | |
} | |
/** | |
* Spawn a worker thread sending it the entire queue up until this point. | |
*/ | |
function startWorker(callback) { | |
worker = needWork.pop() | |
worker.on('message',callback(worker)) | |
console.log("Sending "+queue.length+" to "+worker.id) | |
worker.send({msg:queue}) | |
queue=[] | |
} | |
/** | |
* Called when a worker sends its result to Master. It parses through the key-value pairs and reduces them into the timestampMap{}. | |
*/ | |
function workerComplete(stream) { | |
return function(worker) { | |
return function(msg) { | |
worker.removeAllListeners() | |
needWork.push(worker); | |
stream.resume(); | |
console.log("Master received: "+msg) | |
Object.keys(msg).forEach(function(value,index) { | |
if(typeof timestampMap[index] === "undefined") { | |
timestampMap[index] = value | |
} else { | |
timestampMap[index] += value | |
} //end if | |
}) //end Object.keys.forEach | |
} //end function() | |
} //end function(id) | |
} | |
/** | |
* This is the sole function of the child threads. It takes in an array of XML entities from Master, and decodes them handling logic inside the callback. | |
*/ | |
function map() { | |
process.on('message',decodeXML) | |
} | |
/** | |
* Decodes an array of XML entities, extracting key-value pairs and sending the resulting array of key-value pairs back to the master thread. When it's work is done, it kills itself. | |
*/ | |
function decodeXML(msg) { | |
console.log(cluster.worker.id+" received message: ",msg.msg.length) | |
queue = msg.msg | |
result = getTimestamps(queue) | |
process.send(result) | |
} | |
/** | |
* Generates the array of key-value pairs. In this case, we extract timestamps. | |
*/ | |
function getTimestamps(queue) { | |
var xml2js = require('xml2js') | |
timestamps = {} | |
queue.forEach(function(val) { | |
xml2js.parseString(val,function(err,result) { | |
time = parseRow(result) | |
if(time == null) {return} | |
if(typeof timestampMap[time]==="undefined") { | |
timestamps[time] = 1 | |
} else { | |
timestamps[time]++ | |
} | |
}) // end xml2js.parseString() | |
}) //end queue.forEach | |
return timestamps | |
} | |
/** | |
* We parse a single XML entity looking for our data. If found, we return it. In this case we are looking for a timestamp. | |
*/ | |
function parseRow(result) { | |
if( result!=null && | |
typeof result !== "undefined" && | |
typeof result["row"] !== "undefined" && | |
typeof result["row"]["$"] !== "undefined" && | |
typeof result["row"]["$"]["CreationDate"] !== "undefined") { | |
split = result["row"]["$"]["CreationDate"].split("T") | |
if(split.length>1) { | |
time = split[1].split(".")[0] | |
return time | |
} //end if | |
} //end if | |
return null | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Problem:
I don't know how to throttle the creation of threads/reading in data from HDD. I quickly run out of memory while forking children.