Last active
March 28, 2017 23:31
-
-
Save dgadiraju/c28751b6ec3aad64deadc31edc4e03af to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package nyse | |
| import com.typesafe.config.ConfigFactory | |
| import org.apache.hadoop.fs.{FileSystem, Path} | |
| import org.apache.spark.{SparkConf, SparkContext} | |
| /** | |
| * Created by itversity on 28/03/17. | |
| */ | |
| object TopNStocksByVolume { | |
| def main(args: Array[String]) { | |
| val appConf = ConfigFactory.load() | |
| val conf = new SparkConf(). | |
| setAppName("Top n stocks by volume"). | |
| setMaster(appConf.getConfig(args(2)).getString("executionMode")) | |
| val sc = new SparkContext(conf) | |
| val inputPath = args(0) | |
| val outputPath = args(1) | |
| // We need to use HDFS FileSystem API to perform validations on input and output path | |
| val fs = FileSystem.get(sc.hadoopConfiguration) | |
| val inputPathExists = fs.exists(new Path(inputPath)) | |
| val outputPathExists = fs.exists(new Path(outputPath)) | |
| if (outputPathExists) | |
| fs.delete(new Path(outputPath), true) | |
| // coalesce is used to reduce number of tasks to process data spread across | |
| // many small files | |
| val data = sc.textFile(inputPath). | |
| coalesce(4) | |
| data. | |
| // Get date in YYYYMM format and stock ticker as key and volume as value | |
| map(rec => { | |
| val a = rec.split(",") | |
| ((a(1).substring(0, 6).toInt, a(0)), a(6).toInt) | |
| }). | |
| // Aggregate and get volume for each stock for each month | |
| reduceByKey(_ + _). | |
| // Move stock ticker to value, now key is trade month | |
| map(rec => (rec._1._1, (rec._2, rec._1._2))). | |
| // Group by trade month | |
| // Output will be (trademonth, List((stockticker, volume))) | |
| groupByKey(). | |
| // Process the list to compute topN stocks by volume for each key | |
| // This simulate dense rank functionality | |
| flatMap(rec => { | |
| // get topN volumes | |
| val topNVolumes = rec._2. | |
| toList. | |
| map(_._1). | |
| sortBy(-_). | |
| distinct. | |
| take(args(3).toInt) | |
| // Check whether the volume of stock falls in topNVolumes | |
| rec._2. | |
| toList. | |
| sortBy(r => -r._1). | |
| filter(r => topNVolumes.contains(r._1)). | |
| map(r => (rec._1, r)) | |
| }). | |
| // sort the data by trade month | |
| sortByKey(). | |
| // format data to be tab delimited | |
| map(rec => rec._1 + "\t" + rec._2._1 + "\t" + rec._2._2). | |
| saveAsTextFile(outputPath) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment