Last active
December 21, 2015 23:18
-
-
Save brycebaril/6380931 to your computer and use it in GitHub Desktop.
Zip two ordered time streams. A full version of this can be found here: https://github.com/brycebaril/node-sosj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = align | |
var Transform = require("stream").Transform | |
|| require("readable-stream/transform") | |
var inherits = require("util").inherits | |
var map = require("through2-map") | |
/** | |
* align takes two objectMode streams and a sequence key and will create a stream | |
* aligning records from each stream resulting in a stream that emits record doublets | |
* when the sequence key '_t' exactly matches. IMPORTANT: This only works on ordered streams! | |
* @param {Stream} leftStream An objectMode stream ordered by _t | |
* @param {Stream} rightStream An objectMode stream ordered by _t | |
* @return {Stream} A doublet stream containing [leftRecord, rightRecord], [leftRecord, null], or [null, rightRecord] | |
*/ | |
function align(leftStream, rightStream) { | |
var aligner = new Aligner() | |
var l = left() | |
var r = right() | |
var ended = 0 | |
l.on("end", function () { | |
l.unpipe(aligner) | |
if (++ended == 2) aligner.end() | |
}) | |
r.on("end", function () { | |
r.unpipe(aligner) | |
if (++ended == 2) aligner.end() | |
}) | |
leftStream.pipe(l).pipe(aligner, {end: false}) | |
rightStream.pipe(r).pipe(aligner, {end: false}) | |
return aligner | |
} | |
function left(stream) { | |
return map({objectMode: true}, function (record) { | |
return [record, null] | |
}) | |
} | |
function right(stream) { | |
return map({objectMode: true}, function (record) { | |
return [null, record] | |
}) | |
} | |
function Aligner(options) { | |
if (!(this instanceof Aligner)) return new Aligner(options) | |
// This MUST be an objectMode stream. | |
options = options || {} | |
options.objectMode = true | |
Transform.call(this, options) | |
this.queue = [[], []] | |
} | |
inherits(Aligner, Transform) | |
Aligner.prototype._transform = function (record, encoding, callback) { | |
var i = (record[0] != null) ? 0 : 1 | |
var o = +(!i) | |
var me = record[i] | |
var other, otherRecord | |
var myQueue = this.queue[i] | |
var otherQueue = this.queue[o] | |
if (myQueue.length > 0 || otherQueue.length == 0) { | |
// My queue is not empty, or the other queue is empty. | |
myQueue.push(record) | |
return callback() | |
} | |
while (otherRecord = otherQueue.shift()) { | |
other = otherRecord[o] | |
if (me._t == other._t) { | |
// These keys are joined, w00t | |
record[o] = other | |
this.push(record) | |
return callback() | |
} | |
else if (me._t < other._t) { | |
// This record pre-dates all seen records on the other's queue | |
otherQueue.unshift(otherRecord) | |
this.push(record) | |
return callback() | |
} | |
else { | |
// This comes after the earliest leftStream record. | |
// Emit that record and continue to the next leftStream record. | |
this.push(otherRecord) | |
} | |
} | |
myQueue.push(record) | |
return callback() | |
} | |
Aligner.prototype._flush = function (callback) { | |
// At this point only one queue should have any items in it. | |
if (this.queue[0].length && this.queue[1].length) | |
return callback(new Error("Incomplete align!")) | |
var self = this | |
this.queue.map(function (q) { | |
q.map(function (r) { self.push(r) }) | |
}) | |
return callback() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment