Last active
September 11, 2017 09:55
-
-
Save bewt85/9122f00ee19d31e7d84f65c5446b8bfb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Readable } = require('stream'); | |
const logger = require("debug"); | |
const _ = require("lodash"); | |
const whenHttpRequestComplete = new Promise(resolve => setTimeout(resolve, _.random(100, 1000))); | |
class SlowNumberSource extends Readable { | |
constructor(options={}) { | |
super(options); | |
this.getNext = this.buildNext(1); | |
} | |
buildNext(i) { | |
return new Promise(resolve => { | |
setTimeout(() => { | |
if (i > 5) resolve([i, null]); | |
else resolve([`${i}\n`, this.buildNext(i+1)]); | |
}, 200) | |
}); | |
} | |
_read() { | |
this.getNext | |
.then(([data, next]) => { | |
if (next !== null) { | |
this.getNext = next; | |
this.push(data); | |
} else { | |
this.push(null); | |
} | |
}) | |
.catch(logger("error")) | |
} | |
} | |
const es = require("event-stream"); | |
const { Writable } = require('stream'); | |
class LineFormatter { | |
constructor() { | |
this.lineNumber = 0; | |
this.mapper = es.map((data, cb) => { | |
this.lineNumber += 1; | |
if (data === "") cb(); | |
cb(null, `Line ${this.lineNumber}: ${data}\n`); | |
}) | |
} | |
} | |
class LineAdder extends Writable { | |
constructor(options={}) { | |
options.objectMode = true; | |
super(options); | |
this._total = 0; | |
this.total = new Promise(resolve => { | |
this.on("finish", () => resolve(this._total)) | |
}) | |
} | |
_write(line, enc, cb) { | |
try { | |
this._total += Number(line.trim()); | |
cb(null); | |
} catch (err) { | |
cb(err); | |
} | |
} | |
} | |
const inputStream = (new SlowNumberSource()) | |
.pipe(es.split()) | |
const formatter = new LineFormatter(); | |
const adder = new LineAdder(); | |
inputStream | |
.pipe(formatter.mapper) | |
.pipe(process.stdout); | |
whenHttpRequestComplete | |
.then(() => inputStream.pipe(adder).total) | |
.then(total => console.log(`Total: ${total}`)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ node example_stream_bug.js | |
Line 1: 1 | |
Line 2: 2 | |
Line 3: 3 | |
Line 4: 4 | |
Line 5: 5 | |
Total: 14 | |
$ node example_stream_bug.js | |
Line 1: 1 | |
Line 2: 2 | |
Line 3: 3 | |
Line 4: 4 | |
Line 5: 5 | |
Total: 15 | |
$ node example_stream_bug.js | |
Line 1: 1 | |
Line 2: 2 | |
Line 3: 3 | |
Line 4: 4 | |
Line 5: 5 | |
Total: 5 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment