Skip to content

Instantly share code, notes, and snippets.

@burnall
Created February 11, 2017 01:25
Show Gist options
  • Save burnall/fae993695e79c87383e8b385e2e1e60b to your computer and use it in GitHub Desktop.
Save burnall/fae993695e79c87383e8b385e2e1e60b to your computer and use it in GitHub Desktop.
export default class JobQueue {
constructor(collection, identifier, pendingJobs, queuedJobs) {
this.collection = collection;
this.identifier = identifier;
// All public, do not care about incapsulation
this.queue = [];
this.pendingJobs = pendingJobs;
this.queuedJobs = queuedJobs;
}
enqueue(jobDescription) {
log.debug(`QUEUE - queue: ${jobDescription}`);
const queueEmpty = this.isEmpty();
this.queue.push(jobDescription);
this.queuedJobs.increase();
if (queueEmpty) {
this.executeJob();
}
}
isEmpty() {
return this.queue.length === 0;
}
instrumentation() {
return {
size: this.queue.length,
queue: this.queue,
getSizeWithoutWaitJobs: () => this.queue.filter(elem => !elem.toString().match(/^WAIT /)).length
}
}
/**
* Will run the job that is first in the queue
*
* If it is an async job, a check is first made to make sure
* that we do not have reached the 'maxNbrOfPendingJobs'. If max has been reached,
* the job is enqueued on the pendingJobsState. For synchronous jobs, we do not check
* maxNbrOfPendingJobs.
*/
executeJob() {
if (this.queue.length === 0) {
return;
}
const jobDescription = this.queue[0];
const job = jobDescription.job;
if (jobDescription.async) {
if (this.pendingJobs.isFull()) {
this.pendingJobs.enqueue({execute: this.executeJob, jobDescription: jobDescription});
return;
}
log.debug(`QUEUE - execute async: ${jobDescription}`);
this.pendingJobs.increase();
job(x.once(this.dequeue.bind(this)));
} else {
log.debug(`QUEUE - execute: ${jobDescription}`);
this.pendingJobs.increase();
job();
this.dequeue();
}
}
/**
* This runs after a job has finished. It will shift the queue and
* call executeJob in order to proceed with the queue
*/
dequeue() {
if (this.queue.length === 0) {
return;
}
const jobDescription = this.queue.shift();
log.debug(`QUEUE - done: ${jobDescription}`);
this.pendingJobs.decrease();
this.queuedJobs.decrease();
this.executeJob();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment