Created
March 2, 2017 18:48
-
-
Save ottomata/0c0d9b00bd0dc62514518a89166aca39 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.wikimedia.analytics.refinery.job | |
| import com.github.nscala_time.time.Imports.{LocalDate, Period} | |
| import com.twitter.algebird.{QTree, QTreeSemigroup} | |
| import org.apache.hadoop.fs.Path | |
| import org.apache.spark.rdd.RDD | |
| import org.apache.spark.sql.hive.HiveContext | |
| import org.apache.spark.sql.{DataFrame, SQLContext} | |
| import org.apache.spark.{SparkConf, SparkContext} | |
| import scopt.OptionParser | |
| import scala.collection.immutable.HashMap | |
| /** | |
| * Usage with spark-submit | |
| * | |
| * spark-submit \ | |
| * --class org.wikimedia.analytics.refinery.job.OttoHiveContext | |
| * --num-executors=8 --executor-cores=1 --executor-memory=2g | |
| * /path/to/refinery-job.jar -o hdfs://analytics-hadoop/tmp/otto-hive-context1 | |
| * -s misc -y 2017 -m 2 -d 10 -h 12 | |
| * | |
| * The metrics are stored in output_directory/session_metrics.tsv, and also exposed through a | |
| * hive external table at wmf.mobile_apps_session_metrics | |
| * | |
| */ | |
| object OttoHiveContext { | |
| def countUriHost(table: String, webrequestSource: String, year: Int, month: Int, day: Int, hour: Int, sqlContext: SQLContext): DataFrame = { | |
| sqlContext.sql( | |
| s""" | |
| |SELECT | |
| | uri_host, | |
| | count(*) as cnt | |
| |FROM ${table} | |
| |WHERE webrequest_source = '${webrequestSource}' | |
| | AND year = ${year} | |
| | AND month = ${month} | |
| | AND day = ${day} | |
| | AND hour = ${hour} | |
| |GROUP BY | |
| | uri_host | |
| |LIMIT 10 | |
| """.stripMargin) | |
| } | |
| /** | |
| * Config class for CLI argument parser using scopt | |
| */ | |
| case class Params(webrequestTable: String = "wmf.webrequest", | |
| outputDir: String = "/tmp/otto-hive-context", | |
| webrequestSource: String = "misc", year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0) | |
| /** | |
| * Define the command line options parser | |
| */ | |
| val argsParser = new OptionParser[Params]("Otto Hive Context") { | |
| opt[String]('w', "webrequest-table") optional() valueName ("<table>") action { (x, p) => | |
| p.copy(webrequestTable = x) | |
| } text ("Base table to webrequest data on hadoop. Defaults to wmf.webrequest") | |
| opt[String]('o', "output-dir") required() valueName ("<path>") action { (x, p) => | |
| p.copy(outputDir = if (x.endsWith("/")) x else x + "/") | |
| } text ("Path to output directory") | |
| opt[String]('s', "webrequest-source") optional() valueName ("<source>") action { (x, p) => | |
| p.copy(webrequestSource = x) | |
| } text ("source") | |
| opt[Int]('y', "year") required() action { (x, p) => | |
| p.copy(year = x) | |
| } text ("Year as an integer") | |
| opt[Int]('m', "month") required() action { (x, p) => | |
| p.copy(month = x) | |
| } validate { x => if (x > 0 & x <= 12) success else failure("Invalid month") | |
| } text ("Month as an integer") | |
| opt[Int]('d', "day") required() action { (x, p) => | |
| p.copy(day = x) | |
| } validate { x => if (x > 0 & x <= 31) success else failure("Invalid day") | |
| } text ("Day as an integer") | |
| opt[Int]('h', "hour") required() action { (x, p) => | |
| p.copy(hour = x) | |
| } validate { x => if (x > 0 & x <= 23) success else failure("Invalid hour") | |
| } text ("Hour as an integer") | |
| } | |
| def main(args: Array[String]) { | |
| argsParser.parse(args, Params()) match { | |
| case Some(params) => { | |
| // Initial setup - Spark, SQLContext | |
| val conf = new SparkConf().setAppName("OttoHiveContext") | |
| val sc = new SparkContext(conf) | |
| val sqlContext = new HiveContext(sc) | |
| sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") | |
| // Get sessions data for all users, calculate stats for different metrics, | |
| // and get the stats in a printable string format to output | |
| val data = countUriHost( | |
| params.webrequestTable, | |
| params.webrequestSource, | |
| params.year, params.month, params.day, params.hour, | |
| sqlContext | |
| ) | |
| data.write.mode("overwrite").format("json").save(params.outputDir + "/out.json") | |
| } | |
| case None => sys.exit(1) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment