Skip to content

Instantly share code, notes, and snippets.

@neerajgoel82
Last active February 10, 2018 22:16
Show Gist options
  • Save neerajgoel82/befbe3e8e780c01ee58019fef7053d40 to your computer and use it in GitHub Desktop.
Save neerajgoel82/befbe3e8e780c01ee58019fef7053d40 to your computer and use it in GitHub Desktop.
-------------------------------------------------------------------------
Training by Sameer Farooqui (https://www.youtube.com/watch?v=7ooZ4S7Ay6Y)
-------------------------------------------------------------------------
Schedulers
- Yarn/Mesos - you get dynamic partitioning (scaling)
- Local/Standalone - you get static partitioning (work is being done to get that in at least standalone more)
Hadoop MR vs Spark
- Spark is essentially a replacement for MR and not HDFS or Yarn.
- In MR, multiple processes run on execution nodes. In Spark one executor runs and there are threads inside that
- In MR, there are fixed slots for map and reduce. Hence wasting lot of CPUs during map operations. In spark, slots are generic
- Facebook Corona fills empty slots more aggressively on executor nodes because it used to take 15-20 seconds between
scheduling tasks in MR. Reuse of slots is much faster in Spark. Tear down and bringing up a process for mapper/reducer in MR
itself takes 1.5-2 seconds as process creation is slow. For FB, it used to take 66 seconds to reuse the slot but after Corona
it reduced to 55 seconds.
- Data is dumped to disk between map and reduce. In spark, it keeps data in memory
Spark execution
---------------
- Tasks(which run per thread) are referred to as executor cores in spark. SO essentially there is one thread per executor core.
In general we should oversubscribe the machine cores by 2x. So if machine has 6 cores, make executor cores to 12 because we
are not pinning each thread to each core internally
- There are other internal thread as well in spark for something like shuffle. So anyways one to one mapping of executor cores to
machine cores is not helpful
- Conf in code overrides conf from spark-submit
- spark-env.sh contains SPARK_LOCAL_DIRS variable which should be set to the list of disks. This is used to spill RDDs
or map spills. Try use SSDs for these DIRS
Local Mode
-----------
- One JVM is started which contains both the driver and the executor
Spark standalone mode
---------------------
- spark master and worker JVMs start on machine on cluster
- when we do spark-submit a driver is launched which contacts spark master to tell him how many executors it needs
- then spark master contacts workers to launch executors on different nodes
- If executor crashes worker restarts them. If worker JVMs crash, master restarts them.
- If driver crashes, master can restart that if it was started with --supervise .. If driver restarts all the executors
will have to be restarted
- spark-env.sh contain another variable SPARK_WORKER_CORES which can be used to configure different number of cores on
individual machines
- Spark masters can be made HA using Zookeeper
- Datastax cassandra spark distribution makes spark master HA using cassandra system tables and not ZooKeeper
- One worker can create multiple executors for different spark applications on a single machine but not multiple executors
for the same application
- If you want multiple executors of same app on same machine, start multiple workers on each machine. SPARK_WORKER_INSTANCES
(sets the number of workers on each machine), SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_DAEMON_MEMORY are the settings to do that
- For each app you can use spark.cores.max, spark.executor.memory to set the executor memory and CPU. Apps are submitted in
FIFO mode
- Different UIs for standalone mode -> Spark Master UI ( This is similar to Mesos master UI) - talks about total memory
available in the cluster etc. This is exposed by Spark Master ... Spark UI - This is the UI exposed by Spark Driver which is
per application ..
- Spark Driver and executors are present in all kinds of deployment
- Spark Application is made of multiple jobs (every action in the app triggers a job) . Job contains stages.
- If on disk, your data is x, then in memory cache could be 2-3x because of Java deserialized format
- Precedence for configuration (increasing order)
spark source code setting -> spark env file -> spark submit params -> specfied in code
Spark Yarn Mode
---------------
- Client Mode - Driver runs on your machine. There is an App Master and executors which run in the cluster. App Master
negotiates with the resources manager.
- Cluster Mode - Driver runs inside app master
Dynamic partitioning is available in this mode
Storage
-------
- cache == persist(memory_only)
- cache is LRU
- if executor dies it does not recompute the lost cached RDD. It will do it when it would be accessed by someone.
- persist(memory_only_serialized) ==> more space efficient ==> Also reduced the chances of GC because multiple
objects can be saved as single serialized buffer
- persist(memory_and_disk) ==> moved to local_dir if required .. deserialized in memory, serialized on disk
- persist(memory_and_disk_ser) ==> serialized in memory, serialized on disk
- persist(disk_only) => only on disk
- persist(memory_only_2) => deserialized on 2 JVMs .. good for very expensive computation only ... generally you should trust
the recomputation
- persist(memory_and_disk_2)
- persist(off_heap) ... RDD will save in tachyon as serialized object .. tachyon runs outside the executor process
- unpersist ==. forcefully take rdd out of memory
- Intermediate data is automatically persisted during the shuffle operations
- Local_Dirs are deleted to make room for new data
- PySpark objects will always be serialized using pickle
Serialization
-------------
- java serialization is default .. kryo is faster
- Cost of GC is directly propotional to number of objects. Try using data structure which create less objects. For eg. Array
is better than List
- There are multiple params to tune GC ... Parallel GC is not recommended for streaming app because it might stop the world
during GC causing delay in streaming app .. Use CMS/G1 ... default is ParallelGC
Narrow/Wide dependency
----------------------
- Narrow - parent partition is used by at most one child partition
- Recreating wide dependency partition can be very costly
- Pipelining can be used for narrow transformations
Lineage
- To display the lineage use toDebugString on RDD. Indentations are stage boundaries
RDDs
- When you read from hadoop, first an hadooprdd is created and then a map operation runs on that automatically to convert
hadoop data types to java data types
- coalesce to reduce the number of partitions does not cause shuffle. Use repartition function to increase the number of partitions
- If you are having key value pairs in RDD and in map function you are just changing values, then you should set preservesPartioning to true
Broadcast variables and accumulators
- Used in broadcast hash join
- accumulators are like counters in hadoop .. can only be added to through an associative operation. Only driver can read an accumulator's value
- Broadcast uses bittorrent technique for file transfers
PySpark
- You can choose your python implementation using PYSPARK_PYTHON, PYSPARK_DRIVER_PYTHON params
- pypy generally works faster than cpython
Shuffle
--------
- Sort Based Shuffle is the suffle from spark 1.3
- HDFS short circuit local reads bypass the local data node. THey get the information from the data node but read directly
from the disk
To improve shuffle following things were done
- External shuffle service : Serving the mapped data to reducer is not the responsibility of the mapper. Worker JVM does
that in standalone mode and node manager does that in yarn mode. Must be enabled with dynamic scaling.
- Serving the data from mapper is now done in zero copy fashion (local dir -> NIC buffer -> world) .
Earlier it was local dir -> linux kernel buffer -> executor -> nic buffer -> world. This is done using Netty Native Transport
- Hash based shuffle was changed to sort based shuffle ==> Hash based shuffle does not work well beyond 10K reducers ...
With Hash shuffle one file handle is open for each reducer which is not the case with sort shuffle
General Notes
--------------
- Keep track of the different types RDDs that are getting created
- Keep track of stages that are created
@buildlackey
Copy link

good notes ! i'm using them to review as i look at the youtube presentation. thnx ;^)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment