Last active
May 27, 2018 04:27
-
-
Save Aitozi/2e021e923ed394155b191853e0975a71 to your computer and use it in GitHub Desktop.
Flink JobGraph生成涉及的类
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class StreamGraphHasherV2 implements StreamGraphHasher { | |
private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV2.class); | |
/** | |
* Returns a map with a hash for each {@link StreamNode} of the {@link | |
* StreamGraph}. The hash is used as the {@link JobVertexID} in order to | |
* identify nodes across job submissions if they didn't change. | |
* | |
* <p>The complete {@link StreamGraph} is traversed. The hash is either | |
* computed from the transformation's user-specified id (see | |
* {@link StreamTransformation#getUid()}) or generated in a deterministic way. | |
* | |
* <p>The generated hash is deterministic with respect to: 生成的一个node的hash值由以下结构部分组成 | |
* <ul> | |
* <li>node-local properties (like parallelism, UDF, node ID), 真正计算的时候只考虑了node的id属性,其他的并发度和userfunction可能认为改变了依然是同一个任务 | |
* <li>chained output nodes, and 能够chain上的output算子 | |
* <li>input nodes hashes input节点的hash值 | |
* </ul> | |
* | |
* @return A map from {@link StreamNode#id} to hash as 16-byte array. | |
*/ | |
@Override | |
public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { | |
// The hash function used to generate the hash | |
final HashFunction hashFunction = Hashing.murmur3_128(0); | |
final Map<Integer, byte[]> hashes = new HashMap<>(); | |
Set<Integer> visited = new HashSet<>(); | |
Queue<StreamNode> remaining = new ArrayDeque<>(); | |
// We need to make the source order deterministic. The source IDs are | |
// not returned in the same order, which means that submitting the same | |
// program twice might result in different traversal, which breaks the | |
// deterministic hash assignment. | |
// 返回的是乱序的,但是hash的生成又和hashes中当前size相关,所以要先做排序保证顺序每次都是一样的 | |
List<Integer> sources = new ArrayList<>(); | |
for (Integer sourceNodeId : streamGraph.getSourceIDs()) { | |
sources.add(sourceNodeId); | |
} | |
Collections.sort(sources); | |
// | |
// 广度优先,先遍历所有的source | |
// Traverse the graph in a breadth-first manner. Keep in mind that | |
// the graph is not a tree and multiple paths to nodes can exist. | |
// | |
// Start with source nodes | |
for (Integer sourceNodeId : sources) { | |
remaining.add(streamGraph.getStreamNode(sourceNodeId)); | |
visited.add(sourceNodeId); | |
} | |
StreamNode currentNode; | |
while ((currentNode = remaining.poll()) != null) { | |
// Generate the hash code. Because multiple path exist to each | |
// node, we might not have all required inputs available to | |
// generate the hash code. | |
if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) { | |
// Add the child nodes | |
for (StreamEdge outEdge : currentNode.getOutEdges()) { | |
StreamNode child = outEdge.getTargetVertex(); | |
if (!visited.contains(child.getId())) { | |
remaining.add(child); | |
visited.add(child.getId()); | |
} | |
} | |
} else { | |
// We will revisit this later. | |
// visited中表示的已经访问过的元素,可能在生成hash值的时候是失败的,比如input节点还没有生成就会返回失败等 | |
// input节点生成完成之后再重新扔进remaing进行hash计算 | |
visited.remove(currentNode.getId()); | |
} | |
} | |
return hashes; | |
} | |
/** | |
* Generates a hash for the node and returns whether the operation was | |
* successful. | |
* | |
* @param node The node to generate the hash for | |
* @param hashFunction The hash function to use | |
* @param hashes The current state of generated hashes | |
* @return <code>true</code> if the node hash has been generated. | |
* <code>false</code>, otherwise. If the operation is not successful, the | |
* hash needs be generated at a later point when all input is available. | |
* @throws IllegalStateException If node has user-specified hash and is | |
* intermediate node of a chain | |
*/ | |
private boolean generateNodeHash( | |
StreamNode node, | |
HashFunction hashFunction, | |
Map<Integer, byte[]> hashes, | |
boolean isChainingEnabled) { | |
// Check for user-specified ID | |
// 这个就是用户通过setUid设置的节点的id值 | |
String userSpecifiedHash = node.getTransformationUID(); | |
if (userSpecifiedHash == null) { | |
// Check that all input nodes have their hashes computed | |
for (StreamEdge inEdge : node.getInEdges()) { | |
// If the input node has not been visited yet, the current | |
// node will be visited again at a later point when all input | |
// nodes have been visited and their hashes set. | |
// 只有当input节点都被生成hash存储之后才能进入后面节点的hash生成 | |
if (!hashes.containsKey(inEdge.getSourceId())) { | |
return false; | |
} | |
} | |
Hasher hasher = hashFunction.newHasher(); | |
byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled); | |
if (hashes.put(node.getId(), hash) != null) { | |
// Sanity check | |
throw new IllegalStateException("Unexpected state. Tried to add node hash " + | |
"twice. This is probably a bug in the JobGraph generator."); | |
} | |
return true; | |
} else { | |
Hasher hasher = hashFunction.newHasher(); | |
byte[] hash = generateUserSpecifiedHash(node, hasher); | |
for (byte[] previousHash : hashes.values()) { | |
if (Arrays.equals(previousHash, hash)) { | |
throw new IllegalArgumentException("Hash collision on user-specified ID. " + | |
"Most likely cause is a non-unique ID. Please check that all IDs " + | |
"specified via `uid(String)` are unique."); | |
} | |
} | |
if (hashes.put(node.getId(), hash) != null) { | |
// Sanity check | |
throw new IllegalStateException("Unexpected state. Tried to add node hash " + | |
"twice. This is probably a bug in the JobGraph generator."); | |
} | |
return true; | |
} | |
} | |
/** | |
* Generates a hash from a user-specified ID. | |
*/ | |
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) { | |
hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8")); | |
// 用户指定了uid之后直接生成指定的hash值就可以了,步骤简化了不少 | |
return hasher.hash().asBytes(); | |
} | |
/** | |
* Generates a deterministic hash from node-local properties and input and | |
* output edges. | |
*/ | |
private byte[] generateDeterministicHash( | |
StreamNode node, | |
Hasher hasher, | |
Map<Integer, byte[]> hashes, | |
boolean isChainingEnabled) { | |
// Include stream node to hash. We use the current size of the computed | |
// hashes as the ID. We cannot use the node's ID, because it is | |
// assigned from a static counter. This will result in two identical | |
// programs having different hashes. | |
generateNodeLocalHash(node, hasher, hashes.size()); | |
// Include chained nodes to hash | |
for (StreamEdge outEdge : node.getOutEdges()) { | |
// 判断节点是否能够连接起来 | |
if (isChainable(outEdge, isChainingEnabled)) { | |
StreamNode chainedNode = outEdge.getTargetVertex(); | |
// Use the hash size again, because the nodes are chained to | |
// this node. This does not add a hash for the chained nodes. | |
// hashsize此时没有变,因为当前节点的还没有加进去,因为这些节点是被融合成了一个,所以使用同一个id值 | |
generateNodeLocalHash(chainedNode, hasher, hashes.size()); | |
} | |
} | |
byte[] hash = hasher.hash().asBytes(); | |
// Make sure that all input nodes have their hash set before entering | |
// this loop (calling this method). | |
for (StreamEdge inEdge : node.getInEdges()) { | |
byte[] otherHash = hashes.get(inEdge.getSourceId()); | |
// Sanity check | |
if (otherHash == null) { | |
throw new IllegalStateException("Missing hash for input node " | |
+ inEdge.getSourceVertex() + ". Cannot generate hash for " | |
+ node + "."); | |
} | |
// 这就是每个节点的hash信息都会带上他input节点的hash信息 | |
for (int j = 0; j < hash.length; j++) { | |
hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]); | |
} | |
} | |
if (LOG.isDebugEnabled()) { | |
String udfClassName = ""; | |
if (node.getOperator() instanceof AbstractUdfStreamOperator) { | |
udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator()) | |
.getUserFunction().getClass().getName(); | |
} | |
LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " + | |
"'" + node.toString() + "' {id: " + node.getId() + ", " + | |
"parallelism: " + node.getParallelism() + ", " + | |
"user function: " + udfClassName + "}"); | |
} | |
return hash; | |
} | |
/** | |
* Applies the {@link Hasher} to the {@link StreamNode} (only node local | |
* attributes are taken into account). The hasher encapsulates the current | |
* state of the hash. | |
* | |
* <p>The specified ID is local to this node. We cannot use the | |
* {@link StreamNode#id}, because it is incremented in a static counter. | |
* Therefore, the IDs for identical jobs will otherwise be different. | |
*/ | |
private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) { | |
// This resolves conflicts for otherwise identical source nodes. BUT | |
// the generated hash codes depend on the ordering of the nodes in the | |
// stream graph. | |
// 没用使用到传入的node参数,查看了下,这个是因为V1中的hash计算和node的属性相关,但V2版本用不到,应该被去除 | |
hasher.putInt(id); | |
} | |
private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) { | |
StreamNode upStreamVertex = edge.getSourceVertex(); | |
StreamNode downStreamVertex = edge.getTargetVertex(); | |
StreamOperator<?> headOperator = upStreamVertex.getOperator(); | |
StreamOperator<?> outOperator = downStreamVertex.getOperator(); | |
// 判断是否可以chain其实是对Edge进行判断,取其上下游进行判断 | |
return downStreamVertex.getInEdges().size() == 1 | |
&& outOperator != null | |
&& headOperator != null | |
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) | |
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS | |
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || | |
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) | |
&& (edge.getPartitioner() instanceof ForwardPartitioner) | |
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() | |
&& isChainingEnabled; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}. | |
*/ | |
@Internal | |
public class StreamingJobGraphGenerator { | |
private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class); | |
/** | |
* Restart delay used for the FixedDelayRestartStrategy in case checkpointing was enabled but | |
* no restart strategy has been specified. | |
*/ | |
private static final long DEFAULT_RESTART_DELAY = 10000L; | |
// ------------------------------------------------------------------------ | |
public static JobGraph createJobGraph(StreamGraph streamGraph) { | |
return new StreamingJobGraphGenerator(streamGraph).createJobGraph(); | |
} | |
// ------------------------------------------------------------------------ | |
private final StreamGraph streamGraph; | |
private final Map<Integer, JobVertex> jobVertices; | |
private final JobGraph jobGraph; | |
private final Collection<Integer> builtVertices; | |
private final List<StreamEdge> physicalEdgesInOrder; | |
private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs; | |
private final Map<Integer, StreamConfig> vertexConfigs; | |
private final Map<Integer, String> chainedNames; | |
private final Map<Integer, ResourceSpec> chainedMinResources; | |
private final Map<Integer, ResourceSpec> chainedPreferredResources; | |
private final StreamGraphHasher defaultStreamGraphHasher; | |
private final List<StreamGraphHasher> legacyStreamGraphHashers; | |
private StreamingJobGraphGenerator(StreamGraph streamGraph) { | |
this.streamGraph = streamGraph; | |
this.defaultStreamGraphHasher = new StreamGraphHasherV2(); | |
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher()); | |
this.jobVertices = new HashMap<>(); | |
this.builtVertices = new HashSet<>(); | |
this.chainedConfigs = new HashMap<>(); | |
this.vertexConfigs = new HashMap<>(); | |
this.chainedNames = new HashMap<>(); | |
this.chainedMinResources = new HashMap<>(); | |
this.chainedPreferredResources = new HashMap<>(); | |
this.physicalEdgesInOrder = new ArrayList<>(); | |
jobGraph = new JobGraph(streamGraph.getJobName()); | |
} | |
private JobGraph createJobGraph() { | |
// make sure that all vertices start immediately | |
jobGraph.setScheduleMode(ScheduleMode.EAGER); | |
// Generate deterministic hashes for the nodes in order to identify them across | |
// submission iff they didn't change. | |
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); | |
// Generate legacy version hashes for backwards compatibility | |
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); | |
for (StreamGraphHasher hasher : legacyStreamGraphHashers) { | |
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); | |
} | |
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); | |
setChaining(hashes, legacyHashes, chainedOperatorHashes); | |
setPhysicalEdges(); | |
setSlotSharing(); | |
configureCheckpointing(); | |
// add registered cache file into job configuration | |
for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) { | |
DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration()); | |
} | |
// set the ExecutionConfig last when it has been finalized | |
try { | |
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); | |
} | |
catch (IOException e) { | |
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + | |
"This indicates that non-serializable types (like custom serializers) were registered"); | |
} | |
return jobGraph; | |
} | |
private void setPhysicalEdges() { | |
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>(); | |
for (StreamEdge edge : physicalEdgesInOrder) { | |
int target = edge.getTargetId(); | |
List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target); | |
// create if not set | |
if (inEdges == null) { | |
inEdges = new ArrayList<>(); | |
physicalInEdgesInOrder.put(target, inEdges); | |
} | |
inEdges.add(edge); | |
} | |
for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) { | |
int vertex = inEdges.getKey(); | |
List<StreamEdge> edgeList = inEdges.getValue(); | |
// 给各个vertex的StreamConfig设置其上游的inEdgeList | |
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList); | |
} | |
} | |
/** | |
* Sets up task chains from the source {@link StreamNode} instances. | |
* | |
* <p>This will recursively create all {@link JobVertex} instances. | |
*/ | |
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { | |
for (Integer sourceNodeId : streamGraph.getSourceIDs()) { | |
// 同样是从source处往后创建chain | |
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); | |
} | |
} | |
private List<StreamEdge> createChain( | |
Integer startNodeId, | |
Integer currentNodeId, | |
Map<Integer, byte[]> hashes, | |
List<Map<Integer, byte[]>> legacyHashes, | |
int chainIndex, | |
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { | |
// builtVertices表示已经创建过JobVertex节点的streamNode的id | |
if (!builtVertices.contains(startNodeId)) { | |
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); | |
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); | |
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); | |
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { | |
if (isChainable(outEdge, streamGraph)) { | |
chainableOutputs.add(outEdge); | |
} else { | |
nonChainableOutputs.add(outEdge); | |
} | |
} | |
for (StreamEdge chainable : chainableOutputs) { | |
transitiveOutEdges.addAll( | |
// 这里是递归的将能够和这个edge chain的edge都串起来并且记录在transitiveOutEdges,他负责收集所有的outEdges,不关chain或者不chain | |
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); | |
} | |
// 深度优先的算法,从source先一直获取到最终的edge和上面的区别在于他的判断因子nonChainable.getTargetId()变了,上面的保持不变,这里是取下一个节点再去判断是否能chain起来 | |
for (StreamEdge nonChainable : nonChainableOutputs) { | |
transitiveOutEdges.add(nonChainable); | |
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); | |
} | |
// 这里其实不管是不是可以chain的算子都会放到chainedOperatorHashes 这个应该表示的是做完chain操作之后的算子的hash列表, | |
// 里面不包含那些被chain进去的算子的hash因为没有他们的startNodeID,这个首先被放进去的应该是sink处算子的hash,因为是递归调用的 | |
List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId); | |
if (operatorHashes == null) { | |
operatorHashes = new ArrayList<>(); | |
chainedOperatorHashes.put(startNodeId, operatorHashes); | |
} | |
operatorHashes.add(new Tuple2<>(hashes.get(currentNodeId), legacyHashes.get(1).get(currentNodeId))); | |
// 对于chain的startnodeid就是最开始那个id值 | |
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); | |
// 资源计算主要针对的是chain的算子的计算 | |
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); | |
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); | |
// 创建JobVertex | |
StreamConfig config = currentNodeId.equals(startNodeId) | |
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) | |
: new StreamConfig(new Configuration()); | |
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); | |
if (currentNodeId.equals(startNodeId)) { | |
config.setChainStart(); | |
config.setChainIndex(0); | |
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); | |
config.setOutEdgesInOrder(transitiveOutEdges); | |
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); | |
/** | |
* 当前节点和startNodeId一致的时候有两种可能: | |
* 1. 这个节点是chain的head底部已经遍历完成 | |
* 2. 这个节点是个单节点那么他的transitiveOutEdges就包含了一个edge,就是他上游节点的edge | |
*/ | |
for (StreamEdge edge : transitiveOutEdges) { | |
connect(startNodeId, edge); | |
} | |
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); | |
} else { | |
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId); | |
if (chainedConfs == null) { | |
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>()); | |
} | |
config.setChainIndex(chainIndex); | |
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); | |
// 保存chain的每个节点的配置 | |
chainedConfigs.get(startNodeId).put(currentNodeId, config); | |
} | |
// 这里应该可以看出普通没有chain的节点应该既是chainstart 也是chainend 两个属性都为true | |
if (chainableOutputs.isEmpty()) { | |
config.setChainEnd(); | |
} | |
// 返回一个链路所涉及到的边 | |
return transitiveOutEdges; | |
} else { | |
return new ArrayList<>(); | |
} | |
} | |
private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) { | |
String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName(); | |
if (chainedOutputs.size() > 1) { | |
List<String> outputChainedNames = new ArrayList<>(); | |
for (StreamEdge chainable : chainedOutputs) { | |
outputChainedNames.add(chainedNames.get(chainable.getTargetId())); | |
} | |
return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")"; | |
} else if (chainedOutputs.size() == 1) { | |
return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId()); | |
} else { | |
// 没有chain的话就返回自己的名字 | |
return operatorName; | |
} | |
} | |
private ResourceSpec createChainedMinResources(Integer vertexID, List<StreamEdge> chainedOutputs) { | |
ResourceSpec minResources = streamGraph.getStreamNode(vertexID).getMinResources(); | |
// chain的资源是将各个资源相加起来 每个节点有资源的概念 | |
for (StreamEdge chainable : chainedOutputs) { | |
minResources = minResources.merge(chainedMinResources.get(chainable.getTargetId())); | |
} | |
return minResources; | |
} | |
private ResourceSpec createChainedPreferredResources(Integer vertexID, List<StreamEdge> chainedOutputs) { | |
ResourceSpec preferredResources = streamGraph.getStreamNode(vertexID).getPreferredResources(); | |
for (StreamEdge chainable : chainedOutputs) { | |
preferredResources = preferredResources.merge(chainedPreferredResources.get(chainable.getTargetId())); | |
} | |
return preferredResources; | |
} | |
private StreamConfig createJobVertex( | |
Integer streamNodeId, | |
Map<Integer, byte[]> hashes, | |
List<Map<Integer, byte[]>> legacyHashes, | |
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { | |
JobVertex jobVertex; | |
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); | |
byte[] hash = hashes.get(streamNodeId); | |
if (hash == null) { | |
throw new IllegalStateException("Cannot find node hash. " + | |
"Did you generate them before calling this method?"); | |
} | |
JobVertexID jobVertexId = new JobVertexID(hash); | |
List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size()); | |
for (Map<Integer, byte[]> legacyHash : legacyHashes) { | |
hash = legacyHash.get(streamNodeId); | |
if (null != hash) { | |
legacyJobVertexIds.add(new JobVertexID(hash)); | |
} | |
} | |
List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId); | |
List<OperatorID> chainedOperatorVertexIds = new ArrayList<>(); | |
List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>(); | |
if (chainedOperators != null) { | |
for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) { | |
chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0)); | |
userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null); | |
} | |
} | |
if (streamNode.getInputFormat() != null) { | |
jobVertex = new InputFormatVertex( | |
chainedNames.get(streamNodeId), | |
jobVertexId, | |
legacyJobVertexIds, | |
chainedOperatorVertexIds, | |
userDefinedChainedOperatorVertexIds); | |
TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration()); | |
taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat())); | |
} else { | |
// 非chain的算子也在chainedNames中 | |
jobVertex = new JobVertex( | |
chainedNames.get(streamNodeId), | |
jobVertexId, | |
legacyJobVertexIds, | |
chainedOperatorVertexIds, | |
userDefinedChainedOperatorVertexIds); | |
} | |
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId)); | |
jobVertex.setInvokableClass(streamNode.getJobVertexClass()); | |
int parallelism = streamNode.getParallelism(); | |
if (parallelism > 0) { | |
jobVertex.setParallelism(parallelism); | |
} else { | |
parallelism = jobVertex.getParallelism(); | |
} | |
jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId); | |
} | |
jobVertices.put(streamNodeId, jobVertex); | |
builtVertices.add(streamNodeId); | |
jobGraph.addVertex(jobVertex); | |
return new StreamConfig(jobVertex.getConfiguration()); | |
} | |
@SuppressWarnings("unchecked") | |
private void setVertexConfig(Integer vertexID, StreamConfig config, | |
List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) { | |
StreamNode vertex = streamGraph.getStreamNode(vertexID); | |
config.setVertexID(vertexID); | |
config.setBufferTimeout(vertex.getBufferTimeout()); | |
config.setTypeSerializerIn1(vertex.getTypeSerializerIn1()); | |
config.setTypeSerializerIn2(vertex.getTypeSerializerIn2()); | |
config.setTypeSerializerOut(vertex.getTypeSerializerOut()); | |
// iterate edges, find sideOutput edges create and save serializers for each outputTag type | |
for (StreamEdge edge : chainableOutputs) { | |
if (edge.getOutputTag() != null) { | |
config.setTypeSerializerSideOut( | |
edge.getOutputTag(), | |
edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig()) | |
); | |
} | |
} | |
for (StreamEdge edge : nonChainableOutputs) { | |
if (edge.getOutputTag() != null) { | |
config.setTypeSerializerSideOut( | |
edge.getOutputTag(), | |
edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig()) | |
); | |
} | |
} | |
config.setStreamOperator(vertex.getOperator()); | |
config.setOutputSelectors(vertex.getOutputSelectors()); | |
config.setNumberOfOutputs(nonChainableOutputs.size()); | |
config.setNonChainedOutputs(nonChainableOutputs); | |
config.setChainedOutputs(chainableOutputs); | |
config.setTimeCharacteristic(streamGraph.getEnvironment().getStreamTimeCharacteristic()); | |
final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig(); | |
config.setStateBackend(streamGraph.getStateBackend()); | |
config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled()); | |
if (ceckpointCfg.isCheckpointingEnabled()) { | |
config.setCheckpointMode(ceckpointCfg.getCheckpointingMode()); | |
} | |
else { | |
// the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints), | |
// so we use that one if checkpointing is not enabled | |
config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE); | |
} | |
config.setStatePartitioner(0, vertex.getStatePartitioner1()); | |
config.setStatePartitioner(1, vertex.getStatePartitioner2()); | |
config.setStateKeySerializer(vertex.getStateKeySerializer()); | |
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass(); | |
if (vertexClass.equals(StreamIterationHead.class) | |
|| vertexClass.equals(StreamIterationTail.class)) { | |
config.setIterationId(streamGraph.getBrokerID(vertexID)); | |
config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID)); | |
} | |
List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs); | |
allOutputs.addAll(nonChainableOutputs); | |
vertexConfigs.put(vertexID, config); | |
} | |
private void connect(Integer headOfChain, StreamEdge edge) { | |
physicalEdgesInOrder.add(edge); | |
Integer downStreamvertexID = edge.getTargetId(); | |
JobVertex headVertex = jobVertices.get(headOfChain); | |
JobVertex downStreamVertex = jobVertices.get(downStreamvertexID); | |
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); | |
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); | |
// 根据partitioner定义上下游的连接规则,其实是生成相应的JobEdge | |
StreamPartitioner<?> partitioner = edge.getPartitioner(); | |
JobEdge jobEdge; | |
if (partitioner instanceof ForwardPartitioner) { | |
// 这里涉及到了IntermediateDataSet的生成,具体代码见JobVertex内部 | |
jobEdge = downStreamVertex.connectNewDataSetAsInput( | |
headVertex, | |
DistributionPattern.POINTWISE, | |
ResultPartitionType.PIPELINED_BOUNDED); | |
} else if (partitioner instanceof RescalePartitioner){ | |
jobEdge = downStreamVertex.connectNewDataSetAsInput( | |
headVertex, | |
DistributionPattern.POINTWISE, | |
ResultPartitionType.PIPELINED_BOUNDED); | |
} else { | |
jobEdge = downStreamVertex.connectNewDataSetAsInput( | |
headVertex, | |
DistributionPattern.ALL_TO_ALL, | |
ResultPartitionType.PIPELINED_BOUNDED); | |
} | |
// set strategy name so that web interface can show it. | |
jobEdge.setShipStrategyName(partitioner.toString()); | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(), | |
headOfChain, downStreamvertexID); | |
} | |
} | |
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { | |
StreamNode upStreamVertex = edge.getSourceVertex(); | |
StreamNode downStreamVertex = edge.getTargetVertex(); | |
StreamOperator<?> headOperator = upStreamVertex.getOperator(); | |
StreamOperator<?> outOperator = downStreamVertex.getOperator(); | |
return downStreamVertex.getInEdges().size() == 1 | |
&& outOperator != null | |
&& headOperator != null | |
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) | |
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS | |
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || | |
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) | |
&& (edge.getPartitioner() instanceof ForwardPartitioner) | |
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() | |
&& streamGraph.isChainingEnabled(); | |
} | |
private void setSlotSharing() { | |
Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>(); | |
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { | |
String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup(); | |
SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup); | |
if (group == null) { | |
group = new SlotSharingGroup(); | |
slotSharingGroups.put(slotSharingGroup, group); | |
} | |
entry.getValue().setSlotSharingGroup(group); | |
} | |
for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) { | |
CoLocationGroup ccg = new CoLocationGroup(); | |
JobVertex source = jobVertices.get(pair.f0.getId()); | |
JobVertex sink = jobVertices.get(pair.f1.getId()); | |
ccg.addVertex(source); | |
ccg.addVertex(sink); | |
source.updateCoLocationGroup(ccg); | |
sink.updateCoLocationGroup(ccg); | |
} | |
} | |
private void configureCheckpointing() { | |
CheckpointConfig cfg = streamGraph.getCheckpointConfig(); | |
long interval = cfg.getCheckpointInterval(); | |
if (interval > 0) { | |
// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy | |
if (streamGraph.getExecutionConfig().getRestartStrategy() == null) { | |
// if the user enabled checkpointing, the default number of exec retries is infinite. | |
streamGraph.getExecutionConfig().setRestartStrategy( | |
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY)); | |
} | |
} else { | |
// interval of max value means disable periodic checkpoint | |
interval = Long.MAX_VALUE; | |
} | |
// --- configure the participating vertices --- | |
// collect the vertices that receive "trigger checkpoint" messages. | |
// currently, these are all the sources | |
// source节点接收trigger checkpoint的信号 | |
List<JobVertexID> triggerVertices = new ArrayList<>(); | |
// collect the vertices that need to acknowledge the checkpoint | |
// currently, these are all vertices | |
// 所有的节点都需要ack ckpt信息 | |
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size()); | |
// collect the vertices that receive "commit checkpoint" messages | |
// currently, these are all vertices | |
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size()); | |
for (JobVertex vertex : jobVertices.values()) { | |
if (vertex.isInputVertex()) { | |
triggerVertices.add(vertex.getID()); | |
} | |
commitVertices.add(vertex.getID()); | |
ackVertices.add(vertex.getID()); | |
} | |
// --- configure options --- | |
ExternalizedCheckpointSettings externalizedCheckpointSettings; | |
if (cfg.isExternalizedCheckpointsEnabled()) { | |
CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup(); | |
// Sanity check | |
if (cleanup == null) { | |
throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured."); | |
} | |
externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation()); | |
} else { | |
externalizedCheckpointSettings = ExternalizedCheckpointSettings.none(); | |
} | |
CheckpointingMode mode = cfg.getCheckpointingMode(); | |
boolean isExactlyOnce; | |
if (mode == CheckpointingMode.EXACTLY_ONCE) { | |
isExactlyOnce = true; | |
} else if (mode == CheckpointingMode.AT_LEAST_ONCE) { | |
isExactlyOnce = false; | |
} else { | |
throw new IllegalStateException("Unexpected checkpointing mode. " + | |
"Did not expect there to be another checkpointing mode besides " + | |
"exactly-once or at-least-once."); | |
} | |
// --- configure the master-side checkpoint hooks --- | |
final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>(); | |
for (StreamNode node : streamGraph.getStreamNodes()) { | |
StreamOperator<?> op = node.getOperator(); | |
if (op instanceof AbstractUdfStreamOperator) { | |
Function f = ((AbstractUdfStreamOperator<?, ?>) op).getUserFunction(); | |
// 对于实现了这个接口的userFunction,记录他的hooks,这个hook在做checkpoint的时候在master节点触发操作 | |
if (f instanceof WithMasterCheckpointHook) { | |
hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f)); | |
} | |
} | |
} | |
// because the hooks can have user-defined code, they need to be stored as | |
// eagerly serialized values | |
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks; | |
if (hooks.isEmpty()) { | |
serializedHooks = null; | |
} else { | |
try { | |
MasterTriggerRestoreHook.Factory[] asArray = | |
hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]); | |
serializedHooks = new SerializedValue<>(asArray); | |
} | |
catch (IOException e) { | |
throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e); | |
} | |
} | |
// because the state backend can have user-defined code, it needs to be stored as | |
// eagerly serialized value | |
final SerializedValue<StateBackend> serializedStateBackend; | |
if (streamGraph.getStateBackend() == null) { | |
serializedStateBackend = null; | |
} else { | |
try { | |
serializedStateBackend = | |
new SerializedValue<StateBackend>(streamGraph.getStateBackend()); | |
} | |
catch (IOException e) { | |
throw new FlinkRuntimeException("State backend is not serializable", e); | |
} | |
} | |
// --- done, put it all together --- | |
JobCheckpointingSettings settings = new JobCheckpointingSettings( | |
triggerVertices, ackVertices, commitVertices, interval, | |
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), | |
cfg.getMaxConcurrentCheckpoints(), | |
externalizedCheckpointSettings, | |
serializedStateBackend, | |
serializedHooks, | |
isExactlyOnce); | |
jobGraph.setSnapshotSettings(settings); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment