Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active April 1, 2017 11:40
Show Gist options
  • Select an option

  • Save dgadiraju/d20c75a5f13b18529e4a81e93c3731ae to your computer and use it in GitHub Desktop.

Select an option

Save dgadiraju/d20c75a5f13b18529e4a81e93c3731ae to your computer and use it in GitHub Desktop.
package retail
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by itversity on 01/04/17.
* problem statement:
* Get the total revenue per day for all completed and closed orders
* Filter orders for COMPLETE and CLOSED
*/
case class Orders(
order_id: Int,
order_date: String,
order_customer_id: Int,
order_status: String)
case class OrderItems(
order_item_id: Int,
order_item_order_id: Int,
order_item_product_id: Int,
order_item_quantity: Int,
order_item_subtotal: Float,
order_item_product_price: Float)
object DailyRevenue {
def main(args: Array[String]): Unit = {
val props = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Daily Revenue").
setMaster(props.getConfig(args(0)).getString("executionMode"))
val sc = new SparkContext(conf)
val inputPath = args(1)
val outputPath = args(2)
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
val op = new Path(outputPath)
if(!fs.exists(new Path(inputPath)))
println("Invalid input path")
if(fs.exists(op))
fs.delete(op, true)
val orders = sc.textFile(inputPath + "/orders").
map(rec => {
val r = rec.split(",")
Orders(r(0).toInt, r(1), r(2).toInt, r(3))
}).
filter(rec => rec.order_status == "COMPLETE" || rec.order_status == "CLOSED").
map(rec => (rec.order_id, rec.order_date))
val orderItems = sc.textFile(inputPath + "/order_items").
map(rec => {
val r = rec.split(",")
OrderItems(r(0).toInt, r(1).toInt, r(2).toInt, r(3).toInt, r(4).toFloat, r(5).toFloat)
}).
map(rec => (rec.order_item_order_id, rec.order_item_subtotal))
/*
val orderItems = sc.textFile("/Users/itversity/Research/data/retail_db/order_items").
map(rec => {
(rec.split(",")(1).toInt, rec.split(",")(4).toFloat)
}).
*/
/*
val dailyRevenueGBK = orders.join(orderItems).
map(rec => rec._2).
groupByKey().
map(rec => (rec._1, rec._2.sum)).
sortByKey()
*/
/*
// aggregateByKey example
val dailyRevenueABK = orders.join(orderIte
map(rec => rec._2).
aggregateByKey((0.0, 0))(
(seqAgg, value) => {
(seqAgg._1 + value, seqAgg._2 + 1)
},
(combAgg, combValue) => {
(combAgg._1 + combValue._1, combAgg._2 + combValue._2)
})
*/
orders.join(orderItems).
map(rec => rec._2).
reduceByKey(_ + _).
sortByKey().
map(rec => rec.productIterator.mkString("\t")).
saveAsTextFile(outputPath)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment