Last active
October 25, 2018 01:41
-
-
Save gbraccialli/39b85704f0272695bba3c50c96f1a85b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def time[T](block: => T): T = { | |
val start = System.currentTimeMillis | |
val res = block | |
val totalTime = System.currentTimeMillis - start | |
println("Elapsed time: %1d seconds".format(totalTime/1000)) | |
res | |
} | |
//spark-shell --conf spark.memory.storageFraction=0 --conf spark.memory.fraction=0.1 | |
//spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer | |
//CAUSES HUGE SHUFFLE ON OSX | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.expressions._ | |
val range1 = 1 to 1000000 | |
val range2 = 1 to 500 | |
val rdd1 = sc.parallelize(range1.toList,50) | |
val rdd2 = sc.parallelize(range2.toList,5) | |
val df1 = rdd1.toDF | |
val df2t = rdd2.toDF | |
val df2 = df2t.withColumn("by", df2t("value")).select("by") | |
//val toLong = udf[Long, Int]( _.toLong) | |
//val join = df1.join(df2, df1("value") === df2("value")) | |
//val join = df1.crossJoin(df2).withColumn("row_number", row_number().over(Window.partitionBy().orderBy(lit(1)))) | |
val join = df1.crossJoin(df2).withColumn("row_number", df1("value") * 100000L + df2("by")) | |
//val join2 = join.withColumn("row_number_l", toLong(join("row_number"))) | |
//val join3 = join.withColumn("row_number_l", toLong(join("row_number"))).withColumn("extra", lit(1)).repartition(333) | |
val join2 = join.withColumn("row_number_l", join("row_number")) | |
val join3 = join.withColumn("row_number_l", join("row_number")).withColumn("extra", lit(1)).repartition(333) | |
//join2.persist(StorageLevel.DISK_ONLY) | |
//join3.persist(StorageLevel.DISK_ONLY) | |
//join2.cache | |
//join2.count | |
//join3.cache | |
//join3.count | |
join2.join(join3, join2("row_number_l") === join3("row_number_l")).count | |
def time[T](block: => T): T = { | |
val start = System.currentTimeMillis | |
val res = block | |
val totalTime = System.currentTimeMillis - start | |
println("Elapsed time: %1d seconds".format(totalTime/1000)) | |
res | |
} | |
//CAUSES HUGE CPU ON MacBookPro | |
val range1 = 1 to 1000000 | |
val range2 = 1 to 5000 | |
val rdd1 = sc.parallelize(range1.toList,50) | |
val rdd2 = sc.parallelize(range2.toList,5) | |
val df1 = rdd1.toDF | |
val df2t = rdd2.toDF | |
val df2 = df2t.withColumn("by", df2t("value")).select("by") | |
val join = df1.crossJoin(df2).withColumn("row_number", df2("by") * 1000 + df1("value")) | |
//join.cache | |
//join.count | |
val sum = join.rdd.map(row => (row.getAs[Int]("by"),row.getAs[Int]("value"))).reduceByKey(_ + _) | |
time(sum.count) | |
//122 seconds on 16 cores virtual server | |
//104 seconds on amazon c4.8xlarge (36 cores) | |
//434 seconds on mac book pro | |
sum.foreach(println) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment