- If values are integers in [0, 255], Parquet will automatically compress to use 1 byte unsigned integers, thus decreasing the size of saved DataFrame by a factor of 8.
- Partition DataFrames to have evenly-distributed, ~128MB partition sizes (empirical finding). Always err on the higher side w.r.t. number of partitions.
- Pay particular attention to the number of partitions when using
flatMap
, especially if the following operation will result in high memory usage. TheflatMap
op usually results in a DataFrame with a [much] larger number of rows, yet the number of partitions will remain the same. Thus, if a subsequent op causes a large expansion of memory usage (i.e. converting a DataFrame of indices to a DataFrame of large Vectors), the memory usage per partition may become too high. In this case, it is beneficial to repartition the output offlatMap
to a number of partitions that will safely allow for appropriate partition memory sizes, based upon the projected memory expansion of the subsequent step(s). Depending on the workflow (i.e. presence offilter
, etc. that may adjust the number of rows), this may have to be done empirically. - When saving DataFrames to disk (i.e. in Parquet format), pay particular attention to the partition sizes. Spark will output one file per task (i.e. one file per partition) on writes, and will read at least one file in a task on reads. The issue here is that if the cluster/setup in which the DataFrame was saved had a larger amount of aggregate memory, and thus could handle larger partition sizes without error, a smaller cluster/setup may have trouble reading this saved DataFrame. Unfortunately, the partition sizes become a leaky abstraction even when saved to disk. A possible use case here is a large preprocessing cluster, and a smaller, leaner serving cluster. In this situation, a remedy would be to repartition the DataFrame into a larger number of partitions before writing.
- Update: In Spark 2.2, there is now a flag that can be set while writing a DataFrame that controls the number of records per file, thus removing the need to perform a costly repartition:
df.write.option("maxRecordsPerFile", 10000).save(....)
--> http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/.
- Update: In Spark 2.2, there is now a flag that can be set while writing a DataFrame that controls the number of records per file, thus removing the need to perform a costly repartition:
- When joining a small DataFrame with a large DataFrame, try to avoid causing a
SortMergeJoin
, as it will cause a large shuffle, and thus is quite costly (if it runs at all). Instead, if the small DataFrame is small enough to be broadcasted, a broadcast join (BroadcastHashJoin
) can be used by Spark to simply broadcast the small DataFrame to each task, removing the need to shuffle the larger DataFrame. To push Spark to use this, coalesce the smaller DataFrame to 1 partition, and then explicitly mark it as able to be broadcasted withsql.functions.broadcast(small_df)
. - When using RDDs in PySpark, make sure to save enough memory on worker nodes for Python processes, as the "executor memory" is only for the JVM.
- When allocating memory on workers, be sure to leave enough memory for other running processes. A JVM can be started with more memory than available, however, it will fail when it approaches the upper bound, leading to "worker lost" errors. Ex. If a machine has 128GB RAM, in reality only 124GB will be available to the OS. Furthermore, several GB will be used by the OS and other processes, so a good upper limit on the Worker memory may be 100GB.
- For best performance, be sure to setup several shuffle disks per node in Spark Standalone clusters and set
SPARK_LOCAL_DIRS=/disk2/local,/disk3/local,...
in./conf/spark-env.sh
with folders on each physical disk.
- You'll need to build the entire Spark project once so that the uber jar with all of the dependencies is created.
./build/sbt clean package
- Then you compile all of the Spark files (not the dependencies) so that you now have both the uber jar with all of Spark AND its dependencies, as well as a set of compiled class files for just Spark. The extra trick is that you set this compilation to continuously compile just the files that have changed since the last time it was compiled. (Leave this running.)
./build/sbt ~compile
- Then you use a special environment variable with any of the Spark executable (spark-shell, spark-submit, testing, etc.) that tells Spark to first look at the locally compiled class files, and then at the uber jar as needed.
SPARK_PREPEND_CLASSES=1
- So, to use the spark shell:
SPARK_PREPEND_CLASSES=1 ./bin/spark-shell
- You'll now have one full build that you run once, with continuous compilation running at all times.
- So, with that in place, make changes to the DataFrame Scala file, and run your tests, spark-shell, etc. No extra builds needed.
- ...
- Profit.
spark-shell --driver-class-path="target/scala-2.11/classes"
- Individual files w/ Python 3:
SPARK_PREPEND_CLASSES=1 SPARK_TESTING=1 PYSPARK_PYTHON=python3 ./bin/pyspark pyspark.mllib.linalg.distributed
- Individual modules w/ multiple Python versions:
SPARK_PREPEND_CLASSES=1 ./python/run-tests --modules=pyspark-mllib --python-executables=python,python2.6,python3
- All tests w/ default Python version:
SPARK_PREPEND_CLASSES=1 ./python/run-tests
PySpark Jupyter Notebook (local mode, with Python 3, loading classes from continuous compilation, and remote debugging):
SPARK_PREPEND_CLASSES=1 PYSPARK_PYTHON=python3 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark --master local[*] --driver-java-options=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
- Create docs:
jekyll serve -w
- Use BrowserSync to automatically reload docs page (from generated site folder):
browser-sync start --server --files "*.html, css/*.css, js/*.js"
- Set example folders as test resources (so that correct libraries are available)
- Can set Spark properties as "VM options" in the Run Configuration: "-Dspark.master=local[*]"
- Can pass arguments to the script via "Program arguments" in the Run Configuration
./build/sbt -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package
- Place
core-site.xml
andhdfs-site.xml
into theconf
folder for automatic HDFS assumptions on read/write without having to use a HDFS URL. - Set
HADOOP_CONF_DIR=/path/to/hadoop/conf
in./conf/spark-env.sh
. - To remove a DataNode directory from all nodes, update the configuration (Ambari can do this), then restart each node one-by-one, waiting for the underreplicated blocks to be replicated again.
sudo -u hdfs hdfs fsck /
to check on health of HDFS filesystem.
./build/sbt -Pnetlib-lgpl ...
- Set the following in
./conf/spark-defaults.conf
:spark.eventLog.dir=/var/logs/spark-history-server spark.eventLog.enabled=true spark.history.fs.logDirectory=file:/var/logs/spark-history-server
- Launch with
./sbin/start-history-server.sh
. - View at
http://<server-url>:18080
.
-
To use a custom Python package sitting in the current directory (i.e. the main script imports from this package), it will need to be zipped up and shipped out to the worker nodes before usage.
-
Given a local package
mypackage
, the following can be used in the main script to zip up the package and ship it to workers before usage of the package:# Ship a fresh copy of the `mypackage` package to the Spark workers. import shutil dirname = "mypackage" zipname = dirname + ".zip" shutil.make_archive(dirname, 'zip', dirname + "/..", dirname) spark.sparkContext.addPyFile(zipname)
-
Note: The zip must include the
mypackage
directory itself, as well as all files within it foraddPyFile
to work correctly. -
This is equivalent to
zip -r mypackage.zip mypackage
.
- Create
conf/slaves
file with list of machines (hostnames or IP addresses) of slaves. - Set
SPARK_HOME
on all machines. sbin/start-all.sh
will start a master (UI at HOSTNAME:8080 by default) and a set of slaves as defined inconf/slaves
.- Sometimes, Spark will "forget" how to stop existing master & worker nodes, and it will be necessary to stop them manually on each machine. The following will help:
ps -ux | grep "[s]park" | awk '{ print $2 }' | xargs kill -9
Add extra Java options:
spark.driver.memory 200g
spark.driver.extraJavaOptions -server -Xmn12G
spark.executor.extraJavaOptions -server -Xmn12G -XX:+UseG1GC
-Xms
= initial heap size
-Xmx
= max heap size
-Xmn
= Eden GC size