Skip to content

Instantly share code, notes, and snippets.

@seanhess
Created September 7, 2010 21:34
Show Gist options
  • Save seanhess/569158 to your computer and use it in GitHub Desktop.
Save seanhess/569158 to your computer and use it in GitHub Desktop.
# Actual connections to the server are independent of these objects
# These objects just USE the connections under the hood
# The module is responsible for managing connections to different databases, too
# Or, rather, the database object
sys = require 'sys'
SortAscendingNum = 1
SortDescendingNum = -1
SortAscendingKey = 'ascending'
SortDescendingKey = 'descending'
exports.load = (dep, exports) ->
mongodb = dep 'mongodb'
events = dep 'events'
# the query param must return a cursor
Cursor = (query) ->
self = this
limit = null
skip = 0
sort = null
nextIndex = 0
# Limit the results returned find().limit(2)
self.limit = (num) ->
limit = num
self
# Skip some results. Used with Limit. find().limit(2).skip(1)
self.skip = (num) ->
skip = num
self
# Sort. Used with limit and skip: find().sort({somefield:1}).limit(10).skip(100)
self.sort = (naturalMongoSyntax) ->
sort = []
for field, orderNum of naturalMongoSyntax
key = if (orderNum is SortDescendingNum) then SortDescendingKey else SortAscendingKey
sort.push [field, key]
self
# returns the first one: find().one (err, doc) ->
self.one = (callback) -> self.next callback
# You can call this multiple times and get the next doc each time
# cursor = db.something.find().next (err, doc) ->
# cursor.next (err, doc) ->
# ....
self.next = (callback) ->
# Either way we want this, right?
localIndex = skip + nextIndex++
# gets a cursor with those applied
cursor (err, cursor) ->
if err? then return callback err
# manually set the limit and skip
# they might have set it themselves!
cursor.limit 1, ->
cursor.skip localIndex, ->
# I already called limit 1, so toArray will be friendly
cursor.toArray (err, docs) ->
if limit? and localIndex >= (skip + limit)
docs = []
docs ?= []
callback err, docs[0]
# # this doesn't work
# # count counts the entire collection, and doesn't take the limit and skip into consideration
# # needs to work on an unlimited collection too!
# self.hasNext = (callback) ->
#
# self.count (err, count) ->
# if err then return callback err
#
# withinLimit = ((not limit?) or nextIndex < limit)
# withinCount = ((nextIndex + skip) < count)
# callback null, (withinLimit and withinCount)
# Calls the callback for each matched result
# db.something.find().each (err, doc) ->
self.each = (callback) ->
cursor (err, cursor) ->
if err then return callback err
cursor.each callback
# Explains whether you hit an index and how many were scanned
# db.something.find({field: "somevalue"}).explain (err, explanation) ->
self.explain = (callback) ->
cursor (err, cursor) ->
if err then return callback err
cursor.explain callback
# Returns the results as an array. Consider using each or skip & limit for large collections.
# db.something.find().toArray (err, docs) ->
self.toArray = (callback) ->
cursor (err, cursor) ->
if err then return callback err
cursor.toArray callback
self.count = (callback) ->
cursor (err, cursor) ->
if err then return callback err
cursor.count callback
# private. modifies the cursor with limit, skip and sort
cursor = (callback) ->
query (err, cursor) ->
if err then return callback err
# They're actually synchronous. I'm taking advantage of that implementation detail
if limit? then cursor.limit limit, ->
if skip? then cursor.skip skip, ->
if sort? then cursor.sort sort, ->
callback null, cursor
self
self
Cursor.BasicCursor = "BasicCursor"
Collection = (database, name) ->
self = this
noop = (err) ->
self.name = -> name
# private, load the collection object
# I'm pretty sure the collection loading is synchronous
connection = (cb) ->
database.connection (err, db) ->
if err then return cb err
db.collection name, (err, collection) ->
cb err, collection
# Ensures an index
# db.mycollection.ensureIndex {field: 1}, {unique: true}, (err, indexName) ->
# db.mycollection.ensureIndex {field: 1}, (err, indexName) ->
self.ensureIndex = (index, options, cb) ->
unique = false
if (typeof options) is 'function'
cb = options
options = {unique: false}
if options? then unique = options.unique
cb ?= noop
indexArray = []
indexArray.push [fieldName, direction] for fieldName, direction of index
connection (err, connection) ->
if err then return cb err
connection.ensureIndex indexArray, unique, cb
self.dropIndexes = (cb) ->
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.dropIndexes cb
# self.dropIndex = (indexName, cb) ->
# cb ?= noop
# connection (err, connection) ->
# if err then return cb err
# connection.dropIndex indexName, cb
self.insert = (docs, cb) ->
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.insert docs, cb
self.remove = (selector, cb) ->
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.remove selector, cb
self.drop = (cb) ->
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.drop (err) ->
if err? and err.message is 'ns not found'
cb null, false
else
cb err, true
# save does an upsert if you specify an _id, otherwise it does an insert
self.save = (doc, cb) ->
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.save doc, cb
# updates the object # Missing multi!
self.update = (selector, updates, upsert, multi, cb) ->
if (typeof upsert) is 'function'
cb = upsert
upsert = false
else if (typeof multi) is 'function'
cb = multi
multi = false
options = {
upsert: upsert
multi: multi
}
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.update selector, updates, options, cb
# self.mapReduce = -> throw new Error("mapReduce not Implemented")
# self.group = -> throw new Error("group not implemented")
# return the count
# db.something.count (err, num) ->
self.count = (cb) ->
cb ?= noop
connection (err, connection) ->
if err then return cb err
connection.count (err, count) ->
cb err, count
# Your basic find. See Mongo Docs for all your options.
# db.something.find({somefield: "somevalue", anotherfield: 'another'},{somefield:1}).one (err, doc) ->
self.find = (selector, fields) ->
selector ?= {}
options = {}
new Cursor (callback) ->
connection (err, connection) ->
if err then return callback err
if fields? then options.fields = fieldName for fieldName, num of fields
connection.find selector, options, (err, cursor) ->
callback err, cursor
# Sugar for find.one
self.findOne = (selector, fields, callback) ->
if (typeof selector) is 'function'
callback = selector
selector = {}
fields = null
else if (typeof fields) is 'function'
callback = fields
fields = null
self.find(selector, fields).one callback
self.toString = -> "Collection #{name}"
self
DatabaseConnectionIsOpening = 'opening'
DatabaseConnectionOpen = 'open'
DatabaseConnectionClosed = 'closed'
DatabaseConnectionClosing = 'closing'
exports.MaximumConnectionsBeforeRecycling = MaximumConnectionsBeforeRecycling = 100
# Connection Pooling wasn't any faster for importing schedules!
Database = (host, port, name) ->
self = this
emitter = null
# mongoDatabase.addListener 'error', (err) -> throw err
mongoServer = null
mongoDatabase = null
# The stored connection
connection = null
state = null
# How many times this connection has been used
connectionCount = 0
self.name = -> name
# whenever we close, we'll recreate this
reset = ->
mongoServer = new mongodb.Server host, port, {}
mongoDatabase = new mongodb.Db name, mongoServer, {}
# To handle a connection reset error, I would have to
# detect that error type, detect which error went with which
# connection, and reopen the connection.
# At least I'm handling it now
mongoDatabase.addListener 'error', (err) ->
sys.log " !!! Mongo (#{err})"
forceClose()
# Opening, Open or Closed
state = DatabaseConnectionClosed
emitter = new events.EventEmitter()
reset()
# protected. Gets a connection (already open or a new one)
self.connection = (callback) ->
if state is DatabaseConnectionClosing
state = DatabaseConnectionOpen
if state is DatabaseConnectionOpen and connectionCount > MaximumConnectionsBeforeRecycling
forceClose()
if state is DatabaseConnectionOpen
connectionCount++
return callback null, connection
emitter.on 'opened', callback
open()
# private. call connection instead. Actually opens the thing
open = ->
# It doesn't seem to ever hit this for some reason...
# Oh, is it calling open for all of them then?
if state is DatabaseConnectionIsOpening then return
state = DatabaseConnectionIsOpening
mongoDatabase.open (err, db) ->
connection = db
connectionCount = 0
state = if err? then DatabaseConnectionClosed else DatabaseConnectionOpen
emitter.emit 'opened', err, connection
# public - closes the connection
self.close = ->
if not (state is DatabaseConnectionClosed)
state = DatabaseConnectionClosing
process.nextTick ->
if state is DatabaseConnectionClosing
forceClose()
# Actually closes it
forceClose = ->
state = DatabaseConnectionClosed
mongoDatabase.close()
reset()
self._fc = forceClose
# Returns a collection. Consider setting fields on the db to collections. This is cheap
# Also sets self[name] to the collection for db.
# db = mongo.db host, port, name
# db.collection 'mycollection'
# db.mycollection.find().one (err, doc) ->
self.collection = (fullname) ->
# "somecollection"
# "some.collection"
names = fullname.split /\./
obj = self
# loop through all items except the last
for i in [0...names.length-1]
localName = names[i] # update name to be some
obj[localName] = {} # create self.some
obj = obj[localName] # obj is now self
localName = names.pop() # make sure we get the last name
obj[localName] = new Collection self, fullname
# cb (err, lastError)
# lastError = {err: null, n: 0, ok: 1}
self.lastError = (cb) ->
self.connection (err, connection) ->
if err then return cb err
connection.lastError cb
self.dropDatabase = (cb) ->
self.connection (err, connection) ->
if err then return cb err
connection.dropDatabase cb
self.dropAllCollections = (cb) ->
waitingForDrop = 0
for name, obj of self
if obj instanceof Collection
waitingForDrop++
obj.drop (err) ->
waitingForDrop--
if err then return cb err
if waitingForDrop is 0 then cb()
self.toString = -> "mongo://#{host}:#{port}/#{name}:#{state})"
self
# Returns an unopened db object
# The database is the machine (host/port)
# Creating multiple databases will open multiple connections!
# db = mongo.db host, port, name
exports.db = (host, port, name) -> new Database host, port, name
exports.Database = Database
exports.Collection = Collection
exports.Cursor = Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment