Flink-分布式快照的设计-流程

Flink 通过 barrier 来协调 checkpoint 的时机,我们在介绍【Flink 网络栈】的时候已经有介绍:对于一个拓扑结构,只有上游算子 checkpoint 完成,下游算子的 checkpoint 才能开始并有意义,又因为下游算子的消费速率并不统一【有的 channel 快,有的 channel 慢】,barrier 就是这样一种协调上下游算子的机制。
stream_barriers

JobManager 统一通知 source operator 发射 barrier 事件,并向下游广播,当下游算子收到这样的事件后,它就知道自己处于两次 checkpoint 之间【一次新的 checkpoint 将被发起】:下游算子 op-1 收到了它所有的 InputChannel 的某次 checkpint 的 barrier 事件后【意味着上游算子的一次 checkpoint 已完成】,自身也可以做 checkpoint,并且在 checkpoint 之后继续将 checkpoint 事件广播到 op-1 的下游算子。

在 exactly-once 语义下,消费端会延迟处理,对齐不同 channel 的 barrier,这在【Flink 网络栈】一文有详细介绍!
stream_aligning

Checkpoint 的协调与发起

Checkpoint 统一由 JobManager 发起,中间涉及到 JobManager 和 TaskManager 的交互,一轮快照可以分为 4 个阶段:

  • JobManager checkpoint 的发起
  • barrier 的逐级传播
  • op/task 的 checkpint 以及 ack 消息回传
  • JobManager commit 消息的广播

整体的交互过程见下图:
flink_ckp_flow

CheckpointCoordinator

JobManager 中对 checkpoint 全局协调控制的核心抽象是 CheckpointCoordinator,它的功能主要包括两部分:


  1. 发起起始 checkpoint trigger-event 消息给 op/task 并收取 op/task 完成该轮 checkpoint 之后的 ack 信息
  2. 维护 op/task 上报的 ack 消息中附带的状态句柄:state-handle 的全局视图

CheckpointCoordinator 可以同时并发执行多个 checkpoint,一次发起并尚未完全 ack 的 checkpint 被抽象为 PendingCheckpoint,当 PendingCheckpoint 被完全 ack 后即可转化一个 CompletedCheckpoint,一个 CompletedCheckpoint 代表一轮成功的分布式快照,该抽象可以拿来在 JobManager 端备份做高可用容错策略。

由于一次发起的分布式快照并不一定执行的顺利【快速的执行完】,会出现各种异常情况,比如执行耗时导致后面发起的快照已执行完而自己却还没有结束、比如整体快照效率低下导致快照任务排队、比如快照期间发生了异常等。CheckpointCoordinator 就是扮演了从 checkpoint 发起到完全结束的整个生命周期的状态、策略协调的作用。

Checkpoint 属性控制

CheckpointProperties 包含 8 项属性,只介绍其中 4 项:


  1. forced:是否是强制,用户指定或者 savepoint 属性为 true
  2. externalize:是否是外部,savepoint 和 FS 快照为 true
  3. savepoint:是否是 savepoint
  4. subsumed:是否已经被包含了,如果比此快照 ckp1 更新的一个 ckp2 成功完成,那么 ckp1 就被称作 subsumed

CheckPoint trigger 策略

对于一般的 checkpoint 来说,CheckpointCoordinator 会不断发起定时任务,触发 checkpint 动作,是非强制、定时触发的;对于 savepoint 来说,快照是一次性的,强制执行的。每轮快照发起的基本策略如下:


  1. 基本的合法性检查:以下情况将取消 checkpoint:
    • CheckpointCoordinator 已 shutdown
    • scheduling 取消则停止接受 timer-task
    • 如果不是强制 checkpoint 并且在排队的 checkpoint 已超过最大限制数
    • 本次 checkpoint 的时间距离上次成功 checkpoint 的时间大于最小限制
    • 并不是所有需要 trigger 的 task 都是 RUNNING 状态
    • 并不是需要 ack 的 task 都是 RUNNING 状态
    • 获取本次全局递增的 checkpointID 失败
  2. 生成中间状态 PendingCheckpoint 并维护到内存映射关系中 checkpointID -> PendingCheckpoint
  3. 遍历所有需要 trigger 的 task 并依次发起 trigger checkpoint request
  4. 如果本次 checkpint 耗费时间超过最大限制,则取消
  5. 若以上发生任何异常,都将取消本次 checkpint

CheckPoint ack 策略

在 op/task 完成一次状态 checkpint 后便会将快照句柄以 ack 消息的形式发送给 CheckpointCoordinator,后者会维护这一轮快照的全局状态视图,具体的 ack 策略如下:


  1. 从 ack 消息中拿到 checkpointID,并从缓存中取出对应的 checkpoint:ckp1
  2. 若 ckp1 存在,则 执行 ckp1 的ack:PendingCheckpoint::acknowledgeTask,会得到 4 种不同的结果:
    • SUCCESS: check ckp1 中所有需要 ack 的 task 是否都已收到 ack 消息,满足则执行以下操作:
      • 将 PendingCheckpoint 转换为 CompletedCheckpoint:cckp1
      • 如果 cckp1 不是 savepoint,则 存储到 CompletedCheckpointStore;并且将 checkpoint 时间早于 cckp1 的 PendingCheckpoint 取消掉
      • 将 ckp1 从缓存中清除,并立即出发一次 checkpoint scheduling timer-task 的重新调度
      • 编译所有的 ack executions 列表并发送 ack 完成的消息
    • DUPLICATE:ack 消息重复接收,直接忽略
    • UNKNOWN:未知的 ack 消息,将清理对应的 上报 task 此次的 snapshort 句柄
    • DISCARDED: ckp1 已经被 discard,将清理对应的 上报 task 此次的 snapshort 句柄
  3. 若 ckp1 存在但是已经被 discard,抛异常
  4. 若 ckp1 不存在,将清理对应的 上报 task 此次的 snapshort 句柄

trigger/ack/commit 节点策略

既然是快照的 trigger/ack 那么就需要一个确定 trigger/ack/commit JobVertex 节点的策略,这段策略封装在 StreamingJobGraphGenerator中,具体见下面的代码逻辑:

1
2
3
4
5
6
7
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}

可以看到,输入源 JobVertex 是 checkpoint 的发起节点,而所有的节点都是接收 ack/commit 消息的节点。

CheckPoint 中间状态 PendingCheckpoint

PendingCheckpoint 代表一个已经发起,但尚未收到所有需要 ack-task ack 消息的 ckeckpoint,一旦所有的 ack 消息都已收到,它就可以转化为一个 CompletedCheckpoint。

Task ack 消息的维护

首先要说明的是:因为多个 op 可能组合成一个 chained-op 在一个 task 中执行,PendingCheckpoint 收到的每条 ack 消息中的 task 快照:TaskStateSnapshot 包含了一个 op/task 中一个/多个 op 实例的快照句柄:OperatorSubtaskState,为了维护 ack 完成情况,PendingCheckpoint 内部包含了三个映射关系:


  1. 需要 ack 但是尚未收到 ack 消息的 task 物理计划映射:ExecutionAttemptID -> ExecutionVertex/map1
  2. 已经收到 ack 消息的 task 物理计划映射:ExecutionAttemptID -> ExecutionVertex/map2
  3. 算子 ID 和算子状态的映射:OperatorID -> OperatorState

每次收到 ack 消息中的 TaskStateSnapshot ,其实就是完善 OperatorID -> OperatorState 的过程,当 map1 为空集时就代表这个快照所有需要 ack 的 task 都收到了 ack 消息,也就是可以转换为 CompletedCheckpoint。

转化为 CompletedCheckpoint

转化的具体策略:

  1. 如果是 savepoint 或者用户指定了外部 checkpoint 的目录,则首先生成
    new SavepointV2(checkpointId, operatorStates.values(), masterState); 并快照到文件系统
  2. 在内存中转化为 CompletedCheckpoint 并返回

PendingCheckpoint 转化为 CompletedCheckpoint 后就可以在 JobManager 端持久化做容错策略了。

CompletedCheckpoint JobManager 端持久化

这里我们只看高可用模式下的情形
一个 CompletedCheckpoint 代表一次成功的分布式快照,为了后续的快照恢复,在 JobManager 端需要将 CompletedCheckpoint 对象持久化。在高可用模式下的策略是将 CompletedCheckpoint 对象序列化写出到文件系统,拿到一个文件句柄后再将该文件句柄写到 zk 做 HA 等高可用策略。

高可用模式下使用 ZooKeeperCompletedCheckpointStore 存储 CompletedCheckpoint,存储的逻辑:


  1. 先将 CompletedCheckpoint instance 序列化后写入到 FS 中
  2. 将 1 中的文件句柄写入 zookeeper 中,对于一个 Job 来说,可以写入的句柄数量是有限值 N,当快照数量超过 N 之后将会把最旧的快照句柄从 zookeeper 删除【相应的分布式快照的状态也会被清理】,再写入最新的快照
  3. 恢复时将会从 2 中的最新一次 checkpint 恢复

TaskManager 端 checkpoint

在 CheckpointCoordinator 发起一次 checkpint 的 trigger 之后,TaskManager 会收到消息:

1
actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));

TaskManager 通过 executionAttemptID 拿到对应的 Task 并触发 checkpint barrier 传播,这段逻辑在 StreamTask 中,我们看一下其中的小细节:

1
2
3
4
5
6
7
8
9
10
// Since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur.
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
// can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

这里 Flink 会先广播 barrier 后执行本 task 的 checkpint,广播的对象是 CheckpointBarrier,它继承自 RuntimeEvent
CheckpointBarrier 中封装了三个参数:


  1. checkpointID: 逐次递增的 checkpint 序列号
  2. timestamp:checkpoint 对应的时间戳
  3. checkpointOptions:checkpint 附带属性

我们在【Flink 网络栈】中有介绍,下游的消息接收统一经过 StreamInputProcessor,而其中的 BarrierBuffer 会依据 barrier block/阻塞 input-channel,直到所有的 input-channel 都收到了某个 checkpint 对应的 barrier 才会释放 block。为了避免死锁,BarrierBuffer 并不会 block 数据的接收,而是会 cache 到机器的 buffer-cache 或磁盘上,只是 block 数据的算子处理。

BarrierBuffer 的 block 释放后,便会触发背后的 op-task 进行 checkpint。

op/task 快照结束后,会发送一条 ack 消息给 JobManager。

Checkpoint 的恢复

CheckPoint 的恢复伴随着 op/task 的恢复过程,在 CheckpointCoordinator 中,首先将 CompletedCheckpointStore 从之前的快照中恢复过来,拿到最新的一次 CompletedCheckpoint,并将各个算子的快照状态 OperatorSubtaskState 应用到 ExecutionVertex 的当前 Execution 中。

后伴随着 TaskDeploymentDescriptor 一起下发到对应的 TaskManagerTaskManager 在启动分配的 task 时会应用对应的快照句柄进行状态恢复。

生活不止眼前的苟且!