Skip to content

Instantly share code, notes, and snippets.

@joyoyoyoyoyo
Forked from petrovg/SparkStatsLogger.scala
Created September 17, 2019 11:30
Show Gist options
  • Select an option

  • Save joyoyoyoyoyo/234b06a8cdbfc31a36dbfe544525ef96 to your computer and use it in GitHub Desktop.

Select an option

Save joyoyoyoyoyo/234b06a8cdbfc31a36dbfe544525ef96 to your computer and use it in GitHub Desktop.
Logging listener for Spark
import java.io.PrintWriter
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.scheduler._
import org.apache.spark.storage.RDDInfo
/**
* Created by petrovg on 22/04/2017.
*/
class StatsLogger(sc: SparkContext, out: PrintWriter, sha1: String, logTasks: Boolean = false) extends SparkListener {
val cacheMemory = sc.getExecutorMemoryStatus.values.map(_._1).sum/(1024*1024*1024)
val numWorkers = sc.getExecutorStorageStatus.map(_.blockManagerId.executorId).filter(_ != "driver").length
val executorMemory = sc.getConf.get("spark.executor.memory")
val executorCores = sc.getConf.get("spark.executor.cores")
def printEvent(details: String *) = {
val time = System.currentTimeMillis()
val timeD = java.time.LocalDateTime.now
val eventText = s"$time,$timeD,$sha1,$cacheMemory,$numWorkers,$executorMemory,$executorCores,${details.mkString(",")}"
out.println(eventText)
}
def rddDescription(r: RDDInfo): String = {
s"RDDId=${r.id},RDDCallSite=${r.callSite},Parts=${r.numPartitions},Cached=${r.numCachedPartitions},Disk=${r.diskSize},Mem=${r.memSize},Storage=${r.storageLevel},Name=${r.name.replace("\n", "//").take(50)}"
}
def stageDescription(s: StageInfo): String = {
val details = s.details.split("\n").filter(_.contains("dash")).mkString("<-")
s"StageName=${s.name},StageId=${s.name},Details=$details,RDDs=${s.rddInfos.map(rddDescription).mkString(",")},TaskMetrics=${s.taskMetrics}"
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit =
printEvent("STAGE_COMPLETED", stageDescription(stageCompleted.stageInfo), "StageTime=" + stageCompleted.stageInfo.submissionTime.map(System.currentTimeMillis() - _))
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit =
printEvent("STAGE_SUBMITTED", stageDescription(stageSubmitted.stageInfo))
def taskInfoDetails(eventName: String, t: TaskInfo, stageId: Int): Seq[String] =
Seq(eventName, t.host, t.executorId, "StageId=" + stageId, "TaskId=" + t.taskId)
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit =
if (logTasks) printEvent(taskInfoDetails("TASK_START", taskStart.taskInfo, taskStart.stageId): _*)
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit =
if (logTasks) printEvent(taskInfoDetails("TASK_GETTING_RESULT", taskGettingResult.taskInfo, 0): _*)
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
if (logTasks) printEvent(taskInfoDetails("TASK_END", taskEnd.taskInfo, taskEnd.stageId): _*)
type JobId = Int
case class JobStartEvent(jobProps: Properties, time: Long)
private var jobStartTimes = Map.empty[JobId, JobStartEvent]
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobStartTimes = jobStartTimes + (jobStart.jobId -> JobStartEvent(jobStart.properties, jobStart.time))
printEvent("JOB_START", "JobProps=" + jobStart.properties, "JobID=" + jobStart.jobId, "Stages=" + jobStart.stageIds.mkString(","))
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val jobStart = jobStartTimes.get(jobEnd.jobId)
val jobTime = jobStart.map(jobEnd.time - _.time).getOrElse(- jobEnd.time)
printEvent("JOB_END", "Time=" + jobEnd.time, "JobId=" + jobEnd.jobId, "Result=" + jobEnd.jobResult, "JobProps=" + jobStart.map(_.jobProps), "JobTime=" + jobTime)
}
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit =
printEvent("UNPERSIST_RDD", "RDDID=" + unpersistRDD.rddId)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment