Last active
March 27, 2017 15:24
-
-
Save dgadiraju/50a5b60348374392e3b4dc47ebe6eea6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package retail | |
| import com.typesafe.config.ConfigFactory | |
| import org.apache.hadoop.fs.{FileSystem, Path} | |
| import org.apache.spark.{SparkConf, SparkContext} | |
| /** | |
| * Created by itversity on 27/03/17. | |
| */ | |
| object DailyRevenuePerDayPerDepartment { | |
| def main(args: Array[String]) { | |
| val appConf = ConfigFactory.load() | |
| val conf = new SparkConf(). | |
| setAppName("Revenue by department per day"). | |
| setMaster(appConf.getConfig(args(2)).getString("executionMode")) | |
| val sc = new SparkContext(conf) | |
| val inputPath = args(0) | |
| val outputPath = args(1) | |
| // We need to use HDFS FileSystem API to perform validations on input and output path | |
| val fs = FileSystem.get(sc.hadoopConfiguration) | |
| val inputPathExists = fs.exists(new Path(inputPath)) | |
| val outputPathExists = fs.exists(new Path(outputPath)) | |
| if(!inputPathExists) { | |
| println("Invalid input path") | |
| return | |
| } | |
| if(outputPathExists) | |
| fs.delete(new Path(outputPath), true) | |
| // Joining categories and departments | |
| // Generate (K, V) and (K, W) pair where | |
| // K = department_id, V = department_name, W = category_id | |
| val departments = sc.textFile(inputPath + "/departments") | |
| val categories = sc.textFile(inputPath + "/categories") | |
| val cdjoin = departments.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))). | |
| join(categories.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))) | |
| // After join the RDD will have elements of type | |
| // (K, (V, W)) = (department_id, (department_name, category_id)) | |
| // Joining products with cdjoin | |
| // (category_id, department_name) from cdjoin is joined with | |
| // (product_category_id, product_id) from products | |
| val products = sc.textFile(inputPath + "/products") | |
| val cdpjoin = cdjoin.map(rec => (rec._2._2, rec._2._1)). | |
| join(products.map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))) | |
| // Output after join will be | |
| // (category_id, (department_name, product_id)) | |
| // Getting (order_item_product_id, (order_item_order_id, order_item_subtotal)) | |
| // from order_items | |
| val orderItems = sc. | |
| textFile(inputPath + "/order_items"). | |
| map(rec => (rec.split(",")(2).toInt, (rec.split(",")(1).toInt, rec.split(",")(4).toDouble))) | |
| // Joining (order_item_product_id, (order_item_order_id, order_item_subtotal)) from order_items | |
| // with (product_id, department_name) from cdpjoin | |
| val cdpojoin = cdpjoin.map(rec => (rec._2._2, rec._2._1)). | |
| join(orderItems) | |
| // Output after join | |
| // (product_id, (order_item_order_id, order_item_subtotal)) | |
| // Getting (order_id, order_date) for completed or closed orders | |
| val orders = sc.textFile(inputPath + "/orders"). | |
| filter(rec => rec.split(",")(3) == "COMPLETE" || rec.split(",")(3) == "CLOSED"). | |
| map(rec => (rec.split(",")(0).toInt, rec.split(",")(1))) | |
| // Joining (order_id, order_date) with | |
| // (order_item_order_id, (department_name, order_item_subtotal)) | |
| val cdpoojoin = cdpojoin. | |
| map(rec => (rec._2._2._1, (rec._2._1, rec._2._2._2))). | |
| join(orders) | |
| // output after join | |
| // (order_id, ((deparment_name, order_item_subtotal), order_date) | |
| // Get data in this format with order_date and department_name as key | |
| // ((order_date, department_name), order_item_subtotal) | |
| // Use reduceByKey to aggregate the data using order_date and department_name as key | |
| // Use sortByKey to sort the data by date and then department_name | |
| // Use map to transform data to tab delimited and then save to file in text format | |
| cdpoojoin. | |
| map(rec => ((rec._2._2, rec._2._1._1), rec._2._1._2)). | |
| reduceByKey(_ + _). | |
| sortByKey(). | |
| map(rec => rec._1._1 + "\t" + rec._1._2 + "\t" + rec._2). | |
| saveAsTextFile(outputPath) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment