Skip to content

Instantly share code, notes, and snippets.

@peteoleary
Created August 3, 2015 23:43
Show Gist options
  • Save peteoleary/10f9c10187bef151d82e to your computer and use it in GitHub Desktop.
Save peteoleary/10f9c10187bef151d82e to your computer and use it in GitHub Desktop.
package com.chefsfeed.spark
// Spark
import org.apache.spark.{
SparkContext,
SparkConf
}
import SparkContext._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
object WordCount {
private val AppName = "WordCountJob"
// Run the word count. Agnostic to Spark's current mode of operation: can be run from tests as well as from main
def execute(master: Option[String], args: List[String], jars: Seq[String] = Nil) {
val sc = {
val conf = new SparkConf().setAppName(AppName).setJars(jars)
for (m <- master) {
conf.setMaster(m)
}
new SparkContext(conf)
}
val logFile = sc.textFile(args(0))
val split_lines = logFile.map(line => handleOneLine(line)).filter(line => line.isDefined).map(line => line.get)
split_lines.saveAsTextFile(args(1))
}
// BELOW HERE
val line_regex = """(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})\.(\d{6})\+(\d{2})\:(\d{2})\s(.+?)\[(.+?)\]\:(.+)""".r
val message_regex = """(.+?)\=(.+)""".r
def contentInfo(path_parts: Array[String]) = {
val info = scala.collection.mutable.Map("content_type" -> path_parts(0))
if (path_parts.length > 1) {
info += ("content_id" -> path_parts(1))
}
info
}
def safeGet(m: scala.collection.mutable.Map[String, String], k: String) = m.getOrElse(k, "")
val int_regex = """(\d+)(.*)""".r
def safeGetInt(m: scala.collection.mutable.Map[String, String], k: String) = {
if (m.contains(k)) {
val int_regex(int_part, other_part) = m(k)
int_part
} else ""
}
def handleOneLine(line: String): Option[String] = {
try {
val line_regex(year, month, day, hour, min, sec, milli_sec, tmz_h, tmz_m, system, component, message) = line
var mp = scala.collection.mutable.Map.empty ++ message.trim.split(" ").map(m => {
val message_regex(k, v) = m; (k, v)
})
val uri_parts = mp("path").stripPrefix("\"").stripSuffix("\"").split("\\?")
val path_parts = uri_parts(0).stripPrefix("/").stripSuffix("/").split("/")
if (uri_parts.length > 1) {
mp ++= uri_parts(1).split("&").map(m => {
val message_regex(k, v) = m; (k -> v)
})
}
path_parts(0) match {
case "venues" => mp = mp ++ contentInfo(path_parts)
case "dishes" => mp = mp ++ contentInfo(path_parts)
case "chefs" => mp = mp ++ contentInfo(path_parts)
case "scene" => {
mp += ("content_type" -> "scene")
mp += ("content_id" -> safeGet(mp, "page"))
}
case _ => {}
}
Some( s"""${year}-${month}-${day}:${hour}:${min}:${sec}\t${safeGet(mp, "content_type")}\t${safeGet(mp, "content_id")}\t${safeGet(mp, "latitude")}\t${safeGet(mp, "longitude")}\t${safeGet(mp, "method")}\t${safeGet(mp, "status")}\t${safeGetInt(mp, "service")}""")
} catch {
case e: Exception => None
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment