Skip to content

Instantly share code, notes, and snippets.

@peteroid
Created August 3, 2016 23:42
Show Gist options
  • Save peteroid/09a68972d834611cd8d919328b0c3fc4 to your computer and use it in GitHub Desktop.
Save peteroid/09a68972d834611cd8d919328b0c3fc4 to your computer and use it in GitHub Desktop.
A simple scala object for benchmarking Spark app
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection._
import sys.process._
import java.io._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.scheduler._
import org.apache.spark.ui.jobs._
import org.apache.spark.ui._
object SparkBench {
var stagesOutput:List[List[String]] = List()
var isCollectingStages = true
var recordingScripts = Array("")
def main(args: Array[String]) {
var test:Benchmarkable = SimpleTest
benchmark(test, args)
}
def benchmark(b:Benchmarkable, args: Array[String]) {
println("[" + getCurrentFormattedTime + "] onRun()")
val scOption = b.onRun()
if (scOption.isEmpty) {
println("No SparkContext returned. No stages data could be collected")
isCollectingStages = false
} else {
val sc = scOption.get
sc.addSparkListener(new JobProgressListener(sc.getConf) {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
var s = stageCompleted.stageInfo
var stageDataOption = this.stageIdToData.get((s.stageId, s.attemptId))
if (stageDataOption.isEmpty)
return
// from spark/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
val stageData = stageDataOption.get
val description = stageData.description
/*val formattedSubmissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}*/
val currentTime = System.currentTimeMillis()
val finishTime = s.completionTime.getOrElse(currentTime)
// The submission time for a stage is misleading because it counts the time
// the stage waits to be launched. (SPARK-10930)
val taskLaunchTimes = stageData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0)
val duration: Option[Long] =
if (taskLaunchTimes.nonEmpty) {
val startTime = taskLaunchTimes.min
if (finishTime > startTime) {
Some(finishTime - startTime)
} else {
Some(currentTime - startTime)
}
} else {
None
}
//val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val inputRead = stageData.inputBytes
//val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
val outputWrite = stageData.outputBytes
//val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
val shuffleRead = stageData.shuffleReadTotalBytes
//val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
val shuffleWrite = stageData.shuffleWriteBytes
//val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
val submitTime = s.submissionTime.get
// val finishTime = submitTime + String.valueOf(duration.getOrElse(0))
// stagesOutput = stagesOutput ::: List(f"${s.stageId}\t$description\t${formatTime(submitTime)}\t${formatTime(finishTime)}\t${duration.getOrElse(0)}%-10s\t$inputRead%-10s\t$outputWrite%-10s\t$shuffleRead%-12s\t$shuffleWrite%-12s")
stagesOutput = stagesOutput ::: List(List(s.stageId.toString) ::: List(description.toString) ::: List(formatTime(submitTime)) ::: List(formatTime(finishTime)) ::: List(duration.getOrElse(0).toString) ::: List(inputRead.toString) ::: List(outputWrite.toString) ::: List(shuffleRead.toString) ::: List(shuffleWrite.toString))
// stagesOutput = stagesOutput ::: List( List(s.stageId.toString) ::: List(description.toString))
}
})
}
println("[" + getCurrentFormattedTime + "] run()")
b.run(args)
println("[" + getCurrentFormattedTime + "] processing data")
val file = new File("/your/output/file")
val bw = new BufferedWriter(new FileWriter(file))
if (isCollectingStages) {
println("stages output")
// println("ID \tDesc \tsubmitTime \tfinishTime \tduration \tinput \toutput \tshuffleRead \tshuffleWrite")
println("ID Desc submitTime finishTime duration input output shuffleRead shuffleWrite")
stagesOutput.foreach(row => {
row.foreach(item => {
print(item + " ")
bw.write(item + " ")
})
println()
bw.write('\n')
})
bw.close()
}
println("[" + getCurrentFormattedTime + "] postRun()")
b.postRun
}
def formatTime (timestamp: Long) : String = {
new SimpleDateFormat("hh:mm:ss a").format(new Date(timestamp))
}
def getCurrentFormattedTime () : String = {
new SimpleDateFormat("hh:mm:ss a").format(new Date(System.currentTimeMillis))
}
}
trait Benchmarkable {
def onRun(): (Option[SparkContext], Option[Array[Process]])
def run(args: Array[String]): Unit
def postRun(): Unit
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment