Skip to content

Instantly share code, notes, and snippets.

@snuffyDev
Created March 29, 2023 18:32
Show Gist options
  • Select an option

  • Save snuffyDev/1e3d49bbedbc84e3471181bef15dba5c to your computer and use it in GitHub Desktop.

Select an option

Save snuffyDev/1e3d49bbedbc84e3471181bef15dba5c to your computer and use it in GitHub Desktop.
/**
* PromisePool limits the number of concurrent executions for any given task.
*
* @example
* ```ts
* import { PromisePool, InlineThread } from 'nanothreads';
*
* const handle = new InlineThread({
* task: (a, b) => a * b
* });
*
* const pool = new PromisePool(4);
*
* // Call the thread 4 times concurrently, 5th call will wait
* for (const num of [...Array(5).keys()]) {
* await pool.add(async ()=> {
* return await handle.send(num, Math.pow(num, 2));
* })
* }
* ```
*/
export class PromisePool {
private concurrency: number;
private tasks = new Set<Promise<any>>();
private activeTaskId: number = 0;
constructor(concurrency: number) {
this.concurrency = concurrency;
}
async add<T = void>(asyncTaskFn: () => Promise<T>): Promise<T> {
const taskPromise = asyncTaskFn();
// Add the task to the tasks map
this.tasks.add(taskPromise);
// Process tasks until the concurrency limit is reached
while (this.activeTaskId >= this.concurrency) {
await Promise.race(this.tasks.values());
}
this.activeTaskId++;
// Start the task and increment the active task count
taskPromise.finally(() => {
// after promise has resolved or rejected, remove from the pool
this.tasks.delete(taskPromise);
this.activeTaskId--;
});
return taskPromise;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment