Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dgadiraju/b0b644c262e960e07913feb594a875af to your computer and use it in GitHub Desktop.
Save dgadiraju/b0b644c262e960e07913feb594a875af to your computer and use it in GitHub Desktop.
package retail
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* Created by itversity on 27/03/17.
* build.sbt
name := "doc"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"
libraryDependencies += "com.typesafe" % "config" % "1.3.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2"
* spark-submit
spark-submit --class retail.DailyRevenuePerDayPerDepartmentHive \
--master yarn \
--conf spark.ui.port=25613 \
doc_2.10-1.0.jar /user/dgadiraju/DailyRevenuePerDayPerDepartmentHive prod
*/
object DailyRevenuePerDayPerDepartmentHive {
def main(args: Array[String]) {
val appConf = ConfigFactory.load()
val conf = new SparkConf().
setAppName("Revenue by department per day").
setMaster(appConf.getConfig(args(1)).getString("executionMode"))
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
val outputPath = args(0)
val fs = FileSystem.get(sc.hadoopConfiguration)
val outputPathExists = fs.exists(new Path(outputPath))
if(outputPathExists)
fs.delete(new Path(outputPath), true)
sqlContext.sql("use doc")
sqlContext.setConf("spark.sql.shuffle.partitions", "2")
sqlContext.sql("select o.order_date, d.department_name, sum(oi.order_item_subtotal) order_revenue " +
"from departments d join categories c on d.department_id = c.category_department_id " +
"join products p on c.category_id = p.product_category_id " +
"join order_items oi on p.product_id = oi.order_item_product_id " +
"join orders o on oi.order_item_order_id = o.order_id " +
"where o.order_status in ('COMPLETE', 'CLOSED') " +
"group by o.order_date, d.department_name " +
"order by o.order_date, d.department_name").
rdd.
map(rec => rec.mkString("\t")).
saveAsTextFile(outputPath)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment