Skip to content

Instantly share code, notes, and snippets.

@hsaputra
Last active January 26, 2021 06:25
Show Gist options
  • Save hsaputra/96997ab2f50b0a454986 to your computer and use it in GitHub Desktop.
Save hsaputra/96997ab2f50b0a454986 to your computer and use it in GitHub Desktop.
Apache Flink Job execution Flow Sequence
Apache Flink Job Flow from client API to JobGraph
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
=============================================
WordCount
ExecutionEnvironment#getExecutionEnvironment
ExecutionEnvironment#readTextFile => DataSet<String>
DataSet#flatMap(FlatMapFunction) => FlatMapOperator<T, R>
ExecutionEnvironment#execute
RemoteEnvironment#execute
PlanExecutor#executePlan
RemoteExecutor#executePlan
Client#run(Plan)
Client#getOptimizedPlan(Plan)
Client#run(OptimizedPlan)
Client#run(JobGraph)
JobClient#SubmitJobAndWait -> Akka Actor case handler
JobManager#SubmitJob -> Akka Actor case handler
// Build ExecutionGraph out of JobGraph
ExecutionGraph#attachJobGraph(JobGraph)
ExecutionGraph#scheduleForExecution(Scheduler)
Apache Flink Runtime Flow from JobGraph to ExecutionGraph submission
===========================================================
JobManager#SubmitJob
...
ExecutionGraph#scheduleForExecution(Scheduler)
ExecutionJobVertex#scheduleAll
Scheduler#getInstancesByHost -> IF NEED PRE-ALLOCATED INPUT SPLITS
ExecutionVertex#scheduleForExecution
Execution#scheduleForExecution -> Single execution of a Vertex
Scheduler#scheduleImmediately
Scheduler#scheduleTask(ScheduledUnit)
ExecutionVertex#getPreferredLocations -> Locality of TaskManager
Execution#deployToSlot
ExecutionVertex#createDeploymentDescriptor
TaskManager#SubmitTask -> Akka actor case handler
TaskManager#submitTask(TaskDeploymentDescriptor)
Task.startExecution
Apache Flink ExecutionGraph Execution
================================
TaskManager#submitTask(TaskDeploymentDescriptor)
Task#setEnvironment(RuntimeEnvironment)
BatchTask#registerInputOutput -> Set the Driver, which is the runtime operator to execute (aka Driver)
NetworkEnvironment#registerTask
Task.startTaskThread
RuntimeEnvironment#getExecutingThread -> Returns the thread, which is assigned to execute the user code.
RuntimeEnvironment#run
BatchTask#invoke
BatchTask#createRuntimeContext -> Create runtime UDF context
BatchTask#initInputsSerializersAndComparators
BatchTask#initBroadcastInputsSerializers
BatchTask#initLocalStrategies
BatchTask#run
MapDriver#run -> The Driver should call the user code with the data subsets.
In this case we use Map operation for example.
TaskContext#getStub -> The task which implement RegularPactTask to get UDF
TaskContext#getInput
TaskContext#getOutputCollector
MapFunction#map -> For Map operation will be UDF that extend the MapFunction
OutputCollector#add -> Add processed record to OutputCollector.
The collector is the "push" counterpart of the Iterator, which "pulls" data in.
RecordWriter#emit
Runtime Flow of data pushed to shuffle
================================
RecordWriter#emit
ChannelSelector#selectChannels -> Returns the array of the appropriate channels where the data should be sent.
RecordSerializer#addRecord
IOReadableWritable#write
RecordSerializer#getCurrentBuffer -> Buffer is wrapper for pooled {@link MemorySegment} instances.
ResultPartitionWriter#writeBuffer(Buffer, targetChannel)
ResultPartition#add(Buffer, targetChannel) -> The targetChannel is the targeted ResultSubpartition of the Buffer.
ResultSubPartition#add -> Will call either SpillableSubpartition or PipelinedSubpartition.
Default in ExecutionConfig.executionMode = ExecutionMode.PIPELINED;
ResultPartition#notifyPipelinedConsumers -> Notify underlying pipelined customers
ResultPartitionerConsumableNotifier#notifyPartitionConsumable
JobManagerResultPartitionConsumableNotifier#notifyPartitionConsumable -> implements ResultPartitionConsumableNotifier
JobManager ! ScheduleOrUpdateConsumers
JobManager#ScheduleOrUpdateConsumers -> Get ExecutionGraph for the job
ExecutionGraph#scheduleOrUpdateConsumers
ExecutionVertex#scheduleOrUpdateConsumers
Execution#scheduleOrUpdateConsumers
TaskManager#UpdateTask -> Actor Message to consumer TaskManager
TaskManager#updateTask -> Happen in consumer TaskManager space.
RuntimeEnvironment#getInputGateById
SimpleInputGate#updateInputChannel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment