Last active
January 11, 2021 09:47
-
-
Save thanoojgithub/63c1cbda1b37d5b1f7df37f82e37dc70 to your computer and use it in GitHub Desktop.
SparkNotes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RDD:: | |
First abstraction in Spark is, Resilient Distributed Datasets (RDDs) | |
- a fault-tolerant collection of elements that can be operated on in parallel. | |
- collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. | |
- RDDs automatically recover from node failures. | |
There are two ways to create RDDs: | |
- Parallelizing an existing collection in your driver program | |
- Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat. | |
- Transformation from parent RDD to chiled RDD | |
A second abstraction in Spark is shared variables | |
By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. | |
Spark supports two types of shared variables: | |
broadcast variables - which can be used to cache a value in memory on all nodes | |
accumulators - which are variables that are only 'added' to, such as counters and sums. | |
val ss = SparkSession.builder().master("spark://thanoojubuntu-Inspiron-3521:7077") | |
.config("spark.sql.hive.metastore.version", "2.3.0") | |
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.config("spark.sql.warehouse.dir", "hdfs://user/hive/warehouse") | |
.config("spark.sql.hive.metastore.jars", "/home/hduser/softwares/apache-hive-2.3.3-bin/lib/*") | |
.config("spark.sql.catalogImplementation", "hive") | |
.config("spark.hadoop.hive.exec.dynamic.partition", "true") | |
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") | |
.config("spark.eventLog.enabled", true) | |
.enableHiveSupport().getOrCreate() | |
import org.apache.spark.sql.types._ | |
import ss.sqlContext.implicits._ | |
val sc = ss.sparkContext | |
sc.setLogLevel("ERROR") | |
ss.udf.register("ageInc10", (age: Int) => { age + 10 }) | |
./bin/spark-submit \ | |
--class <main-class> \ | |
--master <master-url> \ | |
--deploy-mode <deploy-mode> \ | |
--conf <key>=<value> \ | |
... # other options | |
<application-jar> \ | |
[application-arguments] | |
spark-submit --class com.sparkstreamingone.core2 --verbose --master spark://thanoojubuntu-Inspiron-3521:7077 target/SparkWithScala-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2020 12 | |
To include a dependency using Maven coordinates: | |
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1" | |
# Run on a YARN cluster | |
export HADOOP_CONF_DIR=XXX | |
./bin/spark-submit \ | |
--class org.apache.spark.examples.SparkPi \ | |
--master yarn \ | |
--deploy-mode cluster \ # can be client for client mode | |
--executor-memory 20G \ | |
--num-executors 50 \ | |
/path/to/examples.jar \ | |
1000 | |
val data = Array(1, 2, 3, 4, 5) | |
val distData = sc.parallelize(data) | |
distData.reduce((a, b) => a + b) | |
scala> val distFile = sc.textFile("data.txt") | |
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26 | |
val peopleDFCsv = spark.read.format("csv") | |
.option("sep", ";") | |
.option("inferSchema", "true") | |
.option("header", "true") | |
.load("examples/src/main/resources/people.csv") | |
usersDF.write.format("orc") | |
.option("orc.bloom.filter.columns", "favorite_color") | |
.save("users_with_options.orc") | |
df.write.option("path", "/some/path").saveAsTable("t") | |
Note: When the table is dropped, the custom table path will not be removed and the table data is still there. | |
sersDF | |
.write | |
.partitionBy("favorite_color") -- when | |
.sortBy("name") | |
.bucketBy(42, "name") | |
.saveAsTable("users_partitioned_bucketed") | |
RDDs support two types of operations: | |
transformations, which create a new dataset from an existing one, and | |
actions, which return a value to the driver program after running a computation on the dataset. | |
All transformations in Spark are lazy,The transformations are only computed when an action requires a result to be returned to the driver program. | |
By default, each transformed RDD may be recomputed each time you run an action on it. - persist (or cache) | |
collect() fetches the entire RDD to a single/driver machine | |
rdd.take(100).foreach(println). | |
PairRDDFunctions: | |
reduceByKey | |
partitionBy | |
join | |
leftOuterJoin | |
groupByKey: | |
sparkContext.textFile("hdfs://") | |
.flatMap(line => line.split(" ") ) | |
.map(word => (word,1)) | |
.groupByKey() | |
.map((x,y) => (x,sum(y))) | |
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers. | |
sparkContext.textFile("hdfs://") | |
.flatMap(line => line.split(" ")) | |
.map(word => (word,1)) | |
.reduceByKey((x,y)=> (x+y)) | |
reduceByKey = groupByKey + combiner | |
aggregateByKey:same as reduceByKey, which takes an initial value. | |
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic | |
Note: repartitions can be happen on PairedRDDs | |
use mapValues instead of map once we are repartioned on RDD | |
mapValue will keep keys untouched | |
mapValues - Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning. | |
val lines = sc.textFile("data.txt") | |
val pairs = lines.map(s => (s, 1)) | |
val counts = pairs.reduceByKey((a, b) => a + b) | |
One important parameter for parallel collections is the number of partitions/slices to cut the dataset into. | |
Spark will run one task for each partition of the cluster. | |
Typically you want 2-4 partitions for each CPU in your cluster. | |
Normally, Spark tries to set the number of partitions automatically based on your cluster. | |
sc.parallelize(data, 10) | |
If we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory | |
-Running executors with too much memory often results in excessive garbage collection delays. | |
-Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. | |
10 Nodes | |
16 Cores per Node | |
64GB RAM per Node | |
1. Tiny executors [One Executor per core]: | |
--num-executors = 16 x 10 = 160 executors | |
--executor-cores = 1 | |
--executor-memory = 64GB/16 = 4GB | |
With only one executor per core, as we discussed above, we’ll not be able to take advantage of running multiple tasks in the same JVM. Also, shared/cached variables like broadcast variables and accumulators will be replicated in each core of the nodes which is 16 times. Also, we are not leaving enough memory overhead for Hadoop/Yarn daemon processes and we are not counting in ApplicationManager. NOT GOOD! | |
2. Fat executors (One Executor per node): | |
--num-executors = 16 executors | |
--executor-cores = 16 | |
--executor-memory = 64GB/1 = 64GB | |
With all 16 cores per executor, apart from ApplicationManager and daemon processes are not counted for, HDFS throughput will hurt and it’ll result in excessive garbage results. Also,NOT GOOD! | |
3. Balance between Fat (vs) Tiny | |
--executor-cores = 5 (for good HDFS throughput) | |
Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15 | |
So, Total available of cores in cluster = 15 x 10 = 150 | |
Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30 | |
Leaving 1 executor for ApplicationManager => --num-executors = 29 | |
Number of executors per node = 30/10 = 3 | |
Memory per executor = 64GB/3 = 21GB | |
Counting off heap overhead = 7% of 21GB = 3GB. So, actual --executor-memory = 21 - 3 = 18GB | |
--num-executors = 30 executors | |
--executor-cores = 5 | |
--executor-memory = 64GB/3 = 21GB ~ 18GB | |
it achieved parallelism of a fat executor and best throughputs of a tiny executor!! | |
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) | |
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) | |
scala> broadcastVar.value | |
res0: Array[Int] = Array(1, 2, 3) | |
Broadcast variable is same as Map-Side join in Hive | |
where, we can avoid reducer task. | |
val longAcc = spark.sparkContext.longAccumulator("SumAccumulator") | |
val rdd = spark.sparkContext.parallelize(Array(1, 2, 3)) | |
rdd.foreach(x => longAcc.add(x)) | |
println(longAcc.value) | |
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database | |
DataFrame is represented by a Dataset of Rows. | |
Dataset[Row] | |
// For implicit conversions like converting RDDs to DataFrames | |
import spark.implicits._ | |
val peopleDF = peopleRDD.map(_.split(",")) | |
.map({ case Array(name: String, age: String) => People(name, age.trim().toInt) }).toDF("name", "age") | |
peopleDF.repartition(2).persist() | |
peopleDF.repartitionByRange(2, col("age")).persist() | |
peopleDF.coalesce(1).persist() | |
peopleDF.createOrReplaceGlobalTempView("peopleView") | |
val usersDF = spark.read.load("examples/src/main/resources/users.parquet") | |
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") | |
The Catalyst Optimizer in Spark offers rule-based and cost-based optimization. Rule-based optimization indicates how to execute the query from a set of defined rules. Meanwhile, cost-based optimization generates multiple execution plans and compares them to choose the lowest cost one. | |
ss.udf.register("ageInc10", (age: Int) => { age + 10 }) | |
val teenagersDF = ss.sql("SELECT name, ageInc10(age) AS age FROM global_temp.peopleView WHERE age BETWEEN 13 AND 19") | |
scala> val rangeAlone = spark.range(5) | |
rangeAlone: org.apache.spark.sql.Dataset[Long] = [id: bigint] | |
scala> rangeAlone.rdd.getNumPartitions | |
res19: Int = 2 | |
scala> rangeAlone.explain(true) | |
== Parsed Logical Plan == | |
Range (0, 5, step=1, splits=Some(2)) | |
== Analyzed Logical Plan == | |
id: bigint | |
Range (0, 5, step=1, splits=Some(2)) | |
== Optimized Logical Plan == | |
Range (0, 5, step=1, splits=Some(2)) | |
== Physical Plan == | |
*(1) Range (0, 5, step=1, splits=2) | |
scala> rangeAlone.show() | |
+---+ | |
| id| | |
+---+ | |
| 0| | |
| 1| | |
| 2| | |
| 3| | |
| 4| | |
+---+ | |
scala> rangeAlone.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true) | |
+---------------+-----------------+ | |
|partition_index|number_of_records| | |
+---------------+-----------------+ | |
| 0| 2| | |
| 1| 3| | |
+---------------+-----------------+ | |
scala> val withRepartition = rangeAlone.repartition(numPartitions = 5) | |
withRepartition: org.apache.spark.sql.Dataset[Long] = [id: bigint] | |
scala> withRepartition.rdd.getNumPartitions | |
res23: Int = 5 | |
scala> withRepartition.explain(true) | |
== Parsed Logical Plan == | |
Repartition 5, true | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Analyzed Logical Plan == | |
id: bigint | |
Repartition 5, true | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Optimized Logical Plan == | |
Repartition 5, true | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Physical Plan == | |
Exchange RoundRobinPartitioning(5) | |
+- *(1) Range (0, 5, step=1, splits=2) | |
scala> withRepartition.show() | |
+---+ | |
| id| | |
+---+ | |
| 0| | |
| 3| | |
| 1| | |
| 2| | |
| 4| | |
+---+ | |
scala> withRepartition.explain(true) | |
== Parsed Logical Plan == | |
Repartition 5, true | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Analyzed Logical Plan == | |
id: bigint | |
Repartition 5, true | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Optimized Logical Plan == | |
Repartition 5, true | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Physical Plan == | |
Exchange RoundRobinPartitioning(5) | |
+- *(1) Range (0, 5, step=1, splits=2) | |
scala> withRepartition.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true) | |
+---------------+-----------------+ | |
|partition_index|number_of_records| | |
+---------------+-----------------+ | |
| 0| 0| | |
| 1| 2| | |
| 2| 2| | |
| 3| 1| | |
| 4| 0| | |
+---------------+-----------------+ | |
scala> withRepartition.printSchema() | |
root | |
|-- id: long (nullable = false) | |
scala> val withRepartition = rangeAlone.repartitionByRange(2, $"id") | |
withRepartition: org.apache.spark.sql.Dataset[Long] = [id: bigint] | |
scala> withRepartition.rdd.getNumPartitions | |
res28: Int = 2 | |
scala> withRepartition.explain(true) | |
== Parsed Logical Plan == | |
'RepartitionByExpression ['id ASC NULLS FIRST], 2 | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Analyzed Logical Plan == | |
id: bigint | |
RepartitionByExpression [id#161L ASC NULLS FIRST], 2 | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Optimized Logical Plan == | |
RepartitionByExpression [id#161L ASC NULLS FIRST], 2 | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Physical Plan == | |
Exchange rangepartitioning(id#161L ASC NULLS FIRST, 2) | |
+- *(1) Range (0, 5, step=1, splits=2) | |
scala> withRepartition.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true) | |
+---------------+-----------------+ | |
|partition_index|number_of_records| | |
+---------------+-----------------+ | |
| 0| 3| | |
| 1| 2| | |
+---------------+-----------------+ | |
scala> withRepartition.show() | |
+---+ | |
| id| | |
+---+ | |
| 0| | |
| 1| | |
| 2| | |
| 3| | |
| 4| | |
+---+ | |
scala> | |
scala> val withRepartition = rangeAlone.coalesce(1) | |
withRepartition: org.apache.spark.sql.Dataset[Long] = [id: bigint] | |
scala> withRepartition.rdd.getNumPartitions | |
res32: Int = 1 | |
scala> withRepartition.rdd.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}.toDF("partition_index","number_of_records").show(true) | |
+---------------+-----------------+ | |
|partition_index|number_of_records| | |
+---------------+-----------------+ | |
| 0| 5| | |
+---------------+-----------------+ | |
scala> withRepartition.explain(true) | |
== Parsed Logical Plan == | |
Repartition 1, false | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Analyzed Logical Plan == | |
id: bigint | |
Repartition 1, false | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Optimized Logical Plan == | |
Repartition 1, false | |
+- Range (0, 5, step=1, splits=Some(2)) | |
== Physical Plan == | |
Coalesce 1 | |
+- *(1) Range (0, 5, step=1, splits=2) | |
scala> withRepartition.show() | |
+---+ | |
| id| | |
+---+ | |
| 0| | |
| 1| | |
| 2| | |
| 3| | |
| 4| | |
+---+ | |
scala> val q = spark.range(100).as("a").join(spark.range(100).as("b")).where($"a.id" === $"b.id") | |
q: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, id: bigint] | |
scala> q.explain | |
== Physical Plan == | |
*(2) BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight | |
:- *(2) Range (0, 100, step=1, splits=2) | |
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) | |
+- *(1) Range (0, 100, step=1, splits=2) | |
scala> q.explain(true) | |
== Parsed Logical Plan == | |
'Filter ('a.id = 'b.id) | |
+- Join Inner | |
:- SubqueryAlias `a` | |
: +- Range (0, 100, step=1, splits=Some(2)) | |
+- SubqueryAlias `b` | |
+- Range (0, 100, step=1, splits=Some(2)) | |
== Analyzed Logical Plan == | |
id: bigint, id: bigint | |
Filter (id#0L = id#3L) | |
+- Join Inner | |
:- SubqueryAlias `a` | |
: +- Range (0, 100, step=1, splits=Some(2)) | |
+- SubqueryAlias `b` | |
+- Range (0, 100, step=1, splits=Some(2)) | |
== Optimized Logical Plan == | |
Join Inner, (id#0L = id#3L) | |
:- Range (0, 100, step=1, splits=Some(2)) | |
+- Range (0, 100, step=1, splits=Some(2)) | |
== Physical Plan == | |
*(2) BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight | |
:- *(2) Range (0, 100, step=1, splits=2) | |
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) | |
+- *(1) Range (0, 100, step=1, splits=2) | |
scala> | |
------------------------------------------------------------------------------------------------------------------------------------ | |
Spark primary building block is RDD - Resilient Distributed Datasets | |
Resilient - fault-tolerant | |
Distributed - data gets partitioned and distributed across cluster worker nodes | |
We know Spark supports lazy evaluation and fault-tolerance. | |
having said that, | |
When we do multiple transformations on RDDs, RDDs are immutable in nature, so each transformation on an exsiting RDD will create a new RDD. | |
each RDD will maintain reference of its parent RDD (including data type and location of the partions of RDD's data), which will look like a gragh. | |
RDD - No inbuilt optimization engine is available. Developers has to take care of optimizing each RDD on the basis of its attributes. | |
DF - catalyst optimizer | |
DS - catalyst optimizer and Tungsten execution engine | |
This lineage gragh is used to create DAG and then submit to DAG scheduler which further divide it into stages(set of tasks) and execute. | |
DAG scheduler submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. | |
Task schedulers get sets of tasks submitted to them from the DAGScheduler for each stage. | |
Each TaskScheduler schedules tasks for a single SparkContext. | |
They return events to the DAGScheduler. | |
It handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage. | |
TaskSets - A set of tasks submitted together to the low-level TaskScheduler | |
scala> val data = sc.parallelize(List((1,2))) | |
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21 | |
scala> val joinedData = data join data | |
joinedData: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[11] at join at <console>:23 | |
scala> joinedData.toDebugString | |
res4: String = | |
(8) MapPartitionsRDD[11] at join at <console>:23 [] | |
| MapPartitionsRDD[10] at join at <console>:23 [] | |
| CoGroupedRDD[9] at join at <console>:23 [] | |
+-(8) ParallelCollectionRDD[8] at parallelize at <console>:21 [] | |
+-(8) ParallelCollectionRDD[8] at parallelize at <console>:21 [] | |
Note: usage of CoGroup | |
DataFrames :: | |
- A DataFrame is a Dataset organized into named columns. | |
DataFrames run on top of Spark SQL optimized execution engine | |
------------------------------------------------------------------------------------------------------------------------------------ | |
------------------------------------------------------------------------------------------------------------------------------------ | |
1. Resilient Distributed Datasets (RDDs) | |
- There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat. | |
2. SC vs SS vs SSQL | |
3. External Datasets | |
4. RDD Operations | |
Printing elements of an RDD | |
rdd.foreach(println) or rdd.map(println). | |
On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these. | |
rdd.collect().foreach(println) | |
To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node, because collect() fetches the entire RDD to a single machine; | |
if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println). | |
Working with Key-Value Pairs | |
- class org.apache.spark.rdd.PairRDDFunctions[K, V] extends Logging with Serializable | |
- The key-value pair operations are available in the PairRDDFunctions class | |
val lines = sc.textFile("data.txt") | |
val pairs = lines.map(s => (s, 1)) | |
val counts = pairs.reduceByKey((a, b) => a + b).sortByKey() | |
counts.collect() | |
Transformations | |
------------------------------------------------------------------------------- | |
Spark Context vs Spark Session | |
SC | |
1. Application master will get created in any of the worker node containers | |
2. Application master creates Spark Driver | |
3. Spark Driver creates Spark Context | |
- Spark Context is the main entry point into Spark functionality | |
- Spark Context is created by Spark Driver for each Spark application when it is first submitted by the user. It exists throughout the lifetime of the Spark application. | |
- For each JVM only one Spark Context can be active. You must stop() activate Spark Context before creating a new one. | |
SparkContext can connect to different types of Cluster Managers. Now the most popular types are YARN, Mesos, Kubernetes or even Nomad. There is also Spark's own standalone cluster manager. | |
Executors | |
- Executors are the processes at the worker's. These tasks are executed on the worker nodes and then return the result to the Spark Driver. | |
"Static Allocation of Executors" - Executors are started once at the beginning of Spark Application and then work during all life of the application | |
UberTask :: Uber mode job run in hadoop | |
ResourceManager launches the ApplicationMaster | |
Then ApplicationMaster retrieves the number of input splits for the job and based on that it decides the number of mappers that has to be launched and also the number of reducers that have to be launched as per the configuration. | |
ApplicationMaster has to decide whether to negotiate resources with the ResourceManager’s scheduler to run the map and reduce tasks or run the job sequentially within the same JVM where ApplicationMaster is running. | |
This decision making by ApplicationMaster happens only if Uber mode is set to true in Hadoop. | |
If uber mode is true and ApplicationMaster decides to run the MapReduce job with in the same JVM then the job is said to be running as a uber task in YARN. | |
By default Uber mode is set to false in Hadoop. | |
Uber configuration is used for MapReduce, whenever you have a small data set. | |
The Uber mode runs the map and reduce tasks within its own process and avoid the overhead of launching and communicating with remote nodes. | |
Configurations parameters required for uber mode are set in etc/hadoop/mapred-site.xml. | |
mapreduce.job.ubertask.enable (Default false) | |
mapreduce.job.ubertask.maxmaps (Default = 9, Users can override this value, but only downward.) | |
mapreduce.job.ubertask.maxreduces (CURRENTLY THE CODE CANNOT SUPPORT MORE THAN ONE REDUCE - Default = 1, Users can override this value, but only downward. zero is a valid maximum value) | |
mapreduce.job.ubertask.maxbytes (Default = HDFS Block Size, Users can override this value, but only downward.) | |
Uber jobs are jobs that are executed within the MapReduce ApplicationMaster. Rather then communicate with RM to create the mapper and reducer containers. The AM runs the map and reduce tasks within its own process and avoided the overhead of launching and communicate with remote containers. | |
By default, a small job is one that has less than 10 mappers, only one reducer, and an input size that is less than the size of one HDFS block. | |
https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.15/bk_using-apache-hadoop/content/uber_jobs.html | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment