Last active
October 15, 2017 06:46
-
-
Save abachman/fe5ad2e2de6fba78fcacc6160feb7e36 to your computer and use it in GitHub Desktop.
Coming to an understanding with node.js Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Stream = require('stream') | |
class First extends Stream.Duplex { | |
constructor(config) { | |
super({ | |
objectMode: true | |
}); | |
this.cache = [] | |
} | |
_read() { | |
const out = this.cache.pop() | |
console.log('First _read', typeof out, out) | |
if (out) | |
this.push(out) | |
} | |
_write(p, enc, next) { | |
console.log('First _write', typeof p, p) | |
if (p) | |
this.cache.push(p) | |
next() | |
} | |
} | |
class Second extends Stream.Duplex { | |
constructor(config) { | |
super({ | |
objectMode: true | |
}); | |
this.cache = [] | |
} | |
// something downstream wants data | |
_read(arg) { | |
let msg = this.cache.pop() | |
console.log('Second _read', typeof msg, msg) | |
if (msg) | |
this.push(msg) | |
} | |
// something upstream has data | |
_write(packet, enc, next) { | |
console.log('Second _write', typeof packet, packet) | |
if (packet) | |
this.cache.push(packet) | |
next() | |
} | |
} | |
class Third extends Stream.Duplex { | |
constructor() { | |
super({ | |
objectMode: true | |
}); | |
this.cache = [] | |
this.history = [] | |
} | |
_read() { | |
let out = this.cache.shift() | |
console.log('Third _read', out) | |
if (typeof out !== 'undefined') { | |
this.history.push(out) | |
out = "stdout: " + JSON.stringify(out) + "\n" | |
this.push(out) | |
} else { | |
this.emit('finished') | |
} | |
} | |
_write(packet, enc, next) { | |
console.log('Third _write', packet) | |
if (packet) | |
this.cache.unshift(packet) | |
next() | |
} | |
} | |
const starter = new First() | |
const middle = new Second() | |
const last = new Third() | |
starter. | |
pipe(middle). | |
pipe(last). | |
pipe(process.stdout) // echo to stdout | |
starter.write({message: 'write 1'}) | |
starter.write({message: 'write 2'}) | |
starter.push({message: 'push 1'}) | |
starter.push({message: 'push 2'}) | |
starter.write({message: 'write 3'}) | |
starter.write({message: 'write 4'}) | |
starter.end({message: 'end 1'}) | |
// custom event capturing the end of data. Why doesn't `end` work? | |
last.on('finished', () => { | |
setTimeout(() => { | |
console.log("") | |
console.log("There will be no more data."); | |
console.log("Third.history:") | |
console.dir(last.history) | |
}, 10) | |
}); | |
// // Writeable stream events | |
// Event: 'close' | |
// Event: 'drain' | |
// Event: 'error' | |
// Event: 'finish' | |
// Event: 'pipe' | |
// Event: 'unpipe' | |
// | |
// // Readable stream events | |
// Event: 'close' | |
// Event: 'data' | |
// Event: 'end' | |
// Event: 'error' | |
// Event: 'readable' | |
['close', 'drain', 'error', 'finish', 'pipe', 'unpipe', 'data', 'end', 'readable'].forEach((event) => { | |
starter.on(event, () => { | |
console.log(`> starter ${event}`) | |
}) | |
middle.on(event, () => { | |
console.log(`> middle ${event}`) | |
}) | |
last.on(event, () => { | |
console.log(`> last ${event}`) | |
}) | |
}) | |
console.log('done writing') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
First _write object { message: 'write 1' } | |
First _write object { message: 'write 2' } | |
First _write object { message: 'write 3' } | |
First _write object { message: 'write 4' } | |
First _write object { message: 'end 1' } | |
done writing | |
First _read object { message: 'end 1' } | |
First _read object { message: 'write 4' } | |
Second _write object { message: 'push 1' } | |
> starter data | |
First _read object { message: 'write 3' } | |
Second _write object { message: 'push 2' } | |
> starter data | |
First _read object { message: 'write 2' } | |
Second _write object { message: 'end 1' } | |
> starter data | |
First _read object { message: 'write 1' } | |
Second _write object { message: 'write 4' } | |
> starter data | |
First _read undefined undefined | |
Second _write object { message: 'write 3' } | |
> starter data | |
Second _write object { message: 'write 2' } | |
> starter data | |
Second _write object { message: 'write 1' } | |
> starter data | |
Second _read object { message: 'write 1' } | |
Second _read object { message: 'write 2' } | |
Third _write { message: 'write 1' } | |
> middle data | |
Second _read object { message: 'write 3' } | |
Third _write { message: 'write 2' } | |
> middle data | |
Second _read object { message: 'write 4' } | |
Third _write { message: 'write 3' } | |
> middle data | |
Second _read object { message: 'end 1' } | |
Third _write { message: 'write 4' } | |
> middle data | |
Second _read object { message: 'push 2' } | |
Third _write { message: 'end 1' } | |
> middle data | |
Second _read object { message: 'push 1' } | |
Third _write { message: 'push 2' } | |
> middle data | |
Second _read undefined undefined | |
Third _write { message: 'push 1' } | |
> middle data | |
Third _read { message: 'push 1' } | |
Third _read { message: 'push 2' } | |
stdout: {"message":"push 1"} | |
> last data | |
Third _read { message: 'end 1' } | |
stdout: {"message":"push 2"} | |
> last data | |
Third _read { message: 'write 4' } | |
stdout: {"message":"end 1"} | |
> last data | |
Third _read { message: 'write 3' } | |
stdout: {"message":"write 4"} | |
> last data | |
Third _read { message: 'write 2' } | |
stdout: {"message":"write 3"} | |
> last data | |
Third _read { message: 'write 1' } | |
stdout: {"message":"write 2"} | |
> last data | |
Third _read undefined | |
stdout: {"message":"write 1"} | |
> last data | |
> starter finish | |
> starter readable | |
> starter readable | |
> middle readable | |
> middle readable | |
> last readable | |
> last readable | |
There will be no more data. | |
Third.history: | |
[ { message: 'push 1' }, | |
{ message: 'push 2' }, | |
{ message: 'end 1' }, | |
{ message: 'write 4' }, | |
{ message: 'write 3' }, | |
{ message: 'write 2' }, | |
{ message: 'write 1' } ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment