Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Last active October 25, 2018 01:41
Show Gist options
  • Save gbraccialli/39b85704f0272695bba3c50c96f1a85b to your computer and use it in GitHub Desktop.
Save gbraccialli/39b85704f0272695bba3c50c96f1a85b to your computer and use it in GitHub Desktop.
def time[T](block: => T): T = {
val start = System.currentTimeMillis
val res = block
val totalTime = System.currentTimeMillis - start
println("Elapsed time: %1d seconds".format(totalTime/1000))
res
}
//spark-shell --conf spark.memory.storageFraction=0 --conf spark.memory.fraction=0.1
//spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
//CAUSES HUGE SHUFFLE ON OSX
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val range1 = 1 to 1000000
val range2 = 1 to 500
val rdd1 = sc.parallelize(range1.toList,50)
val rdd2 = sc.parallelize(range2.toList,5)
val df1 = rdd1.toDF
val df2t = rdd2.toDF
val df2 = df2t.withColumn("by", df2t("value")).select("by")
//val toLong = udf[Long, Int]( _.toLong)
//val join = df1.join(df2, df1("value") === df2("value"))
//val join = df1.crossJoin(df2).withColumn("row_number", row_number().over(Window.partitionBy().orderBy(lit(1))))
val join = df1.crossJoin(df2).withColumn("row_number", df1("value") * 100000L + df2("by"))
//val join2 = join.withColumn("row_number_l", toLong(join("row_number")))
//val join3 = join.withColumn("row_number_l", toLong(join("row_number"))).withColumn("extra", lit(1)).repartition(333)
val join2 = join.withColumn("row_number_l", join("row_number"))
val join3 = join.withColumn("row_number_l", join("row_number")).withColumn("extra", lit(1)).repartition(333)
//join2.persist(StorageLevel.DISK_ONLY)
//join3.persist(StorageLevel.DISK_ONLY)
//join2.cache
//join2.count
//join3.cache
//join3.count
join2.join(join3, join2("row_number_l") === join3("row_number_l")).count
def time[T](block: => T): T = {
val start = System.currentTimeMillis
val res = block
val totalTime = System.currentTimeMillis - start
println("Elapsed time: %1d seconds".format(totalTime/1000))
res
}
//CAUSES HUGE CPU ON MacBookPro
val range1 = 1 to 1000000
val range2 = 1 to 5000
val rdd1 = sc.parallelize(range1.toList,50)
val rdd2 = sc.parallelize(range2.toList,5)
val df1 = rdd1.toDF
val df2t = rdd2.toDF
val df2 = df2t.withColumn("by", df2t("value")).select("by")
val join = df1.crossJoin(df2).withColumn("row_number", df2("by") * 1000 + df1("value"))
//join.cache
//join.count
val sum = join.rdd.map(row => (row.getAs[Int]("by"),row.getAs[Int]("value"))).reduceByKey(_ + _)
time(sum.count)
//122 seconds on 16 cores virtual server
//104 seconds on amazon c4.8xlarge (36 cores)
//434 seconds on mac book pro
sum.foreach(println)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment