Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created March 2, 2017 18:48
Show Gist options
  • Select an option

  • Save ottomata/0c0d9b00bd0dc62514518a89166aca39 to your computer and use it in GitHub Desktop.

Select an option

Save ottomata/0c0d9b00bd0dc62514518a89166aca39 to your computer and use it in GitHub Desktop.
package org.wikimedia.analytics.refinery.job
import com.github.nscala_time.time.Imports.{LocalDate, Period}
import com.twitter.algebird.{QTree, QTreeSemigroup}
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser
import scala.collection.immutable.HashMap
/**
* Usage with spark-submit
*
* spark-submit \
* --class org.wikimedia.analytics.refinery.job.OttoHiveContext
* --num-executors=8 --executor-cores=1 --executor-memory=2g
* /path/to/refinery-job.jar -o hdfs://analytics-hadoop/tmp/otto-hive-context1
* -s misc -y 2017 -m 2 -d 10 -h 12
*
* The metrics are stored in output_directory/session_metrics.tsv, and also exposed through a
* hive external table at wmf.mobile_apps_session_metrics
*
*/
object OttoHiveContext {
def countUriHost(table: String, webrequestSource: String, year: Int, month: Int, day: Int, hour: Int, sqlContext: SQLContext): DataFrame = {
sqlContext.sql(
s"""
|SELECT
| uri_host,
| count(*) as cnt
|FROM ${table}
|WHERE webrequest_source = '${webrequestSource}'
| AND year = ${year}
| AND month = ${month}
| AND day = ${day}
| AND hour = ${hour}
|GROUP BY
| uri_host
|LIMIT 10
""".stripMargin)
}
/**
* Config class for CLI argument parser using scopt
*/
case class Params(webrequestTable: String = "wmf.webrequest",
outputDir: String = "/tmp/otto-hive-context",
webrequestSource: String = "misc", year: Int = 0, month: Int = 0, day: Int = 0, hour: Int = 0)
/**
* Define the command line options parser
*/
val argsParser = new OptionParser[Params]("Otto Hive Context") {
opt[String]('w', "webrequest-table") optional() valueName ("<table>") action { (x, p) =>
p.copy(webrequestTable = x)
} text ("Base table to webrequest data on hadoop. Defaults to wmf.webrequest")
opt[String]('o', "output-dir") required() valueName ("<path>") action { (x, p) =>
p.copy(outputDir = if (x.endsWith("/")) x else x + "/")
} text ("Path to output directory")
opt[String]('s', "webrequest-source") optional() valueName ("<source>") action { (x, p) =>
p.copy(webrequestSource = x)
} text ("source")
opt[Int]('y', "year") required() action { (x, p) =>
p.copy(year = x)
} text ("Year as an integer")
opt[Int]('m', "month") required() action { (x, p) =>
p.copy(month = x)
} validate { x => if (x > 0 & x <= 12) success else failure("Invalid month")
} text ("Month as an integer")
opt[Int]('d', "day") required() action { (x, p) =>
p.copy(day = x)
} validate { x => if (x > 0 & x <= 31) success else failure("Invalid day")
} text ("Day as an integer")
opt[Int]('h', "hour") required() action { (x, p) =>
p.copy(hour = x)
} validate { x => if (x > 0 & x <= 23) success else failure("Invalid hour")
} text ("Hour as an integer")
}
def main(args: Array[String]) {
argsParser.parse(args, Params()) match {
case Some(params) => {
// Initial setup - Spark, SQLContext
val conf = new SparkConf().setAppName("OttoHiveContext")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
// Get sessions data for all users, calculate stats for different metrics,
// and get the stats in a printable string format to output
val data = countUriHost(
params.webrequestTable,
params.webrequestSource,
params.year, params.month, params.day, params.hour,
sqlContext
)
data.write.mode("overwrite").format("json").save(params.outputDir + "/out.json")
}
case None => sys.exit(1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment