Created
August 3, 2016 23:42
-
-
Save peteroid/09a68972d834611cd8d919328b0c3fc4 to your computer and use it in GitHub Desktop.
A simple scala object for benchmarking Spark app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.text.SimpleDateFormat | |
import java.util.Date | |
import scala.collection._ | |
import sys.process._ | |
import java.io._ | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.scheduler._ | |
import org.apache.spark.ui.jobs._ | |
import org.apache.spark.ui._ | |
object SparkBench { | |
var stagesOutput:List[List[String]] = List() | |
var isCollectingStages = true | |
var recordingScripts = Array("") | |
def main(args: Array[String]) { | |
var test:Benchmarkable = SimpleTest | |
benchmark(test, args) | |
} | |
def benchmark(b:Benchmarkable, args: Array[String]) { | |
println("[" + getCurrentFormattedTime + "] onRun()") | |
val scOption = b.onRun() | |
if (scOption.isEmpty) { | |
println("No SparkContext returned. No stages data could be collected") | |
isCollectingStages = false | |
} else { | |
val sc = scOption.get | |
sc.addSparkListener(new JobProgressListener(sc.getConf) { | |
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { | |
var s = stageCompleted.stageInfo | |
var stageDataOption = this.stageIdToData.get((s.stageId, s.attemptId)) | |
if (stageDataOption.isEmpty) | |
return | |
// from spark/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala | |
val stageData = stageDataOption.get | |
val description = stageData.description | |
/*val formattedSubmissionTime = s.submissionTime match { | |
case Some(t) => UIUtils.formatDate(new Date(t)) | |
case None => "Unknown" | |
}*/ | |
val currentTime = System.currentTimeMillis() | |
val finishTime = s.completionTime.getOrElse(currentTime) | |
// The submission time for a stage is misleading because it counts the time | |
// the stage waits to be launched. (SPARK-10930) | |
val taskLaunchTimes = stageData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0) | |
val duration: Option[Long] = | |
if (taskLaunchTimes.nonEmpty) { | |
val startTime = taskLaunchTimes.min | |
if (finishTime > startTime) { | |
Some(finishTime - startTime) | |
} else { | |
Some(currentTime - startTime) | |
} | |
} else { | |
None | |
} | |
//val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") | |
val inputRead = stageData.inputBytes | |
//val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else "" | |
val outputWrite = stageData.outputBytes | |
//val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else "" | |
val shuffleRead = stageData.shuffleReadTotalBytes | |
//val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else "" | |
val shuffleWrite = stageData.shuffleWriteBytes | |
//val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" | |
val submitTime = s.submissionTime.get | |
// val finishTime = submitTime + String.valueOf(duration.getOrElse(0)) | |
// stagesOutput = stagesOutput ::: List(f"${s.stageId}\t$description\t${formatTime(submitTime)}\t${formatTime(finishTime)}\t${duration.getOrElse(0)}%-10s\t$inputRead%-10s\t$outputWrite%-10s\t$shuffleRead%-12s\t$shuffleWrite%-12s") | |
stagesOutput = stagesOutput ::: List(List(s.stageId.toString) ::: List(description.toString) ::: List(formatTime(submitTime)) ::: List(formatTime(finishTime)) ::: List(duration.getOrElse(0).toString) ::: List(inputRead.toString) ::: List(outputWrite.toString) ::: List(shuffleRead.toString) ::: List(shuffleWrite.toString)) | |
// stagesOutput = stagesOutput ::: List( List(s.stageId.toString) ::: List(description.toString)) | |
} | |
}) | |
} | |
println("[" + getCurrentFormattedTime + "] run()") | |
b.run(args) | |
println("[" + getCurrentFormattedTime + "] processing data") | |
val file = new File("/your/output/file") | |
val bw = new BufferedWriter(new FileWriter(file)) | |
if (isCollectingStages) { | |
println("stages output") | |
// println("ID \tDesc \tsubmitTime \tfinishTime \tduration \tinput \toutput \tshuffleRead \tshuffleWrite") | |
println("ID Desc submitTime finishTime duration input output shuffleRead shuffleWrite") | |
stagesOutput.foreach(row => { | |
row.foreach(item => { | |
print(item + " ") | |
bw.write(item + " ") | |
}) | |
println() | |
bw.write('\n') | |
}) | |
bw.close() | |
} | |
println("[" + getCurrentFormattedTime + "] postRun()") | |
b.postRun | |
} | |
def formatTime (timestamp: Long) : String = { | |
new SimpleDateFormat("hh:mm:ss a").format(new Date(timestamp)) | |
} | |
def getCurrentFormattedTime () : String = { | |
new SimpleDateFormat("hh:mm:ss a").format(new Date(System.currentTimeMillis)) | |
} | |
} | |
trait Benchmarkable { | |
def onRun(): (Option[SparkContext], Option[Array[Process]]) | |
def run(args: Array[String]): Unit | |
def postRun(): Unit | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment