Skip to content

Instantly share code, notes, and snippets.

@azagniotov
Last active May 5, 2025 21:56
Show Gist options
  • Save azagniotov/2783afbc7704c794bf263c202aedfc70 to your computer and use it in GitHub Desktop.
Save azagniotov/2783afbc7704c794bf263c202aedfc70 to your computer and use it in GitHub Desktop.
This Spark / PySpark on YARN configuration is optimized for large-scale workloads with high executor memory (64g), efficient Kryo serialization, and aggressive adaptive query execution to handle skewed data and optimize partitioning, while also tuning shuffle and network settings for stability under load.
/**
* The configuration parameters outlined below are specifically optimized for execution across
* four (4) distributed workers, each provisioned with the n2d-highmem-48 machine type—offering
* 48 virtual CPUs and 384 GB of RAM per node. These settings have been carefully fine-tuned to
* support Spark workloads that process data on a monthly cadence, where each job ingests and
* computes over an entire month's worth of data in a single run on DataProc.
*
* The cluster total resources:
* - 4 workers * 48 vCPUs/worker = 192 total vCPUs
* - 4 workers * 384 GB RAM/worker = 1,536 GB total RAM
*
*
* CONFIG EXPLAINATION:
* spark:spark.executor.memory -> "60g" and spark:spark.executor.memoryOverhead -> "6g":
* - Each executor will require 60GB (heap) + 6GB (off-heap memory) = 66 GB of RAM.
* - With 384 GB per worker, we can fit floor(384 GB / 66 GB/executor) = 5 executors (Spark processes)
* per worker. This means we can run up to 4 workers * 5 executors = 20 executors across the cluster.
* - With 48 vCPUs per worker, allocating 5 executors means each executor could potentially be assigned
* floor(48 vCPUs / 5 executors) = 9 vCPUs
* - spark.executor.memoryOverhead is tuned proportionally to spark.executor.memory (often 10-20%).
* 6 GB of 60 GB is around 10%, which is a reasonable starting point.
*
*
* spark:spark.executor.cores -> "6":
* - Default behavior on YARN: Spark relies on YARN's resource allocation, and the number of vCPUs
* per executor is determined by yarn.nodemanager.resource.cpu-vcores (the total vCPUs per node)
* and how YARN schedules containers. By default, YARN doesn't inherently split vCPUs evenly across
* executors unless we configure it to do so. Without spark.executor.cores, Spark/YARN assumes
* 1 vCPU per executor unless overridden by YARN's container settings or an explicit Spark config.
*
* Setting 6 vCPUs per-executor (despite the prior calculation allowing for 9 vCPUs) to avoid
* resource contention, especially with high memory usage per task. Fewer cores per executor allow
* better resource distribution and reduce the chance of overwhelming the node. While this might
* slightly reduce intra-executor parallelism, it can significantly reduce the peak memory footprint
* of the executor JVM. In total: 120 vCPUs for 20 executors.
*
*
* "spark:spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "64M",
* "spark:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "256MB"
* - Setting these to reasonable defaults based on the official docs.
*
*
* "spark.sql.shuffle.partitions" -> 400 and "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "64M"
* - spark.sql.shuffle.partitions: This defines the number of partitions Spark creates during
* shuffle operations (e.g., joins, aggregations).
* - spark.sql.adaptive.advisoryPartitionSizeInBytes: with Adaptive Query Execution (AQE)
* enabled (runtime optimization feature that dynamically adjusts the query execution plan based on
* runtime statistics), Spark aims to coalesce small partitions toward this advisory size (64 MB)
* after a shuffle optimizing resource usage: 400 partitions x 64 MB per partition = ~25 GB.
*
* This gives us a ~25 GB of total shuffle data size across all partitions if each partition hits the
* advisory 64 MB target. It's a rough approximation of the memory footprint of a shuffle stage under
* ideal AQE conditions.
*
* To put this in context:
* Each executor has 60 GB of heap memory (spark.executor.memory) plus 6 GB of off-heap overhead
* (spark.executor.memoryOverhead), totaling 66 GB. With 20 executors (5 per worker x 4 workers),
* the cluster uses 1,320 GB of the 1,536 GB total RAM (4 workers x 384 GB), leaving ~54 GB
* headroom per worker machine for system use: (1,536 GB - 1,320 GB) / 4 = 54 GB.
*
* A ~25 GB total shuffle workload (400 partitions x 64 MB) across the cluster fits comfortably
* within the 1,200 GB total heap (20 executors x 60 GB), averaging 1.25 GB per executor (25 / 20)
* - well within the 60 GB capacity.
*
* This configuration yields ~3 partitions per vCPU (400 partitions / 120 vCPUs of 20 executors),
* aligning with the recommended 2-4 partitions per vCPU for balanced parallelism (based on the
* offical docs).
*
* This ratio ensures efficient utilization of the cluster's 120 vCPUs for moderate workloads,
* with Adaptive Query Execution (AQE) optimizing for smaller or skewed data. The setup suggests
* the config aligns with the n2d-highmem-48 hardware for a moderate shuffle workload, while the
* 6 GB overhead per executor supports shuffle fetch buffers (e.g., ~756 MB with maxReqsInFlight = 6
* and spark.reducer.maxSizeInFlight = 128m) and Kryo serialization (up to 6 GB max).
*
* The 756 MB represents the approximate off-heap memory used per executor for shuffle fetch buffers
* in the current Spark config. This is critical for managing network data transfers during shuffle
* operations like joins. It's important because it ensures efficient shuffle performance within
* the 6 GB spark.executor.memoryOverhead, avoiding bottlenecks or memory errors. This figure is
* computed by assuming each executor uses ~6 vCPUs (explained above), maxReqsInFlight = 6,
* allowing 6 concurrent fetch requests per vCPU, each consuming ~21 MB
* (spark.reducer.maxSizeInFlight = 128 MB / 6 == ~21 MB).
*
* Thus, 6 vCPUs per-executor x 6 requests x 21 MB == ~756 MB, fitting in the overhead which is
* the spark.executor.memoryOverhead.
*
* This configuration maintains 720 concurrent fetch requests (from 20 executors x 6 vCPUs x 6
* requests/vCPU), ensuring network stability while using 756 MB of the 6 GB
* spark.executor.memoryOverhead per executor. Setting spark.reducer.maxReqsInFlight = 6 and
* spark.reducer.maxSizeInFlight = 128 MB (default is 48) with 400 partitions optimizes shuffle
* performance a little by increasing the data-in-flight to ~15.1 GB (720 requests x 21 MB), while
* potentially reducing the fetch time for the 25 GB shuffle workload.
*
*
* "spark:spark.driver.maxResultSize" -> "6g"
* - Limit of total size of serialized results of all partitions for each Spark action (e.g. collect)
* in bytes. The jobs will be aborted if the total size is above this limit. Setting this to zero
* removes the limit. Having a high limit may cause out-of-memory errors in driver. For now, setting
* this to 6g to avoid OOM. The official docs have this defaulted to 1g
*
*
* "spark:spark.reducer.maxReqsInFlight" -> "6"
* "spark:spark.shuffle.io.retryWait" -> "30s",
* "spark:spark.shuffle.io.maxRetries" -> "5",
* "spark:spark.network.timeout" -> "600"
* - Setting these to reasonable values to improve shuffle performance and failure detection. With
* regards to maxReqsInFlight, setting this to 6 (the official docs have this as Int.MaxValue)
* spark:spark.reducer.maxReqsInFlight controls how many concurrent fetch requests a reduce task is
* allowed to issue to map outputs during the shuffle phase. More concurrent requests
* (i.e.: higher maxReqsInFlight) mean more data held in memory at once on the executor running the
* reduce task. See the aforemention explaination about maxReqsInFlight and maxSizeInFlight.
*/
Map(
"spark:spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
"spark:spark.kryoserializer.buffer.max" -> "1024M",
"spark:spark.executor.cores" -> "6",
"spark:spark.executor.instances" -> "20",
"spark:spark.executor.memory" -> "60g",
"spark:spark.executor.memoryOverhead" -> "6g",
"spark:spark.driver.memory" -> "16g",
"spark:spark.driver.maxResultSize" -> "6g",
"spark:spark.sql.adaptive.enabled" -> "true",
"spark:spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark:spark.sql.adaptive.skewJoin.enabled" -> "true",
"spark:spark.sql.adaptive.skewJoin.skewedPartitionFactor" -> "3",
"spark:spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "64M",
"spark:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" -> "256MB",
"spark:spark.sql.shuffle.partitions" -> "400",
"spark:spark.reducer.maxReqsInFlight" -> "6",
"spark:spark.reducer.maxSizeInFlight" -> "128m",
"spark:spark.shuffle.io.retryWait" -> "30s",
"spark:spark.shuffle.io.maxRetries" -> "5",
"spark:spark.network.timeout" -> "600"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment