Created
September 7, 2010 21:34
-
-
Save seanhess/569158 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Actual connections to the server are independent of these objects | |
# These objects just USE the connections under the hood | |
# The module is responsible for managing connections to different databases, too | |
# Or, rather, the database object | |
sys = require 'sys' | |
SortAscendingNum = 1 | |
SortDescendingNum = -1 | |
SortAscendingKey = 'ascending' | |
SortDescendingKey = 'descending' | |
exports.load = (dep, exports) -> | |
mongodb = dep 'mongodb' | |
events = dep 'events' | |
# the query param must return a cursor | |
Cursor = (query) -> | |
self = this | |
limit = null | |
skip = 0 | |
sort = null | |
nextIndex = 0 | |
# Limit the results returned find().limit(2) | |
self.limit = (num) -> | |
limit = num | |
self | |
# Skip some results. Used with Limit. find().limit(2).skip(1) | |
self.skip = (num) -> | |
skip = num | |
self | |
# Sort. Used with limit and skip: find().sort({somefield:1}).limit(10).skip(100) | |
self.sort = (naturalMongoSyntax) -> | |
sort = [] | |
for field, orderNum of naturalMongoSyntax | |
key = if (orderNum is SortDescendingNum) then SortDescendingKey else SortAscendingKey | |
sort.push [field, key] | |
self | |
# returns the first one: find().one (err, doc) -> | |
self.one = (callback) -> self.next callback | |
# You can call this multiple times and get the next doc each time | |
# cursor = db.something.find().next (err, doc) -> | |
# cursor.next (err, doc) -> | |
# .... | |
self.next = (callback) -> | |
# Either way we want this, right? | |
localIndex = skip + nextIndex++ | |
# gets a cursor with those applied | |
cursor (err, cursor) -> | |
if err? then return callback err | |
# manually set the limit and skip | |
# they might have set it themselves! | |
cursor.limit 1, -> | |
cursor.skip localIndex, -> | |
# I already called limit 1, so toArray will be friendly | |
cursor.toArray (err, docs) -> | |
if limit? and localIndex >= (skip + limit) | |
docs = [] | |
docs ?= [] | |
callback err, docs[0] | |
# # this doesn't work | |
# # count counts the entire collection, and doesn't take the limit and skip into consideration | |
# # needs to work on an unlimited collection too! | |
# self.hasNext = (callback) -> | |
# | |
# self.count (err, count) -> | |
# if err then return callback err | |
# | |
# withinLimit = ((not limit?) or nextIndex < limit) | |
# withinCount = ((nextIndex + skip) < count) | |
# callback null, (withinLimit and withinCount) | |
# Calls the callback for each matched result | |
# db.something.find().each (err, doc) -> | |
self.each = (callback) -> | |
cursor (err, cursor) -> | |
if err then return callback err | |
cursor.each callback | |
# Explains whether you hit an index and how many were scanned | |
# db.something.find({field: "somevalue"}).explain (err, explanation) -> | |
self.explain = (callback) -> | |
cursor (err, cursor) -> | |
if err then return callback err | |
cursor.explain callback | |
# Returns the results as an array. Consider using each or skip & limit for large collections. | |
# db.something.find().toArray (err, docs) -> | |
self.toArray = (callback) -> | |
cursor (err, cursor) -> | |
if err then return callback err | |
cursor.toArray callback | |
self.count = (callback) -> | |
cursor (err, cursor) -> | |
if err then return callback err | |
cursor.count callback | |
# private. modifies the cursor with limit, skip and sort | |
cursor = (callback) -> | |
query (err, cursor) -> | |
if err then return callback err | |
# They're actually synchronous. I'm taking advantage of that implementation detail | |
if limit? then cursor.limit limit, -> | |
if skip? then cursor.skip skip, -> | |
if sort? then cursor.sort sort, -> | |
callback null, cursor | |
self | |
self | |
Cursor.BasicCursor = "BasicCursor" | |
Collection = (database, name) -> | |
self = this | |
noop = (err) -> | |
self.name = -> name | |
# private, load the collection object | |
# I'm pretty sure the collection loading is synchronous | |
connection = (cb) -> | |
database.connection (err, db) -> | |
if err then return cb err | |
db.collection name, (err, collection) -> | |
cb err, collection | |
# Ensures an index | |
# db.mycollection.ensureIndex {field: 1}, {unique: true}, (err, indexName) -> | |
# db.mycollection.ensureIndex {field: 1}, (err, indexName) -> | |
self.ensureIndex = (index, options, cb) -> | |
unique = false | |
if (typeof options) is 'function' | |
cb = options | |
options = {unique: false} | |
if options? then unique = options.unique | |
cb ?= noop | |
indexArray = [] | |
indexArray.push [fieldName, direction] for fieldName, direction of index | |
connection (err, connection) -> | |
if err then return cb err | |
connection.ensureIndex indexArray, unique, cb | |
self.dropIndexes = (cb) -> | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.dropIndexes cb | |
# self.dropIndex = (indexName, cb) -> | |
# cb ?= noop | |
# connection (err, connection) -> | |
# if err then return cb err | |
# connection.dropIndex indexName, cb | |
self.insert = (docs, cb) -> | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.insert docs, cb | |
self.remove = (selector, cb) -> | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.remove selector, cb | |
self.drop = (cb) -> | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.drop (err) -> | |
if err? and err.message is 'ns not found' | |
cb null, false | |
else | |
cb err, true | |
# save does an upsert if you specify an _id, otherwise it does an insert | |
self.save = (doc, cb) -> | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.save doc, cb | |
# updates the object # Missing multi! | |
self.update = (selector, updates, upsert, multi, cb) -> | |
if (typeof upsert) is 'function' | |
cb = upsert | |
upsert = false | |
else if (typeof multi) is 'function' | |
cb = multi | |
multi = false | |
options = { | |
upsert: upsert | |
multi: multi | |
} | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.update selector, updates, options, cb | |
# self.mapReduce = -> throw new Error("mapReduce not Implemented") | |
# self.group = -> throw new Error("group not implemented") | |
# return the count | |
# db.something.count (err, num) -> | |
self.count = (cb) -> | |
cb ?= noop | |
connection (err, connection) -> | |
if err then return cb err | |
connection.count (err, count) -> | |
cb err, count | |
# Your basic find. See Mongo Docs for all your options. | |
# db.something.find({somefield: "somevalue", anotherfield: 'another'},{somefield:1}).one (err, doc) -> | |
self.find = (selector, fields) -> | |
selector ?= {} | |
options = {} | |
new Cursor (callback) -> | |
connection (err, connection) -> | |
if err then return callback err | |
if fields? then options.fields = fieldName for fieldName, num of fields | |
connection.find selector, options, (err, cursor) -> | |
callback err, cursor | |
# Sugar for find.one | |
self.findOne = (selector, fields, callback) -> | |
if (typeof selector) is 'function' | |
callback = selector | |
selector = {} | |
fields = null | |
else if (typeof fields) is 'function' | |
callback = fields | |
fields = null | |
self.find(selector, fields).one callback | |
self.toString = -> "Collection #{name}" | |
self | |
DatabaseConnectionIsOpening = 'opening' | |
DatabaseConnectionOpen = 'open' | |
DatabaseConnectionClosed = 'closed' | |
DatabaseConnectionClosing = 'closing' | |
exports.MaximumConnectionsBeforeRecycling = MaximumConnectionsBeforeRecycling = 100 | |
# Connection Pooling wasn't any faster for importing schedules! | |
Database = (host, port, name) -> | |
self = this | |
emitter = null | |
# mongoDatabase.addListener 'error', (err) -> throw err | |
mongoServer = null | |
mongoDatabase = null | |
# The stored connection | |
connection = null | |
state = null | |
# How many times this connection has been used | |
connectionCount = 0 | |
self.name = -> name | |
# whenever we close, we'll recreate this | |
reset = -> | |
mongoServer = new mongodb.Server host, port, {} | |
mongoDatabase = new mongodb.Db name, mongoServer, {} | |
# To handle a connection reset error, I would have to | |
# detect that error type, detect which error went with which | |
# connection, and reopen the connection. | |
# At least I'm handling it now | |
mongoDatabase.addListener 'error', (err) -> | |
sys.log " !!! Mongo (#{err})" | |
forceClose() | |
# Opening, Open or Closed | |
state = DatabaseConnectionClosed | |
emitter = new events.EventEmitter() | |
reset() | |
# protected. Gets a connection (already open or a new one) | |
self.connection = (callback) -> | |
if state is DatabaseConnectionClosing | |
state = DatabaseConnectionOpen | |
if state is DatabaseConnectionOpen and connectionCount > MaximumConnectionsBeforeRecycling | |
forceClose() | |
if state is DatabaseConnectionOpen | |
connectionCount++ | |
return callback null, connection | |
emitter.on 'opened', callback | |
open() | |
# private. call connection instead. Actually opens the thing | |
open = -> | |
# It doesn't seem to ever hit this for some reason... | |
# Oh, is it calling open for all of them then? | |
if state is DatabaseConnectionIsOpening then return | |
state = DatabaseConnectionIsOpening | |
mongoDatabase.open (err, db) -> | |
connection = db | |
connectionCount = 0 | |
state = if err? then DatabaseConnectionClosed else DatabaseConnectionOpen | |
emitter.emit 'opened', err, connection | |
# public - closes the connection | |
self.close = -> | |
if not (state is DatabaseConnectionClosed) | |
state = DatabaseConnectionClosing | |
process.nextTick -> | |
if state is DatabaseConnectionClosing | |
forceClose() | |
# Actually closes it | |
forceClose = -> | |
state = DatabaseConnectionClosed | |
mongoDatabase.close() | |
reset() | |
self._fc = forceClose | |
# Returns a collection. Consider setting fields on the db to collections. This is cheap | |
# Also sets self[name] to the collection for db. | |
# db = mongo.db host, port, name | |
# db.collection 'mycollection' | |
# db.mycollection.find().one (err, doc) -> | |
self.collection = (fullname) -> | |
# "somecollection" | |
# "some.collection" | |
names = fullname.split /\./ | |
obj = self | |
# loop through all items except the last | |
for i in [0...names.length-1] | |
localName = names[i] # update name to be some | |
obj[localName] = {} # create self.some | |
obj = obj[localName] # obj is now self | |
localName = names.pop() # make sure we get the last name | |
obj[localName] = new Collection self, fullname | |
# cb (err, lastError) | |
# lastError = {err: null, n: 0, ok: 1} | |
self.lastError = (cb) -> | |
self.connection (err, connection) -> | |
if err then return cb err | |
connection.lastError cb | |
self.dropDatabase = (cb) -> | |
self.connection (err, connection) -> | |
if err then return cb err | |
connection.dropDatabase cb | |
self.dropAllCollections = (cb) -> | |
waitingForDrop = 0 | |
for name, obj of self | |
if obj instanceof Collection | |
waitingForDrop++ | |
obj.drop (err) -> | |
waitingForDrop-- | |
if err then return cb err | |
if waitingForDrop is 0 then cb() | |
self.toString = -> "mongo://#{host}:#{port}/#{name}:#{state})" | |
self | |
# Returns an unopened db object | |
# The database is the machine (host/port) | |
# Creating multiple databases will open multiple connections! | |
# db = mongo.db host, port, name | |
exports.db = (host, port, name) -> new Database host, port, name | |
exports.Database = Database | |
exports.Collection = Collection | |
exports.Cursor = Cursor |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment