Skip to content

Instantly share code, notes, and snippets.

@aj0strow
Created July 15, 2015 20:37
Show Gist options
  • Save aj0strow/c8285b537471f5d90e9b to your computer and use it in GitHub Desktop.
Save aj0strow/c8285b537471f5d90e9b to your computer and use it in GitHub Desktop.
MongoDB capped collection event stream
var through = require('through2')
function subscribe (collection, params) {
var options = {
tailable: true,
awaitData: true,
noTimeout: true,
numberOfRetries: Number.MAX_VALUE,
}
var lastObjectId = params && params.objectId
var s = through.obj(function (doc, enc, callback) {
lastObjectId = doc._id
callback(null, doc)
})
function connect () {
var query = {}
if (lastObjectId) {
query._id = { $gt: lastObjectId }
}
var cursor = collection.find(query, {}, options)
cursor.pipe(s)
cursor.on('error', function (e) {
if (isCursorError(e)) {
cursor.unpipe(s)
connect()
} else {
s.emit('error', e)
}
})
}
function isCursorError (e) {
return e.name == 'MongoError' && /cursor/.test(e.message)
}
process.nextTick(connect)
return s
}
// var stream = subscribe(db.events, { objectId: 'optional' })
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment