Skip to content

Instantly share code, notes, and snippets.

@scttnlsn
Created July 30, 2012 22:16
Show Gist options
  • Select an option

  • Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.

Select an option

Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Pub/sub with MongoDB and Node.js

Pub/sub with MongoDB and Node.js

Setup:

$ mongo
> use pubsub
> db.createCollection('messages', { capped: true, size: 100000 })
> db.messages.insert({})

$ npm install mongodb

Subscribe:

$ node subscribe.js

Publish:

$ mongo
> use pubsub
> db.messages.insert({ message: 'Hello world', time: Date.now() })
var mongo = require('mongodb');
var server = new mongo.Server('localhost', 27017);
var db = new mongo.Db('pubsub', server);
db.open(function(err) {
if (err) throw err;
db.collection('messages', function(err, collection) {
if (err) throw err;
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, doc) {
if (err) throw err;
var query = { _id: { $gt: doc._id }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, message) {
if (err) throw err;
console.log(message);
next();
});
})();
});
});
});
@cjtylor
Copy link
Copy Markdown

cjtylor commented Jan 24, 2013

Hi, thanks very much for this post - this is very useful to me.
One question, does the subscribe.js essentially polling the mongodb, or it only queries the db when there is new data arrival?

Why i am asking is:
I checked the log of mongo db - and i see action every 2 seconds, even if i did not publish any new data into the db.
Thanks very much!

below is the mongo log:

Thu Jan 24 08:18:23 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2098ms
Thu Jan 24 08:18:25 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2095ms
Thu Jan 24 08:18:27 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2100ms
Thu Jan 24 08:18:29 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2095ms

@xaiki
Copy link
Copy Markdown

xaiki commented Feb 6, 2013

hey, I've turned this gist into a module: https://gist.github.com/xaiki/4722178
would you consider uploading it to npm ?

@xaiki
Copy link
Copy Markdown

xaiki commented Feb 6, 2013

https://github.com/scttnlsn/mubsub
oh you already did that, sorry =)

@lcneves
Copy link
Copy Markdown

lcneves commented Feb 13, 2019

subscribe.js, line 26: We're trusting that cursor.nextObject() will put the callback in the event loop, which is a reasonable assumption for an asynchronous call, but it still relies on the implementation. If cursor.nextObject() calls the callback directly, we will eventually have a "Maximum call stack exceeded" error. Just to be on the safe side, we could change this line to process.nextTick(() => next());.

@papnkukn
Copy link
Copy Markdown

Update for the 2019s - mongodb 3.3.4 or similar

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://localhost:27017';
const dbname = 'pubsub';

MongoClient.connect(url, { useUnifiedTopology: true }, function(err, client) {
  if (err) throw err;
 
  const db = client.db(dbname);
  
  db.collection('messages', function(err, collection) {
      if (err) throw err;

      var latest = collection.find({ }).limit(1); //.sort({ $natural: -1 })

      latest.next(function(err, doc) {
          if (err) throw err;

          var query = { _id: { $gt: doc._id }};
          
          var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
          var cursor = collection.find(query, options); //.sort({ $natural: 1 })

          (function next() {
              cursor.next(function(err, message) {
                  if (err) throw err;
                  console.log(message);
                  next();
                  //or better process.nextTick(() => next());
              });
          })();
      });
  });
 
  //client.close();
});

@xtianus79
Copy link
Copy Markdown

is this now obsolete with streams?

@hugojerez
Copy link
Copy Markdown

An adaptation I made with Mongoose on a project with es-lint rules

      this.mongooseClient =  this.app.get('mongooseClient');
      this.Model  = this.mongooseClient.model('events');
subscribe(callback) {
      let lastMessageIDReceived; 
      // subscribe to message queue and emit data
      // Get the last registered item
      var latest = this.Model.find({ }).sort({ $natural: -1 }).limit(1); 
  
      latest.cursor().next((err, doc)=> {
        if (err) throw err;
  
        var query = { _id: { $gt: doc._id }};
        var options = { data:1, tailable: true, awaitdata: true, numberOfRetries: -1 };
        var cursor = this.Model.find(query, options).sort({ $natural: -1 }).limit(1);
      
        const cursorIterator =  () =>{
          cursor.cursor().next((err, message) =>{
            if (err) throw err;
            // Avoid nulls and Repeated messages
            if(message && lastMessageIDReceived != String(message._id)){
                // JSON Parse-Stringify to remove Getters and Setters
              const result =  JSON.parse(JSON.stringify(message));
                // String cast to remove Getters and Setters
                lastMessageIDReceived = String(message._id);
              // Execute our custom callback
              callback(result.data);
            }
            cursorIterator();
          });
        };
        // Here begins the iteration
        cursorIterator();
      });
  }

@minenwerfer
Copy link
Copy Markdown

Updated version -- newer MongoDB versions use Promises instead of callbacks:

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://0.0.0.0:27017';
const dbname = 'pubsub';

;(async () => {
  const client = await MongoClient.connect(url)

  const db = client.db(dbname);
  const collection = db.collection('messages')


  const latest = collection.find({ }).limit(1).sort({ $natural: -1 }); //.sort({ $natural: -1 })

  const doc = await latest.next()
  const query = { _id: { $gt: doc._id }}

  const options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
  const cursor = collection.find(query, options); //.sort({ $natural: 1 })

  await (async function next() {
    const message = await cursor.next()
    console.log(message)

    await next()
  })();
 
})()

@kn327
Copy link
Copy Markdown

kn327 commented Apr 18, 2026

Fixed eventual infinite calling stack and added abort controls (in case we want to terminate without shutting down the entire application)

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://0.0.0.0:27017';
const dbname = 'pubsub';

const ctrl = new AbortController();

//- let this run in the background doing it's loops
const listener = tailCollection(ctrl.signal);

//- call this function to stop the listener and wait for it to shut down
async function abortListener() {
    ctrl.abort();

    try {
       await listener;
    }
    catch (e) {
        console.error("Error encountered waiting for listener to abort...", e);
    }
}

async function tailCollection(signal: AbortSignal) {
  const client = await MongoClient.connect(url)

  const db = client.db(dbname);
  const collection = db.collection('messages')

  const latest = collection.find({ }).limit(1).sort({ $natural: -1 }); //.sort({ $natural: -1 })

  const doc = await latest.next()
  const query = { _id: { $gt: doc._id }}

  const options = { tailable: true, awaitData: true, numberOfRetries: -1, maxAwaitTimeMS: 1000, signal };
  const cursor = collection.find(query, options); //.sort({ $natural: 1 })

  while (true) {
    const message = await cursor.next();

    if (signal.aborted) {
      console.log("abort signal received - stopping listener...");
      break;
    } else if (!!message) {
       //- todo: do something with the new record...
       console.log(message);
    }
  }

   if (!cursor.isClosed) cursor.close();

   //- todo: stop the DB client?
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment