Created
April 12, 2014 13:40
-
-
Save benjamingr/10536349 to your computer and use it in GitHub Desktop.
Bluebird asParallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Promise.longStackTraces(); | |
//end debug | |
/** | |
Hijack Promise.map to accept parallelism limit argument | |
**/ | |
function queueConcurrent(work, degreeOfParallelism) { | |
// work is array of functions returning promises | |
"use strict"; | |
return new Promise(function (ful, rej) { | |
var results = new Array(work.length); | |
degreeOfParallelism = Math.min(work.length, degreeOfParallelism); | |
var last = degreeOfParallelism; | |
for (var i = 0; i < last; i++) { | |
var p = work[i]().bind(i).then(queueNext, rej); | |
} | |
function queueNext(result) { | |
results[this] = result; | |
if (last === work.length + degreeOfParallelism - 1) return ful(results); | |
if (last >= work.length) return last++; | |
return work[last++]().bind(last).then(queueNext, rej); | |
} | |
}); | |
} | |
var oldMap = Promise.prototype.map; | |
Promise.prototype.map = function mapLimit(fn,n) { | |
if(arguments.length === 1){ | |
return oldMap.apply(this,arguments); | |
} | |
return this.then(function (work) { | |
return queueConcurrent(work.map(function(el){ | |
return function(){ | |
return Promise.resolve(fn(el)); | |
}; | |
}), n); | |
}); | |
}; | |
/** | |
ParallelJS changes | |
*/ | |
var isNode = typeof global !== "undefined" && {}.toString.call(global) == '[object global]'; | |
var maxWorkers = isNode ? require('os').cpus().length : 4; | |
function Parallel(){} | |
Promise.prototype.asParallel = function(opts){ | |
var p = this.then(function(v){ return v;}); | |
p.requiredScripts = []; | |
p.requiredFunctions = []; | |
p.getWorkerSource = Parallel.prototype.getWorkerSource; | |
p._spawnWorker = Parallel.prototype._spawnWorker; | |
p.spawn = Parallel.prototype.spawn; | |
p.options = {evalPath:null}; | |
p.map = Parallel.prototype.map; | |
return p; | |
}; | |
Parallel.prototype.getWorkerSource = function (cb) { | |
var preStr = ''; | |
var i = 0; | |
if (!isNode && this.requiredScripts.length !== 0) { | |
preStr += 'importScripts("' + this.requiredScripts.join('","') + '");\r\n'; | |
} | |
for (i = 0; i < this.requiredFunctions.length; ++i) { | |
if (this.requiredFunctions[i].name) { | |
preStr += 'var ' + this.requiredFunctions[i].name + ' = ' + this.requiredFunctions[i].fn.toString() + ';'; | |
} else { | |
preStr += this.requiredFunctions[i].fn.toString(); | |
} | |
} | |
if (isNode) { | |
return preStr + 'process.on("message", function(e) {process.send(JSON.stringify((' + cb.toString() + ')(JSON.parse(e).data)))})'; | |
} else { | |
return preStr + 'self.onmessage = function(e) {self.postMessage((' + cb.toString() + ')(e.data))}'; | |
} | |
}; | |
Parallel.prototype._spawnWorker = function (cb) { | |
var wrk; | |
var src = this.getWorkerSource(cb); | |
if (isNode) { | |
wrk = new Worker(this.options.evalPath); | |
wrk.postMessage(src); | |
} else { | |
if (Worker === undefined) { | |
return undefined; | |
} | |
try { | |
if (!URL) { | |
throw new Error('Can\'t create a blob URL in this browser!'); | |
} else { | |
//console.log(src); | |
var blob = new Blob([src], { | |
type: 'text/javascript' | |
}); | |
var url = URL.createObjectURL(blob); | |
wrk = new Worker(url); | |
} | |
} catch (e) { | |
if (this.options.evalPath !== null) { // blob/url unsupported, cross-origin error | |
wrk = new Worker(this.options.evalPath); | |
wrk.postMessage(src); | |
} else { | |
throw e; | |
} | |
} | |
} | |
return wrk; | |
}; | |
Parallel.prototype.spawn = function (cb,data) { | |
var that = this; | |
return this.then(function (d) { | |
var wrk = that._spawnWorker(cb); | |
if (wrk !== undefined) { | |
var p = pMessage(wrk).then(function (msg) { | |
return msg.data; | |
}); | |
wrk.postMessage(data); | |
return p; | |
} | |
return cb(data); // sync fallback | |
}); | |
function pMessage(wrk){ | |
return new Promise(function(resolve,reject){ | |
wrk.onmessage = resolve; | |
wrk.onerror = function(ev){ | |
reject(ev.message); | |
}; | |
}).finally(function(){ | |
wrk.terminate(); | |
}); | |
} | |
}; | |
Parallel.prototype.map = function map(cb,deg){ | |
var that = this; | |
return Promise.prototype.map.call(this,function(el){ | |
return that.spawn(cb,el); | |
}, deg || maxWorkers); | |
}; | |
var arr = []; for(var i=0;i<8;i++) arr.push(40); | |
Promise.resolve(arr).tap(function(){ | |
console.time("time"); | |
}).asParallel().map(function(el){ | |
function fib(n){ | |
return n < 2? 1 : (fib(n-1)+fib(n-2)); | |
} | |
return fib(el); | |
},8).then(function(res){ | |
console.log(res); | |
}).tap(function(el){ | |
console.timeEnd("time"); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Bits of source code ripped from ParallelJS, so props for them for _spawnWorker and etc.