Created
March 29, 2017 07:17
-
-
Save dgadiraju/9e7506846f16d133d1a2bc20c42569db to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package nyse | |
import com.typesafe.config.ConfigFactory | |
import org.apache.hadoop.fs.{FileSystem, Path} | |
import org.apache.spark.{SparkConf, SparkContext} | |
/** | |
* Created by itversity on 28/03/17. | |
*/ | |
object TopNStocksByVolumeWithName { | |
def main(args: Array[String]) { | |
val appConf = ConfigFactory.load() | |
val conf = new SparkConf(). | |
setAppName("Top n stocks by volume"). | |
setMaster(appConf.getConfig(args(3)).getString("executionMode")) | |
val sc = new SparkContext(conf) | |
val inputPath = args(0) | |
val stockSymbolsPath = args(1) | |
val outputPath = args(2) | |
// We need to use HDFS FileSystem API to perform validations on input and output path | |
val fs = FileSystem.get(sc.hadoopConfiguration) | |
val inputPathExists = fs.exists(new Path(inputPath)) | |
val outputPathExists = fs.exists(new Path(outputPath)) | |
if (outputPathExists) | |
fs.delete(new Path(outputPath), true) | |
// coalesce is used to reduce number of tasks to process data spread across | |
// many small files | |
val data = sc.textFile(inputPath). | |
coalesce(4) | |
val stockSymbols = sc.textFile(stockSymbolsPath). | |
map(rec => (rec.split("\t")(0), rec.split("\t")(1))). | |
collectAsMap() | |
val bv = sc.broadcast(stockSymbols) | |
val totalRecords = sc.accumulator(0, "Total number of records") | |
val noTradedRecords = sc.accumulator(0, "Number of records that are not traded") | |
val noOfTopNRecords = sc.accumulator(0, "Number of records fall under top n records") | |
data. | |
// Get date in YYYYMM format and stock ticker as key and volume as value | |
map(rec => { | |
totalRecords += 1 | |
val a = rec.split(",") | |
if(a(6).toInt == 0) | |
noTradedRecords += 1 | |
((a(1).substring(0, 6).toInt, a(0)), a(6).toInt) | |
}). | |
// Aggregate and get volume for each stock for each month | |
reduceByKey(_ + _). | |
// Move stock ticker to value, now key is trade month | |
map(rec => (rec._1._1, (rec._2, rec._1._2))). | |
// Group by trade month | |
// Output will be (trademonth, List((stockticker, volume))) | |
groupByKey(). | |
// Process the list to compute topN stocks by volume for each key | |
// This simulate dense rank functionality | |
flatMap(rec => { | |
// get topN volumes | |
val topNVolumes = rec._2. | |
toList. | |
map(_._1). | |
sortBy(-_). | |
distinct. | |
take(args(4).toInt) | |
// Check whether the volume of stock falls in topNVolumes | |
rec._2. | |
toList. | |
sortBy(r => -r._1). | |
filter(r => topNVolumes.contains(r._1)). | |
map(r => (rec._1, r)) | |
}). | |
// sort the data by trade month | |
sortByKey(). | |
// format data to be tab delimited | |
map(rec => { | |
noOfTopNRecords += 1 | |
val s = if (bv.value.contains(rec._2._2)) bv.value.get(rec._2._2).get else rec._2._2 | |
rec._1 + "\t" + s + "\t" + rec._2._1 | |
}). | |
saveAsTextFile(outputPath) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment