Last active
February 1, 2019 23:50
-
-
Save hackergrrl/b8da1258c84b0c0988b9c0c5f68f67c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var stream = require('stream') | |
module.exports = function (id) { | |
// opts.allowHalfDuplex is _very_ important! Otherwise ending the readable | |
// half of the duplex will leave the writable side open! | |
var counter = new stream.Duplex({allowHalfOpen:false}) | |
counter.processed = [] | |
var payloadsLeft = 3 | |
counter._read = function () { | |
if (!payloadsLeft) return this.push(null) | |
var payload = JSON.stringify({ sender: id, left: payloadsLeft }) | |
var prefix = Buffer.alloc(4) | |
prefix.writeUInt32LE(payload.length, 0) | |
this.push(prefix) | |
this.push(payload) | |
payloadsLeft-- | |
} | |
var expected = 0 | |
var accum = Buffer.alloc(0) | |
counter._write = function (chunk, enc, next) { | |
accum = Buffer.concat([accum, chunk]) | |
tryParse() | |
next() | |
} | |
function tryParse () { | |
// haven't recv'd prefix len yet | |
if (!expected && accum.length < 4) return | |
// read prefix length | |
if (!expected) { | |
expected = accum.readUInt32LE(0) | |
accum = accum.slice(4) | |
} | |
// read next chunk | |
if (accum.length >= expected) { | |
var buf = accum.slice(0, expected) | |
var value = JSON.parse(buf.toString()) | |
counter.processed.push(value) | |
accum = accum.slice(expected) | |
expected = 0 | |
tryParse() | |
} | |
} | |
// exposes '.processed' so you can examine the payloads received | |
return counter | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var counterDuplex = require('./counter') | |
var a = counterDuplex('a') | |
var b = counterDuplex('b') | |
a.pipe(b).pipe(a) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment