Skip to content

Instantly share code, notes, and snippets.

@jcouyang
Last active November 12, 2024 21:37
Show Gist options
  • Save jcouyang/632709f30e12a7879a73e9e132c0d56b to your computer and use it in GitHub Desktop.
Save jcouyang/632709f30e12a7879a73e9e132c0d56b to your computer and use it in GitHub Desktop.
Promise All with Limit of Concurrent N

The Promise All Problem

in case of processing a very large array e.g. Promise.all(A_VERY_LARGE_ARRAY_OF_XHR_PROMISE)

which would probably blow you browser memory by trying to send all requests at the same time

solution is limit the concurrent of requests, and wrap promise in thunk

Promise.allConcurrent(2)([()=>fetch('BLAH1'), ()=>fetch('BLAH2'),...()=>fetch('BLAHN')])

if set concurrent to 2, it will send request BLAH1 and BLAH2 at the same time

if BLAH1 return response and resolved, will immediatly send request to BLAH3

in this way promise sending at the same time always keep the limit 2 which we’ve just configed before

describe('promiseAllStepN', function(){
describe('3 tasks, and cucurrent is 2', function(){
let tasks;
beforeEach(function(){
tasks = range(3).map(x=>sinon.stub())
})
describe('1 is finish', function(){
it('will kickoff the third task', function(done){
tasks[0].returns(Promise.resolve(0))
let task2 = tasks[2]
tasks[1].returns(new Promise(resolve=>setTimeout(()=>{
expect(task2.called).to.be.equal(true)
resolve(1)
done()
}, 1000)))
tasks[2].returns(Promise.resolve(2))
return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
})
})
})
describe('10 tasks, and cucurrent is 3', function(){
let tasks;
beforeEach(function(){
tasks = range(10).map(x=>sinon.stub())
})
describe('1st is finish but 2nd stuck', function(){
it.only('final task will run before 2nd', function(done){
tasks.forEach((task,index) => task.returns(Promise.resolve(index)))
let task10 = tasks[9]
tasks[1].returns(new Promise(resolve=>setTimeout(()=>{
expect(task10.called).to.be.equal(true)
resolve(1)
done()
}, 1000)))
return Promise.allConcurrent(2)(tasks).then(x=>console.log(x))
})
})
})
})
function promiseAllStepN(n, list) {
let tail = list.splice(n)
let head = list
let resolved = []
let processed = 0
return new Promise(resolve=>{
head.forEach(x=>{
let res = x()
resolved.push(res)
res.then(y=>{
runNext()
return y
})
})
function runNext(){
if(processed == tail.length){
resolve(Promise.all(resolved))
}else{
resolved.push(tail[processed]().then(x=>{
runNext()
return x
}))
processed++
}
}
})
}
Promise.allConcurrent = n => list => promiseAllStepN(n, list)
@gionkunz
Copy link

gionkunz commented Nov 12, 2024

I know this is four years ago, but given it's still a top google result, I wanted to share my approach / style too. I made a couple of decisions in my implementation:

  • Typescript
  • Readability is more important than Size
  • Using Promise.race instead of Promise.all internally in order to maximize "worker" usage
  • Using const generic types to support promise tuple types in returned Promise.all
  • Guardless approach (loops just check queues, operations are only performed when needed without additional guards, eg. splice works with deleteCount=0 naturally and empty array is not processed)
  • Mix of data types (Array for remainingTasks, Map for executingTasks and array again for finishedTasks) to have native solutions for performance optimizations

If you prefer some of those decisions for your own code then hopefully this solution is useful to you. I did not yet test the function very extensively, so there might be bugs I did not account for yet.

export type AsyncFunction<T = unknown> = () => Promise<T>;
export type TaskResult<T = unknown> = {
  index: number;
  value: T;
  task: AsyncFunction<T>;
};

export async function promiseAllMaxConcurrency<const Tasks extends AsyncFunction<unknown>[]>(
  tasks: Tasks,
  maxConcurrency = 4
): Promise<{ [K in keyof Tasks]: Awaited<ReturnType<Tasks[K]>> }> {
  const remainingTasks = [...tasks];
  const finishedTasks: Promise<unknown>[] = [];
  const executingTasks = new Map<number, Promise<TaskResult>>();
  do {
    const tasksToAdd = maxConcurrency - executingTasks.size;
    for (const task of remainingTasks.splice(0, tasksToAdd)) {
      const index = tasks.indexOf(task);
      const taskPromise = task().then((value) => ({
        index: tasks.indexOf(task),
        value,
        task,
      } satisfies TaskResult));
      executingTasks.set(index, taskPromise);
    }

    const finishedPromise = Promise.race(executingTasks.values());
    const result = await finishedPromise;
    finishedTasks[result.index] = finishedPromise.then(result => result.value);
    executingTasks.delete(result.index);
  } while (remainingTasks.length > 0 || executingTasks.size > 0);

  return Promise.all(finishedTasks) as Promise<{ [K in keyof Tasks]: Awaited<ReturnType<Tasks[K]>> }>;
}

promiseAllMaxConcurrency([
  () => new Promise<number>((resolve) => resolve(1)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('two'), 1000)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('three'), 1000)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('four'), 1000)),
  () => new Promise<string>((resolve) => setTimeout(() => resolve('long running...'), 5000)),
  () => new Promise<boolean>((resolve) => setTimeout(() => resolve(true), 1000)),
]).then((results) => {
  console.log(results);
}).catch(e => console.error(e));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment