Skip to content

Instantly share code, notes, and snippets.

@danared
Created October 26, 2015 21:13
Show Gist options
  • Save danared/a0c4353bd70b7256cc09 to your computer and use it in GitHub Desktop.
Save danared/a0c4353bd70b7256cc09 to your computer and use it in GitHub Desktop.
//This applies a SQL windowing functions to partition the 1-minute bars into 5-minute windows
// and then selects the open, high, low, & close price within each 5 minute window
val dfFiveMinForMonth = sqlContext.sql(
"""
SELECT m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close
FROM
(SELECT
Symbol,
FIRST_VALUE(Timestamp)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)
as OpenTime,
LAST_VALUE(Timestamp)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)
as CloseTime,
FIRST_VALUE(Open)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)
as Open,
MAX(High)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)
as High,
MIN(Low)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)
as Low,
LAST_VALUE(Close)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)
as Close
FROM minbars)
as m
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4"""
)
//Configure which table we want to write to in MongoDB
val fiveMinOutputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "marketdata", Collection -> "fiveMinBars", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitKey -> "_id", SplitSize -> 8))
val writeConfig = fiveMinOutputBuilder.build()
//Write the data to MongoDB - because of Spark's just-in-time execution, this actually triggers running the query to read from the 1-minute bars table in MongoDB and then writing to the 5-minute bars table in MongoDB
dfFiveMinForMonth.saveToMongodb(writeConfig)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment