Last active
December 14, 2018 01:41
-
-
Save webstory/ad07a0c2a7f315576f55e82a454ae67e to your computer and use it in GitHub Desktop.
Nodejs 8 object stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
class Producer extends stream.Readable { | |
constructor(options) { | |
options = Object.assign({ | |
highWaterMark: 5, | |
objectMode: true | |
}, options); | |
super(options); | |
this.max = 50; | |
this.index = 1; | |
} | |
_read(size) { | |
const i = this.index++; | |
if (i > this.max) { | |
this.push(null); | |
} else { | |
this.push(i); | |
console.log(`Pushed ${i}`); | |
} | |
} | |
} | |
class Consumer extends stream.Writable { | |
constructor(options) { | |
options = Object.assign({ | |
highWaterMark: 5, | |
objectMode: true | |
}, options); | |
super(options); | |
this.name = options.name || 'Consumer'; | |
} | |
_write(chunk, encoding, callback) { | |
setTimeout(() => { | |
console.log(`[${this.name}] ${chunk}`); | |
callback(); | |
}, 100); | |
} | |
_final(callback) { | |
callback(); | |
} | |
} | |
class Transformer extends stream.Transform { | |
constructor(options) { | |
options = Object.assign({ | |
highWaterMark: 5, | |
objectMode: true | |
}, options); | |
super(options); | |
this._fn = options.func || ((a) => a); | |
console.log(options); | |
} | |
_transform(chunk, encoding, callback) { | |
this.push(this._fn(chunk)); | |
callback(); | |
} | |
_flush(callback) { | |
callback(); | |
} | |
} | |
class Mux extends stream.PassThrough { | |
constructor(options) { | |
options = Object.assign({ | |
highWaterMark: 1, | |
objectMode: true | |
}); | |
super(options); | |
} | |
} | |
const producer = new Producer(); | |
const t1 = new Transformer({ func: (a) => a *10 }); | |
const consumer1 = new Consumer({ name: 'C1' }); | |
const consumer2 = new Consumer({ name: 'C2' }); | |
const mux = new Mux(); | |
producer | |
.pipe(t1) | |
.pipe(mux) // Comment if not using multiplexer | |
.pipe(consumer1); | |
mux.pipe(consumer2); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment