Last active
June 6, 2019 16:08
-
-
Save Zaynex/a01b2ff7526975bbdf2ebee26d5adbd0 to your computer and use it in GitHub Desktop.
并行的异步 task 队列
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* 并行异步队列 | |
* 首次上传时占用最大的并行请求数 | |
* 后续当每个之前的请求结束后,继续执行下一个请求 | |
* 应用场景:分片文件上传 | |
* | |
*/ | |
class WraperTask { | |
task; | |
finished; | |
taskIndex; | |
constructor(task, finished, taskIndex?) { | |
this.task = task; | |
this.finished = finished; | |
this.taskIndex = taskIndex; | |
} | |
} | |
class Task { | |
maxParalleTask; | |
taskResolve; | |
queueResolve; | |
taskQueue; | |
currentTaskIndex; | |
currentCount; | |
/** | |
* | |
* @param {number} maxParalleTask 最大并行数量 | |
* @param {func} taskResolve 单个 task 的 resolve | |
* @param {func} queueResolve 所有 task 完成的 resolve | |
*/ | |
constructor(maxParalleTask, taskResolve, queueResolve) { | |
this.maxParalleTask = maxParalleTask; | |
this.taskResolve = taskResolve; | |
this.queueResolve = queueResolve; | |
this.reset(); | |
} | |
push(asyncTask) { | |
return this.taskQueue.push(new WraperTask(asyncTask, false)); | |
} | |
getTaskQueue() { | |
return this.taskQueue.length; | |
} | |
getUnfinishedTaskQueue() { | |
return this.taskQueue.reduce(function(accumulator, currentValue) { | |
accumulator = currentValue.finished ? accumulator : accumulator.concat(currentValue); | |
return accumulator; | |
}, []); | |
} | |
paralleTask() { | |
this.taskQueue | |
.slice(0, Math.min(this.taskQueue.length, this.maxParalleTask)) | |
.forEach((singleTask, index) => { | |
singleTask.taskIndex = index; | |
singleTask.task().then(args => this._resolve(args, singleTask)); | |
this.currentTaskIndex += 1; | |
this.currentCount += 1; | |
}); | |
} | |
_resolve(args, singleTask) { | |
singleTask.finished = true; | |
this.currentCount -= 1; | |
if (this.taskResolve) { | |
this.taskResolve(args); | |
} | |
// not finished | |
console.log(`this.getUnfinishedTaskQueue`, this.getUnfinishedTaskQueue()); | |
if (this.getUnfinishedTaskQueue().length !== 0) { | |
console.log('next task'); | |
if (this.currentCount < this.maxParalleTask) { | |
this.dispatchNextTask(); | |
} | |
return; | |
} | |
// finished | |
if (this.queueResolve) { | |
this.queueResolve(); | |
} | |
this.reset(); | |
} | |
dispatchNextTask() { | |
const singleTask = this.taskQueue[this.currentTaskIndex]; | |
if (singleTask && singleTask.task) { | |
singleTask.task().then(args => this._resolve(args, singleTask)); | |
this.currentTaskIndex += 1; | |
this.currentCount += 1; | |
} | |
} | |
reset() { | |
this.taskQueue = []; | |
this.currentTaskIndex = 0; // 执行到第几个 task | |
this.currentCount = 0; // 当前执行的 task 的总数 | |
} | |
} | |
function handleAll() { | |
console.log('all task resolved'); | |
} | |
function handleResolve(data) { | |
console.log('task resolved', data); | |
} | |
function MockTest() { | |
let task = new Task(4, handleResolve, handleAll); | |
for (let i = 0; i < 10; i++) { | |
task.push(signleTask); | |
} | |
function signleTask() { | |
return new Promise((resolve, reject) => { | |
setTimeout(() => { | |
resolve(Math.random()); | |
}, Math.random() * 10000); | |
}); | |
} | |
task.paralleTask(); | |
} | |
MockTest(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment