Last active
October 17, 2015 17:30
-
-
Save jfarid27/30ed7854229f3bbe5393 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var zlib = require("zlib"); | |
var jsonstream = require("json-stream"); | |
var fs = require("fs"); | |
var combine = require("combine-streams"); | |
var Readable = require("stream").Readable; | |
var Async = require("async"); | |
var work = function(cachedir, file){ | |
return function(callback) { | |
if (file.endsWith(".json.gz") && file.indexOf(dateprefix)>-1) { | |
console.log("Scanning:",file); | |
var scanner = scangz(cachedir + file); | |
scanner.on('end', function () { | |
console.log("Done with", file); | |
callback(null) | |
}); | |
var scannerp = scanner.pipe(jsonstream()); | |
scannerp.on('error', function(err) { | |
console.log(err); | |
console.log(err.stack); | |
callback("Scanner p error") | |
}); | |
scannerp.on('data', function(chunk) { | |
if (filter(chunk)) { | |
results.push(chunk); | |
} | |
}); | |
} | |
} | |
} | |
var scangz = function (fname) { | |
var rawread = fs.createReadStream(fname); | |
var unzipped = rawread.pipe(zlib.createUnzip()); | |
return unzipped; | |
} | |
// Return a stream | |
module.exports.scanner = function(cachedir, dateprefix, filter) { | |
var results = new Readable({objectMode: true}); | |
results._read = function noop() {}; | |
fs.readdir(cachedir, function(err,files) { | |
if (err) throw err; | |
/* Now for each file, raise the event and add it to the event queue | |
* Since node is single threaded, the events will be added to the queue, | |
* completes reading, then only one event will be handled at a time. | |
*/ | |
var workload = [] | |
files.forEach(function(file){ | |
worlkload.push(work(cachedir, file)) | |
}); | |
async.series(workload, function(err){ | |
if (err) { | |
//Write your own error handler here | |
} | |
}) | |
}); | |
return results; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment