Created
October 3, 2013 00:44
-
-
Save skeggse/6802812 to your computer and use it in GitHub Desktop.
A Node.js backpressure example over a tcp/net stream. Just run index.js, the output is a little shoddy but it works. The producer only produces once the consumer catches up, but the streams are disconnected--they can't communicate directly.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'); | |
var util = require('util'); | |
var Writable = require('stream').Writable; | |
/** | |
* Pretends to consume the data written to it. In reality, it just eats data | |
* really slowly. | |
* | |
* @constructor | |
* @extends Writable | |
*/ | |
function Consumer() { | |
if (!(this instanceof Consumer)) | |
return new Consumer(); | |
Writable.call(this); | |
this.consumed = 0; | |
} | |
util.inherits(Consumer, Writable); | |
/** | |
* Handles writes to the Consumer, as specified by the Streams2 API. | |
* | |
* @param {!Buffer} chunk The chunk of data to consume. | |
* @param {null} encoding The null encoding because we're not decoding strings. | |
* @param {function(?Error=)} callback The callback for when the chunk has been | |
* dealt with. | |
* @private | |
* @override | |
*/ | |
// pretend to do something with the data | |
Consumer.prototype._write = function(chunk, encoding, callback) { | |
var self = this; | |
setTimeout(function() { | |
self.consumed += chunk.length; | |
callback(); | |
}, 800); | |
}; | |
var consumer = new Consumer(); | |
var conn = net.connect(process.env.PORT || 3000, 'localhost', function() { | |
conn.pipe(consumer); | |
}); | |
module.exports = consumer; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var producer = require('./producer'); | |
var consumer = require('./consumer'); | |
function rpad(string, length) { | |
string = string + ''; | |
while (string.length < length) | |
string += ' '; | |
return string; | |
} | |
var producerSnapshot = 0, consumerSnapshot = 0; | |
setInterval(function() { | |
var produceRate, consumeRate; | |
produceRate = producer.produced - producerSnapshot; | |
consumeRate = consumer.consumed - consumerSnapshot; | |
producerSnapshot = producer.produced; | |
consumerSnapshot = consumer.consumed; | |
console.log('produced:', producer.produced, '\tbytes/second', produceRate); | |
console.log('consumed:', consumer.consumed, '\tbytes/second', consumeRate); | |
}, 1000); | |
exports.producer = producer; | |
exports.consumer = consumer; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'); | |
var util = require('util'); | |
var crypto = require('crypto'); | |
var Readable = require('stream').Readable; | |
/** | |
* Checks that the object is a positive/non-zero integer. | |
* | |
* @param {*} object The object to check. | |
* @return {boolean} Whether the provided object meets the criteria. | |
*/ | |
function isPositiveInteger(object) { | |
return typeof object === 'number' && object > 0 && object === (object | 0); | |
} | |
/** | |
* Produces cryptographically random data as fast as it should. Could also just | |
* create a new buffer which would contain some random data and a bunch of zero | |
* bytes. | |
* | |
* @param {number=} defaultSize The default size to produce if not advised. | |
* @constructor | |
* @extends Readable | |
*/ | |
function Producer(defaultSize) { | |
if (!(this instanceof Producer)) | |
return new Producer(); | |
Readable.call(this); | |
this.defaultSize = isPositiveInteger(defaultSize) ? defaultSize : 1024; | |
this.produced = 0; | |
} | |
util.inherits(Producer, Readable); | |
/** | |
* Handles reads or generates data for fast-production mode, as specified by the | |
* Streams2 API. | |
* | |
* @param {?number=} size The size to read, advisory only, defaults to the value | |
* provided to the constructor if not specified. | |
* @private | |
* @override | |
*/ | |
Producer.prototype._read = function(size) { | |
var self = this; | |
if (!isPositiveInteger(size)) | |
size = this.defaultSize; | |
crypto.randomBytes(size, function(err, data) { | |
if (err) | |
return self.emit('error', err); | |
self.produced += data.length; | |
self.push(data); | |
}); | |
}; | |
var producer = new Producer(); | |
var server = net.createServer(function(conn) { | |
producer.pipe(conn); | |
}); | |
server.listen(process.env.PORT || 3000); | |
module.exports = producer; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment