Last active
October 16, 2023 13:26
-
-
Save rkatic/6a7f2235d916f13c0de2b27aeef0e8c0 to your computer and use it in GitHub Desktop.
Elegant implementation of concurrency/speed limiter.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** @param {number} n */ | |
function semaphore (n) { | |
const resolvers = [] | |
const pushToResolvers = resolve => { resolvers.push(resolve) } | |
/** @type {() => (Promise<void> | void)} */ | |
const acquire = () => { | |
if (--n < 0) { | |
return new Promise(pushToResolvers) | |
} | |
} | |
const release = () => { | |
if (resolvers.length > 0) { | |
resolvers.shift()() | |
} | |
++n | |
} | |
return { acquire, release } | |
} | |
/** @typedef {(...args: any[]) => any} AnyFn */ | |
/** | |
* Create a version of a function with limited concurrency and/or speed. | |
* | |
* @example | |
* // Limited version of `fetch` with no more than 100 concurrent requests. | |
* `const queueFetch = limited(fetch, { concurrency: 100 })` | |
* | |
* @template {AnyFn} F | |
* @param {F} fn | |
* @param {{ concurrency?: number, speed?: number }} options | |
* @returns {(...args: Parameters<F>) => Promise<Awaited<ReturnType<F>>>} | |
*/ | |
function limited (fn, { concurrency = Infinity, speed = Infinity }) { | |
concurrency = Math.min(concurrency, speed) | |
const endDelay = speed && Math.ceil(concurrency / speed * 1000) | |
const { acquire, release } = semaphore(concurrency) | |
async function limitedFn(...args) { | |
await acquire() | |
try { | |
return await fn.apply(this, args) | |
} finally { | |
if (endDelay) { | |
setTimeout(release, endDelay) | |
} else { | |
release() | |
} | |
} | |
} | |
return Object.defineProperty(limitedFn, 'name', { | |
value: fn.name, | |
configurable: true, | |
}) | |
} | |
// const queueFetch = limited(window.fetch, { concurrency: 100 }) | |
/** | |
* @param {{ concurrency?: number, speed?: number }} options | |
* @returns {<F extends AnyFn>(fn: F) => (...args: Parameters<F>) => Promise<Awaited<ReturnType<F>>>} | |
*/ | |
const limitBy = options => fn => limited(fn, options) | |
// const queueFetch2 = limitBy({ concurrency: 100 })(window.fetch) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment