Flink 源码之快照

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

周期触发checkpoint的方法调用链

  • JobMaster.triggerSavepoint
  • SchedulerBase.startCheckpointScheduler
  • CheckpointCoordinator.startCheckpointScheduler
  • CheckpointCoordinator.scheduleTriggerWithDelay
  • CheckpointCoordinator.triggerCheckpoint
  • Execution.triggerCheckpoint
  • Execution.triggerCheckpointHelper
  • RpcTaskManagerGateway.triggerCheckpoint
  • TaskExecutor.triggerCheckpoint
  • Task.triggerCheckpointBarrier
  • invokable.triggerCheckpointAsync(这里我们看StreamTask)
  • StreamTask.performCheckpoint
  • OperatorChain.broadcastCheckpointBarrier和CheckpointingOperation.executeCheckpointing

设置触发checkpoint的节点

ExecutionGraphenableCheckpointing方法,创建了一个checkpointCoordinator对象。该对象运行如JobManager中,负责统筹这个分布式系统中的checkpoint过程。它负责如下内容:

  • 定时触发checkpoint操作。命令数据源发送checkpoint屏障。
  • 接收各个operator的某个checkpoint完成确认消息。
  • 对于某个checkpoint,当接收到所有operator的确认消息之时,发送消息通知各个operator,checkpoint已完成。
  • 保存已完成和正在进行中的checkpoint的相关信息。

我们注意到构建checkpointCoordinator传入一个变量叫tasksToTrigger。含义为需要触发checkpoint的节点。这个变量在StreamingJobGraphGeneratorconfigureCheckpointing方法中创建。此方法的相关代码如下(无关部分已省略):

for (JobVertex vertex : jobVertices.values()) {
    if (vertex.isInputVertex()) {
        triggerVertices.add(vertex.getID());
    }
    commitVertices.add(vertex.getID());
    ackVertices.add(vertex.getID());
}

triggerVertices集合为符合isInputVertex这个条件的所有vertex。继续查看isInputVertex方法。代码如下:

public boolean isInputVertex() {
    return this.inputs.isEmpty();
}

这下就明白了。没有任何输入的JobVertex才是inputVertex。因此,Checkpoint操作只会在inputVertex触发,即数据源是首先触发checkpoint操作的节点,然后checkpoint随着checkpoint barrier流向下游,依次触发各个节点的checkpoint操作。

周期触发Checkpoint操作的调用链分析

JobMaster

JobMaster触发savepoint的时候会启动checkpoint过程。
我们查看下JobMastertriggerSavepoint方法:

@Override
public CompletableFuture<String> triggerSavepoint(
        @Nullable final String targetDirectory,
        final boolean cancelJob,
        final Time timeout) {

    return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}

下面我们分析下schedulerNG.triggerSavepoint方法。

SchedulerBase

JobMastertriggerSavepoint里的schedulerNG.triggerSavepoint调用的是SchedulerBase的方法。代码如下:

@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
    // 确保运行在主线程
    mainThreadExecutor.assertRunningInMainThread();

    // 从executionGraph获取checkpointCoordinator
    final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    if (checkpointCoordinator == null) {
        throw new IllegalStateException(
            String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
    } else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
        // 确保配置了savepoint默认存储目录,或者方法中传入了存储目录
        log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

        throw new IllegalStateException(
            "No savepoint directory configured. You can either specify a directory " +
                "while cancelling via -s :targetDirectory or configure a cluster-wide " +
                "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
    }

    // 如果是取消作业,停止checkpoint协调器
    if (cancelJob) {
        checkpointCoordinator.stopCheckpointScheduler();
    }

    // 先触发一次savepoint操作(实际上触发的是checkpoint)
    // 接下来返回checkpoint操作保存的文件路径
    // 最后执行:1.如果需要取消作业,并且之前步骤抛出了异常,则再次启动checkpoint协调器,抛出异常
    // 2.如果需要取消作业,之前步骤没有抛出异常,取消任务执行
    return checkpointCoordinator
        .triggerSavepoint(System.currentTimeMillis(), targetDirectory)
        .thenApply(CompletedCheckpoint::getExternalPointer)
        .handleAsync((path, throwable) -> {
            if (throwable != null) {
                if (cancelJob) {
                    startCheckpointScheduler(checkpointCoordinator);
                }
                throw new CompletionException(throwable);
            } else if (cancelJob) {
                log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
                cancel();
            }
            return path;
        }, mainThreadExecutor);
}

CheckpointCoordinator

CheckpointCoordinator负责协调所有算子的分布式快照和状态。它向相关的
task发送消息来触发快照动作,之后收集它们快照成功的确认消息。

CheckpointCoordinatorcreateActivatorDeactivator方法。该方法创建了一个Job状态监听器。如果Job的运行状态发生变化会调用listener的jobStatusChanges方法。代码如下:

CheckpointCoordinatorDeActivatorjobStatusChanges方法:

@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
    if (newJobStatus == JobStatus.RUNNING) {
        // start the checkpoint scheduler
        coordinator.startCheckpointScheduler();
    } else {
        // anything else should stop the trigger for now
        coordinator.stopCheckpointScheduler();
    }
}

从代码中不难发现,如果Job跳转为RUNNING状态,调用CheckpointCoordinatorstartCheckpointScheduler方法。反之调用stopCheckpointScheduler方法。

接下来看一看startCheckpointScheduler方法的源代码,如下所示:

public void startCheckpointScheduler() {
    synchronized (lock) {
        if (shutdown) {
            throw new IllegalArgumentException("Checkpoint coordinator is shut down");
        }

        // make sure all prior timers are cancelled
        // 先停止之前创建的scheduler
        stopCheckpointScheduler();

        // 再创建一个新的scheduler
        periodicScheduling = true;
        // 延迟一段时间后启动定时checkpoint触发任务
        // 延迟时间为checkpoint间隔最短时间到checkpoint间隔时间+1(开区间)之间的随机值
        currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
    }
}

scheduleTriggerWithDelay方法启动了一个checkpoint操作定时触发器,代码如下所示:

private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
    return timer.scheduleAtFixedRate(
        new ScheduledTrigger(),
        initDelay, baseInterval, TimeUnit.MILLISECONDS);
}

这段代码设置了一个定时触发任务,任务逻辑在ScheduledTrigger中。它的代码如下:

private final class ScheduledTrigger implements Runnable {

    @Override
    public void run() {
        try {
            triggerCheckpoint(System.currentTimeMillis(), true);
        }
        catch (Exception e) {
            LOG.error("Exception while triggering checkpoint for job {}.", job, e);
        }
    }
}
``

`run`方法内仅有一个调用`triggerCheckpoint`。我们跟踪这个方法:
```java
// 上一步调用该方法时timestamp为系统当前时间,isPeriodic为true
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, boolean isPeriodic) {
    try {
        return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false);
    } catch (CheckpointException e) {
        long latestGeneratedCheckpointId = getCheckpointIdCounter().get();
        // here we can not get the failed pending checkpoint's id,
        // so we pass the negative latest generated checkpoint id as a special flag
        failureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId);
        return FutureUtils.completedExceptionally(e);
    }
}

接下来会调用真正的checkpoint处理逻辑,该方法比较长。它的代码如下:

@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
        long timestamp,
        CheckpointProperties props,
        @Nullable String externalSavepointLocation,
        boolean isPeriodic,
        boolean advanceToEndOfTime) throws CheckpointException {

    if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // make some eager pre-checks
    // 参数检查
    synchronized (lock) {
        preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
    }

    // check if all tasks that we need to trigger are running.
    // if not, abort the checkpoint
    // tasksToTrigger为需要触发checkpoint的task,本篇一开始已分析过
    // 确保所有的tasksToTrigger都在运行状态
    Execution[] executions = new Execution[tasksToTrigger.length];
    for (int i = 0; i < tasksToTrigger.length; i++) {
        Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
        if (ee == null) {
            LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                    tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                    job);
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        } else if (ee.getState() == ExecutionState.RUNNING) {
            executions[i] = ee;
        } else {
            LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                    tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                    job,
                    ExecutionState.RUNNING,
                    ee.getState());
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        }
    }

    // next, check if all tasks that need to acknowledge the checkpoint are running.
    // if not, abort the checkpoint
    // 检查所有需要接收checkpoint确认消息的task是否在运行
    Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

    for (ExecutionVertex ev : tasksToWaitFor) {
        Execution ee = ev.getCurrentExecutionAttempt();
        if (ee != null) {
            ackTasks.put(ee.getAttemptId(), ev);
        } else {
            LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                    ev.getTaskNameWithSubtaskIndex(),
                    job);
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        }
    }

    // we will actually trigger this checkpoint!
    // 开始checkpoint过程
    final CheckpointStorageLocation checkpointStorageLocation;
    final long checkpointID;

    try {
        // this must happen outside the coordinator-wide lock, because it communicates
        // with external services (in HA mode) and may block for a while.
        // 获取Checkpoint ID
        // 非HA模式使用StandaloneCheckpointIDCounter
        // HA模式使用ZooKeeperCheckpointIDCounter
        checkpointID = checkpointIdCounter.getAndIncrement();

        // 获取checkpoint的存储目录
        checkpointStorageLocation = props.isSavepoint() ?
                checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
                checkpointStorage.initializeLocationForCheckpoint(checkpointID);
    }
    catch (Throwable t) {
        int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
        LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
                job,
                numUnsuccessful,
                t);
        throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
    }

    // 创建一个进行中checkpoint
    final PendingCheckpoint checkpoint = new PendingCheckpoint(
        job,
        checkpointID,
        timestamp,
        ackTasks,
        masterHooks.keySet(),
        props,
        checkpointStorageLocation,
        executor);

    if (statsTracker != null) {
        // 获取状态report回调
        PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
            checkpointID,
            timestamp,
            props);

        // 设置状态回调
        checkpoint.setStatsCallback(callback);
    }

    // schedule the timer that will clean up the expired checkpoints
    // 过期checkpoint的清理逻辑
    final Runnable canceller = () -> {
        synchronized (lock) {
            // only do the work if the checkpoint is not discarded anyways
            // note that checkpoint completion discards the pending checkpoint object
            // 排除已废弃的checkpoint
            if (!checkpoint.isDiscarded()) {
                LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

                // 放弃正在进行中的checkpoint
                failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
                // pendingCheckpoints移除此checkpoint
                pendingCheckpoints.remove(checkpointID);
                // 增加最近checkpoint ID
                rememberRecentCheckpointId(checkpointID);
                // 触发排队等待的checkpoint操作
                // 下面的preCheckBeforeTriggeringCheckpoint方法会触发并发checkpoint检查
                // pendingCheckpoints大于maxConcurrentCheckpointAttempts的时候
                // triggerRequestQueued会设置为true
                // 此时会立刻触发一次checkpoint操作
                triggerQueuedRequests();
            }
        }
    };

    try {
        // re-acquire the coordinator-wide lock
        synchronized (lock) {
            // 参数检查
            preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());

            LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
            // 加入checkpoint到pendingCheckpoints集合中
            pendingCheckpoints.put(checkpointID, checkpoint);

            // 注册一个定时触发任务
            // 在checkpoint超时的时候执行checkpoint取消任务
            // 具体取消任务为上一段分析的canceller
            ScheduledFuture<?> cancellerHandle = timer.schedule(
                    canceller,
                    checkpointTimeout, TimeUnit.MILLISECONDS);

            // 如果task取消handle没有设置成功(此处只有一种可能,checkpoint已被废弃)
            if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                // checkpoint is already disposed!
                // 调用handle的取消checkpoint方法
                cancellerHandle.cancel(false);
            }

            // TODO, asynchronously snapshots master hook without waiting here
            // 循环调用master hook
            // MasterTriggerRestoreHook用于生成或回复checkpoint之前通知外部系统
            for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
                final MasterState masterState =
                    MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
                        .get(checkpointTimeout, TimeUnit.MILLISECONDS);
                checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
            }
            
            // 检查未确认的master状态集合是否为空
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
        }
        // end of lock scope

        // 获取checkpoint类型和存储位置配置
        final CheckpointOptions checkpointOptions = new CheckpointOptions(
                props.getCheckpointType(),
                checkpointStorageLocation.getLocationReference());

        // send the messages to the tasks that trigger their checkpoint
        // 触发所有tasksToTrigger的checkpoint创建过程
        for (Execution execution: executions) {
            if (props.isSynchronous()) {
                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
            } else {
                execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
            }
        }

        // 设置checkpoint未成功触发器个数为0
        numUnsuccessfulCheckpointsTriggers.set(0);
        return checkpoint.getCompletionFuture();
    }
    catch (Throwable t) {
        // guard the map against concurrent modifications
        synchronized (lock) {
            pendingCheckpoints.remove(checkpointID);
        }

        int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
        LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
                checkpointID, job, numUnsuccessful, t);

        if (!checkpoint.isDiscarded()) {
            failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
        }

        try {
            checkpointStorageLocation.disposeOnFailure();
        }
        catch (Throwable t2) {
            LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
        }

        // rethrow the CheckpointException directly.
        if (t instanceof CheckpointException) {
            throw (CheckpointException) t;
        }
        throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
    }
}

这段方法比较长,大部分的逻辑为校验task是否在运行,参数校验和调用masterHook。

下面我们分析下触发checkpoint的入口execution.triggerCheckpoint方法。

public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
    triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
}

triggerCheckpointHelper方法:

private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

    final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
    if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // 获取slot
    final LogicalSlot slot = assignedResource;

    if (slot != null) {
        // 获取TaskManagerGateway
        // TaskManagerGateway用户JobManager和TaskManager通信
        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

        // 触发checkpoint操作
        taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
    } else {
        LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
    }
}

这里taskManagerGatewayRpcTaskManagerGateway类型。我们查看RpcTaskManagerGatewaytriggerCheckpoint方法。代码如下:

@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
    taskExecutorGateway.triggerCheckpoint(
        executionAttemptID,
        checkpointId,
        timestamp,
        checkpointOptions,
        advanceToEndOfEventTime);
}

此处taskExecutorGatewayTaskExecutor类型。我们继续跟踪它的triggerCheckpoint方法:

@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) {
    log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

    final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
    if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // 获取slot中运行的Task
    final Task task = taskSlotTable.getTask(executionAttemptID);

    if (task != null) {
        // 触发task发送CheckpointBarrier
        task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

        return CompletableFuture.completedFuture(Acknowledge.get());
    } else {
        final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

        log.debug(message);
        return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }
}

TasktriggerCheckpointBarrier方法:

public void triggerCheckpointBarrier(
        final long checkpointID,
        final long checkpointTimestamp,
        final CheckpointOptions checkpointOptions,
        final boolean advanceToEndOfEventTime) {

    final AbstractInvokable invokable = this.invokable;
    final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

    if (executionState == ExecutionState.RUNNING && invokable != null) {
        try {
            invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
        }
        catch (RejectedExecutionException ex) {
            // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
            LOG.debug(
                "Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
                checkpointID, taskNameWithSubtask, executionId);
        }
        catch (Throwable t) {
            if (getExecutionState() == ExecutionState.RUNNING) {
                failExternally(new Exception(
                    "Error while triggering checkpoint " + checkpointID + " for " +
                        taskNameWithSubtask, t));
            } else {
                LOG.debug("Encountered error while triggering checkpoint {} for " +
                    "{} ({}) while being not in state running.", checkpointID,
                    taskNameWithSubtask, executionId, t);
            }
        }
    }
    else {
        LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

        // send back a message that we did not do the checkpoint
        checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    }
}

此方法的核心就一句invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime)。这里的invokable是具体的Task类型,可以是SourceStreamTaskStreamTask。其中SourceStreamTask的父类是StreamTask,调用的triggerCheckpointAsync也是父类中的方法。我们分析下StreamTasktriggerCheckpointAsync方法:

@Override
public Future<Boolean> triggerCheckpointAsync(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) {

    return mailboxProcessor.getMainMailboxExecutor().submit(
            () -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
            "checkpoint %s with %s",
        checkpointMetaData,
        checkpointOptions);
}

继续跟踪triggerCheckpoint方法。代码如下:

private boolean triggerCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        boolean advanceToEndOfEventTime) throws Exception {
    try {
        // No alignment if we inject a checkpoint
        // 初始化checkpoint监控
        CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
            .setBytesBufferedInAlignment(0L)
            .setAlignmentDurationNanos(0L);

        // 执行checkpoint
        boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime);
        if (!success) {
            // 如果没有成功,拒绝此checkpoint
            declineCheckpoint(checkpointMetaData.getCheckpointId());
        }
        return success;
    } catch (Exception e) {
        // propagate exceptions only if the task is still in "running" state
        if (isRunning) {
            Exception exception = new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
                " for operator " + getName() + '.', e);
            handleCheckpointException(exception);
            throw exception;
        } else {
            LOG.debug("Could not perform checkpoint {} for operator {} while the " +
                "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
            return false;
        }
    }
}

主要逻辑在performCheckpoint方法,源代码如下:

private boolean performCheckpoint(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetrics checkpointMetrics,
        boolean advanceToEndOfTime) throws Exception {

    LOG.debug("Starting checkpoint ({}) {} on task {}",
        checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

    final long checkpointId = checkpointMetaData.getCheckpointId();

    if (isRunning) {
        actionExecutor.runThrowing(() -> {

            if (checkpointOptions.getCheckpointType().isSynchronous()) {
                setSynchronousSavepointId(checkpointId);

                if (advanceToEndOfTime) {
                    advanceToEndOfEventTime();
                }
            }

            // All of the following steps happen as an atomic step from the perspective of barriers and
            // records/watermarks/timers/callbacks.
            // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
            // checkpoint alignments

            // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
            //           The pre-barrier work should be nothing or minimal in the common case.
            // 执行operatorChain的准备checkpoint屏障的逻辑
            operatorChain.prepareSnapshotPreBarrier(checkpointId);

            // Step (2): Send the checkpoint barrier downstream
            // 广播checkpoint屏障到下游
            // operatorChain保存了所有的数据输出
            // 遍历所有的输出,将checkpoint屏障发送给它们
            operatorChain.broadcastCheckpointBarrier(
                    checkpointId,
                    checkpointMetaData.getTimestamp(),
                    checkpointOptions);

            // Step (3): Take the state snapshot. This should be largely asynchronous, to not
            //           impact progress of the streaming topology
            // task自己执行checkpoint过程
            checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

        });

        return true;
    } else {
        actionExecutor.runThrowing(() -> {
            // we cannot perform our checkpoint - let the downstream operators know that they
            // should not wait for any input from this operator

            // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
            // yet be created
            // 如果任务没有在运行,发送取消checkpoint的标记到下游
            final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            recordWriter.broadcastEvent(message);
        });

        return false;
    }
}

OperatorChainbroadcastCheckpointBarrier方法负责将checkpoint屏障发送到chain的所有下游输出端。下游task接收到屏障后也是再次向后传递屏障,同时自己进行checkpoint,直到屏障传递到sink。详细专题分析请参见:Flink 源码之分布式快照

broadcastCheckpointBarrier代码如下所示:

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
    CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
    for (RecordWriterOutput<?> streamOutput : streamOutputs) {
        streamOutput.broadcastEvent(barrier);
    }
}

checkpoint的具体过程在checkpointState方法中。它的代码如下:

private void checkpointState(
        CheckpointMetaData checkpointMetaData,
        CheckpointOptions checkpointOptions,
        CheckpointMetrics checkpointMetrics) throws Exception {

    CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
            checkpointMetaData.getCheckpointId(),
            checkpointOptions.getTargetLocation());

    // 创建一个Checkpoint操作
    CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
        this,
        checkpointMetaData,
        checkpointOptions,
        storage,
        checkpointMetrics);

    // 执行checkpoint
    checkpointingOperation.executeCheckpointing();
}

我们继续分析CheckpointingOperationexecuteCheckpointing方法。代码如下:

public void executeCheckpointing() throws Exception {
    startSyncPartNano = System.nanoTime();

    try {
        // 对所有operator进行checkpoint
        for (StreamOperator<?> op : allOperators) {
            // 调用AbstractStreamOperator的snapshotState方法
            // snapshotState方法中是具体执行snapshot的逻辑
            // 逻辑比较复杂,此处暂不分析,待明确之后另开一篇讨论
            checkpointStreamOperator(op);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                checkpointMetaData.getCheckpointId(), owner.getName());
        }

        startAsyncPartNano = System.nanoTime();

        checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

        // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
        // 执行需要完成snapshot过程的逻辑
        AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
            owner,
            operatorSnapshotsInProgress,
            checkpointMetaData,
            checkpointMetrics,
            startAsyncPartNano);

        owner.cancelables.registerCloseable(asyncCheckpointRunnable);
        owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                    "Alignment duration: {} ms, snapshot duration {} ms",
                owner.getName(), checkpointMetaData.getCheckpointId(),
                checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                checkpointMetrics.getSyncDurationMillis());
        }
    } catch (Exception ex) {
        // Cleanup to release resources
        for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
            if (null != operatorSnapshotResult) {
                try {
                    operatorSnapshotResult.cancel();
                } catch (Exception e) {
                    LOG.warn("Could not properly cancel an operator snapshot result.", e);
                }
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                    "Alignment duration: {} ms, snapshot duration {} ms",
                owner.getName(), checkpointMetaData.getCheckpointId(),
                checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                checkpointMetrics.getSyncDurationMillis());
        }

        if (checkpointOptions.getCheckpointType().isSynchronous()) {
            // in the case of a synchronous checkpoint, we always rethrow the exception,
            // so that the task fails.
            // this is because the intention is always to stop the job after this checkpointing
            // operation, and without the failure, the task would go back to normal execution.
            throw ex;
        } else {
            owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
        }
    }
}

owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable)这一句调用了AsyncCheckpointRunnablerun方法。该方法封装了完成执行snapshot过程的逻辑。代码如下:

public void run() {
    FileSystemSafetyNet.initializeSafetyNetForThread();
    try {

        TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
            new TaskStateSnapshot(operatorSnapshotsInProgress.size());

        TaskStateSnapshot localTaskOperatorSubtaskStates =
            new TaskStateSnapshot(operatorSnapshotsInProgress.size());

        for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {

            OperatorID operatorID = entry.getKey();
            // 获取每个operator正在运行的快照任务
            OperatorSnapshotFutures snapshotInProgress = entry.getValue();

            // finalize the async part of all by executing all snapshot runnables
            // 创建finalizedSnapshots,创建的时候
            // 执行快照过程中的异步方法
            OperatorSnapshotFinalizer finalizedSnapshots =
                new OperatorSnapshotFinalizer(snapshotInProgress);

            // operator快照状态的一个副本,用于向JobManager汇报的快照状态
            jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                operatorID,
                finalizedSnapshots.getJobManagerOwnedState());

            // operator快照状态的一个副本,用于Task本地的故障快速恢复
            localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                operatorID,
                finalizedSnapshots.getTaskLocalState());
        }

        final long asyncEndNanos = System.nanoTime();
        final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;

        // 为监控系统提供快照操作的耗时
        checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

        // 将快照状态从运行中设置为已完成
        if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
            CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {

            // 汇报快照过程已经完成,该方法稍后分析
            reportCompletedSnapshotStates(
                jobManagerTaskOperatorSubtaskStates,
                localTaskOperatorSubtaskStates,
                asyncDurationMillis);

        } else {
            LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                owner.getName(),
                checkpointMetaData.getCheckpointId());
        }
    } catch (Exception e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - asynchronous part of checkpoint {} could not be completed.",
                owner.getName(),
                checkpointMetaData.getCheckpointId(),
                e);
        }
        handleExecutionException(e);
    } finally {
        owner.cancelables.unregisterCloseable(this);
        FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
    }
}

Snapshot确认消息发送

整个过程的调用链

  • StreamTask.reportCompletedSnapshotStates
  • TaskStateManagerImpl.reportTaskStateSnapshots
  • RpcCheckpointResponder.acknowledgeCheckpoint
  • JobMaster.acknowledgeCheckpoint
  • SchedulerBase.acknowledgeCheckpoint
  • CheckpointCoordinator.receiveAcknowledgeMessage
  • CheckpointCoordinator.completePendingCheckpoint
  • PendingCheckpoint.finalizeCheckpoint

AsyncCheckpointRunnable.reportCompletedSnapshotStates

我们从AsyncCheckpointRunnablereportCompletedSnapshotStates方法开始分析。reportCompletedSnapshotStates负责汇报snapshot过程成功完成,代码如下:

private void reportCompletedSnapshotStates(
    TaskStateSnapshot acknowledgedTaskStateSnapshot,
    TaskStateSnapshot localTaskStateSnapshot,
    long asyncDurationMillis) {

    // 获取任务状态管理器
    TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();

    boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
    boolean hasLocalState = localTaskStateSnapshot.hasState();

    Preconditions.checkState(hasAckState || !hasLocalState,
        "Found cached state but no corresponding primary state is reported to the job " +
            "manager. This indicates a problem.");

    // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
    // to stateless tasks on restore. This enables simple job modifications that only concern
    // stateless without the need to assign them uids to match their (always empty) states.
    // 上报任务快照状态
    taskStateManager.reportTaskStateSnapshots(
        checkpointMetaData,
        checkpointMetrics,
        hasAckState ? acknowledgedTaskStateSnapshot : null,
        hasLocalState ? localTaskStateSnapshot : null);

    LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
        owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis);

    LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
        owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}

TaskStateManagerImpl.reportTaskStateSnapshots

上一个代码片段中taskStateManager的实现类为TaskStateManagerImpl。接下来分析下reportTaskStateSnapshots方法。

@Override
public void reportTaskStateSnapshots(
    @Nonnull CheckpointMetaData checkpointMetaData,
    @Nonnull CheckpointMetrics checkpointMetrics,
    @Nullable TaskStateSnapshot acknowledgedState,
    @Nullable TaskStateSnapshot localState) {

    long checkpointId = checkpointMetaData.getCheckpointId();

    // 保存本地快照状态
    localStateStore.storeLocalState(checkpointId, localState);

    // 发送快照成功确认消息
    checkpointResponder.acknowledgeCheckpoint(
        jobId,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        acknowledgedState);
}

RpcCheckpointResponder.acknowledgeCheckpoint

上面checkpointResponder类型为RpcCheckpointResponder。我们查看下它的acknowledgeCheckpoint方法:

@Override
public void acknowledgeCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        TaskStateSnapshot subtaskState) {

    checkpointCoordinatorGateway.acknowledgeCheckpoint(
        jobID,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        subtaskState);
}

该方法调用了checkpointCoordinatorGateway的同名方法。这里checkpointCoordinatorGatewayJobMaster对象。
RpcCheckpointResponder在JobManager选举成功的时候,建立和JobManager的联系的时候创建。

JobMaster.acknowledgeCheckpoint

@Override
public void acknowledgeCheckpoint(
        final JobID jobID,
        final ExecutionAttemptID executionAttemptID,
        final long checkpointId,
        final CheckpointMetrics checkpointMetrics,
        final TaskStateSnapshot checkpointState) {

    schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}

SchedulerBase.acknowledgeCheckpoint

我们查看该方法的源代码。经历过层层调用之后我们在这里找到了和CheckpointCoordinator的交互过程。代码如下所示。

@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
    // 确保在主线程中运行
    mainThreadExecutor.assertRunningInMainThread();

    // 获取checkpointCoordinator 对象
    final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    // 创建一个checkpoint确认消息
    final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
        jobID,
        executionAttemptID,
        checkpointId,
        checkpointMetrics,
        checkpointState);

    // 获取taskManager的地址
    final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

    if (checkpointCoordinator != null) {
        ioExecutor.execute(() -> {
            try {
                // 调用checkpointCoordinator的接收确认消息方法
                checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
            } catch (Throwable t) {
                log.warn("Error while processing checkpoint acknowledgement message", t);
            }
        });
    } else {
        String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
        if (executionGraph.getState() == JobStatus.RUNNING) {
            log.error(errorMessage, jobGraph.getJobID());
        } else {
            log.debug(errorMessage, jobGraph.getJobID());
        }
    }
}

CheckpointCoordinator.receiveAcknowledgeMessage

现在我们回到CheckpointCoordinator,分析下receiveAcknowledgeMessage方法。

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
    if (shutdown || message == null) {
        return false;
    }

    // 检查message的JobID和实际运行的JobID是否相同
    if (!job.equals(message.getJob())) {
        LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
        return false;
    }

    // 获取checkpoint ID
    final long checkpointId = message.getCheckpointId();

    synchronized (lock) {
        // we need to check inside the lock for being shutdown as well, otherwise we
        // get races and invalid error log messages
        // 确保没有shutdown
        if (shutdown) {
            return false;
        }

        // 获取正在进行的checkpoint操作
        final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

        if (checkpoint != null && !checkpoint.isDiscarded()) {

            switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                case SUCCESS:
                    LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                    // 如果确认成功,并且接收到了所有operator快照成功的确认
                    // 调用完成此PendingCheckpoint的逻辑
                    if (checkpoint.areTasksFullyAcknowledged()) {
                        completePendingCheckpoint(checkpoint);
                    }
                    break;
                case DUPLICATE:
                    LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
                        message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                    break;
                case UNKNOWN:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                            "because the task's execution attempt id was unknown. Discarding " +
                            "the state handle to avoid lingering state.", message.getCheckpointId(),
                        message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                    discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                    break;
                case DISCARDED:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                        "because the pending checkpoint had been discarded. Discarding the " +
                            "state handle tp avoid lingering state.",
                        message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                    discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
            }

            return true;
        }
        else if (checkpoint != null) {
            // this should not happen
            throw new IllegalStateException(
                    "Received message for discarded but non-removed checkpoint " + checkpointId);
        }
        else {
            boolean wasPendingCheckpoint;

            // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
            if (recentPendingCheckpoints.contains(checkpointId)) {
                wasPendingCheckpoint = true;
                LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
                    "{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
            }
            else {
                LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
                    checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                wasPendingCheckpoint = false;
            }

            // try to discard the state so that we don't have lingering state lying around
            discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

            return wasPendingCheckpoint;
        }
    }
}

在分析完成checkpoint方法(completePendingCheckpoint)之前,我们先分析下PendingCheckpoint的确认过程。
PendingCheckpointacknowledgeTask方法如下所示:

public TaskAcknowledgeResult acknowledgeTask(
        ExecutionAttemptID executionAttemptId,
        TaskStateSnapshot operatorSubtaskStates,
        CheckpointMetrics metrics) {

    synchronized (lock) {
        if (discarded) {
            // 如果checkpoint已废弃,返回DISCARDED
            return TaskAcknowledgeResult.DISCARDED;
        }

        // 从notYetAcknowledgedTasks集合中移除已确认的task
        // notYetAcknowledgedTasks保存了所有未确认的task
        final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);

        if (vertex == null) {
            // 如果notYetAcknowledgedTasks没有该task
            // 但是它在acknowledgedTasks(已确认的task)集合中
            // 返回重复确认DUPLICATE
            if (acknowledgedTasks.contains(executionAttemptId)) {
                return TaskAcknowledgeResult.DUPLICATE;
            } else {
                // 其他情况返回未知
                return TaskAcknowledgeResult.UNKNOWN;
            }
        } else {
            // 添加到已确认task集合中
            acknowledgedTasks.add(executionAttemptId);
        }

        List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
        int subtaskIndex = vertex.getParallelSubtaskIndex();
        long ackTimestamp = System.currentTimeMillis();

        long stateSize = 0L;

        // 这段代码为保存各个operator的snapshot状态
        if (operatorSubtaskStates != null) {
            for (OperatorID operatorID : operatorIDs) {

                OperatorSubtaskState operatorSubtaskState =
                    operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);

                // if no real operatorSubtaskState was reported, we insert an empty state
                if (operatorSubtaskState == null) {
                    operatorSubtaskState = new OperatorSubtaskState();
                }

                OperatorState operatorState = operatorStates.get(operatorID);

                if (operatorState == null) {
                    operatorState = new OperatorState(
                        operatorID,
                        vertex.getTotalNumberOfParallelSubtasks(),
                        vertex.getMaxParallelism());
                    operatorStates.put(operatorID, operatorState);
                }

                operatorState.putState(subtaskIndex, operatorSubtaskState);
                stateSize += operatorSubtaskState.getStateSize();
            }
        }

        ++numAcknowledgedTasks;

        // publish the checkpoint statistics
        // to prevent null-pointers from concurrent modification, copy reference onto stack
        // 这段代码为汇报所有子任务checkpoint状态
        final PendingCheckpointStats statsCallback = this.statsCallback;
        if (statsCallback != null) {
            // Do this in millis because the web frontend works with them
            long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;

            SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
                subtaskIndex,
                ackTimestamp,
                stateSize,
                metrics.getSyncDurationMillis(),
                metrics.getAsyncDurationMillis(),
                metrics.getBytesBufferedInAlignment(),
                alignmentDurationMillis);

            statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
        }

        // 最后返回执行成功
        return TaskAcknowledgeResult.SUCCESS;
    }
}

我们再来看看什么时候会调用completePendingCheckpoint方法完成checkpoint。checkpoint.areTasksFullyAcknowledged()方法返回true的时候会调用。这段代码的逻辑如下:

public boolean areTasksFullyAcknowledged() {
    return notYetAcknowledgedTasks.isEmpty() && !discarded;
}

从上面代码可知,任务没有被废弃,并且notYetAcknowledgedTasks为空(所有的task都已被确认)的时候才会调用completePendingCheckpoint方法。

CheckpointCoordinator.completePendingCheckpoint

private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
    final long checkpointId = pendingCheckpoint.getCheckpointId();
    final CompletedCheckpoint completedCheckpoint;

    // As a first step to complete the checkpoint, we register its state with the registry
    // 注册所有operator的state到sharedStateRegistry
    Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
    sharedStateRegistry.registerAll(operatorStates.values());

    try {
        try {
            // 调用完成pendingCheckpoint的逻辑,具体内容稍后分析
            completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
        // 重置失败checkpoint的计数
            failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
        }
        catch (Exception e1) {
            // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
            if (!pendingCheckpoint.isDiscarded()) {
                failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
            }

            throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
                CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
        }

        // the pending checkpoint must be discarded after the finalization
        // 检查状态,调用finalizeCheckpoint方法后pendingCheckpoint必须为discarded状态
        Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

        try {
            // 存储已完成的checkpoint
            completedCheckpointStore.addCheckpoint(completedCheckpoint);
        } catch (Exception exception) {
            // we failed to store the completed checkpoint. Let's clean up
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        completedCheckpoint.discardOnFailedStoring();
                    } catch (Throwable t) {
                        LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
                    }
                }
            });

            throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
                CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
        }
    } finally {
        // 从正在进行中checkpoint集合中移除此checkpoint
        pendingCheckpoints.remove(checkpointId);

        triggerQueuedRequests();
    }

    // 保存最近的checkpoint ID
    rememberRecentCheckpointId(checkpointId);

    // drop those pending checkpoints that are at prior to the completed one
    // 挂掉所有id小于checkpointId的checkpoint操作(被挂掉的checkpoint不能是强制的)
    dropSubsumedCheckpoints(checkpointId);

    // record the time when this was completed, to calculate
    // the 'min delay between checkpoints'
    // 保存此次checkpoint完成时间
    lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();

    LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
        completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());

    if (LOG.isDebugEnabled()) {
        StringBuilder builder = new StringBuilder();
        builder.append("Checkpoint state: ");
        for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
            builder.append(state);
            builder.append(", ");
        }
        // Remove last two chars ", "
        builder.setLength(builder.length() - 2);

        LOG.debug(builder.toString());
    }

    // send the "notify complete" call to all vertices
    final long timestamp = completedCheckpoint.getTimestamp();

    for (ExecutionVertex ev : tasksToCommitTo) {
        Execution ee = ev.getCurrentExecutionAttempt();
        if (ee != null) {
            // 向各个节点发送checkpoint完成的消息,此方法很重要
            // 逐级发送通知到所有的task,StreamTask,再到所有的operator,userFunction
            // 最后如果userFunction实现了CheckpointListener接口
            // 逐个调用这些userFunction的notifyCheckpointComplete方法
            ee.notifyCheckpointComplete(checkpointId, timestamp);
        }
    }
}

注:通知各个operator checkpoint成功的调用链如下:

  • Execution.notifyCheckpointComplete
  • RpcTaskManagerGateway.notifyCheckpointComplete
  • TaskExecutor.confirmCheckpoint
  • Task.notifyCheckpointComplete
  • StreamTask.notifyCheckpointCompleteAsync
  • StreamTask.notifyCheckpointComplete
  • AbstractUdfStreamOperator.notifyCheckpointComplete
  • CheckpointListener.notifyCheckpointComplete

PendingCheckpoint.finalizeCheckpoint

最后分析一下PendingCheckpoint如何完成,最终生成CompletedCheckpoint对象的过程。代码如下所示:

public CompletedCheckpoint finalizeCheckpoint() throws IOException {

    synchronized (lock) {
        // 保证所有的masterState都确认
        checkState(areMasterStatesFullyAcknowledged(),
            "Pending checkpoint has not been fully acknowledged by master states yet.");
        // 保证所有的task都确认
        checkState(areTasksFullyAcknowledged(),
            "Pending checkpoint has not been fully acknowledged by tasks yet.");

        // make sure we fulfill the promise with an exception if something fails
        try {
            // write out the metadata
            // 创建一个savepoint对象
            final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
            final CompletedCheckpointStorageLocation finalizedLocation;

            // 保存checkpoint数据到文件系统
            try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
                Checkpoints.storeCheckpointMetadata(savepoint, out);
                finalizedLocation = out.closeAndFinalizeCheckpoint();
            }

            CompletedCheckpoint completed = new CompletedCheckpoint(
                    jobId,
                    checkpointId,
                    checkpointTimestamp,
                    System.currentTimeMillis(),
                    operatorStates,
                    masterStates,
                    props,
                    finalizedLocation);

            // completableFuture任务完成,返回completedCheckpoint
            onCompletionPromise.complete(completed);

            // to prevent null-pointers from concurrent modification, copy reference onto stack
            // 设置completedCheckpoint的discardCallback
            PendingCheckpointStats statsCallback = this.statsCallback;
            if (statsCallback != null) {
                // Finalize the statsCallback and give the completed checkpoint a
                // callback for discards.
                CompletedCheckpointStats.DiscardCallback discardCallback =
                        statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
                completed.setDiscardCallback(discardCallback);
            }

            // mark this pending checkpoint as disposed, but do NOT drop the state
            // 标记自己为disposed状态
            dispose(false);

            return completed;
        }
        catch (Throwable t) {
            onCompletionPromise.completeExceptionally(t);
            ExceptionUtils.rethrowIOException(t);
            return null; // silence the compiler
        }
    }
}

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348

推荐阅读更多精彩内容