Skip to content

Instantly share code, notes, and snippets.

@ktoso
Created November 1, 2012 21:21
Show Gist options
  • Save ktoso/3996636 to your computer and use it in GitHub Desktop.
Save ktoso/3996636 to your computer and use it in GitHub Desktop.
MongoStream
Person where(_.age > 18) foreach { p => /*...*/ }
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
def withCursor[T](cursor: => DBCursor)(f: DBCursor => T) =
try { f(c) } finally { cursor.close() }
// somewhere...
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
withCursor(coll.find()) { cursor =>
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
// ...
def stream[T <: MongoRecord[T]]
(meta: MongoMetaRecord[T],
query: MongoMetaRecord[T] => BaseQuery[_, _, _, _, _, _, _])
(callback: T => Unit) {
stream(meta, query(meta).asDBObject.some)(callback)
}
// so we can call it like:
import com.foursquare.rogue.Rogue._
stream(meta, meta.where(_.age > 18)) { migrateIt }
/** So we don't have to manually extract field names */
def select[T](on: T, fields: Function1[T, BaseField]*): Seq[String] =
for(f <- fields) yield f(on).name
def streamSelect[T <: MongoRecord[T]]
(meta: MongoMetaRecord[T],
select: Seq[String] = Nil,
query: DBObject)(callback: List[Any] => Unit) {
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
val selectFields = new BasicDBObject(select.map(_ -> 1).toMap)
withCursor(coll.find(query, selectFields)) { cursor =>
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
while(cursor.hasNext) {
val next = cursor.next()
val fieldValues = select map { next.get(_) match {
case l: BasicDBList => l.toList
case other => other
}
}
callback(fieldValues)
}
}
}
}
// which can be used as:
val m = Person
val above18 = m where (_.age > 18)
streamSelect(m, select[Person](m, _.name, _.age, _.height), query = above18) {
case (name: String) :: (age: Int) :: (h: Double) :: Nil =>
// ...
}
// instead of using the bellow helper you can pass names manually:
select[Person](m, _.name, _.age, _.height) == List(m.name.name, m.age.name, m.height.name)
def stream[T <: MongoRecord[T]]
(meta: MongoMetaRecord[T], query: Option[DBObject] = None)
(callback: T => Unit) {
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
withCursor(coll.find(query.getOrElse(null))) { cursor =>
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
while(cursor.hasNext) {
val next = cursor.next()
val obj = meta.fromDBObject(next)
callback(obj)
}
}
}
}
// can be used like this:
val meta = Person
stream(meta) { migrateIt(_) }
// or with the query supplied:
import scalaz.Scalaz._
import com.foursquare.rogue.Rogue._
val query = meta where(_.age > 18)
stream(meta, query.asDBObject.some) { migrateIt }
val m = Person
streamTypesafe(m)((m.age, m.name, m.height)) {
(age, name, height) =>
// yes, age: Int, name: String, and height: Double! :-)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment