Last active
November 15, 2022 06:33
-
-
Save crazy4groovy/b2928bac94fa5055f82e6d64a1c3eb7c to your computer and use it in GitHub Desktop.
A simple queue to throttle max X simultaneous async function executions (JavaScript)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default function newQueue( | |
maxCurr = 10, | |
func, | |
{ logger, logtag = "" } = {} | |
) { | |
let curr = 0; | |
const queue = []; | |
async function scheduleWork() { | |
if (curr >= maxCurr || queue.length === 0) return; | |
curr++; | |
const { func, args, resolve, reject } = queue.shift(); | |
try { | |
logger && logger(`${logtag}[Q] WORK STARTED:`, curr, queue.length); | |
resolve(await func(...args)); // handle async AND sync functions | |
} catch (err) { | |
logger && logger(`${logtag}[Q] !! WORK ERROR:`, err.message, func.name); | |
reject(err); | |
} | |
curr--; | |
logger && logger(`${logtag}[Q] WORK ENDED: `, curr, queue.length); | |
scheduleWork(); // schedule next func exec | |
} | |
const hoc = (...args) => { | |
return new Promise((resolve, reject) => { | |
queue.push({ func, args, resolve, reject }); | |
scheduleWork(); // schedule next func exec | |
}); | |
}; | |
hoc.active = () => curr; | |
hoc.size = () => queue.length; | |
return hoc; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default function newQueue(maxCurr = 10, { logger, logtag = "" } = {}) { | |
let curr = 0; | |
const queue = []; | |
async function scheduleWork() { | |
if (curr >= maxCurr || queue.length === 0) return; | |
curr++; | |
const { func, args, resolve, reject } = queue.shift(); | |
try { | |
logger && logger(`${logtag}[Q] WORK STARTED:`, curr, queue.length); | |
resolve(await func(...args)); // handle async AND sync functions | |
} catch (err) { | |
logger && logger(`${logtag}[Q] !! WORK ERROR:`, err.message, func.name); | |
reject(err); | |
} | |
curr--; | |
logger && logger(`${logtag}[Q] WORK ENDED: `, curr, queue.length); | |
scheduleWork(); // schedule next func exec | |
} | |
const hoc = | |
(func) => | |
(...args) => { | |
return new Promise((resolve, reject) => { | |
queue.push({ func, args, resolve, reject }); | |
scheduleWork(); // schedule next func exec | |
}); | |
}; | |
hoc.active = () => curr; | |
hoc.size = () => queue.length; | |
return hoc; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const delayReject = (sec, data) => | |
new Promise((_, rej) => | |
setTimeout(() => { | |
rej(data); | |
}, sec * 1000) | |
); | |
export default function newQueue( | |
maxCurr = 10, | |
func, | |
{ logger, logtag = "", timeoutSecs = 0 } = {} | |
) { | |
let curr = 0; | |
const queue = []; | |
const notAbortSymbol = Symbol('notAbort'); | |
const abortPromise = | |
timeoutSecs > 0 | |
? async () => delayReject(timeoutSecs, `${logtag}[Q] Timeout!`) | |
: () => notAbortSymbol; | |
async function scheduleWork() { | |
if (curr >= maxCurr || queue.length === 0) return; | |
curr++; | |
const { func, args, resolve, reject } = queue.shift(); | |
try { | |
logger && logger(`${logtag}[Q] WORK STARTED:`, curr, queue.length); | |
const ps = [func(...args), abortPromise()].filter(x => x != notAbortSymbol); | |
resolve(await Promise.race(ps)); // handle async AND sync functions | |
} catch (err) { | |
logger && logger(`${logtag}[Q] !! WORK ERROR:`, err.message, func.name); | |
reject(err); | |
} | |
curr--; | |
logger && logger(`${logtag}[Q] WORK ENDED: `, curr, queue.length); | |
scheduleWork(); // schedule next func exec | |
} | |
const hoc = (...args) => { | |
return new Promise((resolve, reject) => { | |
queue.push({ func, args, resolve, reject }); | |
scheduleWork(); // schedule next func exec | |
}); | |
}; | |
hoc.active = () => curr; | |
hoc.size = () => queue.length; | |
return hoc; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment