Skip to content

Instantly share code, notes, and snippets.

@myfreeer
Last active June 20, 2017 10:57
Show Gist options
  • Save myfreeer/04122d3aca873cd0b266526e817b326e to your computer and use it in GitHub Desktop.
Save myfreeer/04122d3aca873cd0b266526e817b326e to your computer and use it in GitHub Desktop.
/**
* Class AsyncThread: Async-based threading
* @license BSD-3-Clause
*/
class AsyncThread extends Array {
/**
* @typedef {Object} task
* @property {*} data - [optional] argument of `resolver`
* @property {Function} resolver - a function that takes `data` as the only argument, returns Promise
*/
/**
* @class
* @param {number} threads - number of threads to run
* @param {task[]} queue - array of tasks
*/
constructor(threads = 1, queue = []) {
super(...queue);
this.format();
/**
* threads to resolve queue
* @type {number}
*/
this._threads = threads | 0;
}
/**
* Change thread number
* @param {number} threads - new threads number
*/
set(threads = this._threads) {
this._threads = threads | 0;
}
/**
* Empty queue
*/
empty() {
this.length = 0;
}
/**
* Remove malformed task from queue and renew task index,
* modified from https://stackoverflow.com/a/281335
* @return {AsyncThread} this
*/
format() {
for (let i = 0; i < this.length; i++) {
const e = this[i];
if (!(e && e.resolver && e.resolver instanceof Function)) {
this.splice(i, 1);
i--;
} else this[i].index = i;
}
return this;
}
/**
* Start a thread to resolve tasks in queue
* @private
* @return {Promise} this.result
*/
async _resolve() {
let task;
while (task = this.shift()) {
try {
this.result[task.index] = await task.resolver(task.data);
} catch (e) {
this.result[task.index] = e;
}
}
return this.result;
}
/**
* Start to resolve tasks in queue
* @return {Promise} this.result
*/
async run() {
this.format();
/**
* Containing the result of solving the queue after run()
* @type {Array}
*/
this.result = new Array(this.length);
const threads = this._threads;
let threadArray = new Array(threads);
for (let i = 0; i < threads; i++)
threadArray[i] = this._resolve();
this._result = Promise.all(threadArray);
await this._result;
return this.result;
}
}
@myfreeer
Copy link
Author

myfreeer commented Jun 5, 2017

//Tests
let threadCount = 4;
let thread = new AsyncThread(threadCount);
let mr=e=>new Promise((resolve, reject) => setTimeout(()=>resolve({data:e,time:performance.now()-time}),e));
for (let i=0; i< 100;i++)thread.push({data:100*Math.random(),resolver:mr});
thread[3]=0;
thread[2222]=0;
let time=performance.now();
let tmp=thread.run();
tmp.then(arr=>{
    let expectedTime = arr.reduce((a,b)=>a+b.data,0) / threadCount;
    let totalTime = arr[arr.length - 1].time;
    let timeCost = totalTime - expectedTime;
    console.info('AsyncThread Test: Threads: ',threadCount,', Expected Time: ', expectedTime, 'ms, Real Time: ', totalTime,'ms, Time Cost: ', timeCost,'ms.');
    console.info('AsyncThread Test: Result: ',arr);
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment