Created
March 9, 2013 19:25
-
-
Save dkhenry/5125408 to your computer and use it in GitHub Desktop.
Using the Aggregation framework for MongoDB in scala to bucketize time stamped data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mongodb.casbah.Imports._ | |
/** | |
* Scheme has this general format ( amongst other fields ) | |
* { "_id" : ObjectId("5138c8bd5e0ee06dd30a6ee1"), | |
* "LAST_SIZE" : NumberLong(200), | |
* "SYMBOL" : "AAPL", | |
* "LAST_PRICE" : 428.31, | |
* "TOTAL_VOLUME" : NumberLong(8923961), | |
* "TIMESTAMP" : ISODate("2013-03-07T17:05:01.144Z"), | |
*} | |
* | |
* Also note there are indexes on TIMESTAMP and SYMBOL | |
* | |
*/ | |
object TickerQuery { | |
RegisterJodaTimeConversionHelpers() | |
val _transactions = MongoDBConnection("transactions") | |
def ticksInRange(symbol: String) = { | |
val pipebuilder = MongoDBList.newBuilder | |
// Build the Date filter | |
val datebuilder = MongoDBObject.newBuilder | |
datebuilder += ("$gt" -> new DateTime()) | |
datebuilder += ("$lt" -> new DateTime().minusDays(1)) | |
// Filter | |
pipebuilder += MongoDBObject("$match" -> MongoDBObject( | |
"SYMBOL" -> symbol, | |
"TIMESTAMP" -> datebuilder.result() | |
)) | |
// Build the Bucket | |
val bucketbuilder = MongoDBObject.newBuilder | |
bucketbuilder += ("year" -> MongoDBObject( "$year" -> "$TIMESTAMP")) | |
bucketbuilder += ("day" -> MongoDBObject( "$dayOfYear" -> "$TIMESTAMP")) | |
bucketbuilder += ("hour" -> MongoDBObject( "$hour" -> "$TIMESTAMP")) | |
bucketbuilder += ("minute" -> MongoDBObject( "$minute" -> "$TIMESTAMP")) | |
// Project | |
pipebuilder += MongoDBObject("$project" -> MongoDBObject( | |
"bucket" -> bucketbuilder.result(), | |
"price" -> "$LAST_PRICE" | |
)) | |
//Group | |
pipebuilder += MongoDBObject("$group"-> MongoDBObject( | |
"_id" -> "$bucket", | |
"total" -> MongoDBObject("$sum" -> 1), | |
"avg_price" -> MongoDBObject("$avg" -> "$price"), | |
"max_price" -> MongoDBObject("$max" -> "$price"), | |
"min_price" -> MongoDBObject("$min" -> "$price") | |
)) | |
// Get the Pipeline | |
val pipeline = pipebuilder.result() | |
val rvalue = _transactions.command(MongoDBObject("aggregate" -> "ticks", "pipeline" -> pipeline)).get("result") match { | |
case list: BasicDBList => list.foreach(println(_)) | |
case _ => Nil | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment