Mostly taken from [3]
The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data.
The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O.
yarn.nodemanager.resource.memory-mb
controls the maximum sum of memory used by the containers on each node.yarn.nodemanager.resource.cpu-vcores
controls the maximum sum of cores used by the containers on each node.
- [1 and 7] Avoid groupByKey when performing an associative reductive operation.
- [1] Avoid reduceByKey When the input and output value types are different.
- [1] Avoid the flatMap-join-groupBy pattern.
- [4] In general, if you use some data twice, cache it.
- [1] One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor.
- [4] Accumulators are a way to efficiently update a variable in parallel during execution. Accumulators differ from broadcast variables in that they may only be read from on the driver process, but they allow Spark programs to efficiently aggregate results.
- [1] An extra shuffle can be advantageous to performance when it increases parallelism.
- [4] You avoid shipping data avoiding operations that trigger shuffles like repartition and coalesce, ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.
- [1] Take
repartitionAndSortWithinPartitions
into account.
Taken from theIntroduction to Apache Spark on Databricks.
Example:
== Physical Plan ==
*Project [avg(price)#276,carat#282]
+- *BroadcastHashJoin [color#109], [color#284], Inner, BuildRight, None
:- *TungstenAggregate(key=[cut#108,color#109], functions=[(avg(cast(price#113 as bigint)),mode=Final,isDistinct=false)], output=[color#109,avg(price)#276])
: +- Exchange hashpartitioning(cut#108, color#109, 200), None
: +- *TungstenAggregate(key=[cut#108,color#109], functions=[(avg(cast(price#113 as bigint)),mode=Partial,isDistinct=false)], output=[cut#108,color#109,sum#314,count#315L])
: +- *Project [cut#108,color#109,price#113]
: +- *Filter isnotnull(color#109)
: +- *Scan csv [cut#108,color#109,price#113] Format: CSV, InputPaths: dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv, PushedFilters: [IsNotNull(color)], ReadSchema: struct<cut:string,color:string,price:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
+- *Project [carat#282,color#284]
+- *Filter isnotnull(color#284)
+- *Scan csv [carat#282,color#284] Format: CSV, InputPaths: dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv, PushedFilters: [IsNotNull(color)], ReadSchema: struct<carat:double,color:string>
- Configuration.
- Tuning (serialization, paralelization...).
- Datasets.
- SparkSession API.
- YARN.
- Running on YARN.
- How to tune your Apache Spark Jobs part 1.
- How to tune your Apache Spark Jobs part 2.
- Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle.
- Spark Shuffle Introduction.
- Spark Architecture: Shuffle.
- Spark Memory Management.
- Avoid GroupByKey.
- Best Practices for YARN Resource Management.