Skip to content

Instantly share code, notes, and snippets.

@moyarich
Created June 9, 2020 22:34
Show Gist options
  • Save moyarich/4d6735b8d417c5e2f7e5f03469d32fb7 to your computer and use it in GitHub Desktop.
Save moyarich/4d6735b8d417c5e2f7e5f03469d32fb7 to your computer and use it in GitHub Desktop.
Implementation of a processOne function to process a single item in the queue
const Queue = require("bull");
const Job = Queue.Job;
/**
* Process only one queue at a time
* Bull has not implimented this feature
* @param callback {function(Object)} This will be called after pulling a job from the queue.
* A job will be passed as the first parameter
*
* @return {Object} The information about the next Job that is in the queue
*
* https://github.com/OptimalBits/bull/blob/master/PATTERNS.md#manually-fetching-jobs
* @since: 20200227
*
* If callback returns an Error , flag the job as failed
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
*/
if (typeof Queue.prototype.processOne !== "function") {
Queue.prototype.processOne = function (callback) {
return new Promise(async (resolve, reject) => {
//Pull a job from 'waiting' and move it to 'active'.
await this.isReady();
const job = await this.getNextJob();
if (job) {
return callback(job)
.then(async (data) => {
await this.isReady();
//Move the job to the 'completed' queue.
const [nextJobData, nextJobId] =
(await job.moveToCompleted("succeeded", true)) || [];
let nextJob;
if (nextJobData) {
nextJob = Job.fromJSON(this, nextJobData, nextJobId);
}
return resolve(nextJob);
})
.catch((error) => {
//Move the job to the 'failed' queue if something goes wrong.
job.moveToFailed(
{
message: error.message,
},
true
);
return reject(error);
});
} else {
return resolve();
}
});
};
}
module.exports = Queue;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment