Flink Watermark & Checkpoint

前言

在前面一章 flink 网络栈的讲解中,我们介绍了 Barrier 的改变以及 Barrier 在 InputGate 消费数据的过程中扮演的时间对齐作用,同时,我们介绍了 InputProcessor 负责数据读取,同时会追踪 watermark 时间并分发到下游。这里我们从 InputProcessor 开始讲起

为什么将 Watermark 和 Checkpoint 放在一起介绍,是因为它们在原理上有相似之处:上游节点逐级广播消息给下游节点来完成一次行为

WaterMark

WaterMark是什么

Watermark 是协调窗口计算的一种方式,它告诉了算子时间不大于 WaterMark 的消息不应该再被接收【如果出现意味着延迟到达】。WaterMark 从源算子开始 emit,并逐级向下游算子传递,算子需要依据自己的缓存策略在适当的时机将 WaterMark 传递到下游。当源算子关闭时,会发射一个携带 Long.MAX_VALUE 值时间戳的 WaterMark,下游算子接收到之后便知道不会再有消息到达。

Flink 提供三种消息时间特性:EventTime【消息产生的时间】、ProcessingTime【消息处理时间】 和 IngestionTime【消息流入 flink 框架的时间】,WaterMark 只在时间特性 EventTime 和 IngestionTime 起作用,并且 IngestionTime 的时间等同于消息的 ingestion 时间。

WaterMark的协调与分发

对于 watermark 的协调与分发集中在 InputProcessor 的 processInput 方法中,下面我们来详细分析其逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//StreamInputProcessor line134
public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
numRecordsIn = streamOperator.getMetricGroup().counter("numRecordsIn");
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycle();
currentRecordDeserializer = null;
}

总结其逻辑:

  • 如果消费到的消息是一个 WaterMark,获得其对应的 source channel id 并将时间更新进去,同时记录下当前所有 channel 的最小 WaterMark 时间
  • 如果当前最小 WaterMark 时间【所有的 channel 都至少消费到该时间】大于上次发射给下游的 WaterMark 时间,则更新 WaterMark 时间并将其交给算子处理
  • 通常算子在处理【尤其是涉及了窗口计算或者需要时间缓存策略的算子】后会将 WaterMark 继续往下游广播发送

WaterMark 的来源

上面我们提到 WaterMark 最初由源算子负责发射到下游,那么它的生成规则是什么呢?又是如何协调的?

我们来看一个源算子的实现便知

在第一章 flink 逻辑计划生成,我们了解了 flink 所有的源算子都继承自 SourceFunction 接口,SourceFuntion 定义了管理消息发射环境的接口 SourceContext,SourceContext 的具体实现在 StreamSource 中,一共有三种:NonTimestampContext、AutomaticWatermarkContext、ManualWatermarkContext,我们来逐一分析。

NonTimestampContext

适用于时间特性:TimeCharacteristic#ProcessingTime,顾名思义,不会 emit 任何 WaterMark

AutomaticWatermarkContext

自动发射 WaterMark,适用于 TimeCharacteristic#IngestionTime ,也就是源算子的处理时间。flink 起了一个timer task 专门以一定的 interval 间隔发射 WaterMark,一个 Interval 内所有 Record 的发射时间处于上次 emit 的 WaterMark 和下次将要 emit 的 WaterMark 之间,Interval 边界到达后会提升下一个 WaterMark 时间,计算本次的 WaterMark 时间并 emit 出去。

自动 emit watermark 的 interval 默认是 200ms ,这是写死不可配置的,具体见:

1
2
3
4
5
6
7
8
9
//StreamExecutionEnvironment line598
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}

ManualWatermarkContext

用户自己指定 WaterMark 时间,使用于 TimeCharacteristic#EventTime,用户需要提供依据源消息提取 Watermark 时间的工具 function

那么用户有哪些指定时间的逻辑呢?

TimestampAssigner

flink 通过接口 TimestampAssigner 来让用户依据消息的格式自己抽取可能被用于 WaterMark 的 timestamp,它只定义了一个接口:long extractTimestamp(T element, long previousElementTimestamp);

而 TimestampAssigner 的两个继承接口 AssignerWithPunctuatedWatermarks 以及 AssignerWithPeriodicWatermarks 定义了另种典型的时间戳生成规则:

  • AssignerWithPunctuatedWatermarks:依据消息中事件元素及自带 timestamp 来驱动 watermark 的递增
  • AssignerWithPeriodicWatermarks:依据消息中的 timestamp 周期性地驱动 watermark 的递增

各个场景可以依据业务的需求去继承和实现

CheckPoint

已经迁移到 Flink-分布式快照的设计-流程

生活不止眼前的苟且!