-
-
Save SrJSDev/1fd011ee2eed492f486794dbd6343913 to your computer and use it in GitHub Desktop.
A simple queue to throttle max X simultaneous async function executions (JavaScript)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default function newQueue( | |
maxCurr = 10, | |
func, | |
{ logger, logtag = "" } = {} | |
) { | |
let curr = 0; | |
const queue = []; | |
async function scheduleWork() { | |
if (curr >= maxCurr || queue.length === 0) return; | |
curr++; | |
const { func, args, resolve, reject } = queue.shift(); | |
try { | |
logger && logger(`${logtag}[Q] WORK STARTED:`, curr, queue.length); | |
resolve(await func(...args)); // handle async AND sync functions | |
} catch (err) { | |
logger && logger(`${logtag}[Q] !! WORK ERROR:`, err.message, func.name); | |
reject(err); | |
} | |
curr--; | |
logger && logger(`${logtag}[Q] WORK ENDED: `, curr, queue.length); | |
scheduleWork(); // schedule next func exec | |
} | |
const hoc = (...args) => { | |
return new Promise((resolve, reject) => { | |
queue.push({ func, args, resolve, reject }); | |
scheduleWork(); // schedule next func exec | |
}); | |
}; | |
hoc.active = () => curr; | |
hoc.size = () => queue.length; | |
return hoc; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default function newQueue(maxCurr = 10, { logger, logtag = "" } = {}) { | |
let curr = 0; | |
const queue = []; | |
async function scheduleWork() { | |
if (curr >= maxCurr || queue.length === 0) return; | |
curr++; | |
const { func, args, resolve, reject } = queue.shift(); | |
try { | |
logger && logger(`${logtag}[Q] WORK STARTED:`, curr, queue.length); | |
resolve(await func(...args)); // handle async AND sync functions | |
} catch (err) { | |
logger && logger(`${logtag}[Q] !! WORK ERROR:`, err.message, func.name); | |
reject(err); | |
} | |
curr--; | |
logger && logger(`${logtag}[Q] WORK ENDED: `, curr, queue.length); | |
scheduleWork(); // schedule next func exec | |
} | |
const hoc = | |
(func) => | |
(...args) => { | |
return new Promise((resolve, reject) => { | |
queue.push({ func, args, resolve, reject }); | |
scheduleWork(); // schedule next func exec | |
}); | |
}; | |
hoc.active = () => curr; | |
hoc.size = () => queue.length; | |
return hoc; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const delayReject = (sec, data) => | |
new Promise((_, rej) => | |
setTimeout(() => { | |
rej(data); | |
}, sec * 1000) | |
); | |
export default function newQueue( | |
maxCurr = 10, | |
func, | |
{ logger, logtag = "", timeoutSecs = 0 } = {} | |
) { | |
let curr = 0; | |
const queue = []; | |
const notAbortSymbol = Symbol('notAbort'); | |
const abortPromise = | |
timeoutSecs > 0 | |
? async () => delayReject(timeoutSecs, `${logtag}[Q] Timeout!`) | |
: () => notAbortSymbol; | |
async function scheduleWork() { | |
if (curr >= maxCurr || queue.length === 0) return; | |
curr++; | |
const { func, args, resolve, reject } = queue.shift(); | |
try { | |
logger && logger(`${logtag}[Q] WORK STARTED:`, curr, queue.length); | |
const ps = [func(...args), abortPromise()].filter(x => x != notAbortSymbol); | |
resolve(await Promise.race(ps)); // handle async AND sync functions | |
} catch (err) { | |
logger && logger(`${logtag}[Q] !! WORK ERROR:`, err.message, func.name); | |
reject(err); | |
} | |
curr--; | |
logger && logger(`${logtag}[Q] WORK ENDED: `, curr, queue.length); | |
scheduleWork(); // schedule next func exec | |
} | |
const hoc = (...args) => { | |
return new Promise((resolve, reject) => { | |
queue.push({ func, args, resolve, reject }); | |
scheduleWork(); // schedule next func exec | |
}); | |
}; | |
hoc.active = () => curr; | |
hoc.size = () => queue.length; | |
return hoc; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment