Skip to content

Instantly share code, notes, and snippets.

@skeggse
Created October 3, 2013 00:44
Show Gist options
  • Save skeggse/6802812 to your computer and use it in GitHub Desktop.
Save skeggse/6802812 to your computer and use it in GitHub Desktop.
A Node.js backpressure example over a tcp/net stream. Just run index.js, the output is a little shoddy but it works. The producer only produces once the consumer catches up, but the streams are disconnected--they can't communicate directly.
var net = require('net');
var util = require('util');
var Writable = require('stream').Writable;
/**
* Pretends to consume the data written to it. In reality, it just eats data
* really slowly.
*
* @constructor
* @extends Writable
*/
function Consumer() {
if (!(this instanceof Consumer))
return new Consumer();
Writable.call(this);
this.consumed = 0;
}
util.inherits(Consumer, Writable);
/**
* Handles writes to the Consumer, as specified by the Streams2 API.
*
* @param {!Buffer} chunk The chunk of data to consume.
* @param {null} encoding The null encoding because we're not decoding strings.
* @param {function(?Error=)} callback The callback for when the chunk has been
* dealt with.
* @private
* @override
*/
// pretend to do something with the data
Consumer.prototype._write = function(chunk, encoding, callback) {
var self = this;
setTimeout(function() {
self.consumed += chunk.length;
callback();
}, 800);
};
var consumer = new Consumer();
var conn = net.connect(process.env.PORT || 3000, 'localhost', function() {
conn.pipe(consumer);
});
module.exports = consumer;
var producer = require('./producer');
var consumer = require('./consumer');
function rpad(string, length) {
string = string + '';
while (string.length < length)
string += ' ';
return string;
}
var producerSnapshot = 0, consumerSnapshot = 0;
setInterval(function() {
var produceRate, consumeRate;
produceRate = producer.produced - producerSnapshot;
consumeRate = consumer.consumed - consumerSnapshot;
producerSnapshot = producer.produced;
consumerSnapshot = consumer.consumed;
console.log('produced:', producer.produced, '\tbytes/second', produceRate);
console.log('consumed:', consumer.consumed, '\tbytes/second', consumeRate);
}, 1000);
exports.producer = producer;
exports.consumer = consumer;
var net = require('net');
var util = require('util');
var crypto = require('crypto');
var Readable = require('stream').Readable;
/**
* Checks that the object is a positive/non-zero integer.
*
* @param {*} object The object to check.
* @return {boolean} Whether the provided object meets the criteria.
*/
function isPositiveInteger(object) {
return typeof object === 'number' && object > 0 && object === (object | 0);
}
/**
* Produces cryptographically random data as fast as it should. Could also just
* create a new buffer which would contain some random data and a bunch of zero
* bytes.
*
* @param {number=} defaultSize The default size to produce if not advised.
* @constructor
* @extends Readable
*/
function Producer(defaultSize) {
if (!(this instanceof Producer))
return new Producer();
Readable.call(this);
this.defaultSize = isPositiveInteger(defaultSize) ? defaultSize : 1024;
this.produced = 0;
}
util.inherits(Producer, Readable);
/**
* Handles reads or generates data for fast-production mode, as specified by the
* Streams2 API.
*
* @param {?number=} size The size to read, advisory only, defaults to the value
* provided to the constructor if not specified.
* @private
* @override
*/
Producer.prototype._read = function(size) {
var self = this;
if (!isPositiveInteger(size))
size = this.defaultSize;
crypto.randomBytes(size, function(err, data) {
if (err)
return self.emit('error', err);
self.produced += data.length;
self.push(data);
});
};
var producer = new Producer();
var server = net.createServer(function(conn) {
producer.pipe(conn);
});
server.listen(process.env.PORT || 3000);
module.exports = producer;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment