Skip to content

Instantly share code, notes, and snippets.

@2shou
Last active October 20, 2015 13:35
Show Gist options
  • Save 2shou/95025fd5ff50d94fd300 to your computer and use it in GitHub Desktop.
Save 2shou/95025fd5ff50d94fd300 to your computer and use it in GitHub Desktop.
log-extract program with Spark
import org.apache.spark.{SparkConf, SparkContext}
object LogExtract {
val keys = Array[String]("dn", "stm", "ev_ac", "v_title", "v_uri", "pg_url")
val notBlankKeys = Array[String]("dn", "stm", "ev_ac", "pg_url")
val filterRegex = ".*(ac_pl`|ac_dl`).*"
val useCompress = false
def process(line: String): String = {
val fields = line.split("`")
val outputArr = Array.fill(keys.length)("")
try {
for (field <- fields) {
val pairs = field.split("=", 2)
val key = pairs(0)
val value = pairs(1)
if (keys.contains(key)) {
outputArr(keys.indexOf(key)) = value
}
}
// 如果必需的字段没有包含,整条日志应该被过滤
for (notBlankKey <- notBlankKeys) {
if (outputArr(keys.indexOf(notBlankKey)).length == 0) {
return null
}
}
} catch {
case ex: Exception => return null
}
outputArr.mkString("\t")
}
def main(args: Array[String]) {
if (useCompress) {
System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress", "true")
System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec")
System.setProperty("spark.mapreduce.output.fileoutputformat.compress.type", "BLOCK")
}
val conf = new SparkConf().setAppName("SparkLogProcess")
val sc = new SparkContext(conf)
sc.textFile(args(0)).filter(line => line.matches(filterRegex)).map(
line => process(line)
).filter(line => line != null).saveAsTextFile(args(1))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment