Skip to content

Instantly share code, notes, and snippets.

@gvergnaud
Last active July 28, 2024 13:17
Show Gist options
  • Select an option

  • Save gvergnaud/95329f23f9a3b2edd798c1cc6ecd2b26 to your computer and use it in GitHub Desktop.

Select an option

Save gvergnaud/95329f23f9a3b2edd798c1cc6ecd2b26 to your computer and use it in GitHub Desktop.
A simple implementation of a promise queue with concurrency
import { deferred } from './deferred';
export class PromiseQueue<T> {
readonly concurrency: number;
#ongoingPromisesCount: number;
#toRun: Array<() => Promise<T>>;
constructor({ concurrency = 1 } = {}) {
this.concurrency = concurrency;
this.#ongoingPromisesCount = 0;
this.#toRun = [];
}
get pendingCount() {
return this.#toRun.length;
}
add = (getPromise: () => Promise<T>): Promise<T> => {
if (this.#ongoingPromisesCount < this.concurrency) {
return this.#runAndCount(getPromise).finally(
this.#onPromiseFinally,
);
}
const { reject, resolve, promise } = deferred<T>();
this.#toRun.unshift(() => {
return this.#runAndCount(getPromise)
.then((res) => {
resolve(res);
return res;
})
.catch((err) => {
reject(err);
throw err;
});
});
return promise;
};
#runAndCount = (getPromise: () => Promise<T>) => {
this.#ongoingPromisesCount++;
return getPromise().finally(() => {
this.#ongoingPromisesCount--;
});
};
#onPromiseFinally = () => {
if (this.#toRun.length) {
const runPromise = this.#toRun.pop()!;
runPromise().finally(this.#onPromiseFinally);
}
};
}
import PromiseQueue from './PromiseQueue'
const queue = new PromiseQueue({ concurrency: 2 }) // Promises will be run two by two
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
queue.add(() => delay(2000))
queue.add(() => delay(2000))
queue.add(() => delay(2000))
queue.add(() => delay(2000))
queue.add(() => delay(2000))
.then(() => console.log('resolved!')) // this will happen after 6 sec
import { PromiseQueue } from './promise-queue';
export function oneAtATime<Args extends any[], T>(
run: (...args: Args) => Promise<T>,
mergeArgs: (args1: Args, arg2: Args) => Args = (a, b) => b,
): (...args: Args) => Promise<T> {
let currentArgs: Args | undefined;
const queue = new PromiseQueue<T>();
let lastestPromise: Promise<T> | null = null;
const consumeArgsAndRun = async () => {
const argsCopy = currentArgs!;
currentArgs = undefined;
return run(...argsCopy);
};
return (...args) => {
currentArgs = currentArgs ? mergeArgs(currentArgs, args) : args;
if (lastestPromise && queue.pendingCount > 0) {
return lastestPromise;
}
lastestPromise = queue.add(consumeArgsAndRun);
return lastestPromise;
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment