Skip to content

Instantly share code, notes, and snippets.

@burnall
Created February 11, 2017 01:11
Show Gist options
  • Save burnall/871d1a80b66f4b6a8745724745635909 to your computer and use it in GitHub Desktop.
Save burnall/871d1a80b66f4b6a8745724745635909 to your computer and use it in GitHub Desktop.
export default class JobQueue {
constructor(collection, identifier, pendingJobs, queuedJobs) {
this.collection = collection;
this.identifier = identifier;
const context = {
queue: [],
pendingJobs: pendingJobs,
queuedJobs: queuedJobs
};
this.enqueue = enqueue.bind(null, context);
this.isEmpty = isEmpty.bind(null, context);
this.instrumentation = instrumentation.bind(null, context);
}
}
function enqueue(context, jobDescription) {
log.debug(`QUEUE - queue: ${jobDescription}`);
const queueEmpty = isEmpty(context);
context.queue.push(jobDescription);
context.queuedJobs.increase();
if (queueEmpty) {
executeJob(context);
}
}
function isEmpty(context) {
return context.queue.length === 0;
}
function instrumentation(context) {
return {
size: context.queue.length,
queue: context.queue,
getSizeWithoutWaitJobs: () => context.queue.filter(elem => !elem.toString().match(/^WAIT /)).length
}
};
/**
* Will run the job that is first in the queue
*
* If it is an async job, a check is first made to make sure
* that we do not have reached the 'maxNbrOfPendingJobs'. If max has been reached,
* the job is enqueued on the pendingJobsState. For synchronous jobs, we do not check
* maxNbrOfPendingJobs.
*/
function executeJob(context) {
if (context.queue.length === 0) {
return;
}
const jobDescription = context.queue[0];
const job = jobDescription.job;
if (jobDescription.async) {
if (context.pendingJobs.isFull()) {
context.pendingJobs.enqueue({execute: executeJob, jobDescription: jobDescription});
return;
}
log.debug(`QUEUE - execute async: ${jobDescription}`);
context.pendingJobs.increase();
job(x.once(() => dequeue(context)));
} else {
log.debug(`QUEUE - execute: ${jobDescription}`);
context.pendingJobs.increase();
job();
dequeue(context);
}
}
/**
* This runs after a job has finished. It will shift the queue and
* call executeJob in order to proceed with the queue
*/
function dequeue(context) {
if (context.queue.length === 0) {
return;
}
const jobDescription = context.queue.shift();
log.debug(`QUEUE - done: ${jobDescription}`);
context.pendingJobs.decrease();
context.queuedJobs.decrease();
executeJob(context);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment