Flink 1.10 源码 -- SreamGraph
Flink的执行计划分为四层
StreamGraph
用户层面上的, 用户根据API编写的程序构造出一个代表拓补图结构的StreamGraph
相关代码在flink-streaming-java项目下StreamGraphGenerator类的generate方法中,该方法会在StreamExecutionEnvironment.execute()方法调用到,也就是说StreamGraph 是在Client端构造的,意味着我们可以通过本地调试观察StreamGraph的构造过程
在构建StreamGraph之前先了解一下Transformation
Transformation
在Flink流处理中,算子最终会将算子转换成一个Transformation, 每个Transformation表示 一个DataStream或多个DataStream转换成新DataStream的操作, 比如,map,filter,union等
我们根据map看一下转换的一个过程,这是DataStream的map方法,接收用户传入的函数,将函数包装为StreamMap类型,调用transform方法
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
跟进transform方法 这里将map封装为StreamOperatorFactory
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
//这里将operator封装入operator factory
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
进入doTransform方法,在这里将 operator封装成了一个OneInputTransformation对象,也就是transformation对象
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 检查输出类型是否为MissingTypeInfo,如果是抛出异常,
transformation.getOutputType();
//创建OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
transformation, //input --上游的 transformation
operatorName,
operatorFactory, //需要进行转换操作的
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//多个级联的map和filter操作会被transform成为一连串的OneInputTransformation。
// 后一个transformation的input指向前一个transformation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
最终通过getExecutionEnvironment().addOperator添加到列表中
getExecutionEnvironment获取执行环境,
addOperator添加列表中
到这里算子就被封装为transformation对象,保存到了StreamExecutionEnvironment中
public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}
protected final List<Transformation<?>> transformations = new ArrayList<>();
@Internal
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
StreamOperator
DataStream上的每一个transformation都对应一个StreamOperator,StreamOperator是运行时具体的实现,会决定User-Defined Funtion(udf)
,的调用方式,可以看出,所有实现类都继承了AbstractStreamOperator
。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator
,该类是封装了UDF的StreamOperator。UDF就是实现了Function
接口的类,如MapFunction
,FilterFunction
transformation的类型
OneInputTransformation
顾名思义OneInputTransformation只有一个输入。代表的算子(单数据流)为:map,flatMap,fliter,process,assignTimestamps等。
TwoInputTransformation
TwoInputTransformation具有两个输入。ConnectedStream的算子为双流运算,它的算子会被转换为TwoInputTransformation。
SourceTransformation
在env中配置数据源的时候会创建出一个DataStreamSource。该对象为dataStream的源头。DataStreamSource的构造函数中会创建一个SourceTransformation。
SinkTransformation
和SourceTransformation类似,在dataStream调用addSink方法的时候会生成一个DataStreamSink对象。该对象在创建的时候会同时构造一个SinkTransformation。
UnionTransformation
该transformation为合并多个input到一个流中。代表算子为union。
SplitTransformation
DataStream调用split的时候会创建SplitStream。SplitStream初始化时会构建一个SplitTransformation。
SelectTransformation
SplitStream在调用select算子的时候会创建SelectTransformation。
FeedbackTransformation
创建IterativeStream的时候会使用到该transformation。
CoFeedbackTransformation
和FeedbackTransformation类似,创建ConnectedIterativeStream的时候会使用到。
PartitionTransformation
涉及到控制数据流向的算子都属于PartitionTransformation,例如shuffle,forward,rebalance,broadcast,rescale,global,partitionCustom和keyBy等。
SideOutputTransformation
调用getSideOutput(获取旁路输出)的时候,SideOutputTransformation会发生作用。
构建StreamGraph
知道了算子转换后的操作后,进入到生成StreamGraph的逻辑,上面已经知道了,生说StreamGraph的逻辑在StreamExecutionEnvironment的execute方法中,在execute方法中调用调用的getStreamGraph方法生成StreamGraph
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
//调用 execute方法, 需要传入一个StreamGraph
// getStreamGraph 获取StreamGraph
return execute(getStreamGraph(jobName));
}
进入getStreamGraph,这里调用了同名方法
@Internal
public StreamGraph getStreamGraph(String jobName) {
return getStreamGraph(jobName, true);
}
继续追进getStreamGraph方法
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
// getStreamGraphGenerator 创建一个StreamGraphGenerator对象
// generate 生成StreamGraph
// 生成StreamGraph
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
getStreamGraphGenerator方法,这个方法会生成一个StreamGraphGenerator对象,StreamGraph是通过这个类进行生成的
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
// 创建 StreamGraphGenerator对象 并设置一系列的参数
return new StreamGraphGenerator(transformations, config, checkpointCfg)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
StreamGraphGenerator
的generate
方法,在该方法中会设置一些参数,以及调用transform方法,最终获取生成的StreamGraph进行返回
public StreamGraph generate() {
// 生成StreamGraph对象,传入执行配置和检查点配置
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
// 设置状态后端
streamGraph.setStateBackend(stateBackend);
// 设置级联配置,为一项优化配置 -- 是否可以将算子 chain在一起
streamGraph.setChaining(chaining);
// 设置调度方式,决定task延迟调度还是立刻调度
streamGraph.setScheduleMode(scheduleMode);
// StreamExecutionEnvironment的cacheFile会传入该变量
// cacheFile为需要分发到各个task manager的用户文件
// todo 用于分布式缓存的
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
// 设置各个级联之间是否采用blocking 连接
streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
// 储存已经被处理的transformation
alreadyTransformed = new HashMap<>();
// 逐个处理transformation
for (Transformation<?> transformation: transformations) {
//该方法会进行递归调用,主要逻辑也在这里
transform(transformation);
}
// 获取已生成的streamGraph
final StreamGraph builtStreamGraph = streamGraph;
// 清空中间变量
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
追进transform方法,主要处理所里都在这个方法中,
这个方法是一个递归的方法,在获取transformedIds的逻辑中会递归的调用该方法, 也就是说每一个transform都会遍历去寻找上游,保证上游全部处理完
这个方法会将用户编写代码转换成StreamGraph,在这之前已经将算子转为一个transform并添加到了env的transformas(List集合)中,这里会将遍历transforms集合,将transform构建成StreamNode以及StreamEdge
主要逻辑都在处理不同的transform方法中, 这里会对每个transform进行判断实例类型,不同的transform实例会调用不同方法进行处理
private Collection<Integer> transform(Transformation<?> transform) { //transform 为每一个算子
//如果转换后的map中包含当前要转换的 transform 那么说明已经被处理过了,直接返回
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
// 如果transformation的最大并行度没有设置,全局的最大并行度已设置,将全局最大并行度设置给transformation
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
// 检查transformation的输出类型,如果是MissingTypeInfo则程序抛出异常
transform.getOutputType();
Collection<Integer> transformedIds;
// 根据不同的 transform 调用不同的处理方法
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof AbstractMultipleInputTransformation<?>) {
transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
} else if (transform instanceof SourceTransformation) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof LegacySourceTransformation<?>) {
transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
// 如果该transformation没有被处理,则加入已处理列表
// 处理每个transformation的时候会先处理它的input(可能没有input,也可能有一个或多个),transform方法会递归调用。
// 在transform方法执行前后双重检查transformation是否已被处理可以确保在递归调用的情况下不会被重复处理
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
// 设置 transform(network)的 缓冲器超时时间, 如果没有设置则使用的默认的 100L
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
// 设置TransformationUserHash
// transform.getUserProvidedNodeHash 获取uid的hash, 在用户编写代码的时候在算子后面调用setUidHash设置的
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
// 如果自动设置uid功能被关闭,同时又没有指定UserProvidedNodeHash和uid,程序抛出异常
if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transform instanceof PhysicalTransformation &&
transform.getUserProvidedNodeHash() == null &&
transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transform.getName());
}
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
// 设置托管内存权重
// 设置transformation的最小和最佳资源要求
streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());
return transformedIds;
}
在处理transform的时候,会根据不同的transform进行不同的处理,大部分处理逻辑都是类似,这里查看一下transformOneInputTransform,SourceTransformation处理逻辑
transformOneInputTransform
该transform 方法的主要作用是在会递归的转换input,在Graph中创建一个新的StreamNode,并将input连接到这个新节点 (input -> 上游节点<StreamNode>)
在这个方法中主要的逻辑
1. 递归遍历他的上游input,确保上游处理完毕并添加edge,并判断如果递归过程中已经处理完,那么直接返回
2. 调用appOperator方法,将transform转换成StreamNode并添加到StreamNodes列表中
3. 如果该transform需要进行keySelector进行分区,则会设置keySelector和序列化器
4. 调用addEdge方法将讲个StreamNode连接
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//首先会递归遍历他的每一个上游input,保证上游全部处理完毕。然后添加Edge
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
// 在递归的过程中,已经转换完毕, 直接返回,防止重复处理
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 获取 slot 共享组 -- 因为有可能多个算子会被chain在一起 这样可以提高slot的资源利用率
// 用于后面调度task的
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 在 streamGraph中 添加算子
// 方法会将 operator(transform) 转换成StreamNode,并添加到 StreamNodes列表中
streamGraph.addOperator(transform.getId(), //transform的唯一id
slotSharingGroup, // slot 共享组
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(), //获取transform的StreamOperatorFactory
transform.getInputType(), //获取输入类型
transform.getOutputType(), // 获取输出类型
transform.getName()); // 获取 该转换的name
// 如果该转换需要进行keySelector进行分区 需要对key进行序列话及设置key的eKeySelector
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
//获取 transform的平行度
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
//设置 transformId的并行度
streamGraph.setParallelism(transform.getId(), parallelism);
//设置 最大并行度
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
//循环上游的input, 保证上游全部处理完
// 这一步是关键,添加一个edge对象,将此oneInputTransformation转换成的vertex和input transformation转换为的vertex连接起来
for (Integer inputId: inputIds) {
//将 input 添加到 edge
// 两个StreamNode 会通过StreamEdge连接在一起, 每一个transform会生成一个StreamNode
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
现在看一下 appOperator和addEdge方法
先看一下appOperator方法
该法最主要的是调用了addNode
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName,
Class<? extends AbstractInvokable> invokableClass) {
// 将transform 转换成StreamNode 并添加到StreamNodes列表中
addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
// 添加序列化
setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
//设置operatorFactory输出类型
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
// sets the output type which must be know at StreamGraph creation time
operatorFactory.setOutputType(outTypeInfo, executionConfig);
}
//设置operatorFactory输入类型
if (operatorFactory.isInputTypeConfigurable()) {
operatorFactory.setInputType(inTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
进入到addNode, 这个方法中会将transform转换成StreamNode,并添加到Nodes列表中
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
//如果StreamNodes中存在该 node 抛出异常
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
//创建一个 StreamNode --> transform 转换成 StreamNode 在转换成Vertex --> JobVertex
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
// 将 StreamNode添加到集合
streamNodes.put(vertexID, vertex);
return vertex;
}
在看一下addEdge方法
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>(),
null,
null);
}
进入addEdgeInternal方法,这个方法是添加Edge的逻辑
在这个方法中主要的逻辑,判断上游是否为一下几种transform ,virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes, 如果是则将他们写入到虚拟节点中,然后创建StremaEdge,将两个StreamNode进行连接,并将虚拟节点的信息写入到StreamEdge中,否则直接将两个StreamNode通过StreamEdge进行连接
虚拟节点
这几类的transform:irtualSideOutputNodes,virtualSelectNodes和virtualPartitionNodes,这几类transform会被处理成虚拟节点, 虚拟节点是什么时候生成的,是在对不同的transform处理逻辑中生成的,在这几种的transform处理逻辑中会将transform添加到虚拟节点, 严格的说虚拟节点并不属于StreamNode,不包含逻辑逻辑转换,虚拟节点不会出现在StreamGraph图中,在进行edge过程中,会判断上游是否为虚拟节点,这里是通过递归的方式去获取上游节点信息,直到找到非虚拟节点即StreamNode,会执行edge逻辑,并将虚拟节点的信息记录在StreamEdge中
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
/**
* todo virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes
* 这几类transform都会被处理成虚拟节点,当下游生成StreamNode后,发现上游为虚拟节点
* 会找到虚拟节点的上游,并创建StreamEdge与虚拟节点上游的transform进行连接.并把
* 虚拟节点的信息写入到StreamEdge中
*/
// 当上有是 SideOutput, 递归调用,并传入SideOutput信息, 下面同理
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
// 获取上下游 StreamNode
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
// 如果没有指定分区器,并且上游和下游操作符的并行度匹配使用forward,则使用rebalance
// 上游并行度 == 下游并行度
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
// 使用 ForwardPartitioner 分区
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) { //否则使用 该分区
partitioner = new RebalancePartitioner<Object>();
}
//进行判断 如果使用ForwardPartitioner分区 并且上下游并行度不相等 则不能使用该分区策略,会抛出异常
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
// 如果没有指定shuffle模式 则由框架决定shuffle模式
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
// 创建一个新的StreamEdge
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
// 连接刚刚创建的edge到上下游节点
// getStreamNode 获取到指定id的StreamNode
// addOutEdge和 addInEdge 将edge添加到获取的StreamNode中
// 在 SteamNode类中, 由两个list 用于收集此egde
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
transformSource方法
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
// todo 如果用户指定了组名,则按原样执行。如果没有指定任何内容,
// 并且输入操作都具有相同的组名称,则使用此名称。否则,选择默认组。
String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
// 为StreamGraph 添加Source
streamGraph.addSource(source.getId(), //每个transform都会获得一个递增的id, 该id在后面会转换成 vertexID
slotSharingGroup,
source.getCoLocationGroupKey(),
source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
//获取设置并行度
int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
source.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(source.getId(), parallelism);
streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
return Collections.singleton(source.getId());
}
进入到addSource方法,这个方法做了两件事,
一个是将Source生成StreamNode添加到StreamNodes列表
另一个是将soutce添加到了StreamGraph的sources(为set集合)列表中
可以看到addSource中并没有生成edge 因为source属于图中的顶点,所以只会通过下游去连接source节点
public <IN, OUT> void addSource(
Integer vertexID, // transform的id
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
SourceOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
// 方法会将 operator(transform) 转换成StreamNode,并添加到 StreamNodes列表中
addOperator(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
inTypeInfo,
outTypeInfo,
operatorName,
SourceOperatorStreamTask.class);
// 将source(operator -> transform)的Id添加的sources的set集合中
// todo source有可能多个 set保证了不会重复
sources.add(vertexID);
}
通过下面代码,看一下转换的过程
通过Debug断点到构建StreamGraph得时候,可以看到transformations列表存有四个transform,为map,window,process以及sink的transform,每个transform的input都指向上游的transform,其中source,keyBy的transform并没有添加到transformations列表中,等下会进行解释
在上面我们发现没有有些算子的transform并没有添加到tranfromtions列表中现在解释一下
source, 在创建source的时候,会创建一个sourceOperator,根据sourceOperator创建DataStreamSouce,在DataStramSource中会将sourceOperator构建成SourceTransformation,最终构建到DataStream,将transform赋值给DataStream的成员变量transform,在下游算子中,会根据成员变量的transform构建自己的transform,该transform的input就是source,最终将构建好的transform添加到env的transformations列表中
keyBy, 在keyBy中创建KeyedStream中,最终将keySelector赋值给了 keyedStream的变量中, KeySelector不会被添加到env的transformations中, 会将信息记录在下游的算子中
keyBy算子,会创建一个keyedStream,并将keyBy传入的值转换成keyedSelector,赋值给keyedStream对象的成员变量中,在创建KyedStream的时候会创建一个PartitionTransformation,该transform会被赋值到DataStream的transformation变量中,在调用timeWindow的时候会将WindowAssigner以及this(即keyedStream)传入创建WindowedStream,在aggregate算子中,会创建一个windowOperator,通过input.transform,将windowOperator转换transform,添加到env的transformations中
keyBy会生成一个 PartitionTransformation的 transform, 但是并不会添加到env的transformations的列表中,而是将信息记录在了下游的transform中即window中,最终将window生成一个transform添加到列表
window生成 通过windowAssigner,keySelector,trigger和evictor等参数构建windowOperator,最终通过transform方法转换成一个transform并添加到env的transformations列表中,
在window中会包含他的上游input(即 keyBy<PartitionTransformation>)
当所有transform都遍历完,这时候构建出StreamGraph,可以看到StreamNode中,已经存在了除了keyBy的所有StreamNode,到这里StreamGraph就已经构建好了