1.状态持久化
在Flink的状态管理机制中,Flink 容错性的保障就是要对状态数据做一个持久化的保存,这样就可以在发生故障后通过持久化数据进行重启恢复。在Flink 中对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。用的最多的外部系统就是HDFS。
2.
检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉,这样重启之后就可以继续处理新数据、而不需要重新计算了。
遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。
检查点是Flink容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把checkpoint叫做“一致性检查点”。
默认情况下,检查点是被禁用的,需要在配置/代码中手动开启。
3.状态后端
检查点的保存离不开JobManager和TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由JobManager向所有TaskManager发出触发检查点的命令;TaskManger收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向JobManager返回确认信息。这个过程是分布式的,当JobManger收到所有TaskManager的返回信息后,就会确认当前检查点成功保存。而这一切工作的协调,就需要一个“专职人员”来完成。
状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。
3.1.状态后端的分类
状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
(1)哈希表状态后端
这种方式就是把状态放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
(2)内嵌RocksDB状态后端
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。
3.2.如何选择状态后端
HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是RocksDB本地硬盘。
- HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
- 而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。
4.检查点恢复具体流程
在运行流处理程序时,Flink会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。
接下来就需要从检查点来恢复状态了。具体的步骤为:
(1)重启应用
遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空,如下图所示:
(2)读取检查点,重置状态
找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,如下图所示
(3)重放数据
从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第4、5个数据就相当于丢掉了;这会造成计算结果的错误。
为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过Source任务向外部数据源重新提交偏移量(offset)来实现
这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。
(4)继续处理数据
接下来,我们就可以正常处理数据了。首先是重放第4、5个数据,然后继续读取后面的数据。
当处理到第5个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据、也没有重复计算数据,这就保证了计算结果的正确性。
5.检查点算法
在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。
5.1.检查点分界线
Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”。
5.2.分布式快照算法
watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。
具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送barrier时,需要广播出去;而当多个上游任务向同一个下游任务传递barrier时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。
场景说明:我们有两个并行的Source任务,会分别读取两个数据流。此时第一条流的Source任务读取了3个数据,偏移量为3;而第二条流的Source任务只读取了一个“hello”数据,偏移量为1。
(1)JobManager发送指令,触发检查点的保存;Source任务保存状态,插入分界线
JobManager 会周期性地向每个 TaskManager发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger会在所有Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中,如下图所示:
并行的Source任务保存的状态为3和1,表示当前的1号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理,Sum任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。
状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后跟数据一样把barrier向下游任务传递,如下图所示:
(3)向下游多个并行子任务广播分界线,执行分界线对齐
Map任务没有状态,所以直接将barrier继续向下游传递。这时由于进行了keyBy分区,所以需要将barrier广播到下游并行的两个Sum任务。同时,Sum任务可能收到来自上游两个并行Map任务的barrier,所以需要执行“分界线对齐”操作。
(4)分界线对齐后,保存状态到持久化存储
各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将barrier向下游继续传递,并通知JobManager保存完毕
这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当Sum将当前状态保存完毕时,Source 1任务已经读取到第一条流的第五个数据了。
(5)先处理缓存数据,然后正常继续处理
当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
6.检查点的配置
检查点很多具体的配置项,可以跟每个作业相关,可以为每个单独的作业配置他的特色。
检查点部分可配置内容如下:
// 进行检查点配置 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 自动存盘机制的间隔 checkpointConfig.setCheckpointInterval(200); // 检查点保存超时时间1分钟,超时后丢掉检查点 checkpointConfig.setCheckpointTimeout(10000); // 设置精确一次模式 checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 允许检查点失败的次数,默认是0,相当于检查点保存失败,会导致整个任务失败,这个默认配置比较严格 checkpointConfig.setTolerableCheckpointFailureNumber(3); // 最大同时出现的检查点数量 checkpointConfig.setMaxConcurrentCheckpoints(2); // 两个检查点之间的最小暂停时间,至少要保证两个检查点保存间隔的时间 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 启用不对齐的检查点保存方式 checkpointConfig.enableUnalignedCheckpoints(); // 设置一个对齐的超时时间 checkpointConfig.setAlignmentTimeout(Duration.ofMillis(100)); // 设置一个外部检查点清理机制 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 设置检查点存储,可以直接传入一个String,指定文件系统的路径 checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")
转载请注明:西门飞冰的博客 » Flink 状态持久化和检查点