Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save dgadiraju/9c15b02097c87678aeb20771301ea7e2 to your computer and use it in GitHub Desktop.

Select an option

Save dgadiraju/9c15b02097c87678aeb20771301ea7e2 to your computer and use it in GitHub Desktop.
package nyse
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by itversity on 30/03/17.
*/
case class StocksEOD(
stockTicker: String,
tradeDate: String,
openPrice: Float,
highPrice: Float,
lowPrice: Float,
closePrice: Float,
volume: Int
)
object TopNStocksByVolumeSQL {
def main(args: Array[String]): Unit = {
val appConf = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Top N Stocks by volume").setMaster("local")
// setMaster(appConf.getConfig(args(2)).getString("executionMode"))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val inputPath = "/Users/itversity/Research/data/nyse/*/*.txt"
// val outputPath = args(1)
// We need to use HDFS FileSystem API to perform validations on input and output path
val fs = FileSystem.get(sc.hadoopConfiguration)
val inputPathExists = fs.exists(new Path(inputPath))
// val outputPathExists = fs.exists(new Path(outputPath))
// if (outputPathExists)
// fs.delete(new Path(outputPath), true)
// coalesce is used to reduce number of tasks to process data spread across
// many small files
import sqlContext.implicits._
val data = sc.textFile(inputPath).
coalesce(4).
map(rec => {
val r = rec.split(",")
StocksEOD(r(0), r(1), r(2).toFloat, r(3).toFloat, r(4).toFloat, r(5).toFloat, r(6).toInt)
}).toDF()
data.registerTempTable("stocks_eod")
// This fails in spark 1.6.2 as spark sql with analytic and windowing functions are not supported
sqlContext.sql("select * from (" +
"select tradeMonth, stockTicker, monthlyVolume," +
" rank() over (partition by tradeMonth order by monthlyVolume desc) rnk from" +
" (select substr(tradeDate, 1, 6) tradeMonth, stockTicker, sum(volume) monthlyVolume" +
" from stocks_eod" +
" group by substr(tradeDate, 1, 6), stockTicker) q) q1" +
" where rnk <= 5" +
" order by tradeMonth, monthlyVolume desc").
collect().
foreach(println)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment