Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

Flink从入门到放弃之源码解析系列

  • Flink组件和逻辑计划
  • Flink执行计划生成
  • JobManager中的基本组件(1)
  • JobManager中的基本组件(2)
  • JobManager中的基本组件(3)
  • TaskManager
  • 算子
  • 网络
  • 水印WaterMark
  • CheckPoint
  • 任务调度与负载均衡
  • 异常处理
  • Alibaba Blink新特性

1Checkpoint

CheckPoint 是 flink 保证消息不丢的机制,通过 Barrier 的方式来协调时机,那么什么是 Barrier 呢?

其实前一章介绍 flink 网络栈 的时候已经有介绍在消费端 flink 对于不同的 Barrier 处理,实际上,Barrier 是用来校准 checkpint 的方式。由于对于一个拓扑结构,只有上游算子 checkpoint 完,下游算子的 cehckpoint 才能开始并有意义,同时下游算子的消费速率并不统一【有的 channel 快,有的 channel 慢】,而 Barrier 就是这样一种协调上下游算子的机制。

JobManager 统一通知源算子发射 Barrier 事件,并向下游广播,当下游算子收到这样的事件后,它就知道自己处于两次 checkpoint 之间【一次新的 checkpoint 将被发起】

当下游算子收到了它所有的 InputChannel 的 Barrier 事件后,它便知道上游算子的一次 checkpoint 已完成,自己也可以做 checkpoint 了,完成之后继续将 checkpoint 事件广播到下游算子

在 Exact-once 语义下,消费端会延迟消费并校准不同 channel 的消费速率,这在 flink 网络栈一章有详细介绍!。

2Checkpoint 的协调与发起

前面提到过 checkpoint 统一由 JobManager 发起,我们来看相关逻辑:

CheckpointCoordinator

flink 的 checkpoint 统一由 CheckpointCoordinator 来协调,通过将 checkpoint 命令事件发送给相关的 tasks 【源 tasks】,它发起 checkpoint 并且收集 checkpoint 的 ack 消息。

构造参数

这里有必要了解一些关键参数,以便我们更加了解 Checkpoint 的细节策略

  • baseInternal:快照的间隔
  • checkpointTimeout:一次 checkpoint 的超时时间,超时的 checkpoint 会被取消
  • maxConcurrentCheckpointAttempts:最多可同时存在的 checkpoint 任务,是对于整个 flink job
  • tasksToTrigger:触发分布式 Checkpoint 的起始 tasks,也就是 source tasks

Checkpoint的发起

前面的章节我们介绍过 ExecutionGraph【flink物理计划抽象】,它有一个核心接口 enableSnapshotCheckpointing ,这个接口在 JobManager 提交作业的时候被执行,具体见JobManager line1238 from method submitJob。这个接口的逻辑总结如下:

  • 获取 checkpoint 的发起节点【源节点】,需要 ack 和 commit 的节点【所有节点】
  • 先关闭已有 checkpoint
  • 实例化 CheckpointCoordinator 和它的监听 Akka 系统 CheckpointCoordinatorDeActivator,并将 CheckpointCoordinatorDeActivator 注册为 EecutionGraph 的 listener,当作业的执行状态变为 RUNNING 时,会通知 CheckpointCoordinatorDeActivator 启动 CheckpointCoordinator 的 checkpoint 线程

那么 CheckpointCoordinator 在收到这样的消息后会怎么处理呢?

它会发起一个 timer task,定时执行,并且传入的时间为当前的系统时间,由于 CheckpointCoordinator 全局只有一个,这个时间也是全局递增并且唯一的:

//CheckpointCoordinator line 1228

private final class ScheduledTrigger implements Runnable {

@Override

public void run() {

try {

triggerCheckpoint(System.currentTimeMillis(), true);

}

catch (Exception e) {

LOG.error("Exception while triggering checkpoint for job {}.", job, e);

}

}

}

下面我们来具体分析 checkpoint 的一些核心动作

checkpoint 的触发

//CheckpointCoordinator line394

public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {

return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();

}

public CheckpointTriggerResult triggerCheckpoint(

long timestamp,

CheckpointProperties props,

@Nullable String externalSavepointLocation,

boolean isPeriodic) {

// make some eager pre-checks

synchronized (lock) {

// abort if the coordinator has been shutdown in the meantime

if (shutdown) {

return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);

}

// Don't allow periodic checkpoint if scheduling has been disabled

if (isPeriodic && !periodicScheduling) {

return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);

}

...

总结逻辑:

  • 如果已关闭或优先处理排队请求会总额并发任务超过限制,都会取消此次 checkpoint 的发起
  • 如果最小间隔时间未达到,也会取消此次 checkpoint
  • check 所有的发起节点【源节点】与其他节点都为 RUNNING 状态后才会发起 checkpoint
  • 发起 checkpoint 并生成一个 PendingCheckpoint 【已经发起但尚未 ack 的 checkpoint】
  • 每个源节点都会发一条消息给自己的 TaskManager 进行 checkpoint

取消 CheckPoint 消息的处理

//CheckpointCoordinator line678

public void receiveDeclineMessage(DeclineCheckpoint message) {

if (shutdown || message == null) {

return;

}

if (!job.equals(message.getJob())) {

throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +

message.getJob() + " while this coordinator handles job " + job);

}

final long checkpointId = message.getCheckpointId();

final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");

PendingCheckpoint checkpoint;

synchronized (lock) {

// we need to check inside the lock for being shutdown as well, otherwise we

// get races and invalid error log messages

if (shutdown) {

return;

}

...

总结其逻辑:

  • 如果有对应的 PendingCheckpoint ,取消掉并且如果在其之后还有其它 checkpoint 的话,重新发起它们的 checkpoint 任务

Ack Checkpoint 消息的处理

//CheckpointCoordinator line727

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {

if (shutdown || message == null) {

return false;

}

if (!job.equals(message.getJob())) {

LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);

return false;

}

final long checkpointId = message.getCheckpointId();

synchronized (lock) {

// we need to check inside the lock for being shutdown as well, otherwise we

// get races and invalid error log messages

if (shutdown) {

return false;

}

...

总结其逻辑:

  • 通过消息里的 checkpoint id 找到对应的 PendingCheckpoint,记录下 对应的 JobVertex 下某个 ExecutionVertex 的 ack 状态
  • PendingCheckpoint 里维护了该次 checkpoint 需要 ack 的全部 ExecutionVertex
  • 如果全部 ack 完成,则清除 PendingCheckpoint 里维护的状态数据并将句柄转化给 CompletedCheckpoint 来维护
  • 丢弃过时的 checkpoint 任务,并重新出发新的 checkpoint
  • 如果全部 ack 完成,通知对应的 TaskManager checkpoint 已完成【checkpoint commit 阶段】,然后通过 CompletedCheckpointStore 将 CompletedCheckpoint 序列化并存储,高可用模式下为 ZK 的方式,具体细节见章节:【flink job manager 基本组件】,将来恢复时,将每个节点需要的句柄注入到状态中,之后算子启动时将状态数据附属于 TaskDeploymentDescriptor 之中分发给 TaskManager 去执行

Checkpoint 的消息流

上面我们说到 TaskManager 收到 AbstractCheckpointMessage 消息,并处理,我们来看核心逻辑:

//TaskManager line500

private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {

actorMessage match {

case message: TriggerCheckpoint =>

val taskExecutionId = message.getTaskExecutionId

val checkpointId = message.getCheckpointId

val timestamp = message.getTimestamp

val checkpointOptions = message.getCheckpointOptions

log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")

val task = runningTasks.get(taskExecutionId)

if (task != null) {

task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)

} else {

log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")

}

...

//Task.java line1140

public void triggerCheckpointBarrier(

final long checkpointID,

long checkpointTimestamp,

final CheckpointOptions checkpointOptions) {

final AbstractInvokable invokable = this.invokable;

final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

if (executionState == ExecutionState.RUNNING && invokable != null) {

// build a local closure

final String taskName = taskNameWithSubtask;

final SafetyNetCloseableRegistry safetyNetCloseableRegistry =

FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();

...

//StreamTask line612

private boolean performCheckpoint(

CheckpointMetaData checkpointMetaData,

CheckpointOptions checkpointOptions,

CheckpointMetrics checkpointMetrics) throws Exception {

LOG.debug("Starting checkpoint ({}) {} on task {}",

checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

synchronized (lock) {

if (isRunning) {

// we can do a checkpoint

总结其逻辑:

  • 先是通过 TaskManager 进行消息路由,对于 TriggerCheckpoint 消息,会路由给相应的 Task 做处理
  • Task 会起一个异步 task 进行 checkpoint,内部是调用 StreamTask 的 performCheckpoint 方法
  • performCheckpoint 内部首先先将此次 checkpoint 的 barrier 广播到下游,以便让下游快速 checkpoint
  • 后执行具体的 checkpoint,将状态持久化,目前支持的持久化方式有:FileSystem、Memory、RocksDB,成功后通知 JobManager 进行 ack,否则取消此次 checkpoint
  • 如果是 ack 消息,依据具体情况通知对应的 KVState

附一张图描述交互过程:

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

3Checkpoint 的存储和恢复

Checkpoint 的存储和恢复均是通过 AbstractStateBackend 来完成,AbstractStateBackend 有三个实现类,FsStateBackend 是通过 HDFS 来存储 checkpoint 状态,继承关系如下:

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

我们来看最常见的一种 FsStateBackend,AbstractStateBackend 内部通过 State 来管理状态数据,依据状态数据的不同特性,状态分为三种:

  • ValueState :最简单的状态,一个 key 一个单值 value,可以跟更新和删除
  • ListState:一个 key 对应一个 value list
  • ReducingState:一个 key 对应的 value 可以进行 reduce 操作
  • FoldingState:一个key,后续添加的值都会通过 folding 函数附加到第一个值上

AbstractStateBackend 内部通过 KvState 接口来管理用户自定义的 kv 数据,我们来看 FsValueState 的继承关系:

那么如何获取这些 State 呢?flink 抽象了另一套接口:StateDescriptor 来获取 State,通过绑定特定的 StateBackend 来获取。这样一层抽象,解耦了 State 的类型和底层的具体的存储实现。我们来看 StateDescriptor 的继承关系:

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

那么这些抽象是如何协调工作的呢?

//KvState 的初始化和获取

//AbstractKeyedCEPPatternOperator

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

public void open() throws Exception {

if (keys == null) {

keys = new HashSet<>();

}

if (nfaOperatorState == null) {

nfaOperatorState = getPartitionedState(

new ValueStateDescriptor<NFA<IN>>(

NFA_OPERATOR_STATE_NAME,

new NFA.Serializer<IN>(),

null));

}

//AbstractStreamOperator

protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {

return getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, stateDescriptor);

}

//AbstractStateBackend

//具体的 kvState 由子类具体实现来决定

public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {

if (keySerializer == null) {

throw new RuntimeException("State key serializer has not been configured in the config. " +

"This operation cannot use partitioned state.");

}

if (!stateDescriptor.isSerializerInitialized()) {

stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());

}

//获取 KvState 后,用户经过一番更新...下面是快照的过程

// StateBackend 的创建

//AbstractStreamOperator

try {

TypeSerializer<Object> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

// if the keySerializer is null we still need to create the state backend

// for the non-partitioned state features it provides, such as the state output streams

String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();

stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);

} catch (Exception e) {

throw new RuntimeException("Could not initialize state backend. ", e);

}

//StreamTask

public AbstractStateBackend createStateBackend(String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception {

AbstractStateBackend stateBackend = configuration.getStateBackend(userClassLoader);

if (stateBackend != null) {

// backend has been configured on the environment

LOG.info("Using user-defined state backend: " + stateBackend);

}

//快照入口

//AbstractStreamOperator

public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {

// here, we deal with key/value state snapshots

StreamTaskState state = new StreamTaskState();

if (stateBackend != null) {

HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =

stateBackend.snapshotPartitionedState(checkpointId, timestamp);

if (partitionedSnapshots != null) {

state.setKvStates(partitionedSnapshots);

}

}

return state;

}

上面的快照之行结束后,用户会获取 KvStateSnapshot 抽象,对于 FsState 来说,起内部封装了文件句柄以及序列化元数据等信息,同时提供了恢复快照的接口,其抽象关系如下:

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

flink 进一步将每个 task 的每个 operator 快照后获取的 KvStateSnapshot 封装成 StreamTaskState,并最终获取一个 StreamTaskState List【对应一个 task 的一组 operators】,分装成 StreamTaskStateList,随后通知 JobManager 的 CheckpointCoordinator:

//RuntimeEnvironment

AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(

jobId,

executionId,

checkpointId,

serializedState,

stateSize);

jobManager.tell(message);

JobManager 再将这些句柄的数据再快照到本地和zk,具体见 JobManager 基本组件。

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

Flink从入门到放弃之源码解析系列-第7章(2) CheckPoint

相关推荐