Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Last active January 11, 2021 09:47
Show Gist options
  • Save thanoojgithub/63c1cbda1b37d5b1f7df37f82e37dc70 to your computer and use it in GitHub Desktop.
Save thanoojgithub/63c1cbda1b37d5b1f7df37f82e37dc70 to your computer and use it in GitHub Desktop.
SparkNotes
RDD::
First abstraction in Spark is, Resilient Distributed Datasets (RDDs)
- a fault-tolerant collection of elements that can be operated on in parallel.
- collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
- RDDs automatically recover from node failures.
There are two ways to create RDDs:
- Parallelizing an existing collection in your driver program
- Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
- Transformation from parent RDD to chiled RDD
A second abstraction in Spark is shared variables
By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task.
Spark supports two types of shared variables:
broadcast variables - which can be used to cache a value in memory on all nodes
accumulators - which are variables that are only 'added' to, such as counters and sums.
val ss = SparkSession.builder().master("spark://thanoojubuntu-Inspiron-3521:7077")
.config("spark.sql.hive.metastore.version", "2.3.0")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs://user/hive/warehouse")
.config("spark.sql.hive.metastore.jars", "/home/hduser/softwares/apache-hive-2.3.3-bin/lib/*")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.hadoop.hive.exec.dynamic.partition", "true")
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.eventLog.enabled", true)
.enableHiveSupport().getOrCreate()
import org.apache.spark.sql.types._
import ss.sqlContext.implicits._
val sc = ss.sparkContext
sc.setLogLevel("ERROR")
ss.udf.register("ageInc10", (age: Int) => { age + 10 })
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
spark-submit --class com.sparkstreamingone.core2 --verbose --master spark://thanoojubuntu-Inspiron-3521:7077 target/SparkWithScala-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2020 12
To include a dependency using Maven coordinates:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.reduce((a, b) => a + b)
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.save("users_with_options.orc")
df.write.option("path", "/some/path").saveAsTable("t")
Note: When the table is dropped, the custom table path will not be removed and the table data is still there.
sersDF
.write
.partitionBy("favorite_color") -- when
.sortBy("name")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
RDDs support two types of operations:
transformations, which create a new dataset from an existing one, and
actions, which return a value to the driver program after running a computation on the dataset.
All transformations in Spark are lazy,The transformations are only computed when an action requires a result to be returned to the driver program.
By default, each transformed RDD may be recomputed each time you run an action on it. - persist (or cache)
collect() fetches the entire RDD to a single/driver machine
rdd.take(100).foreach(println).
PairRDDFunctions:
reduceByKey
partitionBy
join
leftOuterJoin
groupByKey:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers.
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
reduceByKey = groupByKey + combiner
aggregateByKey:same as reduceByKey, which takes an initial value.
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic
Note: repartitions can be happen on PairedRDDs
use mapValues instead of map once we are repartioned on RDD
mapValue will keep keys untouched
mapValues - Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
One important parameter for parallel collections is the number of partitions/slices to cut the dataset into.
Spark will run one task for each partition of the cluster.
Typically you want 2-4 partitions for each CPU in your cluster.
Normally, Spark tries to set the number of partitions automatically based on your cluster.
sc.parallelize(data, 10)
If we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory
-Running executors with too much memory often results in excessive garbage collection delays.
-Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM.
10 Nodes
16 Cores per Node
64GB RAM per Node
1. Tiny executors [One Executor per core]:
--num-executors = 16 x 10 = 160 executors
--executor-cores = 1
--executor-memory = 64GB/16 = 4GB
With only one executor per core, as we discussed above, we’ll not be able to take advantage of running multiple tasks in the same JVM. Also, shared/cached variables like broadcast variables and accumulators will be replicated in each core of the nodes which is 16 times. Also, we are not leaving enough memory overhead for Hadoop/Yarn daemon processes and we are not counting in ApplicationManager. NOT GOOD!
2. Fat executors (One Executor per node):
--num-executors = 16 executors
--executor-cores = 16
--executor-memory = 64GB/1 = 64GB
With all 16 cores per executor, apart from ApplicationManager and daemon processes are not counted for, HDFS throughput will hurt and it’ll result in excessive garbage results. Also,NOT GOOD!
3. Balance between Fat (vs) Tiny
--executor-cores = 5 (for good HDFS throughput)
Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
So, Total available of cores in cluster = 15 x 10 = 150
Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30
Leaving 1 executor for ApplicationManager => --num-executors = 29
Number of executors per node = 30/10 = 3
Memory per executor = 64GB/3 = 21GB
Counting off heap overhead = 7% of 21GB = 3GB. So, actual --executor-memory = 21 - 3 = 18GB
--num-executors = 30 executors
--executor-cores = 5
--executor-memory = 64GB/3 = 21GB ~ 18GB
it achieved parallelism of a fat executor and best throughputs of a tiny executor!!
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast variable is same as Map-Side join in Hive
where, we can avoid reducer task.
val longAcc = spark.sparkContext.longAccumulator("SumAccumulator")
val rdd = spark.sparkContext.parallelize(Array(1, 2, 3))
rdd.foreach(x => longAcc.add(x))
println(longAcc.value)
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database
DataFrame is represented by a Dataset of Rows.
Dataset[Row]
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val peopleDF = peopleRDD.map(_.split(","))
.map({ case Array(name: String, age: String) => People(name, age.trim().toInt) }).toDF("name", "age")
peopleDF.repartition(2).persist()
peopleDF.repartitionByRange(2, col("age")).persist()
peopleDF.coalesce(1).persist()
peopleDF.createOrReplaceGlobalTempView("peopleView")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
The Catalyst Optimizer in Spark offers rule-based and cost-based optimization. Rule-based optimization indicates how to execute the query from a set of defined rules. Meanwhile, cost-based optimization generates multiple execution plans and compares them to choose the lowest cost one.
ss.udf.register("ageInc10", (age: Int) => { age + 10 })
val teenagersDF = ss.sql("SELECT name, ageInc10(age) AS age FROM global_temp.peopleView WHERE age BETWEEN 13 AND 19")
scala> val rangeAlone = spark.range(5)
rangeAlone: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> rangeAlone.rdd.getNumPartitions
res19: Int = 2
scala> rangeAlone.explain(true)
== Parsed Logical Plan ==
Range (0, 5, step=1, splits=Some(2))
== Analyzed Logical Plan ==
id: bigint
Range (0, 5, step=1, splits=Some(2))
== Optimized Logical Plan ==
Range (0, 5, step=1, splits=Some(2))
== Physical Plan ==
*(1) Range (0, 5, step=1, splits=2)
scala> rangeAlone.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
scala> rangeAlone.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true)
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 2|
| 1| 3|
+---------------+-----------------+
scala> val withRepartition = rangeAlone.repartition(numPartitions = 5)
withRepartition: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> withRepartition.rdd.getNumPartitions
res23: Int = 5
scala> withRepartition.explain(true)
== Parsed Logical Plan ==
Repartition 5, true
+- Range (0, 5, step=1, splits=Some(2))
== Analyzed Logical Plan ==
id: bigint
Repartition 5, true
+- Range (0, 5, step=1, splits=Some(2))
== Optimized Logical Plan ==
Repartition 5, true
+- Range (0, 5, step=1, splits=Some(2))
== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *(1) Range (0, 5, step=1, splits=2)
scala> withRepartition.show()
+---+
| id|
+---+
| 0|
| 3|
| 1|
| 2|
| 4|
+---+
scala> withRepartition.explain(true)
== Parsed Logical Plan ==
Repartition 5, true
+- Range (0, 5, step=1, splits=Some(2))
== Analyzed Logical Plan ==
id: bigint
Repartition 5, true
+- Range (0, 5, step=1, splits=Some(2))
== Optimized Logical Plan ==
Repartition 5, true
+- Range (0, 5, step=1, splits=Some(2))
== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *(1) Range (0, 5, step=1, splits=2)
scala> withRepartition.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true)
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 0|
| 1| 2|
| 2| 2|
| 3| 1|
| 4| 0|
+---------------+-----------------+
scala> withRepartition.printSchema()
root
|-- id: long (nullable = false)
scala> val withRepartition = rangeAlone.repartitionByRange(2, $"id")
withRepartition: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> withRepartition.rdd.getNumPartitions
res28: Int = 2
scala> withRepartition.explain(true)
== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 2
+- Range (0, 5, step=1, splits=Some(2))
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [id#161L ASC NULLS FIRST], 2
+- Range (0, 5, step=1, splits=Some(2))
== Optimized Logical Plan ==
RepartitionByExpression [id#161L ASC NULLS FIRST], 2
+- Range (0, 5, step=1, splits=Some(2))
== Physical Plan ==
Exchange rangepartitioning(id#161L ASC NULLS FIRST, 2)
+- *(1) Range (0, 5, step=1, splits=2)
scala> withRepartition.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true)
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 3|
| 1| 2|
+---------------+-----------------+
scala> withRepartition.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
scala>
scala> val withRepartition = rangeAlone.coalesce(1)
withRepartition: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> withRepartition.rdd.getNumPartitions
res32: Int = 1
scala> withRepartition.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true)
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 5|
+---------------+-----------------+
scala> withRepartition.explain(true)
== Parsed Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(2))
== Analyzed Logical Plan ==
id: bigint
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(2))
== Optimized Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(2))
== Physical Plan ==
Coalesce 1
+- *(1) Range (0, 5, step=1, splits=2)
scala> withRepartition.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
scala> val q = spark.range(100).as("a").join(spark.range(100).as("b")).where($"a.id" === $"b.id")
q: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, id: bigint]
scala> q.explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *(2) Range (0, 100, step=1, splits=2)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Range (0, 100, step=1, splits=2)
scala> q.explain(true)
== Parsed Logical Plan ==
'Filter ('a.id = 'b.id)
+- Join Inner
:- SubqueryAlias `a`
: +- Range (0, 100, step=1, splits=Some(2))
+- SubqueryAlias `b`
+- Range (0, 100, step=1, splits=Some(2))
== Analyzed Logical Plan ==
id: bigint, id: bigint
Filter (id#0L = id#3L)
+- Join Inner
:- SubqueryAlias `a`
: +- Range (0, 100, step=1, splits=Some(2))
+- SubqueryAlias `b`
+- Range (0, 100, step=1, splits=Some(2))
== Optimized Logical Plan ==
Join Inner, (id#0L = id#3L)
:- Range (0, 100, step=1, splits=Some(2))
+- Range (0, 100, step=1, splits=Some(2))
== Physical Plan ==
*(2) BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *(2) Range (0, 100, step=1, splits=2)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Range (0, 100, step=1, splits=2)
scala>
------------------------------------------------------------------------------------------------------------------------------------
Spark primary building block is RDD - Resilient Distributed Datasets
Resilient - fault-tolerant
Distributed - data gets partitioned and distributed across cluster worker nodes
We know Spark supports lazy evaluation and fault-tolerance.
having said that,
When we do multiple transformations on RDDs, RDDs are immutable in nature, so each transformation on an exsiting RDD will create a new RDD.
each RDD will maintain reference of its parent RDD (including data type and location of the partions of RDD's data), which will look like a gragh.
RDD - No inbuilt optimization engine is available. Developers has to take care of optimizing each RDD on the basis of its attributes.
DF - catalyst optimizer
DS - catalyst optimizer and Tungsten execution engine
This lineage gragh is used to create DAG and then submit to DAG scheduler which further divide it into stages(set of tasks) and execute.
DAG scheduler submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster.
Task schedulers get sets of tasks submitted to them from the DAGScheduler for each stage.
Each TaskScheduler schedules tasks for a single SparkContext.
They return events to the DAGScheduler.
It handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
TaskSets - A set of tasks submitted together to the low-level TaskScheduler
scala> val data = sc.parallelize(List((1,2)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21
scala> val joinedData = data join data
joinedData: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[11] at join at <console>:23
scala> joinedData.toDebugString
res4: String =
(8) MapPartitionsRDD[11] at join at <console>:23 []
| MapPartitionsRDD[10] at join at <console>:23 []
| CoGroupedRDD[9] at join at <console>:23 []
+-(8) ParallelCollectionRDD[8] at parallelize at <console>:21 []
+-(8) ParallelCollectionRDD[8] at parallelize at <console>:21 []
Note: usage of CoGroup
DataFrames ::
- A DataFrame is a Dataset organized into named columns.
DataFrames run on top of Spark SQL optimized execution engine
------------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------------------------------------------
1. Resilient Distributed Datasets (RDDs)
- There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
2. SC vs SS vs SSQL
3. External Datasets
4. RDD Operations
Printing elements of an RDD
rdd.foreach(println) or rdd.map(println).
On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these.
rdd.collect().foreach(println)
To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node, because collect() fetches the entire RDD to a single machine;
if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).
Working with Key-Value Pairs
- class org.apache.spark.rdd.PairRDDFunctions[K, V] extends Logging with Serializable
- The key-value pair operations are available in the PairRDDFunctions class
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b).sortByKey()
counts.collect()
Transformations
-------------------------------------------------------------------------------
Spark Context vs Spark Session
SC
1. Application master will get created in any of the worker node containers
2. Application master creates Spark Driver
3. Spark Driver creates Spark Context
- Spark Context is the main entry point into Spark functionality
- Spark Context is created by Spark Driver for each Spark application when it is first submitted by the user. It exists throughout the lifetime of the Spark application.
- For each JVM only one Spark Context can be active. You must stop() activate Spark Context before creating a new one.
SparkContext can connect to different types of Cluster Managers. Now the most popular types are YARN, Mesos, Kubernetes or even Nomad. There is also Spark's own standalone cluster manager.
Executors
- Executors are the processes at the worker's. These tasks are executed on the worker nodes and then return the result to the Spark Driver.
"Static Allocation of Executors" - Executors are started once at the beginning of Spark Application and then work during all life of the application
UberTask :: Uber mode job run in hadoop
ResourceManager launches the ApplicationMaster
Then ApplicationMaster retrieves the number of input splits for the job and based on that it decides the number of mappers that has to be launched and also the number of reducers that have to be launched as per the configuration.
ApplicationMaster has to decide whether to negotiate resources with the ResourceManager’s scheduler to run the map and reduce tasks or run the job sequentially within the same JVM where ApplicationMaster is running.
This decision making by ApplicationMaster happens only if Uber mode is set to true in Hadoop.
If uber mode is true and ApplicationMaster decides to run the MapReduce job with in the same JVM then the job is said to be running as a uber task in YARN.
By default Uber mode is set to false in Hadoop.
Uber configuration is used for MapReduce, whenever you have a small data set.
The Uber mode runs the map and reduce tasks within its own process and avoid the overhead of launching and communicating with remote nodes.
Configurations parameters required for uber mode are set in etc/hadoop/mapred-site.xml.
mapreduce.job.ubertask.enable (Default false)
mapreduce.job.ubertask.maxmaps (Default = 9, Users can override this value, but only downward.)
mapreduce.job.ubertask.maxreduces (CURRENTLY THE CODE CANNOT SUPPORT MORE THAN ONE REDUCE - Default = 1, Users can override this value, but only downward. zero is a valid maximum value)
mapreduce.job.ubertask.maxbytes (Default = HDFS Block Size, Users can override this value, but only downward.)
Uber jobs are jobs that are executed within the MapReduce ApplicationMaster. Rather then communicate with RM to create the mapper and reducer containers. The AM runs the map and reduce tasks within its own process and avoided the overhead of launching and communicate with remote containers.
By default, a small job is one that has less than 10 mappers, only one reducer, and an input size that is less than the size of one HDFS block.
https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.15/bk_using-apache-hadoop/content/uber_jobs.html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment