Skip to content

Instantly share code, notes, and snippets.

@becelli
Last active January 8, 2024 13:26
Show Gist options
  • Save becelli/b143569880f07f8efaecb8672f1d5a6e to your computer and use it in GitHub Desktop.
Save becelli/b143569880f07f8efaecb8672f1d5a6e to your computer and use it in GitHub Desktop.
A concurrency pool implementation in TypeScript
export class ConcurrencyPool<I> {
private readonly logger = console;
public constructor(
private readonly data: I[],
private readonly workerCount: number
) {}
public async executeSettled<O>(
mapper: (item: I) => Promise<O>
): Promise<Array<PromiseSettledResult<O>>> {
const results = new Array<PromiseSettledResult<O>>(this.data.length);
const iterator = this.data.entries();
const workers = Array(this.workerCount)
.fill(iterator)
.map(this.doWorkSettled.bind(this, results, mapper));
await Promise.all(workers);
return results;
}
public async execute<O>(mapper: (item: I) => Promise<O>): Promise<O[]> {
const results = new Array<O>(this.data.length);
const iterator = this.data.entries();
const mapFunction = this.doWork.bind(this, results, mapper);
const workers = Array(this.workerCount).fill(iterator).map(mapFunction);
await Promise.all(workers);
return results;
}
private async doWorkSettled<O>(
results: Array<PromiseSettledResult<O>>,
mapper: (item: I) => Promise<O>,
iterator: IterableIterator<[number, I]>
) {
for (const [index, item] of iterator) {
try {
results[index] = {
status: "fulfilled",
value: await mapper(item),
};
} catch (error: unknown) {
results[index] = {
status: "rejected",
reason: error,
};
}
}
}
private async doWork<O>(
results: O[],
mapper: (item: I) => Promise<O>,
iterator: IterableIterator<[number, I]>
) {
for (const [index, item] of iterator) {
results[index] = await mapper(item);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment