Created
May 4, 2016 22:07
-
-
Save githoov/ef13362369905df60a3ca2971b008035 to your computer and use it in GitHub Desktop.
Spark Streaming job for parsing logs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This is a Spark Streaming job | |
that takes a raw stream of logs | |
from Flume, parses the log lines | |
capturing them in an RDD, then | |
adds a schema and ultimately writes | |
to HDFS. | |
Written by Scott Hoover, 2016. | |
Send questions to [email protected] | |
or githoov on GitHub. | |
*/ | |
// fire up spark context | |
val sc = new SparkConf().setAppName("Log Data Webinar") | |
// fire up sql context for dataframe operations | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
// fire up streaming context | |
val ssc = new StreamingContext(sc, Seconds(5)) | |
// fire up flume stream | |
val lines = FlumeUtils.createStream(ssc, "localhost", 9988) | |
// for batch testing | |
// val lines = sc.textFile("hdfs://ip-10-55-0-147.ec2.internal:8020/logdata/looker.log.20160420.gz") | |
// regular expressing to match timestamp, message information, and the message body | |
val lineMatch = "([0-9\\-]+\\s+[0-9\\:\\.]+\\s+[\\+0-9]+)\\s+(?:\\[)(\\S+)(?:\\])(.*)".r | |
// case class for parsed log-line structure | |
case class LogLine ( | |
created_at: String, | |
message_type: Option[String], | |
thread_id: Option[String], | |
message_source: Option[String], | |
message: String | |
) | |
// function to extract message type from message information | |
def messageType(message_data: String): Option[String] = { | |
return "[A-Z]+".r.findFirstIn(message_data) | |
} | |
// function to extract thread identifier from message information | |
def threadId(message_data: String): Option[String] = { | |
return "[a-z0-9]+".r.findFirstIn(message_data) | |
} | |
// function to extract message source from message information | |
def messageSource(message_data: String): Option[String] = { | |
if (message_data.split("\\|").length >= 3) { | |
Option(message_data.split("\\|")(2)) | |
} else { | |
None | |
} | |
} | |
// function to match lineMatch regular expression and output LogLine class | |
def extractValues(line: String): Option[LogLine] = { | |
line match { | |
case lineMatch(created_at, message_data, message, _*) | |
=> return Option(LogLine(created_at, messageType(message_data), threadId(message_data), messageSource(message_data), message)) | |
case _ | |
=> None | |
} | |
} | |
// parse raw lines | |
val parsedLines = lines.flatMap(extractValues) | |
// convert RDD to dataframe (with schema) | |
val linesDF = sqlContext.createDataFrame(parsedLines) | |
// write structured data to parquet | |
linesDF.write.parquet("hdfs://ip-10-55-0-147.ec2.internal:8020/home/hadoop/logdata/parsed_lines.parquet") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment