Skip to content

Instantly share code, notes, and snippets.

@Aschen
Created January 14, 2020 09:23
Show Gist options
  • Select an option

  • Save Aschen/80c48c19ec2424594bcc930b4868b496 to your computer and use it in GitHub Desktop.

Select an option

Save Aschen/80c48c19ec2424594bcc930b4868b496 to your computer and use it in GitHub Desktop.
Closur optim
const
PipeRunner = require('./pipeRunner');
const now = (unit) => {
const hrTime = process.hrtime();
switch (unit) {
case 'milli':
return hrTime[0] * 1000 + hrTime[1] / 1000000;
case 'micro':
return hrTime[0] * 1000000 + hrTime[1] / 1000;
case 'nano':
return hrTime[0] * 1000000000 + hrTime[1];
default:
return now('nano');
}
};
const runner = new PipeRunner(10, 10);
const pipes = [
(cb, cbCtx) => {
cb.call(cbCtx, null, 'hey')
},
(data, cb, cbCtx) => {
cb.call(cbCtx, null, data)
},
(data, cb, cbCtx) => {
cb.call(cbCtx, null, data)
},
(data, cb, cbCtx) => {
cb.call(cbCtx, null, data)
}
];
(() => {
const times = [];
for (let i = 1000 * 1000; i--;) {
const start = now('nano')
runner.run(pipes, (error, result) => {
if (result !== 'hey') {
console.log(result)
throw Error('')
}
times.push(now('nano') - start)
})
}
console.log(times.reduce((acc, current) => acc + current, 0) / times.length)
})();
(() => {
const times = [];
for (let i = 1000 * 1000; i--;) {
const start = now('nano')
runner.run(pipes, (error, result) => {
if (result !== 'hey') {
console.log(result)
throw Error('')
}
times.push(now('nano') - start)
})
}
console.log(times.reduce((acc, current) => acc + current, 0) / times.length)
})();
(() => {
const times = [];
for (let i = 1000 * 1000; i--;) {
const start = now('nano')
runner.run2(pipes, (error, result) => {
if (result !== 'hey') {
console.log(result)
throw Error('')
}
times.push(now('nano') - start)
})
}
console.log(times.reduce((acc, current) => acc + current, 0) / times.length)
})();
(() => {
const times = [];
for (let i = 1000 * 1000; i--;) {
const start = now('nano')
runner.run2(pipes, (error, result) => {
if (result !== 'hey') {
console.log(result)
throw Error('')
}
times.push(now('nano') - start)
})
}
console.log(times.reduce((acc, current) => acc + current, 0) / times.length)
})();
'use strict';
class WaterfallContext {
constructor(cbContext, chain, cb) {
this.index = 0;
this.chain = chain;
this.cb = cb;
this.cbContext = cbContext;
this.result = [];
}
}
function waterfallCB(err, ...res) {
if (err) {
this.cb.call(this.cbContext, err);
}
else {
this.result = res;
this.index++;
waterfallNext(this);
}
}
function waterfallNext(ctx) {
if (ctx.index === ctx.chain.length) {
ctx.cb.call(ctx.cbContext, null, ...ctx.result);
return;
}
ctx.chain[ctx.index](...ctx.result, waterfallCB, ctx);
}
function waterfall(chain, cb, cbContext) {
const ctx = new WaterfallContext(cbContext, chain, cb);
waterfallNext(ctx);
}
const
assert = require('assert'),
Denque = require('denque'),
async = require('async'),
{errors: {KuzzleError}} = require('kuzzle-common-objects')
function waterCB (error, result) {
this.instance.running--;
if (!this.instance.buffer.isEmpty()) {
setImmediate(this.instance._boundRunNext);
}
if (error) {
this.callback(error);
}
else {
this.callback(null, result);
}
}
/**
* @private
* @class PipeChain
*/
class PipeChain {
constructor (chain, callback) {
this.chain = chain;
this.callback = callback;
}
}
/**
* @class PipeRunner
* Runs pipe chains if the number of already running concurrent pipes allows it.
* Otherwise, delays the pipe chain until a slot is freed. If the storage
* buffer is full, rejects the provided callback and discard the pipe chain.
*
* @param {Number} concurrent - max number of concurrent pipes
* @param {Number} bufferSize - max number of delayed pipe chains
*/
class PipeRunner {
constructor (concurrent, bufferSize) {
assert(typeof concurrent === 'number' && concurrent > 0, 'Cannot instantiate pipes executor: invalid maxConcurrentPipes parameter value');
assert(typeof bufferSize === 'number' && bufferSize > 0, 'Cannot instantiate pipes executor: invalid pipesBufferSize parameter value');
this.maxConcurrent = concurrent;
this.running = 0;
this.maxBufferSize = bufferSize;
this.buffer = new Denque();
this._boundRunNext = this._runNext.bind(this);
}
run (chain, callback) {
if (this.running >= this.maxConcurrent) {
if (this.buffer.length >= this.maxBufferSize) {
callback(new Error('too_many_pipes'));
}
else {
this.buffer.push(new PipeChain(chain, callback));
}
return;
}
this.running++;
async.waterfall(chain, (error, result) => {
this.running--;
if (!this.buffer.isEmpty()) {
setImmediate(this._runNext.bind(this));
}
if (error) {
callback(error);
}
else {
callback(null, result);
}
});
}
run2 (chain, callback) {
if (this.running >= this.maxConcurrent) {
if (this.buffer.length >= this.maxBufferSize) {
callback(new Error('too_many_pipes'));
}
else {
this.buffer.push(new PipeChain(chain, callback));
}
return;
}
this.running++;
const waterCBContext = {
instance: this,
callback
};
waterfall(chain, waterCB, waterCBContext);
}
_runNext () {
if (this.buffer.isEmpty() || this.running >= this.maxConcurrent) {
return;
}
const pipe = this.buffer.shift();
this.run(pipe.chain, pipe.callback);
}
}
module.exports = PipeRunner;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment