Skip to content

Instantly share code, notes, and snippets.

@cowboy
Last active December 20, 2015 10:31
Show Gist options
  • Save cowboy/6116543 to your computer and use it in GitHub Desktop.
Save cowboy/6116543 to your computer and use it in GitHub Desktop.
JS streams: can I prevent the "myThing" stream from being killed when the "noise" stream being piped into it ends?
var es = require('event-stream');
function makeThing() {
var out = es.through();
var substream = es.through();
substream.pipe(out);
setInterval(function() {
substream.write('internal data\n');
}, 200);
var buffer = '';
var filter = es.through(function(data) {
var parts = (buffer + data).split(/\r?\n/);
buffer = parts.pop();
for (var i = 0; i < parts.length; i++) {
this.queue(parts[i] + '\n');
}
});
// There is no way to prevent es.through from killing itself when .end is
// called unless the end callback is overridden here.
filter.end = function(data) {
this.queue(buffer + (data || '') + '\n');
this.writable = this.readable = true; // unsure if this is necessary
buffer = '';
return this
};
var pipeline = es.pipeline(
filter,
out
);
return pipeline;
}
var myThing = makeThing();
myThing.pipe(process.stdout);
// noise goes into thing
function makeNoise(str) {
myThing.write('makeNoise (' + str + ')\n');
var noise = es.through();
noise.pipe(myThing);
var counter = 0;
var id = setInterval(function() {
counter++;
noise.write(str + '[' + counter + ']' + (counter % 5 === 0 ? '\n' : ' '));
if (counter === 7) {
clearInterval(id);
noise.end('done');
setTimeout(makeNoise.bind(null, str), 1000);
}
}, 150);
}
makeNoise('noise');
// makeNoise('NOISE');
$ node stream_wtf.js
makeNoise (noise)
internal data
internal data
internal data
noise[1] noise[2] noise[3] noise[4] noise[5]
internal data
internal data
noise[6] noise[7] done
internal data
internal data
internal data
internal data
internal data
makeNoise (noise)
internal data
internal data
internal data
noise[1] noise[2] noise[3] noise[4] noise[5]
internal data
internal data
noise[6] noise[7] done
internal data
internal data
internal data
internal data
^C
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment