Skip to content

Instantly share code, notes, and snippets.

@valtoni
Created June 9, 2020 11:34
Show Gist options
  • Save valtoni/cf84cce45a048c4d69668105fecba9c9 to your computer and use it in GitHub Desktop.
Save valtoni/cf84cce45a048c4d69668105fecba9c9 to your computer and use it in GitHub Desktop.
Scala test
// val e = (1 to 10 toList).mkString(",")
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
// 2019-12-10 16:06:40,135 cask datab 0012 REST TRACE [com.kortlet.IEnferinend] (ajp-/192.168.0.1:8009-77) Invoke End
val regexRestBase = """(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})[\s\t]+(\w+)[\s\t]+(\w+)[\s\t]+(\d+)[\s\t]+(\w+)[\s\t]+(\w+)[\s\t](\[[\w\.]+\])[\s\t]+(\(.*\))[\s\t]+(.*)""".r
val regexWithDatabase = """(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})[\s\t]+(\w+)>(\w+)[\s\t]+(\w+)[\s\t]+(\d+)[\s\t]+(\w+)[\s\t]+(\w+)[\s\t]+(\[[\w\.]+\])[\s\t]+(\(.*\))[\s\t]+(.*)""".r
val regexNormal = """(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d{3})[\s\t]+(\w+)[\s\t]+(\[[\w\.]+\])[\s\t]+(\(.*\))[\s\t]+(.*)""".r
val logschema = new StructType().add(StructField("date", StringType, true)).add(StructField("user", StringType, true)).add(StructField("user", StringType, true)).add(StructField("database", StringType, true)).add(StructField("countryCode", StringType, true)).add(StructField("protocol", StringType, true)).add(StructField("level", StringType, true)).add(StructField("clazz", StringType, true)).add(StructField("thread", StringType, true)).add(StructField("message", StringType, true))
// val logsRDD = spark.sparkContext.textFile("/logs/mast.log.30")
// for (i <- 1 to 30) logsRDD = logsRDD.union(spark.sparkContext.textFile(f"/logs/mast.log.${i}"))
val logsRDD = sc.union(for (i <- 1 to 10) yield spark.sparkContext.textFile(f"/logs/mast?.log.${i}"))
val logRowRDD = logsRDD map {
l => {
l match {
case regexNormal(date, level, clazz, thread, message) => Row(date, "", "", "", "", "", level, clazz, thread, message)
case regexWithDatabase(date, user, user, database, countryCode, protocol, level, clazz, thread, message) => Row(date, user, user, database, countryCode, protocol, level, clazz, thread, message)
case regexRestBase(date, user, database, countryCode, protocol, level, clazz, thread, message) => Row(date, user, "", database, countryCode, protocol, level, clazz, thread, message)
case _ => Row("", "", "", "", "", "", "", "", "", "")
}
}
}
val logDF = spark.createDataFrame(logRowRDD, logschema)
logDF.createOrReplaceTempView("logs")
spark.sql("select substring(date,1,10) as date, user, count(*) as occurences from logs where date != '' group by substring(date,1,10),user order by count(*) desc").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment