Skip to content

Instantly share code, notes, and snippets.

@Kishibe3
Last active May 30, 2025 10:50
Show Gist options
  • Save Kishibe3/46efda0f1f9ea4325162a1b01a3e85f8 to your computer and use it in GitHub Desktop.
Save Kishibe3/46efda0f1f9ea4325162a1b01a3e85f8 to your computer and use it in GitHub Desktop.
This is the Javascript port of the Python module aiolimiter.
class AsyncLimiter {
// Reference https://github.com/mjpieters/aiolimiter/blob/master/src/aiolimiter/leakybucket.py
#level = 0;
#last_check = 0;
#rate_per_sec;
#waiters = [];
#waker_handle;
#id = 0;
#Future;
constructor(count, interval) {
this.count = count;
this.interval = interval;
this.#rate_per_sec = count / interval;
this.#Future = class {
#promise;
#resolve;
#done = false;
constructor() {
this.#promise = new Promise(res => {
this.#resolve = val => {
res(val);
this.#done = true;
};
});
}
done() {
return this.#done;
}
set_result(val) {
this.#resolve(val);
}
add_done_callback(fn) {
this.#promise.then(() => setTimeout(fn, 0));
}
// make it thenable
then(onFulfilled, onRejected) {
return this.#promise.then(onFulfilled, onRejected);
}
};
}
#leak() {
const now = Date.now();
if (this.#level) {
const decrement = (now - this.#last_check) * this.#rate_per_sec / 1000;
this.#level = Math.max(this.#level - decrement, 0);
}
this.#last_check = now;
}
has_capacity(amount = 1) {
this.#leak();
return this.#level + amount <= this.count;
}
#wake_next() {
this.#waiters.sort((a, b) => a.amount - b.amount || a.id - b.id);
const heap = this.#waiters;
const handle = this.#waker_handle;
this.#waker_handle = null;
if (handle)
clearTimeout(handle);
while (heap.length > 0 && heap[0]['fut'].done())
heap.shift();
if (heap.length === 0)
return;
const { amount, _, fut } = heap[0];
this.#leak();
const needed = amount + this.#level - this.count;
if (needed <= 0) {
heap.shift();
fut.set_result(null);
return;
}
const wait_for = needed / this.#rate_per_sec * 1000;
this.#waker_handle = setTimeout(() => this.#wake_next(), wait_for);
}
async acquire(amount = 1) {
if (amount > this.count)
throw RangeError('Can\'t acquire more than the maximum capacity');
while (!this.has_capacity(amount)) {
let fut = new this.#Future();
fut.add_done_callback(() => setTimeout(() => this.#wake_next(), 0));
this.#waiters.push({ amount, id: this.#id++, fut });
this.#wake_next();
await fut;
}
this.#level += amount;
this.#wake_next();
}
// usage:
// let limiter = AsyncLimiter(4, 8);
// await limiter.run(() => fetch('https://www.google.com'));
async run(fn) {
await this.acquire();
return await fn();
}
}
/*
Example 1, bark 10 times without annoying my neighbors! ...Hopefully:
let limiter = new AsyncLimiter(4, 8);
let ref = Date.now();
async function task() {
return await limiter.run(() => Promise.resolve().then(e => console.log((Date.now() - ref) / 1000, 'WoofWoof')));
}
await Promise.all(Array.from({ length: 10 }, () => task()));
Example 2, crawling a web api:
let limiter = new AsyncLimiter(4, 8);
async function task() {
return await limiter.run(() => fetch('https://api.ipify.org/').then(e => e.text()));
}
await Promise.all(Array.from({ length: 10 }, () => task()));
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment