Flink源码解析之Job提交(下)

上一篇文章我们讲解了通过命令行将一个Job提交到TaskManager的整体过程,但是我们中间忽略了一些细节,比如Job提交到集群的哪些节点,JobGraph是什么,它是如何生成的?JobClient又是如何将Job提交到集群中的等等,本文会为你一一解决这些问题。

Flink运行时环境

Flink运行时主要包含两种类型的处理器:

  • JobManager: 主要负责协调分布式执行。调度任务,协调Checkpoint,协调故障时容错功能等。
  • TaskManager: 执行数据流的Task(或更具体地说,子任务),并缓冲和交换数据流。

根据JobManager和TaskManager的分工和名称,应该可以很显然的看出JobClient提交Job到JobManager节点上,并通过它将子任务分配到TaskManager上执行。

交互模式提交Job

在通过命令行提交Job时,会调用CluterClient的run方法去执行提交逻辑,而且分为两种方式,交互模式和非交互模式:

    public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) { // 如果包含入口类(非交互模式提交Job)

            // JobWithJars是一个Flink数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader
            final JobWithJars jobWithJars;
            if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                jobWithJars = prog.getPlanWithoutJars();
            } else {
                jobWithJars = prog.getPlanWithJars();
            }

            return run(jobWithJars, parallelism, prog.getSavepointSettings());
        } else if (prog.isUsingInteractiveMode()) { // 使用交互模式提交Job
            log.info("Starting program in interactive mode");

            final List<URL> libraries;
            if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                libraries = Collections.emptyList();
            } else {
                libraries = prog.getAllLibraries();
            }

            ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
                prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
                prog.getSavepointSettings());
            ContextEnvironment.setAsContext(factory);

            try {
                // 调用main方法
                prog.invokeInteractiveModeForExecution();
                if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
                    throw new ProgramMissingJobException("The program didn't contain a Flink job.");
                }
                if (isDetached()) {
                    // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
                    return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
                } else {
                    // in blocking mode, we execute all Flink jobs contained in the user code and then return here
                    return this.lastJobExecutionResult;
                }
            } finally {
                ContextEnvironment.unsetContext();
            }
        } else {
            throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
        }
    }

而实际中,大家可能都是采用交互模式提交作业,在提交的jar包中包含mainClass。以Flink的流处理示例WordCount为例:

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0).sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

        // execute program
        env.execute("Streaming WordCount");
    }

ClusterClient中的prog.invokeInteractiveModeForExecution();其实就是调用WordCount的main方法。main方法的逻辑很简单,分为两部分:构建和执行数据流。本节重点讲执行数据流,也就是最后一行的env.execute("Streaming WordCount");
以本地流执行环境(LocalStreamEnvironment)来看一下execute方法执行了哪些逻辑

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        // transform the streaming program into a JobGraph
        // 生成流图
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);

        // 将流图转换成作业图
        JobGraph jobGraph = streamGraph.getJobGraph();

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());

        configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
        configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        // add (and override) the settings with what the user defined
        configuration.addAll(this.conf);

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
        try {
            exec.start();
            // 提交作业图
            return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
        }
        finally {
            transformations.clear();
            exec.stop();
        }
    }

可以看出主要分为三步:生成流图,生成作业图和提交Job。首先看下提交Job的逻辑

  @throws(classOf[JobExecutionException])
  def submitJobAndWait(
      jobGraph: JobGraph,
      printUpdates: Boolean)
    : JobExecutionResult = {
    submitJobAndWait(jobGraph, printUpdates, timeout)
  }
  @throws(classOf[JobExecutionException])
  def submitJobAndWait(
      jobGraph: JobGraph,
      printUpdates: Boolean,
      timeout: FiniteDuration)
    : JobExecutionResult = {

    val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)

    val userCodeClassLoader =
      try {
        createUserCodeClassLoader(
          jobGraph.getUserJars,
          jobGraph.getClasspaths,
          Thread.currentThread().getContextClassLoader)
      } catch {
        case e: Exception => throw new JobExecutionException(
          jobGraph.getJobID,
          "Could not create the user code class loader.",
          e)
      }

    try {
      JobClient.submitJobAndWait(
       clientActorSystem,
       configuration,
       highAvailabilityServices,
       jobGraph,
       timeout,
       printUpdates,
       userCodeClassLoader)
    } finally {
       if(!useSingleActorSystem) {
         // we have to shutdown the just created actor system
         shutdownJobClientActorSystem(clientActorSystem)
       }
     }
  }

通过执行链,可以看出最终还是会通过上文描述过的JobClient.submitJobAndWait(...)方法提交作业,这里不再赘述。JobClient会启动一个Actor System,虽然它不是Flink运行时的一部分,但是它可以断开连接,或者保持连接以接收进度报告。一个整体的Job提交图如下所示:

上面讲了提交作业的三步,第一和第二步分别是生成流图和作业图,下面我们分别看下流图和作业图 。

流图

StreamGraph(流图)是用来表示流的拓补结构的数据结构,它包含了生成JobGraph的必要信息。
流图是由节点和边组成的,分别对应数据结构StreamNode和StreamEdge。一个StreamGraph可能如下图所示:


下面我们看下StreamGraph是如何创建的,即getStreamGraph()方法的逻辑。

    @Internal
    public StreamGraph getStreamGraph() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return StreamGraphGenerator.generate(this, transformations);
    }
    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
        return new StreamGraphGenerator(env).generateInternal(transformations);
    }
    /**
     * This starts the actual transformation, beginning from the sinks.
     */
    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }
    /**
     * Transforms one {@code StreamTransformation}.
     *
     * <p>This checks whether we already transformed it and exits early in that case. If not it
     * delegates to one of the transformation specific methods.
     */
    private Collection<Integer> transform(StreamTransformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() > 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

可以看出,核心的逻辑在transform(StreamTransformation<?> transform)方法中,可能大家疑惑StreamTransformation是什么?StreamTransformation是DataStream创建操作的描述信息,每一个DataStream底层都有一个StreamTransformation,它是DataStream的原始信息。通过StreamTransformation就可以构建一副整体的StreamGraph。以OneInputTransformation为例,看下是如何进行transform操作的。

    /**
     * Transforms a {@code OneInputTransformation}.
     *
     * <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
     * wired the inputs to this new node.
     */
    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        // 转换当前OneInputTransformation的输入StreamTransformation
        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        //添加 StreamGraph 节点
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        // 添加 StreamGraph 边
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

逻辑很清晰,解析当前OneInputTransformation的输入StreamTransformation,根据OneInputTransformation的operator等信息构建StreamNode,然后根据解析的输入StreamTransformation的Id,构建StreamEdge。
在创建Stream,以及生成StreamGraph的过程中,涉及到较多的数据结构以及层次关系,以上述的WordCount示例中,通过text.flatMap(new Tokenizer())创建的流为例,具体的数据结构和层次如下图所示:

作业图

作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。

相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已经不完全是“静态”的数据结构了,因为它加入了中间数据集(IntermediateDataSet)这一“动态”概念。

作业顶点(JobVertex)、中间数据集(IntermediateDataSet)、作业边(JobEdge)是组成JobGraph的基本元素。这三个对象彼此之间互为依赖:

  • 一个JobVertex关联着若干个JobEdge作为输入端以及若干个IntermediateDataSet作为其生产的结果集;
  • 一个IntermediateDataSet关联着一个JobVertex作为生产者以及若干个JobEdge作为消费者;
  • 一个JobEdge关联着一个IntermediateDataSet可认为是源以及一个JobVertex可认为是目标消费者;

因此一个JobGraph可能的如下图所示:


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容