Created
February 11, 2017 01:11
-
-
Save burnall/871d1a80b66f4b6a8745724745635909 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default class JobQueue { | |
constructor(collection, identifier, pendingJobs, queuedJobs) { | |
this.collection = collection; | |
this.identifier = identifier; | |
const context = { | |
queue: [], | |
pendingJobs: pendingJobs, | |
queuedJobs: queuedJobs | |
}; | |
this.enqueue = enqueue.bind(null, context); | |
this.isEmpty = isEmpty.bind(null, context); | |
this.instrumentation = instrumentation.bind(null, context); | |
} | |
} | |
function enqueue(context, jobDescription) { | |
log.debug(`QUEUE - queue: ${jobDescription}`); | |
const queueEmpty = isEmpty(context); | |
context.queue.push(jobDescription); | |
context.queuedJobs.increase(); | |
if (queueEmpty) { | |
executeJob(context); | |
} | |
} | |
function isEmpty(context) { | |
return context.queue.length === 0; | |
} | |
function instrumentation(context) { | |
return { | |
size: context.queue.length, | |
queue: context.queue, | |
getSizeWithoutWaitJobs: () => context.queue.filter(elem => !elem.toString().match(/^WAIT /)).length | |
} | |
}; | |
/** | |
* Will run the job that is first in the queue | |
* | |
* If it is an async job, a check is first made to make sure | |
* that we do not have reached the 'maxNbrOfPendingJobs'. If max has been reached, | |
* the job is enqueued on the pendingJobsState. For synchronous jobs, we do not check | |
* maxNbrOfPendingJobs. | |
*/ | |
function executeJob(context) { | |
if (context.queue.length === 0) { | |
return; | |
} | |
const jobDescription = context.queue[0]; | |
const job = jobDescription.job; | |
if (jobDescription.async) { | |
if (context.pendingJobs.isFull()) { | |
context.pendingJobs.enqueue({execute: executeJob, jobDescription: jobDescription}); | |
return; | |
} | |
log.debug(`QUEUE - execute async: ${jobDescription}`); | |
context.pendingJobs.increase(); | |
job(x.once(() => dequeue(context))); | |
} else { | |
log.debug(`QUEUE - execute: ${jobDescription}`); | |
context.pendingJobs.increase(); | |
job(); | |
dequeue(context); | |
} | |
} | |
/** | |
* This runs after a job has finished. It will shift the queue and | |
* call executeJob in order to proceed with the queue | |
*/ | |
function dequeue(context) { | |
if (context.queue.length === 0) { | |
return; | |
} | |
const jobDescription = context.queue.shift(); | |
log.debug(`QUEUE - done: ${jobDescription}`); | |
context.pendingJobs.decrease(); | |
context.queuedJobs.decrease(); | |
executeJob(context); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment