Created
April 5, 2017 04:07
-
-
Save dgadiraju/9c15b02097c87678aeb20771301ea7e2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package nyse | |
| import com.typesafe.config.ConfigFactory | |
| import org.apache.hadoop.fs.{FileSystem, Path} | |
| import org.apache.spark.sql.SQLContext | |
| import org.apache.spark.{SparkConf, SparkContext} | |
| /** | |
| * Created by itversity on 30/03/17. | |
| */ | |
| case class StocksEOD( | |
| stockTicker: String, | |
| tradeDate: String, | |
| openPrice: Float, | |
| highPrice: Float, | |
| lowPrice: Float, | |
| closePrice: Float, | |
| volume: Int | |
| ) | |
| object TopNStocksByVolumeSQL { | |
| def main(args: Array[String]): Unit = { | |
| val appConf = ConfigFactory.load() | |
| val conf = new SparkConf(). | |
| setAppName("Top N Stocks by volume").setMaster("local") | |
| // setMaster(appConf.getConfig(args(2)).getString("executionMode")) | |
| val sc = new SparkContext(conf) | |
| val sqlContext = new SQLContext(sc) | |
| val inputPath = "/Users/itversity/Research/data/nyse/*/*.txt" | |
| // val outputPath = args(1) | |
| // We need to use HDFS FileSystem API to perform validations on input and output path | |
| val fs = FileSystem.get(sc.hadoopConfiguration) | |
| val inputPathExists = fs.exists(new Path(inputPath)) | |
| // val outputPathExists = fs.exists(new Path(outputPath)) | |
| // if (outputPathExists) | |
| // fs.delete(new Path(outputPath), true) | |
| // coalesce is used to reduce number of tasks to process data spread across | |
| // many small files | |
| import sqlContext.implicits._ | |
| val data = sc.textFile(inputPath). | |
| coalesce(4). | |
| map(rec => { | |
| val r = rec.split(",") | |
| StocksEOD(r(0), r(1), r(2).toFloat, r(3).toFloat, r(4).toFloat, r(5).toFloat, r(6).toInt) | |
| }).toDF() | |
| data.registerTempTable("stocks_eod") | |
| // This fails in spark 1.6.2 as spark sql with analytic and windowing functions are not supported | |
| sqlContext.sql("select * from (" + | |
| "select tradeMonth, stockTicker, monthlyVolume," + | |
| " rank() over (partition by tradeMonth order by monthlyVolume desc) rnk from" + | |
| " (select substr(tradeDate, 1, 6) tradeMonth, stockTicker, sum(volume) monthlyVolume" + | |
| " from stocks_eod" + | |
| " group by substr(tradeDate, 1, 6), stockTicker) q) q1" + | |
| " where rnk <= 5" + | |
| " order by tradeMonth, monthlyVolume desc"). | |
| collect(). | |
| foreach(println) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment