Created
September 25, 2013 03:37
-
-
Save tjstebbing/6694905 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Mongo DB queue helper. | |
var oneMinute = 60 * 1000; // in js milliseconds. | |
var lockTime = 30 * oneMinute; | |
var safeOpts = { safe: true, w: 1, journal: true }; | |
/* processQueuedItems(collection, filter, taskFunc, log, callback) | |
* | |
* collection: a mongodb Collection object | |
* filter: a javascript object with mongo query terms (or null) | |
* taskFunc: called for each queued item with (item, callback) | |
* log: logging function for error messages | |
* callback: finish callback, called when no queued items remain | |
* | |
* The taskFunc will be called with the item locked in mongodb. | |
* It should process the item, then call callback(err, keep, updates) | |
* to unlock the item. | |
* If keep is false, the item will be deleted from mongo forever. | |
* If keep is true, you MUST ensure that the item will not match the | |
* filter conditions again immediately (e.g. by returning some updates | |
* or updating the item yourself) otherwise the item will be found | |
* and processed again. | |
* If updates is specified, it must be an object containing mongodb fields | |
* to set or mongo update operations such as $set, $push, etc. | |
*/ | |
exports.processQueuedItems = function (collection, filter, taskFunc, log, callback) { | |
// find items that have no lock, or were locked in the past beyond | |
// the lock timeout duration. | |
// locks taken before this time have expired. | |
var now = Date.now(), locksExpire = now - lockTime; | |
var isUnlocked = { $or: [ {locked:null}, {locked:{$lt:locksExpire}} ] }; | |
var notBlocked = { blocked: null }; | |
var andTerms = [ isUnlocked, notBlocked ]; | |
if (filter) andTerms.push( filter ); | |
var query = { $and: andTerms }; | |
// the task handler; continue until we cannot find an unlocked item. | |
function findAndProcess() { | |
// query for an unlocked item and lock it atomically. | |
collection.findAndModify(query, {}, { | |
$set: { locked: now } | |
}, safeOpts, function (err, item) { | |
if (err) { | |
var msg = "processQueuedItems: error querying db:"+err.toString(); | |
log(msg); | |
return callback(new Error(msg)); | |
} | |
if (!item) { | |
// the queue is empty, or all items are locked. | |
return callback(null); | |
} | |
// run the async task handler over the item. | |
try { | |
taskFunc( item, finishItem ); | |
} catch (task_err) { | |
finishItem( task_err ); | |
} | |
function finishItem(err, keep, updates) { | |
// async handler has finished or crashed. | |
if (err) { | |
// advisory: log but do not abort. | |
log("processQueuedItems: error processing item: " + | |
JSON.stringify(item,null,2) + "\n" + err.stack); | |
// mark the item as blocked so we won't process it again. | |
// this allows us to inspect, fix and retry broken items. | |
keep = true; | |
updates = { blocked: true, lastError: err.stack||err.toString() }; | |
} | |
// task completed: unlock and apply optional updates. | |
if (keep) { | |
var changes = promoteFieldsToSet(updates); | |
changes.$set.locked = null; // unlock the item. | |
collection.update({_id:item._id}, changes, safeOpts, | |
function (err) { | |
if (err) { | |
// advisory: log but do not abort. | |
log("processQueuedItems: error unlocking item: " + | |
JSON.stringify(item,null,2) + " with update " + | |
JSON.stringify(updates,null,2) + err.toString()); | |
} | |
// unwind the stack, then find the next item to process. | |
process.nextTick(findAndProcess); | |
}); | |
} else { | |
collection.remove({_id:item._id}, safeOpts, | |
function (err) { | |
if (err) { | |
// advisory: log but do not abort. | |
log("processQueuedItems: error removing item: " + | |
JSON.stringify(item,null,2) + err.toString()); | |
} | |
// unwind the stack, then find the next item to process. | |
process.nextTick(findAndProcess); | |
}); | |
} | |
} | |
}); | |
} | |
findAndProcess(); | |
}; | |
function promoteFieldsToSet(updates) { | |
// promote top-level fields to mongodb $set operations | |
// and merge with any other $-prefixed operations. | |
var sets = {}, changeSet = { $set: sets }; | |
if (updates) { | |
for (var key in updates) { | |
if (key.charAt(0) === '$') { | |
if (key === '$set') { | |
// merge these with our $set op. | |
var setOp = updates[key]; | |
for (var setKey in setOp) { | |
sets[setKey] = setOp[setKey]; | |
} | |
} else { | |
// keep other mongo ops as-is. | |
changeSet[key] = updates[key]; | |
} | |
} else { | |
// top-level fields become $set ops. | |
sets[key] = updates[key]; | |
} | |
} | |
} | |
return changeSet; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment