Created
October 26, 2015 21:13
-
-
Save danared/a0c4353bd70b7256cc09 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//This applies a SQL windowing functions to partition the 1-minute bars into 5-minute windows | |
// and then selects the open, high, low, & close price within each 5 minute window | |
val dfFiveMinForMonth = sqlContext.sql( | |
""" | |
SELECT m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close | |
FROM | |
(SELECT | |
Symbol, | |
FIRST_VALUE(Timestamp) | |
OVER ( | |
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) | |
ORDER BY Timestamp) | |
as OpenTime, | |
LAST_VALUE(Timestamp) | |
OVER ( | |
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) | |
ORDER BY Timestamp) | |
as CloseTime, | |
FIRST_VALUE(Open) | |
OVER ( | |
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) | |
ORDER BY Timestamp) | |
as Open, | |
MAX(High) | |
OVER ( | |
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) | |
ORDER BY Timestamp) | |
as High, | |
MIN(Low) | |
OVER ( | |
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) | |
ORDER BY Timestamp) | |
as Low, | |
LAST_VALUE(Close) | |
OVER ( | |
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60)) | |
ORDER BY Timestamp) | |
as Close | |
FROM minbars) | |
as m | |
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4""" | |
) | |
//Configure which table we want to write to in MongoDB | |
val fiveMinOutputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "marketdata", Collection -> "fiveMinBars", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitKey -> "_id", SplitSize -> 8)) | |
val writeConfig = fiveMinOutputBuilder.build() | |
//Write the data to MongoDB - because of Spark's just-in-time execution, this actually triggers running the query to read from the 1-minute bars table in MongoDB and then writing to the 5-minute bars table in MongoDB | |
dfFiveMinForMonth.saveToMongodb(writeConfig) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment