Skip to content

Instantly share code, notes, and snippets.

@Zaynex
Last active June 6, 2019 16:08
Show Gist options
  • Save Zaynex/a01b2ff7526975bbdf2ebee26d5adbd0 to your computer and use it in GitHub Desktop.
Save Zaynex/a01b2ff7526975bbdf2ebee26d5adbd0 to your computer and use it in GitHub Desktop.
并行的异步 task 队列
/**
* 并行异步队列
* 首次上传时占用最大的并行请求数
* 后续当每个之前的请求结束后,继续执行下一个请求
* 应用场景:分片文件上传
*
*/
class WraperTask {
task;
finished;
taskIndex;
constructor(task, finished, taskIndex?) {
this.task = task;
this.finished = finished;
this.taskIndex = taskIndex;
}
}
class Task {
maxParalleTask;
taskResolve;
queueResolve;
taskQueue;
currentTaskIndex;
currentCount;
/**
*
* @param {number} maxParalleTask 最大并行数量
* @param {func} taskResolve 单个 task 的 resolve
* @param {func} queueResolve 所有 task 完成的 resolve
*/
constructor(maxParalleTask, taskResolve, queueResolve) {
this.maxParalleTask = maxParalleTask;
this.taskResolve = taskResolve;
this.queueResolve = queueResolve;
this.reset();
}
push(asyncTask) {
return this.taskQueue.push(new WraperTask(asyncTask, false));
}
getTaskQueue() {
return this.taskQueue.length;
}
getUnfinishedTaskQueue() {
return this.taskQueue.reduce(function(accumulator, currentValue) {
accumulator = currentValue.finished ? accumulator : accumulator.concat(currentValue);
return accumulator;
}, []);
}
paralleTask() {
this.taskQueue
.slice(0, Math.min(this.taskQueue.length, this.maxParalleTask))
.forEach((singleTask, index) => {
singleTask.taskIndex = index;
singleTask.task().then(args => this._resolve(args, singleTask));
this.currentTaskIndex += 1;
this.currentCount += 1;
});
}
_resolve(args, singleTask) {
singleTask.finished = true;
this.currentCount -= 1;
if (this.taskResolve) {
this.taskResolve(args);
}
// not finished
console.log(`this.getUnfinishedTaskQueue`, this.getUnfinishedTaskQueue());
if (this.getUnfinishedTaskQueue().length !== 0) {
console.log('next task');
if (this.currentCount < this.maxParalleTask) {
this.dispatchNextTask();
}
return;
}
// finished
if (this.queueResolve) {
this.queueResolve();
}
this.reset();
}
dispatchNextTask() {
const singleTask = this.taskQueue[this.currentTaskIndex];
if (singleTask && singleTask.task) {
singleTask.task().then(args => this._resolve(args, singleTask));
this.currentTaskIndex += 1;
this.currentCount += 1;
}
}
reset() {
this.taskQueue = [];
this.currentTaskIndex = 0; // 执行到第几个 task
this.currentCount = 0; // 当前执行的 task 的总数
}
}
function handleAll() {
console.log('all task resolved');
}
function handleResolve(data) {
console.log('task resolved', data);
}
function MockTest() {
let task = new Task(4, handleResolve, handleAll);
for (let i = 0; i < 10; i++) {
task.push(signleTask);
}
function signleTask() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(Math.random());
}, Math.random() * 10000);
});
}
task.paralleTask();
}
MockTest();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment