Skip to content

Instantly share code, notes, and snippets.

@myndzi
Last active June 23, 2016 16:38
Show Gist options
  • Save myndzi/fccf36579c4fe0d2a50d700e2c435299 to your computer and use it in GitHub Desktop.
Save myndzi/fccf36579c4fe0d2a50d700e2c435299 to your computer and use it in GitHub Desktop.
var Transform = require('stream').Transform;
var util = require('util');
function AggregateStream(reducer) {
Transform.call(this, { readableObjectMode: true });
if (typeof reducer !== 'function') {
throw new Error('Must supply a reducer');
}
this.aggregated = void 0;
this.reducer = reducer;
}
util.inherits(AggregateStream, Transform);
AggregateStream.prototype._transform = function (chunk, encoding, callback) {
try {
var obj = JSON.parse(encoding === 'buffer' ? chunk : chunk.toString(encoding));
if (this.aggregated === void 0) {
this.aggregated = obj;
} else {
this.aggregated = this.reducer(this.aggregated, obj);
}
callback();
} catch (err) {
callback(err);
}
};
AggregateStream.prototype._flush = function () {
this.push(this.aggregated);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment