可靠消息
一,目的
系统间解耦,异步通知消息,削峰填谷。系统柔性可用(分区容错)。
1.1,何为可靠消息?
持久化,不丢失。努力送达,幂等消费。CA/CP最终一致性。
1.2,可靠性责任链(Producer,KAFKA,Consumers)
由于Broker解耦生产者不知道消息是否被消费。但能知道的是Broker是否接收了消息,是否把消息安全的存储起来。这里存在一条责任链,开始于生产者,移动到消息系统,最后到达消费者。每个环节都要正确执行,环节间的交接也要正确执行。这意味着应用开发者要正确的流程写程序,防止丢失消息,或者滥用消息API。
本文从三个角色的职责划分来讨论如何达到可靠。
生产侧:业务端进行业务操作发KAFKA消息异步投递解耦,消息发送失败投递补偿端。补偿端失败记录独立错误日志作为凭证依据。
消费侧:关于业务消费消息后失败异常。俩种路线第一种捕获业务异常并丢到补偿端(较难实现,开发测试遗漏难排查,对编码规范要求极高)。业务自己控group id offset.虽然看起来逻辑难度大但SDK可以封装而且口子比较少。
补偿端:失败直接丢KAFAK补偿消费者刷oracle关系数据库。定时跑批任务刷异常列表尝试3次。失败告警人工干预,管理后台可以看到补偿失败的消息列表可以进行人工干预。
二,KAFKA内部的可靠性
2.1 KAFKA基本概念
2.1.1,Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群;
2.1.2,Topic:主题是对一组消息的抽象分类,比如例如page view日志、click日志等都可以以topic的形式进行抽象划分类别。在物理上,不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可使得数据的生产者或消费者不必关心数据存于何处;
2.1.3,Partition:每个主题又被分成一个或者若干个分区(Partition)。每个分区在本地磁盘上对应一个文件夹,分区命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始至分区总数减-1;
2.1.4,LogSegment:每个分区又被划分为多个日志分段(LogSegment)组成,日志段是Kafka日志对象分片的最小单位;LogSegment算是一个逻辑概念,对应一个具体的日志文件(“.log”的数据文件)和两个索引文件(“.index”和“.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成;
2.1.5,Offset:每个partition中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到partition中。每个消息都有一个连续的序列号称之为offset—偏移量,用于在partition内唯一标识消息(并不表示消息在磁盘上的物理位置);
2.1.6,Message:消息是Kafka中存储的最小最基本的单位,即为一个commit log,由一个固定长度的消息头和一个可变长度的消息体组成;
三个部分保证可靠,KAFKA内部,生产,消费
存储的图片资源: https://cloud.tencent.com/developer/article/1421267
2.2,kafka的内部存储机制
Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关参数,可以使得Kafka在性能和可靠性之间运转的游刃有余。Kafka从0.8.x版本开始提供partition级别的复制,replication的数量可$KAFKA_HOME/config/server.properties中配置。
Kafka中消息是以topic进行分类的,生产者通过topic向Kafka broker发送消息,消费者通过topic读取数据。然而topic在物理层面又能以partition为分组,一个topic可以分成若干个partition。Kafka中的消息以顺序的方式存储在文件中。
Kafka中的topic的partition有N个副本(replicas)。N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,follower定期地去复制leader上的数据。
如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据。
TOPIC-PARTITION
kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。
segment
在磁盘上,一个partition就是一个目录,然后每个segment由一个index文件和一个log文件组成。如下:
$ tree kafka | head -n 6
kafka
├── events-1
│ ├── 00000000003064504069.index
│ ├── 00000000003064504069.log
│ ├── 00000000003065011416.index
│ ├── 00000000003065011416.log
Segment下的log文件就是存储消息的地方
每个消息都会包含消息体、offset、timestamp、key、size、压缩编码器、校验和、消息版本号等。
在磁盘上的数据格式和producer发送到broker的数据格式一模一样,也和consumer收到的数据格式一模一样。由于磁盘格式与consumer以及producer的数据格式一模一样,这样就使得Kafka可以通过零拷贝(zero-copy)技术来提高传输效率。
所谓的分区其实就是在kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号,编号从0开始。
1、segment
所谓的segment其实就是在分区对应的文件夹下产生的文件。
一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;另一方面可以基于这些segment文件进行历史数据的删除,提高效率。
一个segment又由一个.log和一个.index文件组成。
1..log
.log文件为数据文件用来存放数据分段数据。
2..index
.index为索引文件保存对对应的.log文件的索引信息。
在.index文件中,保存了对对应.log文件的索引信息,通过查找.index文件可以获知每个存储在当前segment中的offset在.log文件中的开始位置,而每条日志有其固定格式,保存了包括offset编号、日志长度、key的长度等相关信息,通过这个固定格式中的数据可以确定出当前offset的结束位置,从而对数据进行读取。
3.命名规则
这两个文件的命名规则为:
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
2、读取数据
开始读取指定分区中某个offset对应的数据时,先根据offset和当前分区的所有segment的名称做比较,确定出数据在哪个segment中,再查找该segment的索引文件,确定当前offset在数据文件中的开始位置,最后从该位置开始读取数据文件,在根据数据格式判断结果,获取完整数据。
2.3,多分片落盘的可靠性保证
1、AR
在Kafka中维护了一个AR列表,包括所有的分区的副本。AR又分为ISR和OSR。
AR = ISR + OSR。
AR、ISR、OSR、LEO、HW这些信息都被保存在Zookeeper中。
1.ISR
ISR中的副本都要同步leader中的数据,只有都同步完成了数据才认为是成功提交了,成功提交之后才能供外界访问。
在这个同步的过程中,数据即使已经写入也不能被外界访问,这个过程是通过LEO-HW机制来实现的。
2.OSR
OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。
最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。
3.LEO
LogEndOffset:分区的最新的数据的offset,当数据写入leader后,LEO就立即执行该最新数据。相当于最新数据标识位。
4.HW
HighWatermark:只有写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到。相当于所有副本同步数据标识位。
在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。
所以LEO代表已经写入的最新数据位置,而HW表示已经同步完成的数据,只有HW之前的数据才能被外界访问。
5.HW截断机制
如果leader宕机,选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。
当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
2.4,Kafka的ack机制。
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时,这样就变成了acks=1的情况。
Kafka中的消息以一下方式存储到文件中。
HW是HighWatermark的缩写,俗称高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。
当leader挂了之后,现在B成为了leader,A重新恢复之后需要进行消息的同步,如果使用追加的方式那么就会有冗余消息,所以A将自己的消息截取到HW的位置在进行同步。
三,生产者
3.1,生产者可靠性级别
在生产者向kafka集群发送时,数据经过网络传输可能因为网络延迟、Program crash等原因造成数据的丢失。
kafka为生产者提供了如下的三种可靠性级别,通过不同策略保证不同的可靠性保障。
其实此策略配置的就是leader将成功接收消息信息响应给客户端的时机。
通过request.required.acks参数配置:
1:生产者发送数据给leader,leader收到数据后发送成功信息,生产者收到后认为发送数据成功,如果一直收不到成功消息,则生产者认为发送数据失败会自动重发数据。
当leader宕机时,可能丢失数据。
0:生产者不停向leader发送数据,而不需要leader反馈成功消息。
这种模式效率最高,可靠性最低。可能在发送过程中丢失数据,也可能在leader宕机时丢失数据。
-1:生产者发送数据给leader,leader收到数据后要等到ISR列表中的所有副本都同步数据完成后,才向生产者发送成功消息,如果一只收不到成功消息,则认为发送数据失败会自动重发数据。
这种模式下可靠性很高,但是当ISR列表中只剩下leader时,当leader宕机让然有可能丢数据。
此时可以配置min.insync.replicas指定要求观察ISR中至少要有指定数量的副本,默认该值为1,需要改为大于等于2的值
这样当生产者发送数据给leader但是发现ISR中只有leader自己时,会收到异常表明数据写入失败,此时无法写入数据,保证了数据绝对不丢。
虽然不丢但是可能会产生冗余数据,例如生产者发送数据给leader,leader同步数据给ISR中的follower,同步到一半leader宕机,此时选出新的leader,可能具有部分此次提交的数据,而生产者收到失败消息重发数据,新的leader接受数据则数据重复了。
四,消费者
4.1,最少一次消费,及应用保证幂等性!
至多一次处理(At most once):消息绝对不会被重复投递,但是消息可能丢失
至少一次处理(At least once):消息绝对不会被丢失,但是有可能重复被消费
精确的一次处理(Exactly once):消息系统的圣杯。所有的消息精确的被投递一次。
“投递”貌似不是准确的语言描述,“处理”才是。无论怎么描述,我们关心的是,消费者能否处理消息,以及处理的次数。但是使用“处理”会使问题变得复杂。比如说,消息必须投递两次才能被处理一次。再比如,如果消费者在处理的过程中宕机,消息必须被第二次投递(给另一个消费者)。
其次,使用“处理”来表达会使得部分失败(partial failure)变得头疼。处理消息一般包括多个步骤。处理的开始到结束包括应用的逻辑以及应用与消息系统的通信。应用逻辑的部分失败由应用来处理。如果应用处理的逻辑是事务的,结果是all or nothing, 那么应用逻辑可以避免部分失败。但是实际上,多个步骤往往涉及不同的系统,使得事务性变得不可能。如果我们考虑到通信,应用,缓存,数据库,我们无法达到精确的一次处理(exactly-once processing).
所以,精确地一次只出现在如下情况中:消息的处理只包括消息系统本身,并且消息系统本身的处理是事务的。在该限定场景下,我们可以处理消息,写消息,发送消息被处理的ACK, 一切都在事务中。而这正是Kafka流能提供的。
但是,如果消息处理是幂等(idempotent)的,我们就可以绕过基于事务的精确一次保证。如果消息处理是幂等的,我们可以安全的处理重复的消息。当然,并不是所有的消息处理都是幂等的。
kafka 最多保证至少一次处理(At least once),可以保证不丢,但是可能会重复,为了解决重复需要引入唯一标识和去重机制,kafka提供了GUID实现了唯一标识,但是并没有提供自带的去重机制,特定应用需要基于业务规则自己去重做幂等。
五,KAFKA可靠消息实现