Skip to content

Instantly share code, notes, and snippets.

@simicd
Forked from dusenberrymw/spark_tips_and_tricks.md
Created February 14, 2020 20:58
Show Gist options
  • Save simicd/c94da76b3f946da1b129998494229e10 to your computer and use it in GitHub Desktop.
Save simicd/c94da76b3f946da1b129998494229e10 to your computer and use it in GitHub Desktop.
Tips and tricks for Apache Spark.

Spark Tips & Tricks

Misc. Tips & Tricks

  • If values are integers in [0, 255], Parquet will automatically compress to use 1 byte unsigned integers, thus decreasing the size of saved DataFrame by a factor of 8.
  • Partition DataFrames to have evenly-distributed, ~128MB partition sizes (empirical finding). Always err on the higher side w.r.t. number of partitions.
  • Pay particular attention to the number of partitions when using flatMap, especially if the following operation will result in high memory usage. The flatMap op usually results in a DataFrame with a [much] larger number of rows, yet the number of partitions will remain the same. Thus, if a subsequent op causes a large expansion of memory usage (i.e. converting a DataFrame of indices to a DataFrame of large Vectors), the memory usage per partition may become too high. In this case, it is beneficial to repartition the output of flatMap to a number of partitions that will safely allow for appropriate partition memory sizes, based upon the projected memory expansion of the subsequent step(s). Depending on the workflow (i.e. presence of filter, etc. that may adjust the number of rows), this may have to be done empirically.
  • When saving DataFrames to disk (i.e. in Parquet format), pay particular attention to the partition sizes. Spark will output one file per task (i.e. one file per partition) on writes, and will read at least one file in a task on reads. The issue here is that if the cluster/setup in which the DataFrame was saved had a larger amount of aggregate memory, and thus could handle larger partition sizes without error, a smaller cluster/setup may have trouble reading this saved DataFrame. Unfortunately, the partition sizes become a leaky abstraction even when saved to disk. A possible use case here is a large preprocessing cluster, and a smaller, leaner serving cluster. In this situation, a remedy would be to repartition the DataFrame into a larger number of partitions before writing.
  • When joining a small DataFrame with a large DataFrame, try to avoid causing a SortMergeJoin, as it will cause a large shuffle, and thus is quite costly (if it runs at all). Instead, if the small DataFrame is small enough to be broadcasted, a broadcast join (BroadcastHashJoin) can be used by Spark to simply broadcast the small DataFrame to each task, removing the need to shuffle the larger DataFrame. To push Spark to use this, coalesce the smaller DataFrame to 1 partition, and then explicitly mark it as able to be broadcasted with sql.functions.broadcast(small_df).
  • When using RDDs in PySpark, make sure to save enough memory on worker nodes for Python processes, as the "executor memory" is only for the JVM.
  • When allocating memory on workers, be sure to leave enough memory for other running processes. A JVM can be started with more memory than available, however, it will fail when it approaches the upper bound, leading to "worker lost" errors. Ex. If a machine has 128GB RAM, in reality only 124GB will be available to the OS. Furthermore, several GB will be used by the OS and other processes, so a good upper limit on the Worker memory may be 100GB.
  • For best performance, be sure to setup several shuffle disks per node in Spark Standalone clusters and set SPARK_LOCAL_DIRS=/disk2/local,/disk3/local,... in ./conf/spark-env.sh with folders on each physical disk.

Continuous Compilation

  • You'll need to build the entire Spark project once so that the uber jar with all of the dependencies is created.
    • ./build/sbt clean package
  • Then you compile all of the Spark files (not the dependencies) so that you now have both the uber jar with all of Spark AND its dependencies, as well as a set of compiled class files for just Spark. The extra trick is that you set this compilation to continuously compile just the files that have changed since the last time it was compiled. (Leave this running.)
    • ./build/sbt ~compile
  • Then you use a special environment variable with any of the Spark executable (spark-shell, spark-submit, testing, etc.) that tells Spark to first look at the locally compiled class files, and then at the uber jar as needed.
    • SPARK_PREPEND_CLASSES=1
  • So, to use the spark shell:
    • SPARK_PREPEND_CLASSES=1 ./bin/spark-shell
  • You'll now have one full build that you run once, with continuous compilation running at all times.
  • So, with that in place, make changes to the DataFrame Scala file, and run your tests, spark-shell, etc. No extra builds needed.
  • ...
  • Profit.

Include other Scala class files on classpath:

spark-shell --driver-class-path="target/scala-2.11/classes"

Test PySpark (from Spark folder):

  • Individual files w/ Python 3:
    • SPARK_PREPEND_CLASSES=1 SPARK_TESTING=1 PYSPARK_PYTHON=python3 ./bin/pyspark pyspark.mllib.linalg.distributed
  • Individual modules w/ multiple Python versions:
    • SPARK_PREPEND_CLASSES=1 ./python/run-tests --modules=pyspark-mllib --python-executables=python,python2.6,python3
  • All tests w/ default Python version:
    • SPARK_PREPEND_CLASSES=1 ./python/run-tests

PySpark Jupyter Notebook (local mode, with Python 3, loading classes from continuous compilation, and remote debugging):

SPARK_PREPEND_CLASSES=1 PYSPARK_PYTHON=python3 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark --master local[*] --driver-java-options=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006

Docs

  • Create docs:
    • jekyll serve -w
  • Use BrowserSync to automatically reload docs page (from generated site folder):
    • browser-sync start --server --files "*.html, css/*.css, js/*.js"

Running Spark Examples In IntelliJ

  • Set example folders as test resources (so that correct libraries are available)
  • Can set Spark properties as "VM options" in the Run Configuration: "-Dspark.master=local[*]"
  • Can pass arguments to the script via "Program arguments" in the Run Configuration

Spark & HDFS

  • ./build/sbt -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package
  • Place core-site.xml and hdfs-site.xml into the conf folder for automatic HDFS assumptions on read/write without having to use a HDFS URL.
  • Set HADOOP_CONF_DIR=/path/to/hadoop/conf in ./conf/spark-env.sh.
  • To remove a DataNode directory from all nodes, update the configuration (Ambari can do this), then restart each node one-by-one, waiting for the underreplicated blocks to be replicated again.
  • sudo -u hdfs hdfs fsck / to check on health of HDFS filesystem.

Build Spark With Native BLAS Support

  • ./build/sbt -Pnetlib-lgpl ...

Spark History Server

  • Set the following in ./conf/spark-defaults.conf:
    spark.eventLog.dir=/var/logs/spark-history-server
    spark.eventLog.enabled=true
    spark.history.fs.logDirectory=file:/var/logs/spark-history-server
    
  • Launch with ./sbin/start-history-server.sh.
  • View at http://<server-url>:18080.

Use A Custom Python Package On Worker Nodes

  • To use a custom Python package sitting in the current directory (i.e. the main script imports from this package), it will need to be zipped up and shipped out to the worker nodes before usage.

  • Given a local package mypackage, the following can be used in the main script to zip up the package and ship it to workers before usage of the package:

    # Ship a fresh copy of the `mypackage` package to the Spark workers.
    import shutil
    dirname = "mypackage"
    zipname = dirname + ".zip"
    shutil.make_archive(dirname, 'zip', dirname + "/..", dirname)
    spark.sparkContext.addPyFile(zipname)
    
  • Note: The zip must include the mypackage directory itself, as well as all files within it for addPyFile to work correctly.

  • This is equivalent to zip -r mypackage.zip mypackage.

Spark Standalone Cluster

  • Create conf/slaves file with list of machines (hostnames or IP addresses) of slaves.
  • Set SPARK_HOME on all machines.
  • sbin/start-all.sh will start a master (UI at HOSTNAME:8080 by default) and a set of slaves as defined in conf/slaves.
  • Sometimes, Spark will "forget" how to stop existing master & worker nodes, and it will be necessary to stop them manually on each machine. The following will help:
    ps -ux | grep "[s]park" | awk '{ print $2 }' | xargs kill -9
    

Add extra Java options:

spark.driver.memory 200g
spark.driver.extraJavaOptions -server -Xmn12G
spark.executor.extraJavaOptions -server -Xmn12G -XX:+UseG1GC

-Xms = initial heap size -Xmx = max heap size -Xmn = Eden GC size

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment