Skip to content

Instantly share code, notes, and snippets.

@scttnlsn
Created July 30, 2012 22:16
Show Gist options
  • Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Pub/sub with MongoDB and Node.js

Pub/sub with MongoDB and Node.js

Setup:

$ mongo
> use pubsub
> db.createCollection('messages', { capped: true, size: 100000 })
> db.messages.insert({})

$ npm install mongodb

Subscribe:

$ node subscribe.js

Publish:

$ mongo
> use pubsub
> db.messages.insert({ message: 'Hello world', time: Date.now() })
var mongo = require('mongodb');
var server = new mongo.Server('localhost', 27017);
var db = new mongo.Db('pubsub', server);
db.open(function(err) {
if (err) throw err;
db.collection('messages', function(err, collection) {
if (err) throw err;
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, doc) {
if (err) throw err;
var query = { _id: { $gt: doc._id }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, message) {
if (err) throw err;
console.log(message);
next();
});
})();
});
});
});
@xtianus79
Copy link

is this now obsolete with streams?

@hugojerez
Copy link

An adaptation I made with Mongoose on a project with es-lint rules

      this.mongooseClient =  this.app.get('mongooseClient');
      this.Model  = this.mongooseClient.model('events');
subscribe(callback) {
      let lastMessageIDReceived; 
      // subscribe to message queue and emit data
      // Get the last registered item
      var latest = this.Model.find({ }).sort({ $natural: -1 }).limit(1); 
  
      latest.cursor().next((err, doc)=> {
        if (err) throw err;
  
        var query = { _id: { $gt: doc._id }};
        var options = { data:1, tailable: true, awaitdata: true, numberOfRetries: -1 };
        var cursor = this.Model.find(query, options).sort({ $natural: -1 }).limit(1);
      
        const cursorIterator =  () =>{
          cursor.cursor().next((err, message) =>{
            if (err) throw err;
            // Avoid nulls and Repeated messages
            if(message && lastMessageIDReceived != String(message._id)){
                // JSON Parse-Stringify to remove Getters and Setters
              const result =  JSON.parse(JSON.stringify(message));
                // String cast to remove Getters and Setters
                lastMessageIDReceived = String(message._id);
              // Execute our custom callback
              callback(result.data);
            }
            cursorIterator();
          });
        };
        // Here begins the iteration
        cursorIterator();
      });
  }

@minenwerfer
Copy link

Updated version -- newer MongoDB versions use Promises instead of callbacks:

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://0.0.0.0:27017';
const dbname = 'pubsub';

;(async () => {
  const client = await MongoClient.connect(url)

  const db = client.db(dbname);
  const collection = db.collection('messages')


  const latest = collection.find({ }).limit(1).sort({ $natural: -1 }); //.sort({ $natural: -1 })

  const doc = await latest.next()
  const query = { _id: { $gt: doc._id }}

  const options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
  const cursor = collection.find(query, options); //.sort({ $natural: 1 })

  await (async function next() {
    const message = await cursor.next()
    console.log(message)

    await next()
  })();
 
})()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment