Last active
August 29, 2015 14:20
-
-
Save JoshRosen/15ab99014fb21cbd1e65 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.10.4" | |
val SPARK_HOME = sys.env.getOrElse("SPARK_HOME", "/Users/joshrosen/Documents/spark") | |
lazy val sparkSql = ProjectRef(file(SPARK_HOME), "sql") | |
lazy val sparkCore = ProjectRef(file(SPARK_HOME), "core") | |
lazy val root = (project in file(".")). | |
settings( | |
aggregate in update := false, | |
fork in run := true, | |
libraryDependencies += "yourkit" % "yourkit-api" % "version" from "file:///Applications/YourKit_Java_Profiler_2013_build_13088.app/lib/yjp-controller-api-redist.jar", | |
javaOptions in run += "-agentpath:/Applications/YourKit_Java_Profiler_2013_build_13088.app/bin/mac/libyjpagent.jnilib=onexit=snapshot", | |
javaOptions in run ++= Seq( | |
"-Xmx2000M", | |
"-Xms2000M" | |
// "-XX:+UnlockDiagnosticVMOptions", | |
// //"-XX:+TraceClassLoading", | |
// //"-XX:+LogCompilation", | |
// //"-XX:+PrintAssembly", | |
// "-XX:+PrintInlining" | |
) | |
).dependsOn(sparkSql, sparkCore) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{PrintWriter, FileOutputStream, File} | |
import org.apache.spark._ | |
import org.apache.spark.executor.TaskMetrics | |
import org.apache.spark.rdd.{RDD, ShuffledRDD} | |
import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListener} | |
/** | |
* Extends [[ShuffledRDD]] to skip the shuffle fetching. | |
*/ | |
class DummyShuffledRDD[K, V, C]( | |
prev: RDD[(K, V)], | |
part: Partitioner) | |
extends ShuffledRDD[K, V, C](prev, part) { | |
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { | |
Iterator.empty | |
} | |
override def getPartitions: Array[Partition] = { | |
// This is sufficient to trigger the parent shuffle stage to be computed without having the | |
// benchmarking harness waste time running many no-op reduce tasks. | |
super.getPartitions.take(1) | |
} | |
} | |
object ShuffleWriteBenchmark { | |
def runBenchmark(numKeys: Int, numPartitions: Int, sc: SparkContext): Long = { | |
val data = sc.parallelize(1 to numKeys, 1).map(x => (x, x)) | |
val shuffled: RDD[(Int, Int)] = new DummyShuffledRDD(data, new HashPartitioner(numPartitions)) | |
val startTime = System.currentTimeMillis() | |
shuffled.count() | |
val endTime = System.currentTimeMillis() | |
System.gc() | |
endTime - startTime | |
} | |
def main(args: Array[String]): Unit = { | |
val commonSettings = new SparkConf() | |
.set("spark.shuffle.spill", "true") | |
.set("spark.shuffle.sort.bypassMergeThreshold", "0") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.io.compression.codec", "lzf") | |
.set("spark.shuffle.memoryFraction", "0.25") | |
.set("spark.shuffle.safetyFraction", "1") | |
//.set("spark.shuffle.compress", "false") | |
//.set("spark.shuffle.spill.batchSize", "1000") | |
val configurations = Map( | |
"tungsten-sort" -> commonSettings.clone | |
.set("spark.shuffle.manager", "tungsten-sort") | |
.set("spark.unsafe.offHeap", "false"), | |
"sort_serialize" -> commonSettings.clone | |
.set("spark.shuffle.manager", "sort") | |
.set("spark.shuffle.sort.serializeMapOutputs", "true") | |
// The 1.2 default sort shuffle just collapses under GC pressure, so it's disabled here: | |
// "sort_no_serialize" -> commonSettings.clone | |
// .set("spark.shuffle.manager", "sort") | |
// .set("spark.shuffle.sort.serializeMapOutputs", "false") | |
) | |
val NUM_WARMUPS = 3 | |
val NUM_ITERATIONS = 5 | |
val outputFileWriter = | |
new PrintWriter(new FileOutputStream(new File("ShuffleWriteBenchmark.txt"), true)) | |
outputFileWriter.println("mode,numKeys,numPartitions,timeMs,shuffleWriteTimeMs,gcTimeMs,diskBytesSpilled,shuffleBytesWritten,recordsWritten") | |
for ( | |
numKeys <- Seq(10, 20, 30, 40).map(_ * 1000000); | |
numPartitions <- Seq(1000); | |
(confName, conf) <- configurations | |
) { | |
val sc = new SparkContext(conf.setMaster("local").setAppName("test")) | |
@volatile var metrics: TaskMetrics = null | |
@volatile var taskTime = 0L | |
val metricsListener = new SparkListener { | |
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { | |
if (taskEnd.taskType == "ShuffleMapTask") { | |
metrics = taskEnd.taskMetrics | |
taskTime = taskEnd.taskInfo.duration | |
} | |
} | |
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { | |
assert(stageSubmitted.stageInfo.numTasks == 1) | |
} | |
} | |
sc.addSparkListener(metricsListener) | |
try { | |
for (_ <- 1 to NUM_WARMUPS) { | |
runBenchmark(numKeys, numPartitions, sc) | |
} | |
// val controller = new com.yourkit.api.Controller | |
// controller.startCPUSampling(null) | |
for (_ <- 1 to NUM_ITERATIONS) { | |
val rawTime = runBenchmark(numKeys, numPartitions, sc) | |
val gcTime = metrics.jvmGCTime | |
val shuffleMetrics = metrics.shuffleWriteMetrics.get | |
outputFileWriter.println( | |
Seq( | |
confName, | |
numKeys, | |
numPartitions, | |
metrics.executorRunTime, | |
shuffleMetrics.shuffleWriteTime / 1000000.0, | |
gcTime, | |
metrics.diskBytesSpilled, | |
shuffleMetrics.shuffleBytesWritten, | |
shuffleMetrics.shuffleRecordsWritten | |
).map(_.toString).mkString(",") | |
) | |
outputFileWriter.flush() | |
} | |
// controller.stopCPUProfiling() | |
} finally { | |
sc.stop() | |
} | |
} | |
outputFileWriter.close() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mode,numKeys,numPartitions,timeMs,shuffleWriteTimeMs,gcTimeMs,diskBytesSpilled,shuffleBytesWritten,recordsWritten | |
tungsten-sort,10000000,1000,5171,90.227,857,0,66969588,10000000 | |
tungsten-sort,10000000,1000,4531,72.326,646,0,66969588,10000000 | |
tungsten-sort,10000000,1000,3722,85.819,125,0,66969588,10000000 | |
tungsten-sort,10000000,1000,3623,70.659,95,0,66969588,10000000 | |
tungsten-sort,10000000,1000,3991,99.786,255,0,66969588,10000000 | |
sort_serialize,10000000,1000,5271,78.69,539,0,66969593,10000000 | |
sort_serialize,10000000,1000,5213,80.115,224,0,66969593,10000000 | |
sort_serialize,10000000,1000,4914,67.3,195,0,66969593,10000000 | |
sort_serialize,10000000,1000,4887,68.207,164,0,66969593,10000000 | |
sort_serialize,10000000,1000,5003,86.585,245,0,66969593,10000000 | |
tungsten-sort,20000000,1000,9509,1261.319,175,113599278,135780836,20000000 | |
tungsten-sort,20000000,1000,10506,1635.94,470,113599278,135780836,20000000 | |
tungsten-sort,20000000,1000,11373,1713.395,842,113599278,135780836,20000000 | |
tungsten-sort,20000000,1000,11807,1737.068,1049,113599278,135780836,20000000 | |
tungsten-sort,20000000,1000,10408,1319.711,905,113599278,135780836,20000000 | |
sort_serialize,20000000,1000,17544,145.444,614,113607985,135779484,20000000 | |
sort_serialize,20000000,1000,17841,149.417,680,113607985,135779484,20000000 | |
sort_serialize,20000000,1000,18260,148.065,1558,113607985,135779484,20000000 | |
sort_serialize,20000000,1000,17725,142.514,955,113607985,135779484,20000000 | |
sort_serialize,20000000,1000,17986,149.294,1134,113607985,135779484,20000000 | |
tungsten-sort,30000000,1000,17609,2808.605,983,113599284,204591901,30000000 | |
tungsten-sort,30000000,1000,19352,2770.783,2349,113599284,204591901,30000000 | |
tungsten-sort,30000000,1000,27407,4561.633,3720,113599284,204591901,30000000 | |
tungsten-sort,30000000,1000,29308,3116.432,4593,113599284,204591901,30000000 | |
tungsten-sort,30000000,1000,18396,3596.082,2071,113599284,204591901,30000000 | |
sort_serialize,30000000,1000,26102,251.966,1332,113607985,204577992,30000000 | |
sort_serialize,30000000,1000,25478,209.738,1146,113607985,204577992,30000000 | |
sort_serialize,30000000,1000,26578,212.958,1849,113607985,204577992,30000000 | |
sort_serialize,30000000,1000,27616,276.484,2461,113607985,204577992,30000000 | |
sort_serialize,30000000,1000,31066,257.589,3199,113607985,204577992,30000000 | |
tungsten-sort,40000000,1000,31062,4407.55,2827,229043276,273393812,40000000 | |
tungsten-sort,40000000,1000,24061,3624.849,1954,229043276,273393812,40000000 | |
tungsten-sort,40000000,1000,22380,3462.363,1981,229043276,273393812,40000000 | |
tungsten-sort,40000000,1000,20958,3504.671,1892,229043276,273393812,40000000 | |
tungsten-sort,40000000,1000,21369,3643.242,1801,229043276,273393812,40000000 | |
sort_serialize,40000000,1000,35383,312.952,2028,229053192,273388402,40000000 | |
sort_serialize,40000000,1000,39778,330.203,4735,229053192,273388402,40000000 | |
sort_serialize,40000000,1000,39350,300.134,2959,229053192,273388402,40000000 | |
sort_serialize,40000000,1000,37383,310.98,2659,229053192,273388402,40000000 | |
sort_serialize,40000000,1000,39116,301.807,3819,229053192,273388402,40000000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See also: https://github.com/apache/spark/blob/master/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala