Created
August 3, 2015 23:43
-
-
Save peteoleary/10f9c10187bef151d82e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.chefsfeed.spark | |
// Spark | |
import org.apache.spark.{ | |
SparkContext, | |
SparkConf | |
} | |
import SparkContext._ | |
import scala.collection.mutable.ArrayBuffer | |
import scala.util.matching.Regex | |
object WordCount { | |
private val AppName = "WordCountJob" | |
// Run the word count. Agnostic to Spark's current mode of operation: can be run from tests as well as from main | |
def execute(master: Option[String], args: List[String], jars: Seq[String] = Nil) { | |
val sc = { | |
val conf = new SparkConf().setAppName(AppName).setJars(jars) | |
for (m <- master) { | |
conf.setMaster(m) | |
} | |
new SparkContext(conf) | |
} | |
val logFile = sc.textFile(args(0)) | |
val split_lines = logFile.map(line => handleOneLine(line)).filter(line => line.isDefined).map(line => line.get) | |
split_lines.saveAsTextFile(args(1)) | |
} | |
// BELOW HERE | |
val line_regex = """(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})\.(\d{6})\+(\d{2})\:(\d{2})\s(.+?)\[(.+?)\]\:(.+)""".r | |
val message_regex = """(.+?)\=(.+)""".r | |
def contentInfo(path_parts: Array[String]) = { | |
val info = scala.collection.mutable.Map("content_type" -> path_parts(0)) | |
if (path_parts.length > 1) { | |
info += ("content_id" -> path_parts(1)) | |
} | |
info | |
} | |
def safeGet(m: scala.collection.mutable.Map[String, String], k: String) = m.getOrElse(k, "") | |
val int_regex = """(\d+)(.*)""".r | |
def safeGetInt(m: scala.collection.mutable.Map[String, String], k: String) = { | |
if (m.contains(k)) { | |
val int_regex(int_part, other_part) = m(k) | |
int_part | |
} else "" | |
} | |
def handleOneLine(line: String): Option[String] = { | |
try { | |
val line_regex(year, month, day, hour, min, sec, milli_sec, tmz_h, tmz_m, system, component, message) = line | |
var mp = scala.collection.mutable.Map.empty ++ message.trim.split(" ").map(m => { | |
val message_regex(k, v) = m; (k, v) | |
}) | |
val uri_parts = mp("path").stripPrefix("\"").stripSuffix("\"").split("\\?") | |
val path_parts = uri_parts(0).stripPrefix("/").stripSuffix("/").split("/") | |
if (uri_parts.length > 1) { | |
mp ++= uri_parts(1).split("&").map(m => { | |
val message_regex(k, v) = m; (k -> v) | |
}) | |
} | |
path_parts(0) match { | |
case "venues" => mp = mp ++ contentInfo(path_parts) | |
case "dishes" => mp = mp ++ contentInfo(path_parts) | |
case "chefs" => mp = mp ++ contentInfo(path_parts) | |
case "scene" => { | |
mp += ("content_type" -> "scene") | |
mp += ("content_id" -> safeGet(mp, "page")) | |
} | |
case _ => {} | |
} | |
Some( s"""${year}-${month}-${day}:${hour}:${min}:${sec}\t${safeGet(mp, "content_type")}\t${safeGet(mp, "content_id")}\t${safeGet(mp, "latitude")}\t${safeGet(mp, "longitude")}\t${safeGet(mp, "method")}\t${safeGet(mp, "status")}\t${safeGetInt(mp, "service")}""") | |
} catch { | |
case e: Exception => None | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment