Skip to content

Instantly share code, notes, and snippets.

@abachman
Last active October 15, 2017 06:46
Show Gist options
  • Save abachman/fe5ad2e2de6fba78fcacc6160feb7e36 to your computer and use it in GitHub Desktop.
Save abachman/fe5ad2e2de6fba78fcacc6160feb7e36 to your computer and use it in GitHub Desktop.
Coming to an understanding with node.js Streams
const Stream = require('stream')
class First extends Stream.Duplex {
constructor(config) {
super({
objectMode: true
});
this.cache = []
}
_read() {
const out = this.cache.pop()
console.log('First _read', typeof out, out)
if (out)
this.push(out)
}
_write(p, enc, next) {
console.log('First _write', typeof p, p)
if (p)
this.cache.push(p)
next()
}
}
class Second extends Stream.Duplex {
constructor(config) {
super({
objectMode: true
});
this.cache = []
}
// something downstream wants data
_read(arg) {
let msg = this.cache.pop()
console.log('Second _read', typeof msg, msg)
if (msg)
this.push(msg)
}
// something upstream has data
_write(packet, enc, next) {
console.log('Second _write', typeof packet, packet)
if (packet)
this.cache.push(packet)
next()
}
}
class Third extends Stream.Duplex {
constructor() {
super({
objectMode: true
});
this.cache = []
this.history = []
}
_read() {
let out = this.cache.shift()
console.log('Third _read', out)
if (typeof out !== 'undefined') {
this.history.push(out)
out = "stdout: " + JSON.stringify(out) + "\n"
this.push(out)
} else {
this.emit('finished')
}
}
_write(packet, enc, next) {
console.log('Third _write', packet)
if (packet)
this.cache.unshift(packet)
next()
}
}
const starter = new First()
const middle = new Second()
const last = new Third()
starter.
pipe(middle).
pipe(last).
pipe(process.stdout) // echo to stdout
starter.write({message: 'write 1'})
starter.write({message: 'write 2'})
starter.push({message: 'push 1'})
starter.push({message: 'push 2'})
starter.write({message: 'write 3'})
starter.write({message: 'write 4'})
starter.end({message: 'end 1'})
// custom event capturing the end of data. Why doesn't `end` work?
last.on('finished', () => {
setTimeout(() => {
console.log("")
console.log("There will be no more data.");
console.log("Third.history:")
console.dir(last.history)
}, 10)
});
// // Writeable stream events
// Event: 'close'
// Event: 'drain'
// Event: 'error'
// Event: 'finish'
// Event: 'pipe'
// Event: 'unpipe'
//
// // Readable stream events
// Event: 'close'
// Event: 'data'
// Event: 'end'
// Event: 'error'
// Event: 'readable'
['close', 'drain', 'error', 'finish', 'pipe', 'unpipe', 'data', 'end', 'readable'].forEach((event) => {
starter.on(event, () => {
console.log(`> starter ${event}`)
})
middle.on(event, () => {
console.log(`> middle ${event}`)
})
last.on(event, () => {
console.log(`> last ${event}`)
})
})
console.log('done writing')
First _write object { message: 'write 1' }
First _write object { message: 'write 2' }
First _write object { message: 'write 3' }
First _write object { message: 'write 4' }
First _write object { message: 'end 1' }
done writing
First _read object { message: 'end 1' }
First _read object { message: 'write 4' }
Second _write object { message: 'push 1' }
> starter data
First _read object { message: 'write 3' }
Second _write object { message: 'push 2' }
> starter data
First _read object { message: 'write 2' }
Second _write object { message: 'end 1' }
> starter data
First _read object { message: 'write 1' }
Second _write object { message: 'write 4' }
> starter data
First _read undefined undefined
Second _write object { message: 'write 3' }
> starter data
Second _write object { message: 'write 2' }
> starter data
Second _write object { message: 'write 1' }
> starter data
Second _read object { message: 'write 1' }
Second _read object { message: 'write 2' }
Third _write { message: 'write 1' }
> middle data
Second _read object { message: 'write 3' }
Third _write { message: 'write 2' }
> middle data
Second _read object { message: 'write 4' }
Third _write { message: 'write 3' }
> middle data
Second _read object { message: 'end 1' }
Third _write { message: 'write 4' }
> middle data
Second _read object { message: 'push 2' }
Third _write { message: 'end 1' }
> middle data
Second _read object { message: 'push 1' }
Third _write { message: 'push 2' }
> middle data
Second _read undefined undefined
Third _write { message: 'push 1' }
> middle data
Third _read { message: 'push 1' }
Third _read { message: 'push 2' }
stdout: {"message":"push 1"}
> last data
Third _read { message: 'end 1' }
stdout: {"message":"push 2"}
> last data
Third _read { message: 'write 4' }
stdout: {"message":"end 1"}
> last data
Third _read { message: 'write 3' }
stdout: {"message":"write 4"}
> last data
Third _read { message: 'write 2' }
stdout: {"message":"write 3"}
> last data
Third _read { message: 'write 1' }
stdout: {"message":"write 2"}
> last data
Third _read undefined
stdout: {"message":"write 1"}
> last data
> starter finish
> starter readable
> starter readable
> middle readable
> middle readable
> last readable
> last readable
There will be no more data.
Third.history:
[ { message: 'push 1' },
{ message: 'push 2' },
{ message: 'end 1' },
{ message: 'write 4' },
{ message: 'write 3' },
{ message: 'write 2' },
{ message: 'write 1' } ]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment