Flink的Checkpoint机制是Flink容错能力的基本保证,能够对流处理运行时的状态进行保存,当故障发生时,能够备份的状态中还原。例如,当Flink读取kafka时,将消费的kafka offset保存下来,如果任务失败,可以从上次消费的offset之后重新消费。
Flink的checkpoint从以下几方面着手理解。
Barrier
Barrier是一个轻量级的数据被按一定的规则(调度)插入到原始数据流中,这个数据不会影响原有数据处理的性能,不会改变原始数据的顺序。Barrier将数据分割成一段一段的,有点类似于Spark Streaming的micro batch中的批次数据。
Barrier随着数据一起在各Task中流动,当Operator收到一个barrier时,会认为此barrier之前的所有数据应该已经得到了处理,这时候就会触发checkpoint。
如果一个Operator有多个并发的输入流,那么当它收到一个checkpoint的barrier时,需要等待其他所有的该checkpoint对应的额barrier到达,再进行处理,这个barrier对齐的步骤如下。
只要有operator从上游接收到一条barrier n,此时,该operator就不能处理这条流barrier以后的数据,直到该operator收到其他所有上游的barrier n。
此时上报barrier n的流暂时不做任何处理。从这些流里读到的数据也不被处理,而是被放置到input buffer中缓存。
直到最后一个上游的barrier n到达,operator会发送barrier n给下游。
-
之后,operator恢复从所有的上游中处理数据,在上游流数据处理之前先将input buffer中的数据处理。
整个过程如下图所示。
Barrier的产生
Flink的checkpoint是由JobMaster发起的,以一定的周期触发Source Task产生barrier。
JobMastert周期性触发checkpoint。
在CheckpointCoordinator类中startCheckpointScheduler() 方法
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
//以特定的周期 baseInterval 触发checkpoint
periodicScheduling = true;
long initialDelay = ThreadLocalRandom.current().nextLong(
minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
}
}
顺着ScheduledTrigger一直进入到jobMaster触发checkpoint的方法triggerCheckpoint. 在经过一系列的设置之后,该方法会生成一个唯一的checkpoint ID,并创建pending的checkpoint,调用rpc方法execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 触发Task侧生成barrier,进行checkpoint.
checkpointID = checkpointIdCounter.getAndIncrement();
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
props,
checkpointStorageLocation,
executor);
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
//调用TaskManager RPC taskManagerGateway.triggerCheckpoint
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
}
}
Source收到指令后生成barrier
TaskManager 接收到触发checkpoint的RPC后,在Source Task中触发生成checkpoint barrier, 在triggerCheckpointBarrier中会创建另一个线程专门做生成barrier的事情。
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
}
在Source Task中真正执行的方法是StreamTask中的performCheckpoint,在此方法中,会进行两件事:首先生成携带checkpoint ID的barrier,并将此barrier发送到所有的下游。然后处理本Task的状态保存。这样,在整个流处理中就有了barrier传递。
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
Barrier传递
上面讲到对于Source Task,会根据JobMaster的指令周期性的在原始数据中插入barrier,并将barrier传递到下游Operator。
对于非Source Task,在处理数据中,并不是周期性触发checkpoint,而是当遇到Barrier数据时,触发一次checkpoint。
具体到代码中,是由BarrierBuffer中的getNextNonBlocked触发。
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
Optional<BufferOrEvent> next;
if (currentBuffered == null) {
next = inputGate.getNextBufferOrEvent();
}
else {
next = Optional.ofNullable(currentBuffered.getNext());
if (!next.isPresent()) {
completeBufferedSequence();
return getNextNonBlocked();
}
}
if (!next.isPresent()) {
if (!endOfStream) {
// end of input stream. stream continues with the buffered data
endOfStream = true;
releaseBlocksAndResetBarriers();
return getNextNonBlocked();
}
else {
// final end of both input and buffered data
return null;
}
}
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
bufferBlocker.add(bufferOrEvent);
checkSizeLimit();
}
else if (bufferOrEvent.isBuffer()) {
return bufferOrEvent;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
if (!endOfStream) {
// process barriers only if there is a chance of the checkpoint completing
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
processEndOfPartition();
}
return bufferOrEvent;
}
}
}
如果当前流尚未结束,则在方法processBarrier中处理该barrier,processBarrier会根据该Task是否有多个输入源判断是否需要对齐barrier。如果可以进行barrier了,则会调用notifyCheckpoint触发checkpoint,该方法会走到triggerCheckpointOnBarrier,后续过程和Source Task一致。
Operator checkpoint
上述讲到当Task执行checkpoint时,首先会生成该checkpoint的barrier广播出去,然后再执行该Task的checkpoint. 通过executeCheckpointing方法调用operator的snapshotState进行状态保存。不同的operator根据自己的需要实现snapshotState方法。
例如Flink提供的kafka consumer operator, KafkaConsumerBase的snapshotState就保存了当前Topic各partition消费到的offset.
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
pending checkpoint到complete
当一个operator完成了checkpoint时,会向job master报告已经完成了,job master收到该operator报告的完成信息,会将此operator从未完成checkpoint移到已完成,当所有的operator都上报了完成信息时,job master会将此checkpoint从pending状态改变未complete状态。
Task侧:
executeCheckpointing -> asyncCheckpointRunnable -> reportCompletedSnapshotStates -> taskStateManager.reportTaskStateSnapshots -> TaskStateManagerImpl 最终会调用
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
通过actor通知job master。
Job master侧:
acknowledgeCheckpoint -> checkpointCoordinator.receiveAcknowledgeMessage(ackMessage) -> checkpoint.acknowledgeTask -> completePendingCheckpoint(checkpoint);
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// As a first step to complete the checkpoint, we register its state with the registry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
try {
try {
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
}
catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
if (!pendingCheckpoint.isDiscarded()) {
pendingCheckpoint.abortError(e1);
}
throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);
}
// the pending checkpoint must be discarded after the finalization
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
try {
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// we failed to store the completed checkpoint. Let's clean up
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
}
}
});
throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
}
} finally {
pendingCheckpoints.remove(checkpointId);
triggerQueuedRequests();
}
rememberRecentCheckpointId(checkpointId);
// drop those pending checkpoints that are at prior to the completed one
dropSubsumedCheckpoints(checkpointId);
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
// send the "notify complete" call to all vertices
final long timestamp = completedCheckpoint.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
}
}