Skip to content

Instantly share code, notes, and snippets.

@pomadchin
Last active October 14, 2019 17:44
Show Gist options
  • Save pomadchin/34974f7a8c89fff7abc4bc59188c0f64 to your computer and use it in GitHub Desktop.
Save pomadchin/34974f7a8c89fff7abc4bc59188c0f64 to your computer and use it in GitHub Desktop.
Apache Spark cache and checkpoint examples
/**
* //> spark-shell
* 19/10/14 13:25:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
* Setting default log level to "WARN".
* To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
* Spark context Web UI available at http://localhost:4040
* Spark context available as 'sc' (master = local[*], app id = local-111).
* Spark session available as 'spark'.
* Welcome to
* ____ __
* / __/__ ___ _____/ /__
* _\ \/ _ \/ _ `/ __/ '_/
* /___/ .__/\_,_/_/ /_/\_\ version 2.4.3
* /_/
*
* Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
* Type in expressions to have them evaluated.
* Type :help for more information.
**/
scala> sc.setCheckpointDir("file:///tmp/checkpoint")
scala> def get(n: Int) = { println(n); n }
// get: (n: Int)Int
scala> val rdd = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).map(get)
// rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd.collect
// 2
// 1
// 5
// 3
// 5
// res1: Array[Int] = Array(1, 2, 3, 5, 5)
scala> val rdd2 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).map(get)
// rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:26
scala> rdd2.checkpoint
scala> rdd2.collect()
// 5
// 2
// 3
// 1
// 5
// 2
// 1
// 3
// 5
// 5
// res3: Array[Int] = Array(1, 2, 3, 5, 5)
scala> val rdd3 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).map(get)
// rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:26
^
scala> rdd3.cache()
// res6: rdd3.type = MapPartitionsRDD[6] at map at <console>:26
scala> rdd3.checkpoint
scala> rdd3.collect()
// 5
// 1
// 2
// 3
// 5
// res8: Array[Int] = Array(1, 2, 3, 5, 5)
/** Datasets checkpoint and cache behavior */
scala> val ds = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).toDS.map(get)
// ds: org.apache.spark.sql.Dataset[Int] = [value: int]
// eager = true
scala> ds.checkpoint
// 2
// 3
// 5
// 1
// 5
// 1
// 3
// 5
// 2
// 5
// res9: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.collect()
// 2
// 1
// 5
// 3
// 5
// res10: Array[Int] = Array(1, 2, 3, 5, 5)
scala> ds.explain(extended = true)
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#17]
+- ExternalRDD [obj#16]
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#17]
+- ExternalRDD [obj#16]
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#17]
+- ExternalRDD [obj#16]
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#17]
+- Scan[obj#16]
scala> val ds2 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).toDS.map(get)
// ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
// eager = false
scala> ds2.checkpoint(false)
// res13: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds2.collect()
// 3
// 2
// 1
// 5
// 5
// res14: Array[Int] = Array(1, 2, 3, 5, 5)
scala> ds2.explain(extended = true)
== Parsed Logical Plan ==
'SerializeFromObject [input[0, int, false] AS value#37]
+- 'MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#36: int
+- 'DeserializeToObject unresolveddeserializer(assertnotnull(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))), obj#35: int
+- SerializeFromObject [input[0, int, false] AS value#32]
+- ExternalRDD [obj#31]
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#37]
+- MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#36: int
+- DeserializeToObject assertnotnull(cast(value#32 as int)), obj#35: int
+- SerializeFromObject [input[0, int, false] AS value#32]
+- ExternalRDD [obj#31]
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#37]
+- MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#36: int
+- ExternalRDD [obj#31]
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#37]
+- *(1) MapElements <function1>, obj#36: int
+- Scan[obj#31]
scala> val ds3 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).toDS.map(get)
// ds3: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds3.cache()
// res15: ds3.type = [value: int]
scala> ds3.checkpoint
// 1
// 3
// 2
// 5
// 5
// res16: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds3.collect()
// res17: Array[Int] = Array(1, 2, 3, 5, 5)
scala> ds3.explain(extended = true)
== Parsed Logical Plan ==
'SerializeFromObject [input[0, int, false] AS value#48]
+- 'MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#47: int
+- 'DeserializeToObject unresolveddeserializer(assertnotnull(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))), obj#46: int
+- SerializeFromObject [input[0, int, false] AS value#43]
+- ExternalRDD [obj#42]
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#48]
+- MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#47: int
+- DeserializeToObject assertnotnull(cast(value#43 as int)), obj#46: int
+- SerializeFromObject [input[0, int, false] AS value#43]
+- ExternalRDD [obj#42]
== Optimized Logical Plan ==
InMemoryRelation [value#48], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) SerializeFromObject [input[0, int, false] AS value#48]
+- *(1) MapElements <function1>, obj#47: int
+- Scan[obj#42]
== Physical Plan ==
*(1) InMemoryTableScan [value#48]
+- InMemoryRelation [value#48], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) SerializeFromObject [input[0, int, false] AS value#48]
+- *(1) MapElements <function1>, obj#47: int
+- Scan[obj#42]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment