Skip to content

Instantly share code, notes, and snippets.

@scttnlsn
Created July 30, 2012 22:16
Show Gist options
  • Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Pub/sub with MongoDB and Node.js

Pub/sub with MongoDB and Node.js

Setup:

$ mongo
> use pubsub
> db.createCollection('messages', { capped: true, size: 100000 })
> db.messages.insert({})

$ npm install mongodb

Subscribe:

$ node subscribe.js

Publish:

$ mongo
> use pubsub
> db.messages.insert({ message: 'Hello world', time: Date.now() })
var mongo = require('mongodb');
var server = new mongo.Server('localhost', 27017);
var db = new mongo.Db('pubsub', server);
db.open(function(err) {
if (err) throw err;
db.collection('messages', function(err, collection) {
if (err) throw err;
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, doc) {
if (err) throw err;
var query = { _id: { $gt: doc._id }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, message) {
if (err) throw err;
console.log(message);
next();
});
})();
});
});
});
@lcneves
Copy link

lcneves commented Feb 13, 2019

subscribe.js, line 26: We're trusting that cursor.nextObject() will put the callback in the event loop, which is a reasonable assumption for an asynchronous call, but it still relies on the implementation. If cursor.nextObject() calls the callback directly, we will eventually have a "Maximum call stack exceeded" error. Just to be on the safe side, we could change this line to process.nextTick(() => next());.

@papnkukn
Copy link

Update for the 2019s - mongodb 3.3.4 or similar

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://localhost:27017';
const dbname = 'pubsub';

MongoClient.connect(url, { useUnifiedTopology: true }, function(err, client) {
  if (err) throw err;
 
  const db = client.db(dbname);
  
  db.collection('messages', function(err, collection) {
      if (err) throw err;

      var latest = collection.find({ }).limit(1); //.sort({ $natural: -1 })

      latest.next(function(err, doc) {
          if (err) throw err;

          var query = { _id: { $gt: doc._id }};
          
          var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
          var cursor = collection.find(query, options); //.sort({ $natural: 1 })

          (function next() {
              cursor.next(function(err, message) {
                  if (err) throw err;
                  console.log(message);
                  next();
                  //or better process.nextTick(() => next());
              });
          })();
      });
  });
 
  //client.close();
});

@xtianus79
Copy link

is this now obsolete with streams?

@hugojerez
Copy link

An adaptation I made with Mongoose on a project with es-lint rules

      this.mongooseClient =  this.app.get('mongooseClient');
      this.Model  = this.mongooseClient.model('events');
subscribe(callback) {
      let lastMessageIDReceived; 
      // subscribe to message queue and emit data
      // Get the last registered item
      var latest = this.Model.find({ }).sort({ $natural: -1 }).limit(1); 
  
      latest.cursor().next((err, doc)=> {
        if (err) throw err;
  
        var query = { _id: { $gt: doc._id }};
        var options = { data:1, tailable: true, awaitdata: true, numberOfRetries: -1 };
        var cursor = this.Model.find(query, options).sort({ $natural: -1 }).limit(1);
      
        const cursorIterator =  () =>{
          cursor.cursor().next((err, message) =>{
            if (err) throw err;
            // Avoid nulls and Repeated messages
            if(message && lastMessageIDReceived != String(message._id)){
                // JSON Parse-Stringify to remove Getters and Setters
              const result =  JSON.parse(JSON.stringify(message));
                // String cast to remove Getters and Setters
                lastMessageIDReceived = String(message._id);
              // Execute our custom callback
              callback(result.data);
            }
            cursorIterator();
          });
        };
        // Here begins the iteration
        cursorIterator();
      });
  }

@minenwerfer
Copy link

Updated version -- newer MongoDB versions use Promises instead of callbacks:

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://0.0.0.0:27017';
const dbname = 'pubsub';

;(async () => {
  const client = await MongoClient.connect(url)

  const db = client.db(dbname);
  const collection = db.collection('messages')


  const latest = collection.find({ }).limit(1).sort({ $natural: -1 }); //.sort({ $natural: -1 })

  const doc = await latest.next()
  const query = { _id: { $gt: doc._id }}

  const options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
  const cursor = collection.find(query, options); //.sort({ $natural: 1 })

  await (async function next() {
    const message = await cursor.next()
    console.log(message)

    await next()
  })();
 
})()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment