- This document is distributed under Apache 2 Licenses.
- All source code described in this document is from Apache Tez project.
- Apache Tez : Accelerating Hadoop Query Processing by Bikas Saha
- Hive + Tez: A Performance Deep Dive by Jitendra Pandey and Gopal. V
Quoting from http://tez.apache.org/:
- Empowering end users by:
- Expressive dataflow definition APIs
- Flexible Input-Processor-Output runtime model
- Data type agnostic
- Simplifying deployment
- Execution Performance
- Performance gains over MapReduce
- Optimal resource management
- Plan reconfiguration at runtime
- Dynamic physical data flow decisions
- DAG-style processing framework
- Apache Top Level Projects
- layer they cover - APIs end-user writes
- Tez: HiveQL, Pig, Cascading - these are applications of Tez
- Spark: Spark DSL, SparkSQL, other DSLs(MLLib, GraphX, and so on.) - all-in-one
- features
- Tez
- Dynamic reconfiguration APIs - like a partial implementation of Optimus!
- Effective intermediate file format for checkpointing
- Spark
- Distributed memory management on RDD
- Tez
$ find . -name "*.java" | xargs wc -l
205571
- tez-api/tez-common/tez-dag/tez-runtime-internals(core of Tez)
- tez-mapreduce/tez-mapreduce-examples(mapreduce APIs)
- Setup YARN and HDFS cluster
- Putting jar files into HDFS
- Configuring environment variables
- Changing properties:
- Hive: -Dhive.execution.engine=tez (default: mr)
- MapReduce: -Dmapreduce.framework.name=yarn-tez (default: yarn)
$ bin/hdfs -put tez-0.5.0 /apps/
$ bin/hadoop job wordcount.jar -Dmapreduce.framework.name=yarn-tez inputdir outputdir
- Input
- Processor
- Output
-
Vertex
- Overview
- Fetching data from Edge
- Processing database
- Output data to Edge
- VertexManager
- When to launch tasks for vertex?
- ShuffleVertexManager: supports slowstart: pipelining map-side processing, fetching and reduce-side sort merge processing
- When to launch tasks for vertex?
- Overview
-
Edge
- How to pass data between Vertexs
- Outputs
- UnorderedKVOutput
- UnorderedPartitionedKVOutput
- OrderedPartitionedKVOutput: Shuffle
- Inputs
- OrderedGroupedKVInput
- OrderedGroupedMergedKVInput: Shuffle
- EdgeProperty
- DataMovementType
- SCATTER_GATHER: Shuffle
- ONE_TO_ONE: map to map
- BROADCAST
- CUSTOM
- SchedulingType
- SEQUENTIAL: MapReduce
- CONCURRENT
- DataMovementType
- Outputs
- How to pass data between Vertexs
-
Let's read wordcount.java
- YarnTaskSchedulerService for distributed mode
- NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
- RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
- NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
- LocalTaskSchedulerService for local mode
- ShuffleVertexManager#TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL
- Collecting profiling information
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
// TODO handle duplicates from retries
if (enableAutoParallelism) {
// save output size
VertexManagerEventPayloadProto proto;
try {
proto = VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException(e);
}
long sourceTaskOutputSize = proto.getOutputSize();
numVertexManagerEventsReceived++;
completedSourceTasksOutputSize += sourceTaskOutputSize;
if (LOG.isDebugEnabled()) {
LOG.debug("Received info of output size: " + sourceTaskOutputSize
+ " numInfoReceived: " + numVertexManagerEventsReceived
+ " total output size: " + completedSourceTasksOutputSize);
}
}
}
- Determining parallelism based on profiling information
void determineParallelismAndApply() {
if(numSourceTasksCompleted == 0) {
return;
}
if(numVertexManagerEventsReceived == 0) {
return;
}
int currentParallelism = pendingTasks.size();
long expectedTotalSourceTasksOutputSize =
(totalNumSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
int desiredTaskParallelism =
(int)(
(expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
desiredTaskInputDataSize);
if(desiredTaskParallelism < minTaskParallelism) {
desiredTaskParallelism = minTaskParallelism;
}
if(desiredTaskParallelism >= currentParallelism) {
return;
}
// most shufflers will be assigned this range
int basePartitionRange = currentParallelism/desiredTaskParallelism;
if (basePartitionRange <= 1) {
// nothing to do if range is equal 1 partition. shuffler does it by default
return;
}
int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
int remainderRangeForLastShuffler = currentParallelism % basePartitionRange;
int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
(numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName()
+ " to " + finalTaskParallelism + " from " + pendingTasks.size()
+ " . Expected output: " + expectedTotalSourceTasksOutputSize
+ " based on actual output: " + completedSourceTasksOutputSize
+ " from " + numVertexManagerEventsReceived + " vertex manager events. "
+ " desiredTaskInputSize: " + desiredTaskInputDataSize);
if(finalTaskParallelism < currentParallelism) {
// final parallelism is less than actual parallelism
Map<String, EdgeManagerPluginDescriptor> edgeManagers =
new HashMap<String, EdgeManagerPluginDescriptor>(bipartiteSources.size());
for(String vertex : bipartiteSources.keySet()) {
// use currentParallelism for numSourceTasks to maintain original state
// for the source tasks
CustomShuffleEdgeManagerConfig edgeManagerConfig =
new CustomShuffleEdgeManagerConfig(
currentParallelism, finalTaskParallelism,
getContext().getVertexNumTasks(vertex), basePartitionRange,
((remainderRangeForLastShuffler > 0) ?
remainderRangeForLastShuffler : basePartitionRange));
EdgeManagerPluginDescriptor edgeManagerDescriptor =
EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
edgeManagers.put(vertex, edgeManagerDescriptor);
}
getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
updatePendingTasks();
}
}