Skip to content

Instantly share code, notes, and snippets.

@oza
Last active May 23, 2021 05:32
Show Gist options
  • Save oza/470e961ff10b60778772 to your computer and use it in GitHub Desktop.
Save oza/470e961ff10b60778772 to your computer and use it in GitHub Desktop.
Apache Tez Source code reading

Tez source code reading

LICENSE

  • This document is distributed under Apache 2 Licenses.
  • All source code described in this document is from Apache Tez project.

Links

What's is Tez?

Quoting from http://tez.apache.org/:

  • Empowering end users by:
    • Expressive dataflow definition APIs
    • Flexible Input-Processor-Output runtime model
    • Data type agnostic
    • Simplifying deployment
  • Execution Performance
    • Performance gains over MapReduce
    • Optimal resource management
    • Plan reconfiguration at runtime
    • Dynamic physical data flow decisions

Tez and Spark

common points

  • DAG-style processing framework
  • Apache Top Level Projects

differences

Tez code base size

$ find . -name "*.java" | xargs wc -l
205571
  • tez-api/tez-common/tez-dag/tez-runtime-internals(core of Tez)
  • tez-mapreduce/tez-mapreduce-examples(mapreduce APIs)

How to deploy

  1. Setup YARN and HDFS cluster
  2. Putting jar files into HDFS
  3. Configuring environment variables
  4. Changing properties:
  • Hive: -Dhive.execution.engine=tez (default: mr)
  • MapReduce: -Dmapreduce.framework.name=yarn-tez (default: yarn)
$ bin/hdfs -put tez-0.5.0 /apps/
$ bin/hadoop job wordcount.jar -Dmapreduce.framework.name=yarn-tez inputdir outputdir

Processing model

  • Input
  • Processor
  • Output

APIs

  • Vertex

    • Overview
      • Fetching data from Edge
      • Processing database
      • Output data to Edge
    • VertexManager
      • When to launch tasks for vertex?
        • ShuffleVertexManager: supports slowstart: pipelining map-side processing, fetching and reduce-side sort merge processing
  • Edge

    • How to pass data between Vertexs
      • Outputs
        1. UnorderedKVOutput
        2. UnorderedPartitionedKVOutput
        3. OrderedPartitionedKVOutput: Shuffle
      • Inputs
        1. OrderedGroupedKVInput
        2. OrderedGroupedMergedKVInput: Shuffle
      • EdgeProperty
        • DataMovementType
          1. SCATTER_GATHER: Shuffle
          2. ONE_TO_ONE: map to map
          3. BROADCAST
          4. CUSTOM
        • SchedulingType
          1. SEQUENTIAL: MapReduce
          2. CONCURRENT
  • Let's read wordcount.java

Schedulers

  • YarnTaskSchedulerService for distributed mode
  • NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
  • RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
  • NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
  • LocalTaskSchedulerService for local mode

Reducer Auto parallelalizm

  • ShuffleVertexManager#TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL
    • Collecting profiling information
  @Override
  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
    // TODO handle duplicates from retries
    if (enableAutoParallelism) {
      // save output size
      VertexManagerEventPayloadProto proto;
      try {
        proto = VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
      } catch (InvalidProtocolBufferException e) {
        throw new TezUncheckedException(e);
      }
      long sourceTaskOutputSize = proto.getOutputSize();
      numVertexManagerEventsReceived++;
      completedSourceTasksOutputSize += sourceTaskOutputSize;
      if (LOG.isDebugEnabled()) {
        LOG.debug("Received info of output size: " + sourceTaskOutputSize 
            + " numInfoReceived: " + numVertexManagerEventsReceived
            + " total output size: " + completedSourceTasksOutputSize);
      }
    }
    
  }
  • Determining parallelism based on profiling information
  void determineParallelismAndApply() {
    if(numSourceTasksCompleted == 0) {
      return;
    }
    
    if(numVertexManagerEventsReceived == 0) {
      return;
    }
    
    int currentParallelism = pendingTasks.size();
    long expectedTotalSourceTasksOutputSize = 
        (totalNumSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
    int desiredTaskParallelism = 
        (int)(
            (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
            desiredTaskInputDataSize);
    if(desiredTaskParallelism < minTaskParallelism) {
      desiredTaskParallelism = minTaskParallelism;
    }
    
    if(desiredTaskParallelism >= currentParallelism) {
      return;
    }
    
    // most shufflers will be assigned this range
    int basePartitionRange = currentParallelism/desiredTaskParallelism;
    
    if (basePartitionRange <= 1) {
      // nothing to do if range is equal 1 partition. shuffler does it by default
      return;
    }
    
    int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
    int remainderRangeForLastShuffler = currentParallelism % basePartitionRange; 
    
    int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
          (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);

    LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName()
        + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
        + " . Expected output: " + expectedTotalSourceTasksOutputSize 
        + " based on actual output: " + completedSourceTasksOutputSize
        + " from " + numVertexManagerEventsReceived + " vertex manager events. "
        + " desiredTaskInputSize: " + desiredTaskInputDataSize);
          
    if(finalTaskParallelism < currentParallelism) {
      // final parallelism is less than actual parallelism
      Map<String, EdgeManagerPluginDescriptor> edgeManagers =
          new HashMap<String, EdgeManagerPluginDescriptor>(bipartiteSources.size());
      for(String vertex : bipartiteSources.keySet()) {
        // use currentParallelism for numSourceTasks to maintain original state
        // for the source tasks
        CustomShuffleEdgeManagerConfig edgeManagerConfig =
            new CustomShuffleEdgeManagerConfig(
                currentParallelism, finalTaskParallelism, 
                getContext().getVertexNumTasks(vertex), basePartitionRange,
                ((remainderRangeForLastShuffler > 0) ?
                    remainderRangeForLastShuffler : basePartitionRange));
        EdgeManagerPluginDescriptor edgeManagerDescriptor =
            EdgeManagerPluginDescriptor.create(CustomShuffleEdgeManager.class.getName());
        edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
        edgeManagers.put(vertex, edgeManagerDescriptor);
      }
      
      getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);
      updatePendingTasks();      
    }
  }

To be continued...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment