Skip to content

Instantly share code, notes, and snippets.

@tjstebbing
Created September 25, 2013 03:37
Show Gist options
  • Save tjstebbing/6694905 to your computer and use it in GitHub Desktop.
Save tjstebbing/6694905 to your computer and use it in GitHub Desktop.
// Mongo DB queue helper.
var oneMinute = 60 * 1000; // in js milliseconds.
var lockTime = 30 * oneMinute;
var safeOpts = { safe: true, w: 1, journal: true };
/* processQueuedItems(collection, filter, taskFunc, log, callback)
*
* collection: a mongodb Collection object
* filter: a javascript object with mongo query terms (or null)
* taskFunc: called for each queued item with (item, callback)
* log: logging function for error messages
* callback: finish callback, called when no queued items remain
*
* The taskFunc will be called with the item locked in mongodb.
* It should process the item, then call callback(err, keep, updates)
* to unlock the item.
* If keep is false, the item will be deleted from mongo forever.
* If keep is true, you MUST ensure that the item will not match the
* filter conditions again immediately (e.g. by returning some updates
* or updating the item yourself) otherwise the item will be found
* and processed again.
* If updates is specified, it must be an object containing mongodb fields
* to set or mongo update operations such as $set, $push, etc.
*/
exports.processQueuedItems = function (collection, filter, taskFunc, log, callback) {
// find items that have no lock, or were locked in the past beyond
// the lock timeout duration.
// locks taken before this time have expired.
var now = Date.now(), locksExpire = now - lockTime;
var isUnlocked = { $or: [ {locked:null}, {locked:{$lt:locksExpire}} ] };
var notBlocked = { blocked: null };
var andTerms = [ isUnlocked, notBlocked ];
if (filter) andTerms.push( filter );
var query = { $and: andTerms };
// the task handler; continue until we cannot find an unlocked item.
function findAndProcess() {
// query for an unlocked item and lock it atomically.
collection.findAndModify(query, {}, {
$set: { locked: now }
}, safeOpts, function (err, item) {
if (err) {
var msg = "processQueuedItems: error querying db:"+err.toString();
log(msg);
return callback(new Error(msg));
}
if (!item) {
// the queue is empty, or all items are locked.
return callback(null);
}
// run the async task handler over the item.
try {
taskFunc( item, finishItem );
} catch (task_err) {
finishItem( task_err );
}
function finishItem(err, keep, updates) {
// async handler has finished or crashed.
if (err) {
// advisory: log but do not abort.
log("processQueuedItems: error processing item: " +
JSON.stringify(item,null,2) + "\n" + err.stack);
// mark the item as blocked so we won't process it again.
// this allows us to inspect, fix and retry broken items.
keep = true;
updates = { blocked: true, lastError: err.stack||err.toString() };
}
// task completed: unlock and apply optional updates.
if (keep) {
var changes = promoteFieldsToSet(updates);
changes.$set.locked = null; // unlock the item.
collection.update({_id:item._id}, changes, safeOpts,
function (err) {
if (err) {
// advisory: log but do not abort.
log("processQueuedItems: error unlocking item: " +
JSON.stringify(item,null,2) + " with update " +
JSON.stringify(updates,null,2) + err.toString());
}
// unwind the stack, then find the next item to process.
process.nextTick(findAndProcess);
});
} else {
collection.remove({_id:item._id}, safeOpts,
function (err) {
if (err) {
// advisory: log but do not abort.
log("processQueuedItems: error removing item: " +
JSON.stringify(item,null,2) + err.toString());
}
// unwind the stack, then find the next item to process.
process.nextTick(findAndProcess);
});
}
}
});
}
findAndProcess();
};
function promoteFieldsToSet(updates) {
// promote top-level fields to mongodb $set operations
// and merge with any other $-prefixed operations.
var sets = {}, changeSet = { $set: sets };
if (updates) {
for (var key in updates) {
if (key.charAt(0) === '$') {
if (key === '$set') {
// merge these with our $set op.
var setOp = updates[key];
for (var setKey in setOp) {
sets[setKey] = setOp[setKey];
}
} else {
// keep other mongo ops as-is.
changeSet[key] = updates[key];
}
} else {
// top-level fields become $set ops.
sets[key] = updates[key];
}
}
}
return changeSet;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment