Last active
April 1, 2017 11:40
-
-
Save dgadiraju/d20c75a5f13b18529e4a81e93c3731ae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package retail | |
| import com.typesafe.config.ConfigFactory | |
| import org.apache.hadoop.fs.{FileSystem, Path} | |
| import org.apache.spark.{SparkConf, SparkContext} | |
| /** | |
| * Created by itversity on 01/04/17. | |
| * problem statement: | |
| * Get the total revenue per day for all completed and closed orders | |
| * Filter orders for COMPLETE and CLOSED | |
| */ | |
| case class Orders( | |
| order_id: Int, | |
| order_date: String, | |
| order_customer_id: Int, | |
| order_status: String) | |
| case class OrderItems( | |
| order_item_id: Int, | |
| order_item_order_id: Int, | |
| order_item_product_id: Int, | |
| order_item_quantity: Int, | |
| order_item_subtotal: Float, | |
| order_item_product_price: Float) | |
| object DailyRevenue { | |
| def main(args: Array[String]): Unit = { | |
| val props = ConfigFactory.load() | |
| val conf = new SparkConf(). | |
| setAppName("Daily Revenue"). | |
| setMaster(props.getConfig(args(0)).getString("executionMode")) | |
| val sc = new SparkContext(conf) | |
| val inputPath = args(1) | |
| val outputPath = args(2) | |
| val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration) | |
| val op = new Path(outputPath) | |
| if(!fs.exists(new Path(inputPath))) | |
| println("Invalid input path") | |
| if(fs.exists(op)) | |
| fs.delete(op, true) | |
| val orders = sc.textFile(inputPath + "/orders"). | |
| map(rec => { | |
| val r = rec.split(",") | |
| Orders(r(0).toInt, r(1), r(2).toInt, r(3)) | |
| }). | |
| filter(rec => rec.order_status == "COMPLETE" || rec.order_status == "CLOSED"). | |
| map(rec => (rec.order_id, rec.order_date)) | |
| val orderItems = sc.textFile(inputPath + "/order_items"). | |
| map(rec => { | |
| val r = rec.split(",") | |
| OrderItems(r(0).toInt, r(1).toInt, r(2).toInt, r(3).toInt, r(4).toFloat, r(5).toFloat) | |
| }). | |
| map(rec => (rec.order_item_order_id, rec.order_item_subtotal)) | |
| /* | |
| val orderItems = sc.textFile("/Users/itversity/Research/data/retail_db/order_items"). | |
| map(rec => { | |
| (rec.split(",")(1).toInt, rec.split(",")(4).toFloat) | |
| }). | |
| */ | |
| /* | |
| val dailyRevenueGBK = orders.join(orderItems). | |
| map(rec => rec._2). | |
| groupByKey(). | |
| map(rec => (rec._1, rec._2.sum)). | |
| sortByKey() | |
| */ | |
| /* | |
| // aggregateByKey example | |
| val dailyRevenueABK = orders.join(orderIte | |
| map(rec => rec._2). | |
| aggregateByKey((0.0, 0))( | |
| (seqAgg, value) => { | |
| (seqAgg._1 + value, seqAgg._2 + 1) | |
| }, | |
| (combAgg, combValue) => { | |
| (combAgg._1 + combValue._1, combAgg._2 + combValue._2) | |
| }) | |
| */ | |
| orders.join(orderItems). | |
| map(rec => rec._2). | |
| reduceByKey(_ + _). | |
| sortByKey(). | |
| map(rec => rec.productIterator.mkString("\t")). | |
| saveAsTextFile(outputPath) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment