Flink 源码笔记 --- checkpoint
设置触发checkpoint的节点
ExecutionGraph该对象运行在 SchedulerBase中, SchedulerBase运行在JobMaster中
ExecutionGraph的enableCheckpointing方法 初始化了checkpointCoordinator(检查点协调器)对象, 改对象运行在JobManager中
checkpointCoordinator 负责分布式系统下checkpoint过程 主要职责:
- 定时触发checkpoint,命令数据源发送 checkpoint barrier
- 接受各个operator的某个checkpoint完成确认消息
- 对于某个checkpoint,当接受到所有的operator的确认消息之时,发送消息通知各个operator,checkpoint已完成
- 保存已完成和正在进行中的checkpoint的相关信息
在构建checkpointCoordinator时,传入一个变量 tasksToTrigger ,是需要触发checkpoint的节点,该变量在StreamingJobGraphGenerator的configureCheckpointing方法中创建
在创建JobMaster的时候, 对schedulerNG进行了初始化
this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
this.schedulerNG = createScheduler(jobManagerJobMetricGroup);
createScheduler该方法会调用 schedulerNGFactory的createInstance,获取调度器
DefaultSchedulerFactory和LegacySchedulerFactory分别创建DefaultScheduler和LegacyScheduler实例,这两者都继承SchedulerBase,实例化时都会调用SchedulerBase的构造方法,其中会构造ExecutionGraph,然后通过startScheduling进行调度
private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
return schedulerNGFactory.createInstance(
log,
jobGraph,
backPressureStatsTracker,
scheduledExecutorService,
jobMasterConfiguration.getConfiguration(),
scheduler,
scheduledExecutorService,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker);
}
在DefaultScheduler和 LegacyScheduler 类中, 创建时候会初始化父类 SchedulerBase
DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final ScheduledExecutorService futureExecutor,
final ScheduledExecutor delayExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
final SchedulingStrategyFactory schedulingStrategyFactory,
final FailoverStrategy.Factory failoverStrategyFactory,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
final ExecutionVertexOperations executionVertexOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) throws Exception {
super(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
new ThrowingSlotProvider(), // this is not used any more in the new scheduler
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
blobWriter,
jobManagerJobMetricGroup,
Time.seconds(0), // this is not used any more in the new scheduler
shuffleMaster,
partitionTracker,
executionVertexVersioner,
false);
...
继续看checkpoints过程
JobMaster触发savepoint的时候会启动checkpoint过程。现在查看一下JobMaster
的triggerSavepoint
方法
//触发保存点
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory,
final boolean cancelJob,
final Time timeout) {
//调用的 SchedulerBase实现类的方法
return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}
在该方法中,调用了schedulerNG的triggerSavepoint同名方法,这里调用的是SchedulerNG接口的实现类 SchedulerBase中的方法,该类同时是一个抽象类
public abstract class SchedulerBase implements SchedulerNG
现在看一下schedulerNG.triggerSavepoint方法 ,该方法中主要获取checkpointCoordinator(检查点协调器),然后调用checkpointCoordinator的triggerSavepoint方法
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
//确保运行在主线程
mainThreadExecutor.assertRunningInMainThread();
//从executionGraph中获取CheckpointCoordinator(检查点协调器)
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
// 确保配置了savepoint默认存储目录,或者方法中传入了存储目录
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
}
// 如果是取消作业,停止checkpoint协调器
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
// 先触发一次savepoint操作(实际上触发的是checkpoint)
// 接下来返回checkpoint操作保存的文件路径
// 最后执行:
// 1.如果需要取消作业,并且之前步骤抛出了异常,则再次启动checkpoint协调器,抛出异常
// 2.如果需要取消作业,之前步骤没有抛出异常,取消任务执行
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel();
}
return path;
}, mainThreadExecutor);
}
进入checkpointCoordinator的triggerSavepoint方法中,在该方法中,首先说去checkpointProperties(检查点配置),而后调用triggerSavepointInternal方法并返回
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
final long timestamp,
@Nullable final String targetLocation) {
final CheckpointProperties properties = CheckpointProperties.forSavepoint();
return triggerSavepointInternal(timestamp, properties, false, targetLocation);
}
跟踪 triggerSavepointInternal方法进去,该方法中主要逻辑都在triggerCheckpoint中
private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
final long timestamp,
final CheckpointProperties checkpointProperties,
final boolean advanceToEndOfEventTime,
@Nullable final String targetLocation) {
checkNotNull(checkpointProperties);
// TODO, call triggerCheckpoint directly after removing timer thread
// for now, execute the trigger in timer thread to avoid competition
final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
timer.execute(() -> triggerCheckpoint(
timestamp,
checkpointProperties,
targetLocation,
false,
advanceToEndOfEventTime)
.whenComplete((completedCheckpoint, throwable) -> {
if (throwable == null) {
resultFuture.complete(completedCheckpoint);
} else {
resultFuture.completeExceptionally(throwable);
}
}));
return resultFuture;
}
继续追踪triggerCheckpoint方法,
该方法中的主要逻辑 :
1.首先进行触发Checkpoint之前的预检查,判断是否满足条件;
2.然后获取一个CheckpointID,创建PendingCheckpoint实例;
3.之后重新检查触发条件是否满足要求,防止产生竞态条件;
4.最后将PendingCheckpoint实例checkpoint加入到pendingCheckpoints中,并向tasks发送消息触发它们的检查点。
@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
long timestamp, //触发检查点时间戳
CheckpointProperties props, //检查点配置
@Nullable String externalSavepointLocation, //外部保存点位置
boolean isPeriodic, //是否时周期性的
boolean advanceToEndOfTime) { //提前到结束时间
// Sanity check 如果检查点是存储在外部系统中且targetDirectory为空,报错
// 不能为同步 设置了保存点
if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
return FutureUtils.completedExceptionally(new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX."));
}
//CompletableFuture 该对象 是1.8新特性,表示一个任务使用
//CompletedCheckpoint 完成的检查点 包含 检查点id 完成时间等元数据
final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
new CompletableFuture<>();
//make some eager pre-checks 一些checkpoint之前的预检查
synchronized (lock) {
//如果触发请求, 或者在队列中存在 不能触发检查点
if (isTriggering || !triggerRequestQueue.isEmpty()) {
// we can't trigger checkpoint directly if there is a trigger request being processed
// or queued
//创建一个新的检查点请求加入队列
triggerRequestQueue.add(new CheckpointTriggerRequest(
timestamp,
props,
externalSavepointLocation,
isPeriodic,
advanceToEndOfTime,
onCompletionPromise));
return onCompletionPromise;
}
}
//触发检查点
startTriggeringCheckpoint(
timestamp,
props,
externalSavepointLocation,
isPeriodic,
advanceToEndOfTime,
onCompletionPromise);
return onCompletionPromise;
}
进入startTriggeringCheckpoint方法
private void startTriggeringCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
boolean advanceToEndOfTime,
CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
try {
// make some eager pre-checks
// 触发之前 对checkpoint 进行预检查
synchronized (lock) { // 是否为周期的 是否是强制执行检查点
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
//检查我们需要触发的所有任务是否都在运行。如果不是,则放弃检查点
final Execution[] executions = getTriggerExecutions();
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
// we will actually trigger this checkpoint!
Preconditions.checkState(!isTriggering);
isTriggering = true;
final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
initializeCheckpoint(props, externalSavepointLocation) //初始化 checkpoints
//该方法 当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。
// thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行 todo 属于回调函数
.thenApplyAsync(
(checkpointIdAndStorageLocation) -> createPendingCheckpoint( //创建挂起的检查点
timestamp,
props,
ackTasks,
isPeriodic,
checkpointIdAndStorageLocation.checkpointId,
checkpointIdAndStorageLocation.checkpointStorageLocation,
onCompletionPromise),
timer);
//主状态完成
final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
//是结合两个任务的返回值进行转化后再返回
.thenCompose(this::snapshotMasterState);
//协调检查点完成
final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
// 异步的
.thenComposeAsync((pendingCheckpoint) ->
//所有与接受 操作符协调器 的检查点相关的逻辑
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
//上面完成 或 抛出异常 都会执行该方法
.whenCompleteAsync(
(ignored, throwable) -> {
//该方法返回一个未来的执行的结果, 如果异常执行完或未完成 未null
//创建一个进行中的checkpoint
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
//不是丢弃的 , 没有意外,
// no exception, no discarding, everything is OK
//快照任务状态
snapshotTaskState(
timestamp,
checkpoint.getCheckpointId(),
checkpoint.getCheckpointStorageLocation(),
props,
executions,
advanceToEndOfTime);
//触发请求成功必须调用它
onTriggerSuccess();
} else {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(onCompletionPromise, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
}
},
timer);
} catch (Throwable throwable) {
onTriggerFailure(onCompletionPromise, throwable);
}
}
由于方法比较长,查看该方法调用的重点方法中具体实现
preCheckBeforeTriggeringCheckpoint方法,主要对触发检查点前进行预检查
private void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {
//检查 检查点全局状态 -- 调度器和协调器的检查
preCheckGlobalState(isPeriodic);
//如果不是强制执行的检查点
if (!forceCheckpoint) {
//检查 是否并行的检查点数量是否查过并发
checkConcurrentCheckpoints();
//检查 检查点之间的最小间隔已经通过
checkMinPauseBetweenCheckpoints();
}
}
getTriggerExecutions方法,查我们需要触发的所有任务是否都在运行。如果不是,则放弃检查点并抛出异常.
private Execution[] getTriggerExecutions() throws CheckpointException {
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info(
"Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
//todo tasksToTrigger 该单位在StreamingJobGraphGenerator中初始化,
// 该变量的含义为需要触发checkpoint的节点
//todo 如果任务运行状态处于 running 则将该任务添加到 执行序列中
executions[i] = ee;
} else {
LOG.info(
"Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
return executions;
}
getAckTasks方法同上方法,检查任务是否运行,如果不是放弃检查点
private Map<ExecutionAttemptID, ExecutionVertex> getAckTasks() throws CheckpointException {
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info(
"Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
return ackTasks;
}
createPendingCheckpoint ,在该方法中,在创建PendingCheckpoint对象之前,进行了预检查, PendingCheckpoint对象的作用, 挂起的检查点是已经启动的检查点,但是还没有被所有需要确认它的任务确认。一旦所有任务都确认了它,它就变成了一个{@link CompletedCheckpoint}。 创建PendingCheckpoint对象后,设置跟踪此PendingCheckpoint的回调 在线程锁中,将心创建的PendingCheckpoint添加到pendingCheckpoints集合中,该集合存储着待处理的PendingCheckpoint,然后设置一个定时任务,在给定的延迟之后执行给定的命令,最后将 PendingCheckpoint对象返回
private PendingCheckpoint createPendingCheckpoint(
long timestamp,
CheckpointProperties props,
Map<ExecutionAttemptID, ExecutionVertex> ackTasks,
boolean isPeriodic,
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
synchronized (lock) {
// todo 创建 pendingCheckpoint 前 先进行检查
try {
// since we haven't created the PendingCheckpoint yet, we need to check the
// global state here.
preCheckGlobalState(isPeriodic);
} catch (Throwable t) {
throw new CompletionException(t);
}
}
//创建 PendingCheckpoint
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
masterHooks.keySet(),
props,
checkpointStorageLocation,
executor,
onCompletionPromise);
if (statsTracker != null) {
//创建一个 pending checkpoint 的跟踪器
PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
checkpointID,
timestamp,
props);
// 设置跟踪此pending checkpoint的回调
checkpoint.setStatsCallback(callback);
}
synchronized (lock) {
//将挂起的checkpoints 加入集合
pendingCheckpoints.put(checkpointID, checkpoint);
//在给定的延迟之后执行给定的命令
ScheduledFuture<?> cancellerHandle = timer.schedule(
new CheckpointCanceller(checkpoint), // todo 要执行的任务
checkpointTimeout, // todo 从现在开始推迟执行的时间
TimeUnit.MILLISECONDS); // todo 延迟参数的时间单位
if (!checkpoint.setCancellerHandle(cancellerHandle)) { //设置句柄
// checkpoint is already disposed!
cancellerHandle.cancel(false);
}
}
//在Flink wen监控页面中 可以看到
//2020-05-14 14:44:08,203 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
// - Triggering checkpoint 1 @ 1589438648198 for job b94ef58349d8befba6412e3d85478bf5.
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
return checkpoint;
}
最主要的方法 snapshotTaskState () 在该方法中,具体逻辑都在这里 现在看一下这个方法的实现, 首先创建CheckpointOptions对象,该对象执行检查点的选项 获取checkpoint类型和存储位置配置
然后开始触发所有tasksToTrigger的checkpoint创建过程,在触发的时候会根据是否时异步的调用不同的方法,但是两个方法最终都会调用 Execution的triggerCheckpointHelper方法
private void snapshotTaskState(
long timestamp,
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CheckpointProperties props,
Execution[] executions,
boolean advanceToEndOfTime) {
// 该对象 执行检查点的选项 获取checkpoint类型和存储位置配置
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(), //获取 类型 是检查点 还是 保存点
checkpointStorageLocation.getLocationReference(), // 获取存储位置的引用
isExactlyOnceMode,
isUnalignedCheckpoint);
// send the messages to the tasks that trigger their checkpoint
// todo 触发所有tasksToTrigger的checkpoint创建过程
for (Execution execution : executions) {
// todo 触发checkpoint的入口
if (props.isSynchronous()) { //同步的
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else { //非同步的
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
}
由于我们平时开发都是使用异步的检查点,所以进入triggerSynchronousSavepoint方法, 该方法调用triggerCheckpointHelper方法
public void triggerSynchronousSavepoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
}
进入triggerCheckpointHelper方法,在该方法中,会先获取slot表示逻辑槽表示任务管理器上的资源,可以将单个任务部署到该资源中
获取TaskManagerGateway对象,这里的对象为RpcTaskManagerGateway类型,RpcTaskManagerGateway是TaskManagerGateway的实现类
然后调用taskManagerGateway的triggerCheckpoint方法,进行触发checkpoint
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
//只有同步保存点才允许将水印提升到最大值
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
//获取 slot
final LogicalSlot slot = assignedResource;
if (slot != null) {
//获取task manager gateway
//todo 这里taskManagerGateway是RpcTaskManagerGateway类型
//返回 与TaskManager对话的网关
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
//todo 触发一个保存点
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
// 日志输出 执行没有分配槽。这表示执行不再运行
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}
进入taskManagerGateway的triggerCheckpoint方法,该方法调用了taskExecutorGateway的同名方法, taskExecutorGateway是TaskExecutor类型
TaskExecutor是TaskExecutorGateway接口的实现类
@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
checkpointId,
timestamp,
checkpointOptions,
advanceToEndOfEventTime);
}
下一篇
Flink-1.10 源码笔记 checkpint - 2
如有错误,欢迎指正!