Skip to content

Instantly share code, notes, and snippets.

@abrkn
Created May 16, 2012 15:25
Show Gist options
  • Save abrkn/2711252 to your computer and use it in GitHub Desktop.
Save abrkn/2711252 to your computer and use it in GitHub Desktop.
var async = require('async');
module.exports = function(config) {
var group = {
name: 'Search Indexer',
tasks: []
};
group.tasks.push(task = {
group: group.name,
name: 'Shows',
query: {
$and: [
{ $or: [
{ searchIndexingAt: { $exists : false } },
{ searchIndexingAt: { $lt: new Date(new Date() - 3600000) } }
] },
{ $or: [
{ searchIndexedAt: { $exists : false } },
{ searchIndexedAt: { $lt: new Date(new Date() - 3600000) } }
] }
]
},
prepare: function(callback) {
async.series([
function(callback) {
config.db.collection("shows").ensureIndex({ searchIndexingAt: 1, searchIndexedAt: 1 }, callback);
},
function(callback) {
config.db.collection("shows").ensureIndex({ searchIndexingAt: 1 }, callback);
},
function(callback) {
config.db.collection("shows").ensureIndex({ keywords: 1 }, callback);
},
function(callback) {
config.db.collection("shows").ensureIndex({ airDate: 1 }, callback);
},
function(callback) {
config.db.collection("shows").ensureIndex({ keywords: 1, airDate: 1 }, callback);
},
], callback);
},
find: function(callback) {
return config.db.collection("shows").find(task.query);
},
run: function(callback) {
var reservation = new Date, state = {};
async.series({
reserve: function(step) {
config.db.collection("shows").update(task.query, { $set: { searchIndexingAt: new Date } }, function(err, res) {
if (err) return step(err);
// Reservation failed from concurrent workers.
if (!res) return callback();
step();
});
},
locate: function(step) {
var q = { searchIndexingAt: reservation };
var f = { airDate: 1, name: 1 };
config.db.collection("shows").findOne(q, f, function(err, show) {
if (err) return step(err);
if (!show) return step(new Error("Failed locate reserved object."));
state.show = show;
step();
});
},
modify: function(step) {
var words =
state.show.name + ' ' +
state.show._id;
var keywords = [], keywordsIndex = [];
if (state.show.airDate && typeof state.show.airDate == 'string') {
words += state.show.airDate.substr(0, 4) + ' ';
}
words = words.toLowerCase().replace(/[^[^a-z\d]/g, ' ').replace(/\s{2,}/g, ' ').replace(/(^\s|\s$)/g, '').split(/\s/);
for (var wordN in words) {
var sw = 'the;an;on;in;for;by;episode;season;', word = words[wordN];
if (word.length <= 1 || sw.indexOf(word + ';') != -1) {
continue;
}
if (!keywordsIndex[word]) {
keywordsIndex[word] = 1;
keywords.push(word);
}
}
var firstWordRe = /^[^a-z\d]*([a-z\d]+)/;
var showNameWord = firstWordRe.exec(state.show.name.toLowerCase().replace(/^the\s/, ''));
if (showNameWord) {
showNameWord = showNameWord[1];
// Partial show name
for (var i = 3; i < showNameWord.length; i++) {
var word = showNameWord.substr(0, i);
if (!keywordsIndex[word]) {
keywordsIndex[word] = 1;
keywords.push(word);
}
}
}
state.keywords = keywords;
step();
},
update: function(step) {
var u = { $set: { searchIndexedAt : new Date, keywords : state.keywords }, $unset: { searchIndexingAt: 1 } };
config.db.collection("shows").update({ _id : state.show._id }, u, function(err, res) {
if (err) return step(err);
if (!res) return step(new Error("Failed to update show."));
step(null, "Show #" + state.show._id + " updated with " + state.keywords.length + " keywords.");
});
}
}, function(err, res) {
if (err) return callback(err);
callback(res.update);
});
},
estimate: function(callback) {
task.find().count(callback);
}
});
return group;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment