-
-
Save joyoyoyoyoyo/234b06a8cdbfc31a36dbfe544525ef96 to your computer and use it in GitHub Desktop.
Logging listener for Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.PrintWriter | |
| import java.util.Properties | |
| import org.apache.spark.SparkContext | |
| import org.apache.spark.scheduler._ | |
| import org.apache.spark.storage.RDDInfo | |
| /** | |
| * Created by petrovg on 22/04/2017. | |
| */ | |
| class StatsLogger(sc: SparkContext, out: PrintWriter, sha1: String, logTasks: Boolean = false) extends SparkListener { | |
| val cacheMemory = sc.getExecutorMemoryStatus.values.map(_._1).sum/(1024*1024*1024) | |
| val numWorkers = sc.getExecutorStorageStatus.map(_.blockManagerId.executorId).filter(_ != "driver").length | |
| val executorMemory = sc.getConf.get("spark.executor.memory") | |
| val executorCores = sc.getConf.get("spark.executor.cores") | |
| def printEvent(details: String *) = { | |
| val time = System.currentTimeMillis() | |
| val timeD = java.time.LocalDateTime.now | |
| val eventText = s"$time,$timeD,$sha1,$cacheMemory,$numWorkers,$executorMemory,$executorCores,${details.mkString(",")}" | |
| out.println(eventText) | |
| } | |
| def rddDescription(r: RDDInfo): String = { | |
| s"RDDId=${r.id},RDDCallSite=${r.callSite},Parts=${r.numPartitions},Cached=${r.numCachedPartitions},Disk=${r.diskSize},Mem=${r.memSize},Storage=${r.storageLevel},Name=${r.name.replace("\n", "//").take(50)}" | |
| } | |
| def stageDescription(s: StageInfo): String = { | |
| val details = s.details.split("\n").filter(_.contains("dash")).mkString("<-") | |
| s"StageName=${s.name},StageId=${s.name},Details=$details,RDDs=${s.rddInfos.map(rddDescription).mkString(",")},TaskMetrics=${s.taskMetrics}" | |
| } | |
| override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = | |
| printEvent("STAGE_COMPLETED", stageDescription(stageCompleted.stageInfo), "StageTime=" + stageCompleted.stageInfo.submissionTime.map(System.currentTimeMillis() - _)) | |
| override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = | |
| printEvent("STAGE_SUBMITTED", stageDescription(stageSubmitted.stageInfo)) | |
| def taskInfoDetails(eventName: String, t: TaskInfo, stageId: Int): Seq[String] = | |
| Seq(eventName, t.host, t.executorId, "StageId=" + stageId, "TaskId=" + t.taskId) | |
| override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = | |
| if (logTasks) printEvent(taskInfoDetails("TASK_START", taskStart.taskInfo, taskStart.stageId): _*) | |
| override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = | |
| if (logTasks) printEvent(taskInfoDetails("TASK_GETTING_RESULT", taskGettingResult.taskInfo, 0): _*) | |
| override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = | |
| if (logTasks) printEvent(taskInfoDetails("TASK_END", taskEnd.taskInfo, taskEnd.stageId): _*) | |
| type JobId = Int | |
| case class JobStartEvent(jobProps: Properties, time: Long) | |
| private var jobStartTimes = Map.empty[JobId, JobStartEvent] | |
| override def onJobStart(jobStart: SparkListenerJobStart): Unit = { | |
| jobStartTimes = jobStartTimes + (jobStart.jobId -> JobStartEvent(jobStart.properties, jobStart.time)) | |
| printEvent("JOB_START", "JobProps=" + jobStart.properties, "JobID=" + jobStart.jobId, "Stages=" + jobStart.stageIds.mkString(",")) | |
| } | |
| override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { | |
| val jobStart = jobStartTimes.get(jobEnd.jobId) | |
| val jobTime = jobStart.map(jobEnd.time - _.time).getOrElse(- jobEnd.time) | |
| printEvent("JOB_END", "Time=" + jobEnd.time, "JobId=" + jobEnd.jobId, "Result=" + jobEnd.jobResult, "JobProps=" + jobStart.map(_.jobProps), "JobTime=" + jobTime) | |
| } | |
| override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = | |
| printEvent("UNPERSIST_RDD", "RDDID=" + unpersistRDD.rddId) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment