Created
March 4, 2017 12:08
-
-
Save apendua/6387125a6a1c8fa475acf62142f794e4 to your computer and use it in GitHub Desktop.
A simple pattern for concurrent workers using observers from Meteor collections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Meteor } from 'meteor/meteor'; | |
import { Tasks } from '/imports/common/collections'; | |
// The workerId should be unique, and at the same time it should persist | |
// across server restarts, because otherwise we may end up with unfinished tasks falling into limbo. | |
const workerId = process.env.WORKER_ID || 'theOnlyWorker'; | |
const tasksInProgress = {}; | |
const makeTaskProcessor = execute => (task) => { | |
if (!task.workerId) { | |
Meteor.defer(() => { | |
Tasks.update({ | |
_id: task._id, | |
workerId: null, | |
}, { | |
$set: { | |
workerId, | |
}, | |
}); | |
}); | |
} else if (!tasksInProgress[task._id]) { | |
tasksInProgress[task._id] = true; | |
Meteor.defer(() => { | |
execute(task); | |
delete tasksInProgress[task._id]; | |
}); | |
} | |
}; | |
const processTask = makeTaskProcessor((task) => { | |
// Here is where the task execution happens. | |
// ... | |
// At the end we need to remove it somehow from tasks queue. | |
// Removing from the collection is the simplest option, but not the only one. | |
Tasks.remove({ _id: task._id }); | |
}); | |
Meteor.startup(() => { | |
// On each restart we clear assignment of tasks that could not be accomplished for some reason. | |
Tasks.update({ workerId }, { $unset: { workerId: 1 } }, { multi: true }); | |
// With this setup, the observer ensures that there are always at most 10 items | |
Tasks.find({ | |
$or: [ | |
{ workerId: null }, | |
{ workerId }, | |
], | |
}, { | |
limit: 10, | |
// With "workerId: -1" nulls come last, which is exactly what we want. | |
sort: { workerId: -1 }, | |
}).observe({ | |
added: processTask, | |
changed: processTask, | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment