一、为什么要做高可用
由于依赖Flink应用A下游服务非常重要,对应用A所提供数据的实时性、可靠性要求比较高。所以对应用A进行了高可用改造,运行在两个独立的flink上,保障不会由于某个flink集群故障而影响下游服务。在探索过程中落地经历了从双活到热备的转变,下面将对这一历程进行回顾。
二、单集群运行
Kafka Source Connector消费数据,经过计算后将计算后的数据写入kafka另外一个topic。写入topic有两个地方。其中Sink直接将数据流写入kafka。ProcessFunction注册了定时器,当一条数据过来后会注册一个定时器,当一个设备的数据一段时间没有流入将会生成一条离线数据写入到kafka。高可用要保证的就是Sink和ProcessFunction数据能够可靠得写入到kafka。
三、高可用阶段一:双活
程序分别跑在两个集群。集群1和集群2在写入kafka前,先通过REDIS的SETNX来设置值,当设置成功时才发送数据到kafka。这种方式经过测试发现存在乱序的情况。由于无法保证先抢到发送权的,一定先发送kafka。
如果在前一条记录发送kafka成功之后,记录一个发送成功状态。读取到这个状态后才允许进行下一条数据的发送可以强行保证有序。但这样做会导致数据时效性低下、程序复杂度升高、落地难,总之此路不通。
四、高可用阶段二:热备
两个集群同时发送kafka数据不行,那么就只让一个集群发送数据。当单个集群出现故障无法恢复,快速用另一个集群替代进行业务数据处理。
我们将当前负责发送数据到下游的集群称为主集群,将不发送数据处于待命状态的集群称为备集群。上图中集群1为主集群,集群2为备集群。为了保证主集群出现故障,切换到备集群的时候不丢数据,备集群的消费进度一定要晚于主集群。
有几个需要回答的问题摆在面前:
- 如何实现备集群跟随主集群处理进度?
- 备集群与主集群消费差距应该是多少?
- 切换后的的重复数据发不发?
1.如何实现备集群跟随主集群处理进度
主集群负责记录处理进度,按照topic-partition粒度记录处理的offset。备集群负责读取前者记录的进度进行降速。为保证代码的简洁和小的复杂度,将记录进度和根据进度降速放在同一个环节处理。
在消费环节还是在其他环节降速面临不同的问题。消费环节控制会导致备集群切换为主集群时,前文中ProcessFunction少量注册的离线定时器误报。在其他环节控制会导致背压,checkpoint失败,且不容易对数据进度进行控制。经过权衡,选择在消费环节控制。
2.备集群与主集群消费差距应该是多少
差距当然是在切换后不漏数据的前提下越接近越好。
下面讨论最近可以多近?
给个反例,集群2是备集群,集群1是主集群,集群2紧随集群1的进度之后。集群1从kafka读取到数据时记录offset,目前记录为9。集群1在处理完数据6时完成了一次checkpoint,目前消费到了数据9(图4),此时7、8虽然消费了,但是没有到达Sink(图5)。集群2此时读取到数据8。
假如集群1这个时候出现故障宕机,切换到集群2,数据7将会丢失。
所以要保证备集群消费的进度在已发送下游数据之后。主集群在一次完整的checkpoint做完后写入offset可以保证备集群不漏数据。
3.切换后的的重复数据发不发
在当前实现中依赖下游系统幂等处理来做到整体的EXACTLY ONCE。
4.整个主备集群协同步骤的描述
1.主集群消费kafka数据,checkpoint成功后,在提交kafka offset的同时,将kafka offset记录到redis中;
2.备集群读取redis中得到当前主集群offset消费进度。消费kafka数据,当数据中对应topic的partition的offset大于或者等于redis中的offset-1则丢弃这条数据(由于Flink中做checkpoint时提交的kafka消费offset是由source emit到下游的kafka数据驱动的),并且调用consumer的pause停止消费指定partition;
3.指定时间(目前是10秒)后会调用consumer的resume方法唤醒备集群的指定partition,当消费的数据仍然满足暂停条件则继续步骤2;
4.主备集群根据配置在apollo中的当前主集群id来知道自己是否是主集群。当主集群切换为备集群时则根据redis中的值呈现跟随状态,备集群切换为主集群则呈现记录redis状态。主集群推送数据到下游,备集群不推数据。
5.一些细节
最终落地基于
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09,
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher,
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
进行二次开发,加入热备模块。其中进度offset提交操作在:
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread 的 setOffsetsToCommit方法,此方法由上层实现了以下接口的类调用
/**
* This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
* participants.
*/
@PublicEvolving
public interface CheckpointListener {
/**
* This method is called as a notification once a distributed checkpoint has been completed.
*
* Note that any exception during this method will not cause the checkpoint to
* fail any more.
*
* @param checkpointId The ID of the checkpoint that has been completed.
* @throws Exception
*/
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
为提高可靠性对redis的访问增加熔断降级机制。当一段时间redis出现访问异常,会暂时不访问,降级窗口结束后会继续访问。
五、结语
最终应用A高可用方案以热备落地,主要是解决单个集群长时间无法恢复的问题。目前不同集群使用同一份checkpoint技术层面已经落地,后面可以有冷备方案,当主机群宕机后,备集群读取主机群的checkpoint文件数据启动继续服务。