Skip to content

Instantly share code, notes, and snippets.

@brycebaril
Last active December 21, 2015 23:18
Show Gist options
  • Save brycebaril/6380931 to your computer and use it in GitHub Desktop.
Save brycebaril/6380931 to your computer and use it in GitHub Desktop.
Zip two ordered time streams. A full version of this can be found here: https://github.com/brycebaril/node-sosj
module.exports = align
var Transform = require("stream").Transform
|| require("readable-stream/transform")
var inherits = require("util").inherits
var map = require("through2-map")
/**
* align takes two objectMode streams and a sequence key and will create a stream
* aligning records from each stream resulting in a stream that emits record doublets
* when the sequence key '_t' exactly matches. IMPORTANT: This only works on ordered streams!
* @param {Stream} leftStream An objectMode stream ordered by _t
* @param {Stream} rightStream An objectMode stream ordered by _t
* @return {Stream} A doublet stream containing [leftRecord, rightRecord], [leftRecord, null], or [null, rightRecord]
*/
function align(leftStream, rightStream) {
var aligner = new Aligner()
var l = left()
var r = right()
var ended = 0
l.on("end", function () {
l.unpipe(aligner)
if (++ended == 2) aligner.end()
})
r.on("end", function () {
r.unpipe(aligner)
if (++ended == 2) aligner.end()
})
leftStream.pipe(l).pipe(aligner, {end: false})
rightStream.pipe(r).pipe(aligner, {end: false})
return aligner
}
function left(stream) {
return map({objectMode: true}, function (record) {
return [record, null]
})
}
function right(stream) {
return map({objectMode: true}, function (record) {
return [null, record]
})
}
function Aligner(options) {
if (!(this instanceof Aligner)) return new Aligner(options)
// This MUST be an objectMode stream.
options = options || {}
options.objectMode = true
Transform.call(this, options)
this.queue = [[], []]
}
inherits(Aligner, Transform)
Aligner.prototype._transform = function (record, encoding, callback) {
var i = (record[0] != null) ? 0 : 1
var o = +(!i)
var me = record[i]
var other, otherRecord
var myQueue = this.queue[i]
var otherQueue = this.queue[o]
if (myQueue.length > 0 || otherQueue.length == 0) {
// My queue is not empty, or the other queue is empty.
myQueue.push(record)
return callback()
}
while (otherRecord = otherQueue.shift()) {
other = otherRecord[o]
if (me._t == other._t) {
// These keys are joined, w00t
record[o] = other
this.push(record)
return callback()
}
else if (me._t < other._t) {
// This record pre-dates all seen records on the other's queue
otherQueue.unshift(otherRecord)
this.push(record)
return callback()
}
else {
// This comes after the earliest leftStream record.
// Emit that record and continue to the next leftStream record.
this.push(otherRecord)
}
}
myQueue.push(record)
return callback()
}
Aligner.prototype._flush = function (callback) {
// At this point only one queue should have any items in it.
if (this.queue[0].length && this.queue[1].length)
return callback(new Error("Incomplete align!"))
var self = this
this.queue.map(function (q) {
q.map(function (r) { self.push(r) })
})
return callback()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment