Skip to content

Instantly share code, notes, and snippets.

@shadowmoose
Last active August 28, 2025 01:24
Show Gist options
  • Save shadowmoose/2afacf36674f0dcbc9594325517b4d54 to your computer and use it in GitHub Desktop.
Save shadowmoose/2afacf36674f0dcbc9594325517b4d54 to your computer and use it in GitHub Desktop.
TypeScript Promise Pool

This is a simple executor queue, which supports dynamic limited concurrency for all queued Functions/Promises. I find myself requiring small utilities like this frequently, so it's getting dumped here for future reference.

The concurrency limit can be adjusted in realtime, even while pending tasks are queued, and the queue will execute more (or fewer) tasks in order to reach the new target. The results or errors from the queued tasks will be safely managed, and directly returned or raised back through the Promise returned by queue.add.

The queue itself is relatively lightweight, requiring no long-running timers or loops, and has been tested extensively for memory leaks and recursion limits in Bun and Node.

const pool = new PromisePool(3);

void pool.add(() => console.log('test')).catch(console.error);
import EventEmitter from "events";
/**
* A pool that acts as a queue, running a limited number of promises at once.
*
* The desired concurrency can be changed at any time, even while tasks are already queued,
* and the pool will automatically start new tasks or throttle subsequent tasks as needed.
*
* This pool is fully event-based with no long-running timers or loops, and avoids recursion limits or memory leaks.
*/
export class PromisePool extends EventEmitter {
private runningCount = 0;
private readonly queue = new SinglyLinkedList<() => Promise<any>>();
private maxSize: number;
private alive = true;
constructor(maxSize: number) {
super();
this.maxSize = maxSize;
}
/**
* Sets the maximum concurrency of the pool. If the new size is larger than the current size,
* new tasks will be started immediately until the pool is full.
*
* If the new size is smaller than the current size, no new tasks will be started until
* the running tasks complete and the pool is back under the limit.
*/
setConcurrency(maxSize: number) {
this.maxSize = maxSize;
this.startNext();
}
/**
* Adds a function to the pool. The function will be executed once pool capacity becomes available.
*
* @param fn Any function to be executed when capacity is available.
* @return A promise that resolves/rejects with the result of the function.
*/
add<T>(fn: () => (Promise<T>|T)): Promise<T> {
if (!this.alive) throw new Error('Pool has been killed.');
const prom = Promise.withResolvers<T>();
const wrapper = async () => {
try {
return prom.resolve(await fn());
} catch (err) {
return prom.reject(err);
} finally {
this.runningCount--;
this.startNext();
}
}
this.queue.add(wrapper);
this.startNext();
return prom.promise;
}
/**
* This is a convenience method for adding a function call with dynamic arguments to the pool.
*
* Shorthand for `add(() => fn(...args))` or `add(fn.bind(thisArg, ...args))`.
* @param fn The callable function to be executed.
* @param args The arguments to be passed to the function.
* @param thisArg Optional `this` context to bind the function to.
*/
addBound<
FN extends (...args: any[]) => any, T extends ReturnType<FN>, ARGS extends Parameters<FN>
>(fn: FN, args: ARGS, thisArg?: any): Promise<T> {
return this.add(fn.bind(thisArg, ...args));
}
private startNext() {
if (!this.alive) return;
while (this.runningCount < this.maxSize) {
const next = this.queue.shift();
if (next) {
this.runningCount++;
setImmediate(next);
} else {
if (this.runningCount === 0) {
this.emit('empty');
}
break;
}
}
}
/** Waits until the pool has no running or pending tasks. */
waitForEmpty(): Promise<void> {
if (this.runningCount === 0 && this.queue.length === 0) return Promise.resolve();
return new Promise(resolve => this.once('empty', resolve));
}
/**
* Kills the pool, clearing all pending tasks and preventing new tasks from being added.
* Waits for all currently running Promises to complete before resolving.
*/
async kill() {
this.alive = false;
this.queue.clear();
return await this.waitForEmpty();
}
get isAlive() { return this.alive }
get queueSize() { return this.queue.length }
get running() { return this.runningCount }
}
interface LinkedListNode {
value: any;
next: LinkedListNode | null;
}
/** A simple singly-linked FIFO list implementation for queueing tasks. */
export class SinglyLinkedList<T> {
private head: LinkedListNode | null = null;
private tail: LinkedListNode | null = null;
private size = 0;
add(value: T) {
const newNode: LinkedListNode = { value, next: null };
this.size++;
if (!this.tail) {
this.head = newNode;
this.tail = newNode;
} else {
this.tail.next = newNode;
this.tail = newNode;
}
}
shift(): T|undefined {
if (!this.head) return undefined;
this.size--;
const value = this.head.value;
this.head = this.head.next;
if (!this.head) this.tail = null;
return value;
}
clear() {
this.head = null;
this.tail = null;
this.size = 0;
}
get length() { return this.size }
}
/**
* Creates a PromisePool and provides decorators and wrappers for using it to easily create async functions and methods,
* which all share the same concurrency limit.
*
* *Note:* The wrapped functions are not reentrant.
* Be careful when using nested wrapped calls, as they may block each other.
* Consider using multiple pools to wrap different methods if needed.
*/
export function PoolWrapper(initialConcurrency: number) {
const pool = new PromisePool(initialConcurrency);
return {
/**
* The underlying PromisePool instance.
* ```typescript
* const { pool, decorators } = PoolWrapper(3);
* pool.setConcurrency(1);
* ```
*/
pool,
decorators: {
/**
* Wrap a method so that it is executed within the promise pool's concurrency limits.
*
* The target method must return a Promise.
*
* ```typescript
* class TestWrapped {
* @wrapper.decorators.method
* async testMethod(a: number, b: string) {}
* }
* ```
*/
method: <This, Args extends any[], Return>(
target: (this: This, ...args: Args) => Promise<Return>
): (this: This, ...args: Args) => Promise<Return> => {
function replacementMethod(this: This, ...args: Args): Promise<Return> {
return pool.addBound(target as any, args as any, this);
}
return replacementMethod;
},
},
/**
* Wrap a function so that it is executed within the promise pool's concurrency limits.
*
* ```typescript
* const wrappedFunction = wrapper.fn(async (a: number, b: string) => {});
* ```
*/
fn: <FN extends (...args: any[]) => any, T extends ReturnType<FN>, ARGS extends Parameters<FN>>(fn: FN) => {
return (...args: ARGS): Promise<T> => {
return pool.addBound(fn, args);
}
},
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment