Skip to content

Instantly share code, notes, and snippets.

@timoxley
Created December 20, 2011 18:31
Show Gist options
  • Save timoxley/1502645 to your computer and use it in GitHub Desktop.
Save timoxley/1502645 to your computer and use it in GitHub Desktop.
mongoose oplog
'use strict'
GLOBAL.DEBUG = true;
var mongoose = require('mongoose')
mongoose.connect('mongodb://localhost/groupdock_development')
mongoose.connection.on('open', function() {
var test = require("assert");
var Slave = function() {
this.running = false;
this.callbacks = [];
//no native_parser right now (because timestamps)
//no strict mode (because system db signed with $ db.js line 189)
//connect without dbName for querying not only "local" db
//console.log("Connecting to " + host + ":" + port);
//this.db = new Db('testing', new Server(host, port, {}), {});
this.db = mongoose.connection
}
//start watching
Slave.prototype.start = function() {
console.log(this.running)
var self = this;
var db = this.db
if (this.running) return;
this.db.open(function(err, db) {
if (err) {
console.log('> MongoSlave error' + err);
process.exit(1);
}
db.collection('local.oplog.rs', function(err, collection) {
// never gets here
console.log('B')
if (! collection) {
console.log('> MongoSlave - local.oplog.rs not found');
self.stop();
return false;
}
process.on('SIGINT', function () {
self.stop(); //tailable cursor should be stopped manually
});
//get last row for init TS
collection.find({}, {'limit': 1, 'sort': [['$natural', -1]]}, function(err, cursor) {
console.log('A')
cursor.toArray(function(err, items) {
if (items.length) {
console.log('> MongoSlave started');
self.running = true;
self._runSlave(collection, items[0]['ts']);
} else if (err) {
console.log(err);
self.stop();
}
});
});
});
})
}
//stop watching
Slave.prototype.stop = function() {
if (!this.running) return;
console.log('> MongoSlave stopped');
this.running = false;
this.db.close();
}
Slave.prototype._runSlave = function(collection, time) {
console.log('runslave')
var self = this;
//watch oplog INFINITE (until Slave.stop())
collection.find({'updatedAt': {'$gt': time}}, {'tailable': 1, 'sort': [['$natural', 1]]}, function(err, cursor) {
cursor.each(function(err, item) {
if (cursor.state == 'closed') { //broken cursor
self.running && self._runSlave(collection, time);
return;
}
time = item['ts'];
switch(item['op']) {
case 'i': //inserted
self._emitObj(item['o']);
break;
case 'u': //updated
self.db.collection(item['ns'], function(err, collection) {
collection.findOne(item['o2']['_id'], {}, function(err, item) {
item && self._emitObj(item);
});
});
break;
case 'd': //deleted
//nothing to do
break;
}
});
});
}
Slave.prototype._emitObj = function (obj) {
for(var i in this.callbacks) this.callbacks[i].call(this, obj);
}
Slave.prototype.onObject = function(callback) {
this.callbacks.push(callback);
}
//just for example
var watcher = new Slave();
watcher.onObject(function(obj) {
console.dir(obj);
});
watcher.start();
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment