Skip to content

Instantly share code, notes, and snippets.

@nicolashery
Last active May 28, 2024 16:53
Show Gist options
  • Save nicolashery/5910969 to your computer and use it in GitHub Desktop.
Save nicolashery/5910969 to your computer and use it in GitHub Desktop.
Combine a pipe of multiple Node.js streams into one stream
var util = require('util')
, Transform = require('stream').Transform
, StreamCombiner = require('./streamcombiner');
var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
chunks1.push(chunk.toString());
var pieces = (soFar + chunk).split('\n');
soFar = pieces.pop();
for (var i = 0; i < pieces.length; i++) {
var piece = pieces[i];
this.push(piece);
}
return done();
};
var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
chunks2.push(chunk.toString());
count = count + 1;
this.push(count + ' ' + chunk.toString() + '\n');
done();
};
var stdin = process.stdin;
var stdout = process.stdout;
process.on('exit', function () {
console.error('chunks1: ' + JSON.stringify(chunks1));
console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);
// Test with `stream1` and `stream2`
// stdin.pipe(stream1).pipe(stream2).pipe(stdout);
// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node example.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]
// Now combine into `stream3` to "hide" `stream1` and `stream2` from user
var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);
// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node example.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]
/* StreamCombiner
Combine a pipe of multiple streams into one stream.
Example:
var stream3 = new StreamCombiner(stream1, stream2);
process.stdin.pipe(stream3).pipe(process.stdout);
// The line above will do this:
// process.stdin.pipe(stream1).pipe(stream2).pipe(process.stdout);
Thanks to Brandon Tilley (https://github.com/BinaryMuse)
for this code snippet.
*/
var util = require('util')
, PassThrough = require('stream').PassThrough;
var StreamCombiner = function() {
this.streams = Array.prototype.slice.apply(arguments);
// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
this.on('pipe', function(source) {
source.unpipe(this);
for(var i in this.streams) {
source = source.pipe(this.streams[i]);
}
this.transformStream = source;
});
};
util.inherits(StreamCombiner, PassThrough);
// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
StreamCombiner.prototype.pipe = function(dest, options) {
return this.transformStream.pipe(dest, options);
};
module.exports = StreamCombiner;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment