Flink 源码之ExecutionGraph

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

从JobGraph到ExecutionGraph

JobGraph通过Dispatcher.submitJob方法提交。这是后续流程的入口方法。该方法调用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob

Dispatcher.persistAndRunJob方法存储并执行作业。如下所示:

private void persistAndRunJob(JobGraph jobGraph) throws Exception {
    jobGraphWriter.putJobGraph(jobGraph);
    runJob(jobGraph, ExecutionType.SUBMISSION);
}

Dispatcher.runJob接收JobGraph和执行类型两个参数。执行类型有两种:提交任务(SUBMISSION)和恢复任务(RECOVERY)。

private void runJob(JobGraph jobGraph, ExecutionType executionType) {
    // 确保JobID对应的这个作业目前不在运行状态,避免重复提交
    Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
    // 获取启动时时间戳
    long initializationTimestamp = System.currentTimeMillis();
    // 这里将JobManagerRunner创建出来
    // JobManagerRunner接下来会构造出JobManager
    CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
        createJobManagerRunner(jobGraph, initializationTimestamp);

    // 包装JobGraph相关信息供Dispatcher使用
    DispatcherJob dispatcherJob =
        DispatcherJob.createFor(
        jobManagerRunnerFuture,
        jobGraph.getJobID(),
        jobGraph.getName(),
        initializationTimestamp);
    
    // 将当前作业的ID加入runningJob集合
    // 表示当前作业已处于运行状态
    runningJobs.put(jobGraph.getJobID(), dispatcherJob);

    final JobID jobId = jobGraph.getJobID();

    // 处理Job派发结果
    final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
        dispatcherJob
        .getResultFuture()
        .handleAsync(
        (dispatcherJobResult, throwable) -> {
            Preconditions.checkState(
                runningJobs.get(jobId) == dispatcherJob,
                "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");

            if (dispatcherJobResult != null) {
                return handleDispatcherJobResult(
                    jobId, dispatcherJobResult, executionType);
            } else {
                return dispatcherJobFailed(jobId, throwable);
            }
        },
        getMainThreadExecutor());

    // 作业停止的时候,将JobID从runningJob中移除
    final CompletableFuture<Void> jobTerminationFuture =
        cleanupJobStateFuture
        .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
        .thenCompose(Function.identity());

    // 将作业ID和对应的作业停止future加入到dispatcherJobTerminationFutures集合维护
    FutureUtils.assertNoException(jobTerminationFuture);
    registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
}

接下来是Dispatcher.createJobManagerRunner方法。

JobManagerDispatcher中被创建出来,然后启动。创建JobManager的逻辑在createJobManagerRunner方法中,如下所示:

CompletableFuture<JobManagerRunner> createJobManagerRunner(
    JobGraph jobGraph, long initializationTimestamp) {
    final RpcService rpcService = getRpcService();
    return CompletableFuture.supplyAsync(
        () -> {
            try {
                // 使用工厂类创建JobManager
                // 传入了JobGraph和高可用服务
                JobManagerRunner runner =
                    jobManagerRunnerFactory.createJobManagerRunner(
                    jobGraph,
                    configuration,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    jobManagerSharedServices,
                    new DefaultJobManagerJobMetricGroupFactory(
                        jobManagerMetricGroup),
                    fatalErrorHandler,
                    initializationTimestamp);
                // 启动JobManager
                // 实际上为启动JobManager的leader选举服务,选出JM主节点
                runner.start();
                return runner;
            } catch (Exception e) {
                throw new CompletionException(
                    new JobInitializationException(
                        jobGraph.getJobID(),
                        "Could not instantiate JobManager.",
                        e));
            }
        },
        ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
    // JobManager creation
}

此时,JobManager开始进行leader竞选活动。为了确保JobManager不存在单点故障问题,Flink设计了JobManager 高可用,可以同时运行多个JobManager实例。在Standalone部署方式中,JobManager的竞选通过Zookeeper来实现。Yarn集群模式下则通过Yarn的ApplicationMaster失败后自动重启动方式来确保JobManager的高可用。有关leader选举的内容请参见Flink 源码之leader选举(Zookeeper方式)

一旦leader JM被选举出来,选举服务会调用对应JM的grantLeadership方法。该方法内容如下所示:

@Override
public void grantLeadership(final UUID leaderSessionID) {
    synchronized (lock) {
        if (shutdown) {
            log.debug(
                "JobManagerRunner cannot be granted leadership because it is already shut down.");
            return;
        }

        leadershipOperation =
            leadershipOperation.thenRun(
            ThrowingRunnable.unchecked(
                () -> {
                    synchronized (lock) {
                        // 主要逻辑是这个
                        // 检查作业调度状态并启动JobManager
                        verifyJobSchedulingStatusAndStartJobManager(
                            leaderSessionID);
                    }
                }));

        handleException(leadershipOperation, "Could not start the job manager.");
    }
}

接着我们跟踪到verifyJobSchedulingStatusAndStartJobManager方法。

@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
    throws FlinkException {
    // 如果JobManager已停止,直接返回
    if (shutdown) {
        log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
        return;
    }

    // 从JobRegistry中获取Job调度状态
    final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
        getJobSchedulingStatus();

    // 如果作业已执行完毕
    // 调用作业执行完毕逻辑(实际上是作业未被当前JobManager完成运行的逻辑)
    if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
        jobAlreadyDone();
    } else {
        // 启动JobMaster
        startJobMaster(leaderSessionId);
    }
}

现在逻辑流转到了JobManagerRunnerImpl.startJobMaster方法。

该方法启动JobMaster。注册JobGraph,启动JobMaster服务并确认该JobMaster为leader。

@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException {
    log.info(
        "JobManager runner for job {} ({}) was granted leadership with session id {}.",
        jobGraph.getName(),
        jobGraph.getJobID(),
        leaderSessionId);

    try {
        // 注册JobGraph
        // 根据集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存储JobID
        runningJobsRegistry.setJobRunning(jobGraph.getJobID());
    } catch (IOException e) {
        throw new FlinkException(
            String.format(
                "Failed to set the job %s to running in the running jobs registry.",
                jobGraph.getJobID()),
            e);
    }

    // 启动JobMaster服务
    startJobMasterServiceSafely(leaderSessionId);

    // 确认该JobMaster是leader状态
    if (jobMasterService != null) {
        confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
    }
}

JobManagerRunnerImpl.startJobMasterServiceSafely紧接着通过

DefaultJobMasterServiceFactory.createJobMasterService方法,创建出JobMaster并启动他的Rpc通信服务。

接下来。在JobMaster构造函数中存在构建Flink作业任务调度器的逻辑。JobMaster.createScheduler方法调用了

DefaultSlotPoolServiceSchedulerFactory.createScheduler创建Flink的调度器。该方法又调用了Scheduler工厂类的创建Scheduler实例这个方法DefaultSchedulerFactory.createInstance

接下来的流程到了DefaultScheduler中。DefaultScheduler是Flink作业调度器的默认实现。它继承了SchedulerBaseSchedulerBase又实现了SchedulerNG接口。

SchedulerBase构造函数中调用了createAndRestoreExecutionGraph方法。

SchedulerBase.createAndRestoreExecutionGraph代码如下所示:

private ExecutionGraph createAndRestoreExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker,
    ExecutionDeploymentTracker executionDeploymentTracker,
    long initializationTimestamp,
    ComponentMainThreadExecutor mainThreadExecutor,
    JobStatusListener jobStatusListener)
    throws Exception {

    // 创建ExecutionGraph
    ExecutionGraph newExecutionGraph =
        createExecutionGraph(
        currentJobManagerJobMetricGroup,
        completedCheckpointStore,
        checkpointsCleaner,
        checkpointIdCounter,
        shuffleMaster,
        partitionTracker,
        executionDeploymentTracker,
        initializationTimestamp);

    // 获取ExecutionGraph中创建出的CheckpointCoordinator
    // 创建CheckpointCoordinator的过程后面章节有说明
    final CheckpointCoordinator checkpointCoordinator =
        newExecutionGraph.getCheckpointCoordinator();

    if (checkpointCoordinator != null) {
        // check whether we find a valid checkpoint
        // 检查是否存在一个最近的checkpoint
        if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
            new HashSet<>(newExecutionGraph.getAllVertices().values()))) {

            // check whether we can restore from a savepoint
            // 如果有,尝试从这个检查点恢复
            tryRestoreExecutionGraphFromSavepoint(
                newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
    }

    // 设置任务失败监听器
    newExecutionGraph.setInternalTaskFailuresListener(
        new UpdateSchedulerNgOnInternalFailuresListener(this));
    // 设置作业状态监听器
    newExecutionGraph.registerJobStatusListener(jobStatusListener);
    // 设置JobMaster的主线程ThreadExecutor
    newExecutionGraph.start(mainThreadExecutor);

    return newExecutionGraph;
}

SchedulerBase.createExecutionGraph方法调用DefaultExecutionGraphBuilder,创建出ExecutionGraph。代码如下所示:

private ExecutionGraph createExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    ShuffleMaster<?> shuffleMaster,
    final JobMasterPartitionTracker partitionTracker,
    ExecutionDeploymentTracker executionDeploymentTracker,
    long initializationTimestamp)
    throws JobExecutionException, JobException {

    // 创建Execution部署监听器
    ExecutionDeploymentListener executionDeploymentListener =
        new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
    //创建Execution状态更新监听器
    ExecutionStateUpdateListener executionStateUpdateListener =
        (execution, newState) -> {
        if (newState.isTerminal()) {
            executionDeploymentTracker.stopTrackingDeploymentOf(execution);
        }
    };

    // 创建ExecutionGraph
    return DefaultExecutionGraphBuilder.buildGraph(
        jobGraph,
        jobMasterConfiguration,
        futureExecutor,
        ioExecutor,
        userCodeLoader,
        completedCheckpointStore,
        checkpointsCleaner,
        checkpointIdCounter,
        rpcTimeout,
        currentJobManagerJobMetricGroup,
        blobWriter,
        log,
        shuffleMaster,
        partitionTracker,
        TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
            jobGraph.getJobType()),
        executionDeploymentListener,
        executionStateUpdateListener,
        initializationTimestamp,
        new DefaultVertexAttemptNumberStore());
}

ExecutionGraph相关概念

ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。

StreamGraphJobGraph不同的是,ExecutionGraph是在JobManager中生成。

ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraphJobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex

ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。

IntermediateResultJobGraphJobVertexIntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResultExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。

DefaultExecutionGraphBuilder的buildGraph方法

public static ExecutionGraph buildGraph(
    JobGraph jobGraph,
    Configuration jobManagerConfig,
    ScheduledExecutorService futureExecutor,
    Executor ioExecutor,
    ClassLoader classLoader,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    Time rpcTimeout,
    MetricGroup metrics,
    BlobWriter blobWriter,
    Logger log,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker,
    TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
    ExecutionDeploymentListener executionDeploymentListener,
    ExecutionStateUpdateListener executionStateUpdateListener,
    long initializationTimestamp,
    VertexAttemptNumberStore vertexAttemptNumberStore)
    throws JobExecutionException, JobException {

    checkNotNull(jobGraph, "job graph cannot be null");

    // 获取作业名称和作业ID
    final String jobName = jobGraph.getName();
    final JobID jobId = jobGraph.getJobID();

    // 创建JobInformation
    // JobInformation为ExecutionGraph中的job相关配置信息的封装类
    final JobInformation jobInformation =
        new JobInformation(
        jobId,
        jobName,
        jobGraph.getSerializedExecutionConfig(),
        jobGraph.getJobConfiguration(),
        jobGraph.getUserJarBlobKeys(),
        jobGraph.getClasspaths());

    // 获取保留在历史记录中的最大重试次数
    final int maxPriorAttemptsHistoryLength =
        jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);

    // 获取IntermediateResultPartition释放策略工厂类
    final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
        PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
        jobManagerConfig);

    // create a new execution graph, if none exists so far
    // 创建ExecutionGraph,后面章节分析
    final DefaultExecutionGraph executionGraph;
    try {
            executionGraph =
                    new DefaultExecutionGraph(
                            jobInformation,
                            futureExecutor,
                            ioExecutor,
                            rpcTimeout,
                            maxPriorAttemptsHistoryLength,
                            classLoader,
                            blobWriter,
                            partitionReleaseStrategyFactory,
                            shuffleMaster,
                            partitionTracker,
                            partitionLocationConstraint,
                            executionDeploymentListener,
                            executionStateUpdateListener,
                            initializationTimestamp,
                            vertexAttemptNumberStore);
    } catch (IOException e) {
        throw new JobException("Could not create the ExecutionGraph.", e);
    }

    // set the basic properties

    try {
        // 设置json格式的执行计划
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
    } catch (Throwable t) {
        log.warn("Cannot create JSON plan for job", t);
        // give the graph an empty plan
        // 如果根据jobGraph生成json执行计划失败,设置一个空的执行计划
        executionGraph.setJsonPlan("{}");
    }

    // initialize the vertices that have a master initialization hook
    // file output formats create directories here, input formats create splits

    final long initMasterStart = System.nanoTime();
    log.info("Running initialization on master for job {} ({}).", jobName, jobId);

    for (JobVertex vertex : jobGraph.getVertices()) {
        // 获取节点调用的类名,即节点的task
        String executableClass = vertex.getInvokableClassName();
        // 确保每个节点的调用类必须存在
        if (executableClass == null || executableClass.isEmpty()) {
            throw new JobSubmissionException(
                jobId,
                "The vertex "
                + vertex.getID()
                + " ("
                + vertex.getName()
                + ") has no invokable class.");
        }

        try {
            // 根据不同的节点类型,调用job启动时节点的任务逻辑
            vertex.initializeOnMaster(classLoader);
        } catch (Throwable t) {
            throw new JobExecutionException(
                jobId,
                "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
                t);
        }
    }

    log.info(
        "Successfully ran initialization on master in {} ms.",
        (System.nanoTime() - initMasterStart) / 1_000_000);

    // topologically sort the job vertices and attach the graph to the existing one
    // 按照拓扑结构(数据流的顺序)排序,获取所有的Job顶点
    List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
    if (log.isDebugEnabled()) {
        log.debug(
            "Adding {} vertices from job graph {} ({}).",
            sortedTopology.size(),
            jobName,
            jobId);
    }
    // executionGraph绑定所有的Job节点
    executionGraph.attachJobGraph(sortedTopology);

    if (log.isDebugEnabled()) {
        log.debug(
            "Successfully created execution graph from job graph {} ({}).", jobName, jobId);
    }

    // configure the state checkpointing
    // 配置checkpoint
    // 如果启用了checkpoint
    if (isCheckpointingEnabled(jobGraph)) {
        // 从JobGraph获取checkpoint的配置
        // snapshotSettings的配置位于StreamingJobGraphGenerator
        JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
        
        // 获取所有触发checkpoint的顶点,即所有的数据输入顶点
        List<ExecutionJobVertex> triggerVertices =
            idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);

        // 获取所有需要checkpoint确认的顶点,即所有的顶点
        List<ExecutionJobVertex> ackVertices =
            idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);

        // 获取所有需要接收到提交checkpoint信息的顶点,即所有的顶点
        List<ExecutionJobVertex> confirmVertices =
            idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);

        // Maximum number of remembered checkpoints
        // 获取历史记录checkpoint最大数量
        int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);

        // 创建checkpoint状态跟踪器,和CheckpointCoordinator配合工作
        CheckpointStatsTracker checkpointStatsTracker =
            new CheckpointStatsTracker(
            historySize,
            ackVertices,
            snapshotSettings.getCheckpointCoordinatorConfiguration(),
            metrics);

        // load the state backend from the application settings
        // 获取状态后端的配置
        final StateBackend applicationConfiguredBackend;
        final SerializedValue<StateBackend> serializedAppConfigured =
            snapshotSettings.getDefaultStateBackend();

        if (serializedAppConfigured == null) {
            applicationConfiguredBackend = null;
        } else {
            try {
                // 根据应用的配置获取状态后端
                applicationConfiguredBackend =
                    serializedAppConfigured.deserializeValue(classLoader);
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(
                    jobId, "Could not deserialize application-defined state backend.", e);
            }
        }

        final StateBackend rootBackend;
        try {
            // 获取状态后端配置
            // 如果应用的状态后端没有配置,使用配置文件中的状态后端
            // 如果配置文件中也没有,使用默认值
            rootBackend =
                StateBackendLoader.fromApplicationOrConfigOrDefault(
                applicationConfiguredBackend, jobManagerConfig, classLoader, log);
        } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
            throw new JobExecutionException(
                jobId, "Could not instantiate configured state backend", e);
        }
        
        // load the checkpoint storage from the application settings
        // 从app设置中加载checkpoint存储配置
        final CheckpointStorage applicationConfiguredStorage;
        final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
            snapshotSettings.getDefaultCheckpointStorage();

        if (serializedAppConfiguredStorage == null) {
            applicationConfiguredStorage = null;
        } else {
            try {
                applicationConfiguredStorage =
                    serializedAppConfiguredStorage.deserializeValue(classLoader);
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(
                    jobId,
                    "Could not deserialize application-defined checkpoint storage.",
                    e);
            }
        }
        
        // 和状态后端的配置类似,从应用配置和flink配置文件中加载checkpoint存储配置
        final CheckpointStorage rootStorage;
        try {
            rootStorage =
                CheckpointStorageLoader.load(
                applicationConfiguredStorage,
                null,
                rootBackend,
                jobManagerConfig,
                classLoader,
                log);
        } catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
            throw new JobExecutionException(
                jobId, "Could not instantiate configured checkpoint storage", e);
        }

        // instantiate the user-defined checkpoint hooks

        // 实例化用户定义的checkpoint钩子
        // 这些钩子可以在恢复快照或者是触发快照的时候执行
        final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
            snapshotSettings.getMasterHooks();
        final List<MasterTriggerRestoreHook<?>> hooks;

        if (serializedHooks == null) {
            hooks = Collections.emptyList();
        } else {
            final MasterTriggerRestoreHook.Factory[] hookFactories;
            try {
                hookFactories = serializedHooks.deserializeValue(classLoader);
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(
                    jobId, "Could not instantiate user-defined checkpoint hooks", e);
            }

            final Thread thread = Thread.currentThread();
            final ClassLoader originalClassLoader = thread.getContextClassLoader();
            thread.setContextClassLoader(classLoader);

            try {
                hooks = new ArrayList<>(hookFactories.length);
                for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                    hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                }
            } finally {
                thread.setContextClassLoader(originalClassLoader);
            }
        }

        // 获取checkpoint协调器的配置
        final CheckpointCoordinatorConfiguration chkConfig =
            snapshotSettings.getCheckpointCoordinatorConfiguration();

        // 为executionGraph应用checkpoint的相关配置
        executionGraph.enableCheckpointing(
            chkConfig,
            triggerVertices,
            ackVertices,
            confirmVertices,
            hooks,
            checkpointIdCounter,
            completedCheckpointStore,
            rootBackend,
            checkpointStatsTracker,
            checkpointsCleaner);
    }

    // create all the metrics for the Execution Graph
    // 创建相关监控项,监控任务运行时间,重启时间和停止时间
    metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
    metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
    metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));

    return executionGraph;
}

创建ExecutionGraph的主要步骤大致如下:

  • 获取Job信息
  • 创建ExecutionGraph本体
  • 绑定JobGraph顶点
  • 设置Checkpoint配置
  • 设置状态后端配置
  • 设置Checkpoint存储
  • 设置Checkpoint钩子
  • 设置作业监控

ExecutionGraph构造函数

  • jobInformation:作业信息的一个封装,包含作业id,名称,配置,用户代码和classpath等。
  • futureExecutor:异步执行线程池。
  • ioExecutor:IO操作线程池。
  • rpcTimeout:RPC调用超时时间。
  • maxPriorAttemptsHistoryLength:保留在历史记录中的最大重试次数。
  • classLoader:用户代码类加载器。
  • blobWriter:用于将数据写入blob server。
  • partitionReleaseStrategyFactory:IntermediateResultPartition释放策略工厂类。
  • shuffleMaster:用于注册IntermediateResultPartition(中间结果分区),向JobMaster注册数据分区及它的生产者。
  • partitionTracker:用于追踪和释放分区。
  • partitionLocationConstraint:限制在部署的时候partition的位置可否未知。在批模式,分区未知可以未知,但是在流模式,分区位置必须是已知的。
  • executionDeploymentListener:执行计划部署监听器
  • executionStateUpdateListener:执行计划更新监听器
  • initializationTimestamp:初始时间戳
  • vertexAttemptNumberStore:用于储存每个Job顶点重试次数

attachJobGraph方法

该方法将JobGraph绑定到ExecutionGraph。

@Override
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

    // 检查在JobMaster主线程执行
    assertRunningInJobMasterMainThread();

    LOG.debug(
        "Attaching {} topologically sorted vertices to existing job graph with {} "
        + "vertices and {} intermediate results.",
        topologiallySorted.size(),
        tasks.size(),
        intermediateResults.size());

    // 创建保存Execution作业顶点的集合
    final ArrayList<ExecutionJobVertex> newExecJobVertices =
        new ArrayList<>(topologiallySorted.size());
    final long createTimestamp = System.currentTimeMillis();

    for (JobVertex jobVertex : topologiallySorted) {

        // 如果有顶点是数据输入顶点并且是无法停止的顶点
        // 则设置ExecutionGraph的数据源task属性为无法停止
        if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
            this.isStoppable = false;
        }

        // create the execution job vertex and attach it to the graph
        // 创建出ExecutionJobVertex
        ExecutionJobVertex ejv =
            new ExecutionJobVertex(
            this,
            jobVertex,
            maxPriorAttemptsHistoryLength,
            rpcTimeout,
            createTimestamp,
            this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));

        // 设置前一个节点的IntermediateResult给当前ejv
        // 完成连接到前置节点这个逻辑,即这个方法名的含义
        ejv.connectToPredecessors(this.intermediateResults);

        // 将job顶点ID和ejv作为键值对,放入ExecutionGraph
        ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
        
        // 如果previousTask不为空,说明两个JobGraph的顶点具有相同的ID,为异常情况
        if (previousTask != null) {
            throw new JobException(
                String.format(
                    "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(), ejv, previousTask));
        }

        // 遍历ejv中创建的IntermediateResult
        // 该IntermediateResult在ExecutionJobVertex构造函数创建
        // 从ejv对应JobVertex的IntermediateDataSets创建出IntermediateResult
        for (IntermediateResult res : ejv.getProducedDataSets()) {
            IntermediateResult previousDataSet =
                this.intermediateResults.putIfAbsent(res.getId(), res);
            // 同理,检查result的ID不能重复
            if (previousDataSet != null) {
                throw new JobException(
                    String.format(
                        "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(), res, previousDataSet));
            }
        }

        // 该集合按照顶点创建顺序保存ejv,将ejv保存起来
        this.verticesInCreationOrder.add(ejv);
        // 统计总的顶点数,作业实际执行的时候,每个并行度都会部署一个vertex运行task
        // 因此需要累加各个ejv的并行度
        this.numVerticesTotal += ejv.getParallelism();
        newExecJobVertices.add(ejv);
    }

    // 注册所有的ExecutionVertex和它输出数据的IntermediateResultPartition
    // ExecutionVertex为物理执行节点,一个ExecutionJobVertex有多少并行度,就会包含多少个ExecutionVertex
    registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);

    // the topology assigning should happen before notifying new vertices to failoverStrategy
    // 创建执行拓扑
    //执行拓扑包含所有ExecutionVertex,ResultPartition以及PipelinedRegion
    executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);

    // 创建分区释放策略
    partitionReleaseStrategy =
        partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

Pipelined Region

创建executionTopology我们会遇到Pipelined region。在解释这个概念前需要了解下pipelined result和blocking result的区别。

Pipelined result指的是数据从管道中源源不断的流出,下游可以连续消费产生的数据。一旦上游产生数据,下游就可以立即开始消费。Pipelined result生产数据的过程永远不会停止。此类型对应的作业为流计算作业。

Blocking result只能等到上游数据生产过程结束的时候才可以消费。Blocking result永远是有限的。典型的场景是批处理作业。

Pipelined Region是ExecutionGraph的一部分。它包含连续多个pipeline类型数据交换task(生成pipelined result)。因此一个ExecutionGraph可被分隔为多个pipelined Region,他们之间有block类型作业相连接。

Pipelined Region的意义是region内部的所有消费者必须持续消费上游生产者产生的数据,从而避免阻塞上游,或者产生反压。所以说同一个pipelined Region内的所有task启动或失败之时都必须同时被调度或重启。

Pipelined region调度的官网解释请参见:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html

enableCheckpointing方法

该方法为ExecutionGraph初始化检查点相关配置。主要逻辑是创建和配置CheckpointCoordinator对象。代码如下所示:

@Override
public void enableCheckpointing(
    CheckpointCoordinatorConfiguration chkConfig,
    List<MasterTriggerRestoreHook<?>> masterHooks,
    CheckpointIDCounter checkpointIDCounter,
    CompletedCheckpointStore checkpointStore,
    StateBackend checkpointStateBackend,
    CheckpointStorage checkpointStorage,
    CheckpointStatsTracker statsTracker,
    CheckpointsCleaner checkpointsCleaner) {

    // 检查作业必须处于已创建状态
    checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
    // 检查CheckpointCoordinator(检查点协调器)必须未创建,避免重复操作
    checkState(checkpointCoordinator == null, "checkpointing already enabled");

    // 收集各个ExecutionJobVertex的OperatorCoordinator
    // OperatorCoordinator运行在JobManager,与作业vertex中的operator相关联。用于和operator交互
    final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators =
        buildOpCoordinatorCheckpointContexts();

    checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");

    // 创建checkpoint失败管理器,负责在checkpoint失败时候调用处理逻辑
    CheckpointFailureManager failureManager =
        new CheckpointFailureManager(
        chkConfig.getTolerableCheckpointFailureNumber(),
        new CheckpointFailureManager.FailJobCallback() {
            @Override
            public void failJob(Throwable cause) {
                getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
            }

            @Override
            public void failJobDueToTaskFailure(
                Throwable cause, ExecutionAttemptID failingTask) {
                getJobMasterMainThreadExecutor()
                    .execute(
                    () ->
                    failGlobalIfExecutionIsStillRunning(
                        cause, failingTask));
            }
        });

    // 创建CheckpointCoordinator周期自动触发checkpoint的定时器
    checkState(checkpointCoordinatorTimer == null);

    checkpointCoordinatorTimer =
        Executors.newSingleThreadScheduledExecutor(
        new DispatcherThreadFactory(
            Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));

    // 创建CheckpointCoordinator,负责协调整个集群范围内所有operator的checkpoint操作,发起checkpoint操作和提交checkpoint
    // create the coordinator that triggers and commits checkpoints and holds the state
    checkpointCoordinator =
        new CheckpointCoordinator(
        jobInformation.getJobId(),
        chkConfig,
        operatorCoordinators,
        checkpointIDCounter,
        checkpointStore,
        checkpointStorage,
        ioExecutor,
        checkpointsCleaner,
        new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
        SharedStateRegistry.DEFAULT_FACTORY,
        failureManager,
        createCheckpointPlanCalculator(),
        new ExecutionAttemptMappingProvider(getAllExecutionVertices()));

    // register the master hooks on the checkpoint coordinator
    // 设置主消息钩子,在创建checkpoint或从checkpoint恢复的时候回调
    for (MasterTriggerRestoreHook<?> hook : masterHooks) {
        if (!checkpointCoordinator.addMasterHook(hook)) {
            LOG.warn(
                "Trying to register multiple checkpoint hooks with the name: {}",
                hook.getIdentifier());
        }
    }

    // 配置checkpoint状态跟踪器
    checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);

    // interval of max long value indicates disable periodic checkpoint,
    // the CheckpointActivatorDeactivator should be created only if the interval is not max
    // value
    // 如果没有禁用周期性触发checkpoint,注册一个作业状态监听器
    // 该listener为CheckpointCoordinator所用,监听作业状态的变化
    if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
        // the periodic checkpoint scheduler is activated and deactivated as a result of
        // job status changes (running -> on, all other states -> off)
        registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
    }

    // 配置状态后端名称和checkpoint存储名称
    this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
    this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
}

从ExecutionGraph到部署Task

ExecutionVertex每次执行都会创建出一个Execution对象。

JobManager启动完毕之后,会对Scheduler发出开始调度的命令(调用SchedulerBasestartScheduling方法)。经过中间层层调用(较为复杂,这里暂时省略),最终到达Execution.deploy方法。

Execution.deploy方法为真正的部署运行逻辑,根据task资源和ExecutionVertex构造出一个task部署描述符。这个部署描述符的作用为携带task执行配置,通过RPC的方式传递给TaskManager,从而创建出一个符合要求的task。

public void deploy() throws JobException {
    // 确保在JobMaster主线程执行
    assertRunningInJobMasterMainThread();

    // 获取分配的资源
    final LogicalSlot slot = assignedResource;

    checkNotNull(
        slot,
        "In order to deploy the execution we first have to assign a resource via tryAssignResource.");

    // Check if the TaskManager died in the meantime
    // This only speeds up the response to TaskManagers failing concurrently to deployments.
    // The more general check is the rpcTimeout of the deployment call
    if (!slot.isAlive()) {
        throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
    }

    // make sure exactly one deployment call happens from the correct state
    // note: the transition from CREATED to DEPLOYING is for testing purposes only
    // 获取之前的状态,并切换状态为正在部署(DEPLOYING)
    ExecutionState previous = this.state;
    if (previous == SCHEDULED || previous == CREATED) {
        if (!transitionState(previous, DEPLOYING)) {
            // race condition, someone else beat us to the deploying call.
            // this should actually not happen and indicates a race somewhere else
            throw new IllegalStateException(
                "Cannot deploy task: Concurrent deployment call race.");
        }
    } else {
        // vertex may have been cancelled, or it was already scheduled
        throw new IllegalStateException(
            "The vertex must be in CREATED or SCHEDULED state to be deployed. Found state "
            + previous);
    }

    // 检查这个slot资源是否分配给了当前execution
    if (this != slot.getPayload()) {
        throw new IllegalStateException(
            String.format(
                "The execution %s has not been assigned to the assigned slot.", this));
    }

    try {

        // race double check, did we fail/cancel and do we need to release the slot?
        // 再次检查作业状态是否为正在部署
        if (this.state != DEPLOYING) {
            slot.releaseSlot(
                new FlinkException(
                    "Actual state of execution "
                    + this
                    + " ("
                    + state
                    + ") does not match expected state DEPLOYING."));
            return;
        }

        LOG.info(
            "Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",
            vertex.getTaskNameWithSubtaskIndex(),
            attemptNumber,
            vertex.getCurrentExecutionAttempt().getAttemptId(),
            getAssignedResourceLocation(),
            slot.getAllocationId());

        // 创建一个Task部署描述符
        // 该部署描述符携带了ExecutionVertex及其分配的资源等信息
        final TaskDeploymentDescriptor deployment =
            TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
            .createDeploymentDescriptor(
            slot.getAllocationId(),
            taskRestore,
            producedPartitions.values());

        // null taskRestore to let it be GC'ed
        taskRestore = null;

        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

        final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
            vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();

        getVertex().notifyPendingDeployment(this);
        // We run the submission in the future executor so that the serialization of large TDDs
        // does not block
        // the main thread and sync back to the main thread once submission is completed.
        // 在这里,异步调用taskManagerGateway,通过rpc方式通知TaskManager
        // 将Task部署描述符发送给TaskManager
        // TaskManager接收到后开始部署Task
        CompletableFuture.supplyAsync(
            () -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
            .thenCompose(Function.identity())
            .whenCompleteAsync(
            (ack, failure) -> {
                if (failure == null) {
                    vertex.notifyCompletedDeployment(this);
                } else {
                    if (failure instanceof TimeoutException) {
                        String taskname =
                            vertex.getTaskNameWithSubtaskIndex()
                            + " ("
                            + attemptId
                            + ')';

                        markFailed(
                            new Exception(
                                "Cannot deploy task "
                                + taskname
                                + " - TaskManager ("
                                + getAssignedResourceLocation()
                                + ") not responding after a rpcTimeout of "
                                + rpcTimeout,
                                failure));
                    } else {
                        markFailed(failure);
                    }
                }
            },
            jobMasterMainThreadExecutor);

    } catch (Throwable t) {
        markFailed(t);
    }
}

上面方法中通过TaskManagerGateway调用了TaskExecutorsubmitTask方法。

@Override
public CompletableFuture<Acknowledge> submitTask(
    TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
    // ...
    // 创建出Task
    Task task =
                    new Task(
                            jobInformation,
                            taskInformation,
                            tdd.getExecutionAttemptId(),
                            tdd.getAllocationId(),
                            tdd.getSubtaskIndex(),
                            tdd.getAttemptNumber(),
                            tdd.getProducedPartitions(),
                            tdd.getInputGates(),
                            memoryManager,
                            taskExecutorServices.getIOManager(),
                            taskExecutorServices.getShuffleEnvironment(),
                            taskExecutorServices.getKvStateService(),
                            taskExecutorServices.getBroadcastVariableManager(),
                            taskExecutorServices.getTaskEventDispatcher(),
                            externalResourceInfoProvider,
                            taskStateManager,
                            taskManagerActions,
                            inputSplitProvider,
                            checkpointResponder,
                            taskOperatorEventGateway,
                            aggregateManager,
                            classLoaderHandle,
                            fileCache,
                            taskManagerConfiguration,
                            taskMetricGroup,
                            resultPartitionConsumableNotifier,
                            partitionStateChecker,
                            getRpcService().getExecutor());
// ...
}

到这里为止,TaskManager中的具体任务Task对象已经被创建出来。从JobGraph生成ExecutionGraph并最终部署为Task的过程已分析完毕。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容