Skip to content

Instantly share code, notes, and snippets.

@scttnlsn
Created July 30, 2012 22:16
Show Gist options
  • Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Pub/sub with MongoDB and Node.js

Pub/sub with MongoDB and Node.js

Setup:

$ mongo
> use pubsub
> db.createCollection('messages', { capped: true, size: 100000 })
> db.messages.insert({})

$ npm install mongodb

Subscribe:

$ node subscribe.js

Publish:

$ mongo
> use pubsub
> db.messages.insert({ message: 'Hello world', time: Date.now() })
var mongo = require('mongodb');
var server = new mongo.Server('localhost', 27017);
var db = new mongo.Db('pubsub', server);
db.open(function(err) {
if (err) throw err;
db.collection('messages', function(err, collection) {
if (err) throw err;
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, doc) {
if (err) throw err;
var query = { _id: { $gt: doc._id }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, message) {
if (err) throw err;
console.log(message);
next();
});
})();
});
});
});
@minenwerfer
Copy link

Updated version -- newer MongoDB versions use Promises instead of callbacks:

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://0.0.0.0:27017';
const dbname = 'pubsub';

;(async () => {
  const client = await MongoClient.connect(url)

  const db = client.db(dbname);
  const collection = db.collection('messages')


  const latest = collection.find({ }).limit(1).sort({ $natural: -1 }); //.sort({ $natural: -1 })

  const doc = await latest.next()
  const query = { _id: { $gt: doc._id }}

  const options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
  const cursor = collection.find(query, options); //.sort({ $natural: 1 })

  await (async function next() {
    const message = await cursor.next()
    console.log(message)

    await next()
  })();
 
})()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment