1、消费者和消费者组
消费者负责订阅Kafka中的主题,并从订阅的主题中拉取消息。与其他消息中间件不同的是:Kafka中的消费理念中还有一层消费者组(consumer group),每个消费者都属于一个消费者组。当消息发布到主题后,只会投递给订阅它的每个消费者组中的一个消费者。换言之,每一个分区只能被一个消费者组中的一个消费者消费。
消费者与消费者组的这种模式可以让整体的消费能力具有横向伸缩性,我们可以增加或减少消费者的个数来提高或降低整体的消费能力。但是,一味的增加消费者并不会使消费能力得到提升,如果消费者过多,出现了消费者的数量大于分区的数量,就会有消费者分配不到任何分区。
消息中间件,一般有两种消息投递模式:点对点和发布订阅。
1)如果所有的消费这都属于一个消费者组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
2)如果所有消费者都属于不同的消费者组,那么所有的消息都会被广播被所有的消费者,即每条消息都会被所有的消费者处理,这就相当于发布/订阅模式的应用。
2、客户端开发
一个正常的消费逻辑应该具备以下几个步骤:
1)配置消费者客户端参数以及创建相关的消费者实例
2)订阅主题
3)拉取消息并消费
4)提交消费位移
5)关闭消费者实例
2.1、消息消费
消息的消费一般有两种模式:推模式和拉模式。Kafka的消费是基于拉(poll)模式的。Kafka的消息消费是一个不断轮询的过程,消费者要做的就是不断地调用poll()方法。
消费者消费到的每条消息的类型为ConsumerRecord。
public class ConsumerRecord<K,V> (
private final String topic; // 主题
private final int partition; // 分区
private final long offset; // 消息所在分区的偏移量
private final long timestamp; // 时间戳
private final Timestamptype timestampType; // 时间戳类型
private final int serialzedKeySize; // key的序列化器
private final int serialzedValueSize; // value的序列化器
private final Headers headers; // 消息的头部内容
private final K key; // 消息的键
private final V value; // 消息的值
private volatile Long checksum;
// 省略若干方法
)
poll()方法返回的类型是ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了ConsumerRecord。
2.2、位移提交
每次调用poll()的时候,它返回的是还没有被消费的消息集,要做到这一点,就需要记录上一次消费时的消费位移。而且,这个消费位移必须要持久化保存,不能仅仅保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再比如,如果消费者组中加入了新的消费者,再均衡之后,分区有可能会被分配给新的消费者,如果不持久化保存,新的消费者会不知道之前的消费位移。
旧的消费位移是存储在zookeeper中,而新的消费者客户端中,是保存在Kafka自己的topic中,即_consumer_offsets。
对于位移提交的时机非常有讲究,有可能会造成重复消费或者消息丢失的现象。
比如:这次我拉取的是[x+1,x+5]的消息,拉取到消息之后,就提交了位移。但是当消费到x+3的时候遇到了异常,在故障恢复之后,重现拉取消息是从x+6开始的。数据发生了丢失。x+3到x+5之间的消息丢失了。
再比如:这次我拉取的是[x+1,x+5]的消息,位移提交动作是消费完所有信息之后才执行的,那么当消费到x+3的时候发生了异常。在故障恢复之后,我们重新拉取的消息是从x+1开始的,那么x+1到x+2之间的消息又重新消费了。
实际情况可能更复杂。
Kafka的默认消费位移提交方式是自动提交,由消费者客户端参数enable.auto.commit配置,默认值为true。自动提交是定期提交,定期的周期由参数auto.commit.interval.ms配置,默认值为5秒。默认情况下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动提交的动作是在poll()方法的逻辑中完成的。
自动提交虽然简便,但随之而来的是重复消费和消息丢失的问题。虽然可以通过减少提交的时间间隔来减少丢失的窗口大小,但是并不能避免重复消费的发生,而且会使位移提交更加频繁。
自动提交重复消费很容易理解。数据丢失的情况:比如有两个线程,线程A不断地拉取消息并存入本地缓存。线程B从本地缓存中读取消息并进行响应的逻辑处理。假设 线程A拉取到的消息过多,线程B还没来得及消费完,此时挂掉了。等重新恢复过来之后,线程B还没有小得到消息就丢失了。
除此之外,自动提交还无法做到精准的位移管理。
很多情况下,并不是拉取完信息就算消费完成,而是需要将信息写入数据库、写入本地缓存等等。在这些场景下,所有的业务都被完成了,才能认为消息被成功消费,手动的提交方式可以让开发人员在合适的地方进行位移提交。
手动提交分为同步提交和异步提交。
2.2.1、同步提交
pubulic void commitSync()
代码示例:
while (isRunning.get()){
ConsumerRecords<String,String> records = consumer.poll(100)
for (ConsumerRecords<String,String> record:records){
// do some logical processing
}
consumer.commitSync()
}
同步提交会阻塞消费者线程直至提交完成。这个示例依然有重复消费的问题,如果业务逻辑处理完之后,在位移提交之前,程序奔溃了,那么待恢复之后又要从上一次位移提交的地方拉取消息。
pubulic void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)
上述方法提供了offset参数,用来提交指定分区的位移。
实际应用中,很少会每消费一条消息就提交一次。因为本身commitSync就是同步执行的,会消耗一定的性能。
2.2.2、异步提交
异步提交的方式在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。
commitAsync()提交的时候同样会有失败的情况发生。重试?不可取,比如这次提交的X偏移量失败了,但是另一次X+Y成功了,假如重试成功了,那么此时位移偏移量又重新回到了X。如果此时发生再均衡,那么恢复之后又从X开始消费。
为此,我们可以设置一个递增的序号来维持异步提交的顺序。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小。如果小了,就不要重试了。
实际情况下,位移提交失败的情况很少发生,不重试也没关系,后面的提交总有成功的。重试会增加代码的复杂度,不重试会增加代码重复消费的几率。
如果消费者异常退出,其实重复消费的情况很难避免,更多可能是从业务处理侧纠正。如果消费者正常退出或者发生再均衡时,可以在退出或再均衡执行之情使用同步提交的方式做最后的把关。
思考如何尽量避免重复消费和消息丢失?
2.3、指定位移提交
在Kafka中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始消费。默认值时"latest",从分区末尾开始消费消息。还有个值"earliest",从开头开始消费。
消息拉取的poll()方法其实对于我们来说是一个黑盒,普通开发人员无法精准的掌握消费起始位置。提供的auto.offset.reset参数只有在找不到位移或位移越界的情况下才会粗略地从末尾或者从头开始消费。
KafkaConsumer中的seek()方法提供了一种功能,可以让我们呢追前消费或者回溯消费。
public void seek (TopicPartition partition, long offset)
partition表示分区,offset表示从分区的哪个位置开始消费。
seek方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法向前跳过若干消息,也可以通过这个方法回溯若干消息,为消费消息提供了很大的灵活性。
2.4、再均衡
再均衡是指分区的所属权从一个消费者转移到另一个消费者的行为,它为消费组具备高可用性和伸缩性提供保障,让我们可以既方便又安全地删除和增加消费者。不过,再均衡期间,消费者组内的消费者是无法读取消息的。换言之,消费组会变得不可用。
除此之外,当一个分区被分配到另一个消费者时,消费者之前的状态会丢失,可能会造成重复消费,因此要避免不必要的再均衡。
2.5、多线程实现
KafkaProducer是线程安全的,但是KafkaConsumer是非线程安全的。
KafkaConsumer非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。
第一种方式:线程封闭,即为每一个线程实例化一个KafkaConsumer对象,一个线程队形一个KafkaConsumer实例,我们可以称之为消费线程。一个线程可以消费一个或多个分区内的消息,所有的消费线程都隶属于一个消费者组。
第二种方法:多个线程同时对应一个分区,可以通过assign()、seek()方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提供消费能力。不过这种方式非常复杂,实际很少用到。
第三种方法:一般而言,poll()拉取消息的速度是相当快的,而整体消费瓶颈其实是现在处理消息这一块。我们可以将处理消息模块分为多线程处理。缺点就是对于消息的顺序处理就比较困难了。