Skip to content

Instantly share code, notes, and snippets.

@JoshRosen
Last active August 29, 2015 14:20
Show Gist options
  • Save JoshRosen/15ab99014fb21cbd1e65 to your computer and use it in GitHub Desktop.
Save JoshRosen/15ab99014fb21cbd1e65 to your computer and use it in GitHub Desktop.
scalaVersion := "2.10.4"
val SPARK_HOME = sys.env.getOrElse("SPARK_HOME", "/Users/joshrosen/Documents/spark")
lazy val sparkSql = ProjectRef(file(SPARK_HOME), "sql")
lazy val sparkCore = ProjectRef(file(SPARK_HOME), "core")
lazy val root = (project in file(".")).
settings(
aggregate in update := false,
fork in run := true,
libraryDependencies += "yourkit" % "yourkit-api" % "version" from "file:///Applications/YourKit_Java_Profiler_2013_build_13088.app/lib/yjp-controller-api-redist.jar",
javaOptions in run += "-agentpath:/Applications/YourKit_Java_Profiler_2013_build_13088.app/bin/mac/libyjpagent.jnilib=onexit=snapshot",
javaOptions in run ++= Seq(
"-Xmx2000M",
"-Xms2000M"
// "-XX:+UnlockDiagnosticVMOptions",
// //"-XX:+TraceClassLoading",
// //"-XX:+LogCompilation",
// //"-XX:+PrintAssembly",
// "-XX:+PrintInlining"
)
).dependsOn(sparkSql, sparkCore)
import java.io.{PrintWriter, FileOutputStream, File}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListener}
/**
* Extends [[ShuffledRDD]] to skip the shuffle fetching.
*/
class DummyShuffledRDD[K, V, C](
prev: RDD[(K, V)],
part: Partitioner)
extends ShuffledRDD[K, V, C](prev, part) {
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
Iterator.empty
}
override def getPartitions: Array[Partition] = {
// This is sufficient to trigger the parent shuffle stage to be computed without having the
// benchmarking harness waste time running many no-op reduce tasks.
super.getPartitions.take(1)
}
}
object ShuffleWriteBenchmark {
def runBenchmark(numKeys: Int, numPartitions: Int, sc: SparkContext): Long = {
val data = sc.parallelize(1 to numKeys, 1).map(x => (x, x))
val shuffled: RDD[(Int, Int)] = new DummyShuffledRDD(data, new HashPartitioner(numPartitions))
val startTime = System.currentTimeMillis()
shuffled.count()
val endTime = System.currentTimeMillis()
System.gc()
endTime - startTime
}
def main(args: Array[String]): Unit = {
val commonSettings = new SparkConf()
.set("spark.shuffle.spill", "true")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.io.compression.codec", "lzf")
.set("spark.shuffle.memoryFraction", "0.25")
.set("spark.shuffle.safetyFraction", "1")
//.set("spark.shuffle.compress", "false")
//.set("spark.shuffle.spill.batchSize", "1000")
val configurations = Map(
"tungsten-sort" -> commonSettings.clone
.set("spark.shuffle.manager", "tungsten-sort")
.set("spark.unsafe.offHeap", "false"),
"sort_serialize" -> commonSettings.clone
.set("spark.shuffle.manager", "sort")
.set("spark.shuffle.sort.serializeMapOutputs", "true")
// The 1.2 default sort shuffle just collapses under GC pressure, so it's disabled here:
// "sort_no_serialize" -> commonSettings.clone
// .set("spark.shuffle.manager", "sort")
// .set("spark.shuffle.sort.serializeMapOutputs", "false")
)
val NUM_WARMUPS = 3
val NUM_ITERATIONS = 5
val outputFileWriter =
new PrintWriter(new FileOutputStream(new File("ShuffleWriteBenchmark.txt"), true))
outputFileWriter.println("mode,numKeys,numPartitions,timeMs,shuffleWriteTimeMs,gcTimeMs,diskBytesSpilled,shuffleBytesWritten,recordsWritten")
for (
numKeys <- Seq(10, 20, 30, 40).map(_ * 1000000);
numPartitions <- Seq(1000);
(confName, conf) <- configurations
) {
val sc = new SparkContext(conf.setMaster("local").setAppName("test"))
@volatile var metrics: TaskMetrics = null
@volatile var taskTime = 0L
val metricsListener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
if (taskEnd.taskType == "ShuffleMapTask") {
metrics = taskEnd.taskMetrics
taskTime = taskEnd.taskInfo.duration
}
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
assert(stageSubmitted.stageInfo.numTasks == 1)
}
}
sc.addSparkListener(metricsListener)
try {
for (_ <- 1 to NUM_WARMUPS) {
runBenchmark(numKeys, numPartitions, sc)
}
// val controller = new com.yourkit.api.Controller
// controller.startCPUSampling(null)
for (_ <- 1 to NUM_ITERATIONS) {
val rawTime = runBenchmark(numKeys, numPartitions, sc)
val gcTime = metrics.jvmGCTime
val shuffleMetrics = metrics.shuffleWriteMetrics.get
outputFileWriter.println(
Seq(
confName,
numKeys,
numPartitions,
metrics.executorRunTime,
shuffleMetrics.shuffleWriteTime / 1000000.0,
gcTime,
metrics.diskBytesSpilled,
shuffleMetrics.shuffleBytesWritten,
shuffleMetrics.shuffleRecordsWritten
).map(_.toString).mkString(",")
)
outputFileWriter.flush()
}
// controller.stopCPUProfiling()
} finally {
sc.stop()
}
}
outputFileWriter.close()
}
}
mode,numKeys,numPartitions,timeMs,shuffleWriteTimeMs,gcTimeMs,diskBytesSpilled,shuffleBytesWritten,recordsWritten
tungsten-sort,10000000,1000,5171,90.227,857,0,66969588,10000000
tungsten-sort,10000000,1000,4531,72.326,646,0,66969588,10000000
tungsten-sort,10000000,1000,3722,85.819,125,0,66969588,10000000
tungsten-sort,10000000,1000,3623,70.659,95,0,66969588,10000000
tungsten-sort,10000000,1000,3991,99.786,255,0,66969588,10000000
sort_serialize,10000000,1000,5271,78.69,539,0,66969593,10000000
sort_serialize,10000000,1000,5213,80.115,224,0,66969593,10000000
sort_serialize,10000000,1000,4914,67.3,195,0,66969593,10000000
sort_serialize,10000000,1000,4887,68.207,164,0,66969593,10000000
sort_serialize,10000000,1000,5003,86.585,245,0,66969593,10000000
tungsten-sort,20000000,1000,9509,1261.319,175,113599278,135780836,20000000
tungsten-sort,20000000,1000,10506,1635.94,470,113599278,135780836,20000000
tungsten-sort,20000000,1000,11373,1713.395,842,113599278,135780836,20000000
tungsten-sort,20000000,1000,11807,1737.068,1049,113599278,135780836,20000000
tungsten-sort,20000000,1000,10408,1319.711,905,113599278,135780836,20000000
sort_serialize,20000000,1000,17544,145.444,614,113607985,135779484,20000000
sort_serialize,20000000,1000,17841,149.417,680,113607985,135779484,20000000
sort_serialize,20000000,1000,18260,148.065,1558,113607985,135779484,20000000
sort_serialize,20000000,1000,17725,142.514,955,113607985,135779484,20000000
sort_serialize,20000000,1000,17986,149.294,1134,113607985,135779484,20000000
tungsten-sort,30000000,1000,17609,2808.605,983,113599284,204591901,30000000
tungsten-sort,30000000,1000,19352,2770.783,2349,113599284,204591901,30000000
tungsten-sort,30000000,1000,27407,4561.633,3720,113599284,204591901,30000000
tungsten-sort,30000000,1000,29308,3116.432,4593,113599284,204591901,30000000
tungsten-sort,30000000,1000,18396,3596.082,2071,113599284,204591901,30000000
sort_serialize,30000000,1000,26102,251.966,1332,113607985,204577992,30000000
sort_serialize,30000000,1000,25478,209.738,1146,113607985,204577992,30000000
sort_serialize,30000000,1000,26578,212.958,1849,113607985,204577992,30000000
sort_serialize,30000000,1000,27616,276.484,2461,113607985,204577992,30000000
sort_serialize,30000000,1000,31066,257.589,3199,113607985,204577992,30000000
tungsten-sort,40000000,1000,31062,4407.55,2827,229043276,273393812,40000000
tungsten-sort,40000000,1000,24061,3624.849,1954,229043276,273393812,40000000
tungsten-sort,40000000,1000,22380,3462.363,1981,229043276,273393812,40000000
tungsten-sort,40000000,1000,20958,3504.671,1892,229043276,273393812,40000000
tungsten-sort,40000000,1000,21369,3643.242,1801,229043276,273393812,40000000
sort_serialize,40000000,1000,35383,312.952,2028,229053192,273388402,40000000
sort_serialize,40000000,1000,39778,330.203,4735,229053192,273388402,40000000
sort_serialize,40000000,1000,39350,300.134,2959,229053192,273388402,40000000
sort_serialize,40000000,1000,37383,310.98,2659,229053192,273388402,40000000
sort_serialize,40000000,1000,39116,301.807,3819,229053192,273388402,40000000
@JoshRosen
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment