Created
January 14, 2020 09:23
-
-
Save Aschen/80c48c19ec2424594bcc930b4868b496 to your computer and use it in GitHub Desktop.
Closur optim
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const | |
| PipeRunner = require('./pipeRunner'); | |
| const now = (unit) => { | |
| const hrTime = process.hrtime(); | |
| switch (unit) { | |
| case 'milli': | |
| return hrTime[0] * 1000 + hrTime[1] / 1000000; | |
| case 'micro': | |
| return hrTime[0] * 1000000 + hrTime[1] / 1000; | |
| case 'nano': | |
| return hrTime[0] * 1000000000 + hrTime[1]; | |
| default: | |
| return now('nano'); | |
| } | |
| }; | |
| const runner = new PipeRunner(10, 10); | |
| const pipes = [ | |
| (cb, cbCtx) => { | |
| cb.call(cbCtx, null, 'hey') | |
| }, | |
| (data, cb, cbCtx) => { | |
| cb.call(cbCtx, null, data) | |
| }, | |
| (data, cb, cbCtx) => { | |
| cb.call(cbCtx, null, data) | |
| }, | |
| (data, cb, cbCtx) => { | |
| cb.call(cbCtx, null, data) | |
| } | |
| ]; | |
| (() => { | |
| const times = []; | |
| for (let i = 1000 * 1000; i--;) { | |
| const start = now('nano') | |
| runner.run(pipes, (error, result) => { | |
| if (result !== 'hey') { | |
| console.log(result) | |
| throw Error('') | |
| } | |
| times.push(now('nano') - start) | |
| }) | |
| } | |
| console.log(times.reduce((acc, current) => acc + current, 0) / times.length) | |
| })(); | |
| (() => { | |
| const times = []; | |
| for (let i = 1000 * 1000; i--;) { | |
| const start = now('nano') | |
| runner.run(pipes, (error, result) => { | |
| if (result !== 'hey') { | |
| console.log(result) | |
| throw Error('') | |
| } | |
| times.push(now('nano') - start) | |
| }) | |
| } | |
| console.log(times.reduce((acc, current) => acc + current, 0) / times.length) | |
| })(); | |
| (() => { | |
| const times = []; | |
| for (let i = 1000 * 1000; i--;) { | |
| const start = now('nano') | |
| runner.run2(pipes, (error, result) => { | |
| if (result !== 'hey') { | |
| console.log(result) | |
| throw Error('') | |
| } | |
| times.push(now('nano') - start) | |
| }) | |
| } | |
| console.log(times.reduce((acc, current) => acc + current, 0) / times.length) | |
| })(); | |
| (() => { | |
| const times = []; | |
| for (let i = 1000 * 1000; i--;) { | |
| const start = now('nano') | |
| runner.run2(pipes, (error, result) => { | |
| if (result !== 'hey') { | |
| console.log(result) | |
| throw Error('') | |
| } | |
| times.push(now('nano') - start) | |
| }) | |
| } | |
| console.log(times.reduce((acc, current) => acc + current, 0) / times.length) | |
| })(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 'use strict'; | |
| class WaterfallContext { | |
| constructor(cbContext, chain, cb) { | |
| this.index = 0; | |
| this.chain = chain; | |
| this.cb = cb; | |
| this.cbContext = cbContext; | |
| this.result = []; | |
| } | |
| } | |
| function waterfallCB(err, ...res) { | |
| if (err) { | |
| this.cb.call(this.cbContext, err); | |
| } | |
| else { | |
| this.result = res; | |
| this.index++; | |
| waterfallNext(this); | |
| } | |
| } | |
| function waterfallNext(ctx) { | |
| if (ctx.index === ctx.chain.length) { | |
| ctx.cb.call(ctx.cbContext, null, ...ctx.result); | |
| return; | |
| } | |
| ctx.chain[ctx.index](...ctx.result, waterfallCB, ctx); | |
| } | |
| function waterfall(chain, cb, cbContext) { | |
| const ctx = new WaterfallContext(cbContext, chain, cb); | |
| waterfallNext(ctx); | |
| } | |
| const | |
| assert = require('assert'), | |
| Denque = require('denque'), | |
| async = require('async'), | |
| {errors: {KuzzleError}} = require('kuzzle-common-objects') | |
| function waterCB (error, result) { | |
| this.instance.running--; | |
| if (!this.instance.buffer.isEmpty()) { | |
| setImmediate(this.instance._boundRunNext); | |
| } | |
| if (error) { | |
| this.callback(error); | |
| } | |
| else { | |
| this.callback(null, result); | |
| } | |
| } | |
| /** | |
| * @private | |
| * @class PipeChain | |
| */ | |
| class PipeChain { | |
| constructor (chain, callback) { | |
| this.chain = chain; | |
| this.callback = callback; | |
| } | |
| } | |
| /** | |
| * @class PipeRunner | |
| * Runs pipe chains if the number of already running concurrent pipes allows it. | |
| * Otherwise, delays the pipe chain until a slot is freed. If the storage | |
| * buffer is full, rejects the provided callback and discard the pipe chain. | |
| * | |
| * @param {Number} concurrent - max number of concurrent pipes | |
| * @param {Number} bufferSize - max number of delayed pipe chains | |
| */ | |
| class PipeRunner { | |
| constructor (concurrent, bufferSize) { | |
| assert(typeof concurrent === 'number' && concurrent > 0, 'Cannot instantiate pipes executor: invalid maxConcurrentPipes parameter value'); | |
| assert(typeof bufferSize === 'number' && bufferSize > 0, 'Cannot instantiate pipes executor: invalid pipesBufferSize parameter value'); | |
| this.maxConcurrent = concurrent; | |
| this.running = 0; | |
| this.maxBufferSize = bufferSize; | |
| this.buffer = new Denque(); | |
| this._boundRunNext = this._runNext.bind(this); | |
| } | |
| run (chain, callback) { | |
| if (this.running >= this.maxConcurrent) { | |
| if (this.buffer.length >= this.maxBufferSize) { | |
| callback(new Error('too_many_pipes')); | |
| } | |
| else { | |
| this.buffer.push(new PipeChain(chain, callback)); | |
| } | |
| return; | |
| } | |
| this.running++; | |
| async.waterfall(chain, (error, result) => { | |
| this.running--; | |
| if (!this.buffer.isEmpty()) { | |
| setImmediate(this._runNext.bind(this)); | |
| } | |
| if (error) { | |
| callback(error); | |
| } | |
| else { | |
| callback(null, result); | |
| } | |
| }); | |
| } | |
| run2 (chain, callback) { | |
| if (this.running >= this.maxConcurrent) { | |
| if (this.buffer.length >= this.maxBufferSize) { | |
| callback(new Error('too_many_pipes')); | |
| } | |
| else { | |
| this.buffer.push(new PipeChain(chain, callback)); | |
| } | |
| return; | |
| } | |
| this.running++; | |
| const waterCBContext = { | |
| instance: this, | |
| callback | |
| }; | |
| waterfall(chain, waterCB, waterCBContext); | |
| } | |
| _runNext () { | |
| if (this.buffer.isEmpty() || this.running >= this.maxConcurrent) { | |
| return; | |
| } | |
| const pipe = this.buffer.shift(); | |
| this.run(pipe.chain, pipe.callback); | |
| } | |
| } | |
| module.exports = PipeRunner; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment