Flink-1.10 源码笔记 StreamGraph生成

Flink 1.10 源码 -- SreamGraph

生成JobGraph

Flink的执行计划分为四层

img

StreamGraph
用户层面上的, 用户根据API编写的程序构造出一个代表拓补图结构的StreamGraph
相关代码在flink-streaming-java项目下StreamGraphGenerator类的generate方法中,该方法会在StreamExecutionEnvironment.execute()方法调用到,也就是说StreamGraph 是在Client端构造的,意味着我们可以通过本地调试观察StreamGraph的构造过程

在构建StreamGraph之前先了解一下Transformation

Transformation

在Flink流处理中,算子最终会将算子转换成一个Transformation, 每个Transformation表示 一个DataStream或多个DataStream转换成新DataStream的操作, 比如,map,filter,union等

我们根据map看一下转换的一个过程,这是DataStream的map方法,接收用户传入的函数,将函数包装为StreamMap类型,调用transform方法

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }

跟进transform方法 这里将map封装为StreamOperatorFactory

    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperator<T, R> operator) {

        //这里将operator封装入operator factory
        return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

进入doTransform方法,在这里将 operator封装成了一个OneInputTransformation对象,也就是transformation对象

1591167501761.png
protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 检查输出类型是否为MissingTypeInfo,如果是抛出异常,
        transformation.getOutputType();

        //创建OneInputTransformation
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            transformation,          //input   --上游的 transformation
                operatorName,
                operatorFactory,     //需要进行转换操作的
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //多个级联的map和filter操作会被transform成为一连串的OneInputTransformation。
        // 后一个transformation的input指向前一个transformation
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

最终通过getExecutionEnvironment().addOperator添加到列表中

getExecutionEnvironment获取执行环境,
addOperator添加列表中
到这里算子就被封装为transformation对象,保存到了StreamExecutionEnvironment中


    public StreamExecutionEnvironment getExecutionEnvironment() {
        return environment;
    }
    
    protected final List<Transformation<?>> transformations = new ArrayList<>();
    
    @Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

StreamOperator

DataStream上的每一个transformation都对应一个StreamOperator,StreamOperator是运行时具体的实现,会决定User-Defined Funtion(udf),的调用方式,可以看出,所有实现类都继承了AbstractStreamOperator。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator,该类是封装了UDF的StreamOperator。UDF就是实现了Function接口的类,如MapFunction,FilterFunction

StreamOperator.png

transformation的类型

1591168455176.png

OneInputTransformation
顾名思义OneInputTransformation只有一个输入。代表的算子(单数据流)为:map,flatMap,fliter,process,assignTimestamps等。

TwoInputTransformation
TwoInputTransformation具有两个输入。ConnectedStream的算子为双流运算,它的算子会被转换为TwoInputTransformation。

SourceTransformation
在env中配置数据源的时候会创建出一个DataStreamSource。该对象为dataStream的源头。DataStreamSource的构造函数中会创建一个SourceTransformation。

SinkTransformation
和SourceTransformation类似,在dataStream调用addSink方法的时候会生成一个DataStreamSink对象。该对象在创建的时候会同时构造一个SinkTransformation。

UnionTransformation
该transformation为合并多个input到一个流中。代表算子为union。

SplitTransformation
DataStream调用split的时候会创建SplitStream。SplitStream初始化时会构建一个SplitTransformation。

SelectTransformation
SplitStream在调用select算子的时候会创建SelectTransformation。

FeedbackTransformation
创建IterativeStream的时候会使用到该transformation。

CoFeedbackTransformation
和FeedbackTransformation类似,创建ConnectedIterativeStream的时候会使用到。

PartitionTransformation
涉及到控制数据流向的算子都属于PartitionTransformation,例如shuffle,forward,rebalance,broadcast,rescale,global,partitionCustom和keyBy等。

SideOutputTransformation
调用getSideOutput(获取旁路输出)的时候,SideOutputTransformation会发生作用。

构建StreamGraph

知道了算子转换后的操作后,进入到生成StreamGraph的逻辑,上面已经知道了,生说StreamGraph的逻辑在StreamExecutionEnvironment的execute方法中,在execute方法中调用调用的getStreamGraph方法生成StreamGraph

    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        
        //调用 execute方法, 需要传入一个StreamGraph 
        // getStreamGraph 获取StreamGraph
        return execute(getStreamGraph(jobName));
    }

进入getStreamGraph,这里调用了同名方法

@Internal
public StreamGraph getStreamGraph(String jobName) {   
    return getStreamGraph(jobName, true);
}

继续追进getStreamGraph方法

    public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
        // getStreamGraphGenerator 创建一个StreamGraphGenerator对象
        // generate 生成StreamGraph
        // 生成StreamGraph
        StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
        if (clearTransformations) {
            this.transformations.clear();
        }
        return streamGraph;
    }

getStreamGraphGenerator方法,这个方法会生成一个StreamGraphGenerator对象,StreamGraph是通过这个类进行生成的

    private StreamGraphGenerator getStreamGraphGenerator() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        // 创建 StreamGraphGenerator对象 并设置一系列的参数
        return new StreamGraphGenerator(transformations, config, checkpointCfg)
            .setStateBackend(defaultStateBackend)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout);
    }

StreamGraphGeneratorgenerate方法,在该方法中会设置一些参数,以及调用transform方法,最终获取生成的StreamGraph进行返回

    public StreamGraph generate() {
        // 生成StreamGraph对象,传入执行配置和检查点配置
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        // 设置状态后端
        streamGraph.setStateBackend(stateBackend);
        // 设置级联配置,为一项优化配置  -- 是否可以将算子 chain在一起
        streamGraph.setChaining(chaining);
        // 设置调度方式,决定task延迟调度还是立刻调度
        streamGraph.setScheduleMode(scheduleMode);
        // StreamExecutionEnvironment的cacheFile会传入该变量
        // cacheFile为需要分发到各个task manager的用户文件
        // todo 用于分布式缓存的
        streamGraph.setUserArtifacts(userArtifacts);
        streamGraph.setTimeCharacteristic(timeCharacteristic);
        streamGraph.setJobName(jobName);
        // 设置各个级联之间是否采用blocking 连接
        streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
        // 储存已经被处理的transformation
        alreadyTransformed = new HashMap<>();

        // 逐个处理transformation
        for (Transformation<?> transformation: transformations) {
        //该方法会进行递归调用,主要逻辑也在这里
            transform(transformation);
        }

        // 获取已生成的streamGraph
        final StreamGraph builtStreamGraph = streamGraph;

        // 清空中间变量
        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

追进transform方法,主要处理所里都在这个方法中,

这个方法是一个递归的方法,在获取transformedIds的逻辑中会递归的调用该方法, 也就是说每一个transform都会遍历去寻找上游,保证上游全部处理完
这个方法会将用户编写代码转换成StreamGraph,在这之前已经将算子转为一个transform并添加到了env的transformas(List集合)中,这里会将遍历transforms集合,将transform构建成StreamNode以及StreamEdge
主要逻辑都在处理不同的transform方法中, 这里会对每个transform进行判断实例类型,不同的transform实例会调用不同方法进行处理


private Collection<Integer> transform(Transformation<?> transform) { //transform 为每一个算子

        //如果转换后的map中包含当前要转换的 transform 那么说明已经被处理过了,直接返回
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);


        // 如果transformation的最大并行度没有设置,全局的最大并行度已设置,将全局最大并行度设置给transformation
        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        // 检查transformation的输出类型,如果是MissingTypeInfo则程序抛出异常
        transform.getOutputType();

        Collection<Integer> transformedIds;
        // 根据不同的 transform 调用不同的处理方法
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof AbstractMultipleInputTransformation<?>) {
            transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
        } else if (transform instanceof SourceTransformation) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof LegacySourceTransformation<?>) {
            transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        // 如果该transformation没有被处理,则加入已处理列表
        // 处理每个transformation的时候会先处理它的input(可能没有input,也可能有一个或多个),transform方法会递归调用。
        // 在transform方法执行前后双重检查transformation是否已被处理可以确保在递归调用的情况下不会被重复处理
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        // 设置 transform(network)的 缓冲器超时时间, 如果没有设置则使用的默认的  100L
        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
        }

        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }

        // 设置TransformationUserHash
        // transform.getUserProvidedNodeHash  获取uid的hash, 在用户编写代码的时候在算子后面调用setUidHash设置的
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        // 如果自动设置uid功能被关闭,同时又没有指定UserProvidedNodeHash和uid,程序抛出异常
        if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
            if (transform instanceof PhysicalTransformation &&
                    transform.getUserProvidedNodeHash() == null &&
                    transform.getUid() == null) {
                throw new IllegalStateException("Auto generated UIDs have been disabled " +
                    "but no UID or hash has been assigned to operator " + transform.getName());
            }
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        // 设置托管内存权重
        // 设置transformation的最小和最佳资源要求
        streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());

        return transformedIds;
    }

在处理transform的时候,会根据不同的transform进行不同的处理,大部分处理逻辑都是类似,这里查看一下transformOneInputTransform,SourceTransformation处理逻辑

transformOneInputTransform 该transform 方法的主要作用是在会递归的转换input,在Graph中创建一个新的StreamNode,并将input连接到这个新节点 (input -> 上游节点<StreamNode>)

在这个方法中主要的逻辑
1. 递归遍历他的上游input,确保上游处理完毕并添加edge,并判断如果递归过程中已经处理完,那么直接返回
2. 调用appOperator方法,将transform转换成StreamNode并添加到StreamNodes列表中
3. 如果该transform需要进行keySelector进行分区,则会设置keySelector和序列化器
4. 调用addEdge方法将讲个StreamNode连接

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        //首先会递归遍历他的每一个上游input,保证上游全部处理完毕。然后添加Edge
        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        // 在递归的过程中,已经转换完毕, 直接返回,防止重复处理
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        // 获取 slot 共享组  --  因为有可能多个算子会被chain在一起 这样可以提高slot的资源利用率
        // 用于后面调度task的
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
        // 在 streamGraph中 添加算子
        // 方法会将 operator(transform) 转换成StreamNode,并添加到 StreamNodes列表中
        streamGraph.addOperator(transform.getId(), //transform的唯一id
                slotSharingGroup,   // slot 共享组
                transform.getCoLocationGroupKey(),
                transform.getOperatorFactory(), //获取transform的StreamOperatorFactory
                transform.getInputType(), //获取输入类型
                transform.getOutputType(), // 获取输出类型
                transform.getName()); // 获取 该转换的name

        // 如果该转换需要进行keySelector进行分区 需要对key进行序列话及设置key的eKeySelector
        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        //获取 transform的平行度
        int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
            transform.getParallelism() : executionConfig.getParallelism();
        //设置 transformId的并行度
        streamGraph.setParallelism(transform.getId(), parallelism);
        //设置 最大并行度
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        //循环上游的input, 保证上游全部处理完
        // 这一步是关键,添加一个edge对象,将此oneInputTransformation转换成的vertex和input transformation转换为的vertex连接起来
        for (Integer inputId: inputIds) {
            //将 input 添加到 edge
            // 两个StreamNode 会通过StreamEdge连接在一起,   每一个transform会生成一个StreamNode
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

现在看一下 appOperator和addEdge方法

先看一下appOperator方法
该法最主要的是调用了addNode

            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory<OUT> operatorFactory,
            TypeInformation<IN> inTypeInfo,
            TypeInformation<OUT> outTypeInfo,
            String operatorName,
            Class<? extends AbstractInvokable> invokableClass) {
        // 将transform 转换成StreamNode 并添加到StreamNodes列表中
        addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
        // 添加序列化
        setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));

        //设置operatorFactory输出类型
        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
            // sets the output type which must be know at StreamGraph creation time
            operatorFactory.setOutputType(outTypeInfo, executionConfig);
        }
        //设置operatorFactory输入类型
        if (operatorFactory.isInputTypeConfigurable()) {
            operatorFactory.setInputType(inTypeInfo, executionConfig);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", vertexID);
        }
    }

进入到addNode, 这个方法中会将transform转换成StreamNode,并添加到Nodes列表中

    protected StreamNode addNode(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            Class<? extends AbstractInvokable> vertexClass,
            StreamOperatorFactory<?> operatorFactory,
            String operatorName) {

        //如果StreamNodes中存在该 node 抛出异常
        if (streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }

        //创建一个 StreamNode  --> transform 转换成 StreamNode 在转换成Vertex --> JobVertex
        StreamNode vertex = new StreamNode(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                operatorName,
                new ArrayList<OutputSelector<?>>(),
                vertexClass);

        // 将 StreamNode添加到集合
        streamNodes.put(vertexID, vertex);

        return vertex;
    }

在看一下addEdge方法

    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        addEdgeInternal(upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                null,
                new ArrayList<String>(),
                null,
                null);

    }

进入addEdgeInternal方法,这个方法是添加Edge的逻辑

在这个方法中主要的逻辑,判断上游是否为一下几种transform ,virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes, 如果是则将他们写入到虚拟节点中,然后创建StremaEdge,将两个StreamNode进行连接,并将虚拟节点的信息写入到StreamEdge中,否则直接将两个StreamNode通过StreamEdge进行连接

虚拟节点
这几类的transform:irtualSideOutputNodes,virtualSelectNodes和virtualPartitionNodes,这几类transform会被处理成虚拟节点, 虚拟节点是什么时候生成的,是在对不同的transform处理逻辑中生成的,在这几种的transform处理逻辑中会将transform添加到虚拟节点, 严格的说虚拟节点并不属于StreamNode,不包含逻辑逻辑转换,虚拟节点不会出现在StreamGraph图中,在进行edge过程中,会判断上游是否为虚拟节点,这里是通过递归的方式去获取上游节点信息,直到找到非虚拟节点即StreamNode,会执行edge逻辑,并将虚拟节点的信息记录在StreamEdge中

    private void addEdgeInternal(Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag,
            ShuffleMode shuffleMode) {

        /**
         * todo virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes
         *  这几类transform都会被处理成虚拟节点,当下游生成StreamNode后,发现上游为虚拟节点
         *  会找到虚拟节点的上游,并创建StreamEdge与虚拟节点上游的transform进行连接.并把
         *  虚拟节点的信息写入到StreamEdge中
         */
        // 当上有是 SideOutput, 递归调用,并传入SideOutput信息, 下面同理
        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
        } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
            if (outputNames.isEmpty()) {
                // selections that happen downstream override earlier selections
                outputNames = virtualSelectNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            shuffleMode = virtualPartitionNodes.get(virtualId).f2;
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
        } else {
            // 获取上下游 StreamNode
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            // 如果没有指定分区器,并且上游和下游操作符的并行度匹配使用forward,则使用rebalance

            //                                      上游并行度        ==   下游并行度
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                //  使用 ForwardPartitioner 分区
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) { //否则使用 该分区
                partitioner = new RebalancePartitioner<Object>();
            }


            //进行判断 如果使用ForwardPartitioner分区  并且上下游并行度不相等 则不能使用该分区策略,会抛出异常
            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow " +
                            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

            // 如果没有指定shuffle模式 则由框架决定shuffle模式
            if (shuffleMode == null) {
                shuffleMode = ShuffleMode.UNDEFINED;
            }

            // 创建一个新的StreamEdge
            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

            // 连接刚刚创建的edge到上下游节点
            // getStreamNode  获取到指定id的StreamNode
            // addOutEdge和 addInEdge  将edge添加到获取的StreamNode中
            // 在 SteamNode类中, 由两个list 用于收集此egde
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }

transformSource方法

    private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
        // todo 如果用户指定了组名,则按原样执行。如果没有指定任何内容,
        //  并且输入操作都具有相同的组名称,则使用此名称。否则,选择默认组。
        String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());

        // 为StreamGraph 添加Source
        streamGraph.addSource(source.getId(),  //每个transform都会获得一个递增的id, 该id在后面会转换成 vertexID
                slotSharingGroup,
                source.getCoLocationGroupKey(),
                source.getOperatorFactory(),
                null,
                source.getOutputType(),
                "Source: " + source.getName());

        //获取设置并行度
        int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
                source.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(source.getId(), parallelism);
        streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());

        return Collections.singleton(source.getId());
    }

进入到addSource方法,这个方法做了两件事,
一个是将Source生成StreamNode添加到StreamNodes列表
另一个是将soutce添加到了StreamGraph的sources(为set集合)列表中
可以看到addSource中并没有生成edge 因为source属于图中的顶点,所以只会通过下游去连接source节点

    public <IN, OUT> void addSource(
            Integer vertexID,  // transform的id
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            SourceOperatorFactory<OUT> operatorFactory,
            TypeInformation<IN> inTypeInfo,
            TypeInformation<OUT> outTypeInfo,
            String operatorName) {
        // 方法会将 operator(transform) 转换成StreamNode,并添加到 StreamNodes列表中
        addOperator(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                inTypeInfo,
                outTypeInfo,
                operatorName,
                SourceOperatorStreamTask.class);
        // 将source(operator -> transform)的Id添加的sources的set集合中
        // todo source有可能多个 set保证了不会重复
        sources.add(vertexID);
    }

通过下面代码,看一下转换的过程


1591323006811.png

通过Debug断点到构建StreamGraph得时候,可以看到transformations列表存有四个transform,为map,window,process以及sink的transform,每个transform的input都指向上游的transform,其中source,keyBy的transform并没有添加到transformations列表中,等下会进行解释


transformations
在上面我们发现没有有些算子的transform并没有添加到tranfromtions列表中现在解释一下

source, 在创建source的时候,会创建一个sourceOperator,根据sourceOperator创建DataStreamSouce,在DataStramSource中会将sourceOperator构建成SourceTransformation,最终构建到DataStream,将transform赋值给DataStream的成员变量transform,在下游算子中,会根据成员变量的transform构建自己的transform,该transform的input就是source,最终将构建好的transform添加到env的transformations列表中

keyBy, 在keyBy中创建KeyedStream中,最终将keySelector赋值给了 keyedStream的变量中, KeySelector不会被添加到env的transformations中, 会将信息记录在下游的算子中

keyBy算子,会创建一个keyedStream,并将keyBy传入的值转换成keyedSelector,赋值给keyedStream对象的成员变量中,在创建KyedStream的时候会创建一个PartitionTransformation,该transform会被赋值到DataStream的transformation变量中,在调用timeWindow的时候会将WindowAssigner以及this(即keyedStream)传入创建WindowedStream,在aggregate算子中,会创建一个windowOperator,通过input.transform,将windowOperator转换transform,添加到env的transformations中

keyBy会生成一个 PartitionTransformation的 transform, 但是并不会添加到env的transformations的列表中,而是将信息记录在了下游的transform中即window中,最终将window生成一个transform添加到列表

window生成 通过windowAssigner,keySelector,trigger和evictor等参数构建windowOperator,最终通过transform方法转换成一个transform并添加到env的transformations列表中,
在window中会包含他的上游input(即 keyBy<PartitionTransformation>)

当所有transform都遍历完,这时候构建出StreamGraph,可以看到StreamNode中,已经存在了除了keyBy的所有StreamNode,到这里StreamGraph就已经构建好了


StreamGraph生成后的StreamNodes
执行图

生成JobGraph

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容