|
import EventEmitter from "events"; |
|
|
|
|
|
/** |
|
* A pool that acts as a queue, running a limited number of promises at once. |
|
* |
|
* The desired concurrency can be changed at any time, even while tasks are already queued, |
|
* and the pool will automatically start new tasks or throttle subsequent tasks as needed. |
|
* |
|
* This pool is fully event-based with no long-running timers or loops, and avoids recursion limits or memory leaks. |
|
*/ |
|
export class PromisePool extends EventEmitter { |
|
private runningCount = 0; |
|
private readonly queue = new SinglyLinkedList<() => Promise<any>>(); |
|
private maxSize: number; |
|
private alive = true; |
|
|
|
constructor(maxSize: number) { |
|
super(); |
|
this.maxSize = maxSize; |
|
} |
|
|
|
/** |
|
* Sets the maximum concurrency of the pool. If the new size is larger than the current size, |
|
* new tasks will be started immediately until the pool is full. |
|
* |
|
* If the new size is smaller than the current size, no new tasks will be started until |
|
* the running tasks complete and the pool is back under the limit. |
|
*/ |
|
setConcurrency(maxSize: number) { |
|
this.maxSize = maxSize; |
|
this.startNext(); |
|
} |
|
|
|
/** |
|
* Adds a function to the pool. The function will be executed once pool capacity becomes available. |
|
* |
|
* @param fn Any function to be executed when capacity is available. |
|
* @return A promise that resolves/rejects with the result of the function. |
|
*/ |
|
add<T>(fn: () => (Promise<T>|T)): Promise<T> { |
|
if (!this.alive) throw new Error('Pool has been killed.'); |
|
const prom = Promise.withResolvers<T>(); |
|
const wrapper = async () => { |
|
try { |
|
return prom.resolve(await fn()); |
|
} catch (err) { |
|
return prom.reject(err); |
|
} finally { |
|
this.runningCount--; |
|
this.startNext(); |
|
} |
|
} |
|
this.queue.add(wrapper); |
|
this.startNext(); |
|
return prom.promise; |
|
} |
|
|
|
/** |
|
* This is a convenience method for adding a function call with dynamic arguments to the pool. |
|
* |
|
* Shorthand for `add(() => fn(...args))` or `add(fn.bind(thisArg, ...args))`. |
|
* @param fn The callable function to be executed. |
|
* @param args The arguments to be passed to the function. |
|
* @param thisArg Optional `this` context to bind the function to. |
|
*/ |
|
addBound< |
|
FN extends (...args: any[]) => any, T extends ReturnType<FN>, ARGS extends Parameters<FN> |
|
>(fn: FN, args: ARGS, thisArg?: any): Promise<T> { |
|
return this.add(fn.bind(thisArg, ...args)); |
|
} |
|
|
|
private startNext() { |
|
if (!this.alive) return; |
|
while (this.runningCount < this.maxSize) { |
|
const next = this.queue.shift(); |
|
if (next) { |
|
this.runningCount++; |
|
setImmediate(next); |
|
} else { |
|
if (this.runningCount === 0) { |
|
this.emit('empty'); |
|
} |
|
break; |
|
} |
|
} |
|
} |
|
|
|
/** Waits until the pool has no running or pending tasks. */ |
|
waitForEmpty(): Promise<void> { |
|
if (this.runningCount === 0 && this.queue.length === 0) return Promise.resolve(); |
|
return new Promise(resolve => this.once('empty', resolve)); |
|
} |
|
|
|
/** |
|
* Kills the pool, clearing all pending tasks and preventing new tasks from being added. |
|
* Waits for all currently running Promises to complete before resolving. |
|
*/ |
|
async kill() { |
|
this.alive = false; |
|
this.queue.clear(); |
|
return await this.waitForEmpty(); |
|
} |
|
|
|
get isAlive() { return this.alive } |
|
get queueSize() { return this.queue.length } |
|
get running() { return this.runningCount } |
|
} |
|
|
|
interface LinkedListNode { |
|
value: any; |
|
next: LinkedListNode | null; |
|
} |
|
|
|
/** A simple singly-linked FIFO list implementation for queueing tasks. */ |
|
export class SinglyLinkedList<T> { |
|
private head: LinkedListNode | null = null; |
|
private tail: LinkedListNode | null = null; |
|
private size = 0; |
|
|
|
add(value: T) { |
|
const newNode: LinkedListNode = { value, next: null }; |
|
this.size++; |
|
if (!this.tail) { |
|
this.head = newNode; |
|
this.tail = newNode; |
|
} else { |
|
this.tail.next = newNode; |
|
this.tail = newNode; |
|
} |
|
} |
|
|
|
shift(): T|undefined { |
|
if (!this.head) return undefined; |
|
this.size--; |
|
const value = this.head.value; |
|
this.head = this.head.next; |
|
if (!this.head) this.tail = null; |
|
return value; |
|
} |
|
|
|
clear() { |
|
this.head = null; |
|
this.tail = null; |
|
this.size = 0; |
|
} |
|
|
|
get length() { return this.size } |
|
} |
|
|
|
|
|
/** |
|
* Creates a PromisePool and provides decorators and wrappers for using it to easily create async functions and methods, |
|
* which all share the same concurrency limit. |
|
* |
|
* *Note:* The wrapped functions are not reentrant. |
|
* Be careful when using nested wrapped calls, as they may block each other. |
|
* Consider using multiple pools to wrap different methods if needed. |
|
*/ |
|
export function PoolWrapper(initialConcurrency: number) { |
|
const pool = new PromisePool(initialConcurrency); |
|
return { |
|
/** |
|
* The underlying PromisePool instance. |
|
* ```typescript |
|
* const { pool, decorators } = PoolWrapper(3); |
|
* pool.setConcurrency(1); |
|
* ``` |
|
*/ |
|
pool, |
|
decorators: { |
|
/** |
|
* Wrap a method so that it is executed within the promise pool's concurrency limits. |
|
* |
|
* The target method must return a Promise. |
|
* |
|
* ```typescript |
|
* class TestWrapped { |
|
* @wrapper.decorators.method |
|
* async testMethod(a: number, b: string) {} |
|
* } |
|
* ``` |
|
*/ |
|
method: <This, Args extends any[], Return>( |
|
target: (this: This, ...args: Args) => Promise<Return> |
|
): (this: This, ...args: Args) => Promise<Return> => { |
|
function replacementMethod(this: This, ...args: Args): Promise<Return> { |
|
return pool.addBound(target as any, args as any, this); |
|
} |
|
|
|
return replacementMethod; |
|
}, |
|
}, |
|
|
|
/** |
|
* Wrap a function so that it is executed within the promise pool's concurrency limits. |
|
* |
|
* ```typescript |
|
* const wrappedFunction = wrapper.fn(async (a: number, b: string) => {}); |
|
* ``` |
|
*/ |
|
fn: <FN extends (...args: any[]) => any, T extends ReturnType<FN>, ARGS extends Parameters<FN>>(fn: FN) => { |
|
return (...args: ARGS): Promise<T> => { |
|
return pool.addBound(fn, args); |
|
} |
|
}, |
|
} |
|
} |