Last active
October 14, 2019 17:44
-
-
Save pomadchin/34974f7a8c89fff7abc4bc59188c0f64 to your computer and use it in GitHub Desktop.
Apache Spark cache and checkpoint examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* //> spark-shell | |
* 19/10/14 13:25:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
* Setting default log level to "WARN". | |
* To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). | |
* Spark context Web UI available at http://localhost:4040 | |
* Spark context available as 'sc' (master = local[*], app id = local-111). | |
* Spark session available as 'spark'. | |
* Welcome to | |
* ____ __ | |
* / __/__ ___ _____/ /__ | |
* _\ \/ _ \/ _ `/ __/ '_/ | |
* /___/ .__/\_,_/_/ /_/\_\ version 2.4.3 | |
* /_/ | |
* | |
* Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162) | |
* Type in expressions to have them evaluated. | |
* Type :help for more information. | |
**/ | |
scala> sc.setCheckpointDir("file:///tmp/checkpoint") | |
scala> def get(n: Int) = { println(n); n } | |
// get: (n: Int)Int | |
scala> val rdd = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).map(get) | |
// rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26 | |
scala> rdd.collect | |
// 2 | |
// 1 | |
// 5 | |
// 3 | |
// 5 | |
// res1: Array[Int] = Array(1, 2, 3, 5, 5) | |
scala> val rdd2 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).map(get) | |
// rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:26 | |
scala> rdd2.checkpoint | |
scala> rdd2.collect() | |
// 5 | |
// 2 | |
// 3 | |
// 1 | |
// 5 | |
// 2 | |
// 1 | |
// 3 | |
// 5 | |
// 5 | |
// res3: Array[Int] = Array(1, 2, 3, 5, 5) | |
scala> val rdd3 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).map(get) | |
// rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:26 | |
^ | |
scala> rdd3.cache() | |
// res6: rdd3.type = MapPartitionsRDD[6] at map at <console>:26 | |
scala> rdd3.checkpoint | |
scala> rdd3.collect() | |
// 5 | |
// 1 | |
// 2 | |
// 3 | |
// 5 | |
// res8: Array[Int] = Array(1, 2, 3, 5, 5) | |
/** Datasets checkpoint and cache behavior */ | |
scala> val ds = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).toDS.map(get) | |
// ds: org.apache.spark.sql.Dataset[Int] = [value: int] | |
// eager = true | |
scala> ds.checkpoint | |
// 2 | |
// 3 | |
// 5 | |
// 1 | |
// 5 | |
// 1 | |
// 3 | |
// 5 | |
// 2 | |
// 5 | |
// res9: org.apache.spark.sql.Dataset[Int] = [value: int] | |
scala> ds.collect() | |
// 2 | |
// 1 | |
// 5 | |
// 3 | |
// 5 | |
// res10: Array[Int] = Array(1, 2, 3, 5, 5) | |
scala> ds.explain(extended = true) | |
== Parsed Logical Plan == | |
SerializeFromObject [input[0, int, false] AS value#17] | |
+- ExternalRDD [obj#16] | |
== Analyzed Logical Plan == | |
value: int | |
SerializeFromObject [input[0, int, false] AS value#17] | |
+- ExternalRDD [obj#16] | |
== Optimized Logical Plan == | |
SerializeFromObject [input[0, int, false] AS value#17] | |
+- ExternalRDD [obj#16] | |
== Physical Plan == | |
*(1) SerializeFromObject [input[0, int, false] AS value#17] | |
+- Scan[obj#16] | |
scala> val ds2 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).toDS.map(get) | |
// ds2: org.apache.spark.sql.Dataset[Int] = [value: int] | |
// eager = false | |
scala> ds2.checkpoint(false) | |
// res13: org.apache.spark.sql.Dataset[Int] = [value: int] | |
scala> ds2.collect() | |
// 3 | |
// 2 | |
// 1 | |
// 5 | |
// 5 | |
// res14: Array[Int] = Array(1, 2, 3, 5, 5) | |
scala> ds2.explain(extended = true) | |
== Parsed Logical Plan == | |
'SerializeFromObject [input[0, int, false] AS value#37] | |
+- 'MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#36: int | |
+- 'DeserializeToObject unresolveddeserializer(assertnotnull(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))), obj#35: int | |
+- SerializeFromObject [input[0, int, false] AS value#32] | |
+- ExternalRDD [obj#31] | |
== Analyzed Logical Plan == | |
value: int | |
SerializeFromObject [input[0, int, false] AS value#37] | |
+- MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#36: int | |
+- DeserializeToObject assertnotnull(cast(value#32 as int)), obj#35: int | |
+- SerializeFromObject [input[0, int, false] AS value#32] | |
+- ExternalRDD [obj#31] | |
== Optimized Logical Plan == | |
SerializeFromObject [input[0, int, false] AS value#37] | |
+- MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#36: int | |
+- ExternalRDD [obj#31] | |
== Physical Plan == | |
*(1) SerializeFromObject [input[0, int, false] AS value#37] | |
+- *(1) MapElements <function1>, obj#36: int | |
+- Scan[obj#31] | |
scala> val ds3 = sc.parallelize(Seq(1, 2, 3, 5, 5), 5).toDS.map(get) | |
// ds3: org.apache.spark.sql.Dataset[Int] = [value: int] | |
scala> ds3.cache() | |
// res15: ds3.type = [value: int] | |
scala> ds3.checkpoint | |
// 1 | |
// 3 | |
// 2 | |
// 5 | |
// 5 | |
// res16: org.apache.spark.sql.Dataset[Int] = [value: int] | |
scala> ds3.collect() | |
// res17: Array[Int] = Array(1, 2, 3, 5, 5) | |
scala> ds3.explain(extended = true) | |
== Parsed Logical Plan == | |
'SerializeFromObject [input[0, int, false] AS value#48] | |
+- 'MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#47: int | |
+- 'DeserializeToObject unresolveddeserializer(assertnotnull(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))), obj#46: int | |
+- SerializeFromObject [input[0, int, false] AS value#43] | |
+- ExternalRDD [obj#42] | |
== Analyzed Logical Plan == | |
value: int | |
SerializeFromObject [input[0, int, false] AS value#48] | |
+- MapElements <function1>, int, [StructField(value,IntegerType,false)], obj#47: int | |
+- DeserializeToObject assertnotnull(cast(value#43 as int)), obj#46: int | |
+- SerializeFromObject [input[0, int, false] AS value#43] | |
+- ExternalRDD [obj#42] | |
== Optimized Logical Plan == | |
InMemoryRelation [value#48], StorageLevel(disk, memory, deserialized, 1 replicas) | |
+- *(1) SerializeFromObject [input[0, int, false] AS value#48] | |
+- *(1) MapElements <function1>, obj#47: int | |
+- Scan[obj#42] | |
== Physical Plan == | |
*(1) InMemoryTableScan [value#48] | |
+- InMemoryRelation [value#48], StorageLevel(disk, memory, deserialized, 1 replicas) | |
+- *(1) SerializeFromObject [input[0, int, false] AS value#48] | |
+- *(1) MapElements <function1>, obj#47: int | |
+- Scan[obj#42] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment