|
var Readable = require('stream').Readable; |
|
var Writable = require('stream').Writable; |
|
var Transform = require('stream').Transform; |
|
var util = require('util'); |
|
|
|
util.inherits(NewLine, Transform); |
|
function NewLine(){ |
|
Transform.call(this,{ |
|
objectMode: true |
|
}); |
|
} |
|
|
|
NewLine.prototype._transform = function(chunk, encoding, next){ |
|
this.push(chunk + '\n'); |
|
next(); |
|
} |
|
|
|
//-------------------- |
|
//----- Track |
|
//-------------------- |
|
|
|
// tracks should not be generated manually, |
|
// they are created by Switch objects |
|
util.inherits(_Track, Transform); |
|
function _Track(){ |
|
Transform.call(this,{ |
|
objectMode: true |
|
}); |
|
} |
|
|
|
_Track.prototype._transform = function(chunk, encoding, next){ |
|
// immediately pass the object along |
|
this.push(chunk); |
|
next(); |
|
} |
|
|
|
//-------------------- |
|
//----- Switch |
|
//-------------------- |
|
|
|
util.inherits(Switch, Transform); |
|
function Switch(){ |
|
Transform.call(this,{ |
|
objectMode: true |
|
}); |
|
this._matchers = []; |
|
} |
|
|
|
Switch.prototype._transform = function(obj, encoding, next){ |
|
var matches = this._matchers; |
|
|
|
// search all cases for a match |
|
// matching is done on a first-to-match basis |
|
for(i in matches){ |
|
var match = matches[i]; |
|
var cls = match.cls; |
|
if( obj instanceof cls ) { |
|
// pass object to matching track |
|
return match.track.write(obj,encoding,next); |
|
} |
|
} |
|
|
|
// no match found |
|
this.push(obj); |
|
next(); |
|
} |
|
|
|
// return a readable Track object |
|
// e.g. switch.case(X).pipe(Y) |
|
Switch.prototype.case = function(cls){ |
|
var track = new _Track(); |
|
this._matchers.push({ |
|
cls : cls, |
|
track : track |
|
}); |
|
return track; |
|
} |
|
|
|
//-------------------- |
|
//----- Log |
|
//-------------------- |
|
|
|
util.inherits(Json, Transform); |
|
function Json(){ |
|
Transform.call(this,{ |
|
objectMode: true |
|
}); |
|
} |
|
|
|
Json.prototype._transform = function(chunk, encoding, next){ |
|
this.push( JSON.stringify(chunk) ); |
|
next(); |
|
} |
|
|
|
util.inherits(Log, Writable); |
|
function Log(prefix){ |
|
this.prefix = prefix; |
|
Writable.call(this,{ |
|
objectMode: true |
|
}); |
|
} |
|
|
|
Log.prototype._write = function(chunk,encoding,next){ |
|
console.log("%s", this.prefix, chunk); |
|
next(); |
|
} |
|
|
|
util.inherits(Fountain, Readable); |
|
function Fountain(list){ |
|
Readable.call(this,{ |
|
objectMode: true |
|
}); |
|
this.list = list; |
|
this.i = 0; |
|
} |
|
|
|
Fountain.prototype._read = function(){ |
|
var self = this; |
|
var Constructor = this.list[this.i]; |
|
this.i = (this.i + 1) % this.list.length; |
|
setTimeout( function(){ |
|
self.push( new Constructor() ); |
|
}, 1000); |
|
} |
|
|
|
//-------------------- |
|
//----- Example |
|
//-------------------- |
|
|
|
function One(){ |
|
this.msg = "One"; |
|
} |
|
|
|
function Two(){ |
|
this.msg = "Two"; |
|
} |
|
|
|
function Three(){ |
|
this.msg = "Three"; |
|
} |
|
|
|
var sw = new Switch(); |
|
|
|
// only objects of type `One` will pass here |
|
sw.case(One).pipe( new Json() ).pipe( new NewLine() ).pipe( process.stdout ); |
|
|
|
// only object of type `Two` will pass here |
|
sw.case(Two).pipe( new Json() ).pipe( new NewLine() ).pipe( process.stderr ); |
|
|
|
new Fountain( |
|
[One,Two,Three] // emit instances of these object continuously |
|
).pipe( |
|
sw // switch objects on cases defined above |
|
).pipe( |
|
new Log("Dropped") // this is the equivalent of the `default` case |
|
); |
|
|
|
|