Skip to content

Instantly share code, notes, and snippets.

@piscisaureus
Last active December 20, 2015 00:38
Show Gist options
  • Save piscisaureus/6042666 to your computer and use it in GitHub Desktop.
Save piscisaureus/6042666 to your computer and use it in GitHub Desktop.
Write behind transform stream
module.exports = WriteBehind;
var assert = require('assert'),
Transform = require('stream').Transform,
inherits = require('util').inherits;
var DEFAULT_SIZE = 1 * 1024 * 1024; // 1 mb
function WriteBehind(options) {
Transform.call(this, options);
options = options || {};
this._size = options.size || DEFAULT_SIZE;
if (typeof this._size !== 'number' ||
this._size <= 0)
throw new Error('Invalid cache size');
this._cutoff = options.cutoff || this._size >> 2 || 1;
if (typeof this._cutoff !== 'number' ||
this._cutoff <= 0)
throw new Error('Invalid cutoff size');
this._buffer = null;
this._used = 0;
this._callbacks = [];
}
inherits(WriteBehind, Transform);
WriteBehind.prototype._transform = function(chunk, encoding, cb) {
// We're not overriding the decodeStrings setting so chunk should always be
// a buffer.
assert(Buffer.isBuffer(chunk));
var left, rv;
if (chunk.length > this._cutoff) {
this._flush();
rv = this.push(chunk);
process.nextTick(cb);
return rv;
}
if (!this._buffer) {
this._buffer = new Buffer(this._size);
this._used = 0;
}
if (this._buffer)
left = this._buffer.length - this._used;
else
left = 0;
if (left < chunk.length)
rv = this._push();
else
rv = true;
if (!this._buffer) {
this._buffer = new Buffer(this._size);
assert(this._used === 0);
}
chunk.copy(this._buffer, this._used);
this._used += chunk.length;
this._callbacks.push(cb);
return rv;
}
WriteBehind.prototype._flush = function(cb) {
if (!this._used) {
cb && process.nextTick(cb);
return true;
}
var rv = this.push(this._buffer.slice(0, this._used));
var callbacks = this._callbacks;
process.nextTick(function() {
for (var i = 0; i < callbacks.length; i++)
callbacks[i]();
cb && cb();
});
this._buffer = null;
this._used = 0;
this._callbacks = [];
return rv;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment