Created
February 11, 2017 01:13
-
-
Save burnall/39a22e02a80ad896a1d9401032c8e2e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default function JobQueue(collection, identifier, pendingJobs, queuedJobs) { | |
const queue = []; | |
this.collection = collection; | |
this.identifier = identifier; | |
this.enqueue = jobDescription => { | |
log.debug(`QUEUE - queue: ${jobDescription}`); | |
const queueEmpty = this.isEmpty(); | |
queue.push(jobDescription); | |
queuedJobs.increase(); | |
if (queueEmpty) { | |
executeJob(); | |
} | |
}; | |
this.isEmpty = () => queue.length === 0; | |
this.instrumentation = () => ({ | |
size: queue.length, | |
queue: queue, | |
getSizeWithoutWaitJobs: () => queue.filter(elem => !elem.toString().match(/^WAIT /)).length | |
}); | |
/** | |
* Will run the job that is first in the queue | |
* | |
* If it is an async job, a check is first made to make sure | |
* that we do not have reached the 'maxNbrOfPendingJobs'. If max has been reached, | |
* the job is enqueued on the pendingJobsState. For synchronous jobs, we do not check | |
* maxNbrOfPendingJobs. | |
*/ | |
function executeJob() { | |
if (queue.length === 0) { | |
return; | |
} | |
const jobDescription = queue[0]; | |
const job = jobDescription.job; | |
if (jobDescription.async) { | |
if (pendingJobs.isFull()) { | |
pendingJobs.enqueue({execute: executeJob, jobDescription: jobDescription}); | |
return; | |
} | |
log.debug(`QUEUE - execute async: ${jobDescription}`); | |
pendingJobs.increase(); | |
job(x.once(dequeue)); | |
} else { | |
log.debug(`QUEUE - execute: ${jobDescription}`); | |
pendingJobs.increase(); | |
job(); | |
dequeue(); | |
} | |
} | |
/** | |
* This runs after a job has finished. It will shift the queue and | |
* call executeJob in order to proceed with the queue | |
*/ | |
function dequeue() { | |
if (queue.length === 0) { | |
return; | |
} | |
const jobDescription = queue.shift(); | |
log.debug(`QUEUE - done: ${jobDescription}`); | |
pendingJobs.decrease(); | |
queuedJobs.decrease(); | |
executeJob(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment