与生产者对应的是消费者,应用程序通过KafkaConsumer来订阅主题,并从订阅的主题中拉取消息。不过我们需要先了解消费者和消费组的概念,否则无法理解如何使用KafkaConsumer。
消费者与消费组
每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。如下图所示:
主题中,共有四个分区,P0、P1、P2、P3,有两个消费组A和B都订阅了这个主题,消费组A中有4个消费者(C0、C1、C2、C3),消费者B有两个消费者(C4、C5)。按照Kafka默认的规则,消费组A中的每一个消费者分配到一个分区,消费组B中每一个消费者分配到两个分区,两个消费组之间互不影响。
每个消费者只能消费被分配到的分区中的消息。换言之,每个分区只能被一个消费组中的一个消费者所消费。
再来看一下消费组内消费者的个数变化时所对应分区分配的演变。假设目前某消费组内只有一个消费者C0,订阅了一个主题,这个主题包含7个分区,P0/P1/P2/P3/P4/P5/P6。也就是说,这个消费者订阅了7个分区:
此时消费组内又增加了新的消费者C1,按照既定的逻辑,需要将原来消费者C0的部分分区分配给消费者C1消费:
C0和C1各自消费所分配的分区,彼此间并无逻辑上的干扰。紧接着又增加消费者C2:
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩,我们可以增加(减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不能让消费能力一直得到增强,如果消费者过多,出现了消费者个数大于分区个数的情况,就会有消费者分配不到任何分区。如下图所示:
以上分配策略都是基于默认的分区分配策略进行分析的,可以通过消费者客户端参数partition.assignment.strategy
来设置消费者与订阅主题之间的分区分配策略。
对于消息中间件而言,一般有两种消息投递模式:点对点模式和发布/订阅模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式以主题为内容节点,主题可以认为是消息传递的中介,使得消息订阅者和发布者保持独立,不需要进行接触即可保持消息的传递,在消息的一对多广播时采用。
- 如果消费者都属于同一消费组,那么所有的消息都会被均衡的投递给每一个消费者,即每条消息都只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息都会被所有的消费者处理,这就相当于订阅/发布应用。
可以通过消费者客户端参数group.id来配置,默认值为空字符串。消费组是逻辑上的概念,它将消费者进行归类,消费者并非逻辑上的概念,它是实际上的应用实例,它可以是一个线程,也可以是一个进程,同一个消费组内的消费者可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
采用目前流行的新消费者(java语言编写)客户端。
一个正产的消费逻辑需要以下几个步骤
- 配置消费者客户端参数及创建响应的客户端实例。
- 订阅主题。
- 拉取消息并消费。
- 提交消费位移。
- 关闭消费者实例。
一个基本的消费者案例如下:
public class Consumer {
public static final String brokerList = "192.168.0.138:9092";
public static final String topic = "topic-demo";
public static final String group = "group-id";
public static final String client = "client-id";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
return properties;
}
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
consumer.subscribe(Collections.singletonList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record->{
System.out.println("topic="+record.topic()+", partition="+record.partition()+", offset="+record.offset());
System.out.println("key="+record.key()+", value="+record.value());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
bootstrap.servers:指定kafka集群地址,可以设置一个或多个,用逗号隔开。注意这里不需要设置集群中全部的broker地址,消费者会从现有的配置中查找全部的集群成员。如果只设置一个地址,启动时为避免该地址机器宕机连接不到集群,最好设置两个或两个以上地址。
key.deserializer和value.deserializer:与生产者客户端参数相对应。消费者从kafka获取到的消息格式都是字节数组(byte[]),所以需要执行相应的反序列化操作才能还原成原有的对象格式。
client.id:客户端id,如果不设置,会自动生成一个非空字符串,内容形式为consumer-1,consumer-2这种格式。
消费者客户端参数众多,在这里罗列讲解没有意义,之后会一一详解。
订阅主题与分区
一个消费者可以订阅一个或多个主题。如上代码示例,通过consumer.subscribe方式订阅主题,对于这个方法而言,既可以以集合的方式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的几个重载的方法如下:
void subscribe(Collection<String> var1);
void subscribe(Collection<String> var1, ConsumerRebalanceListener var2);
void assign(Collection<TopicPartition> var1);
void subscribe(Pattern var1, ConsumerRebalanceListener var2);
void subscribe(Pattern var1);
如果消费者采用的是正则表达式的方式订阅,在之后的创建过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在kafka和其他系统之间进行数据赋值时,这种正则表达式的方式显得很常见。
consumer.subscribe(Pattern.compile("topic-.*"));
重载方法中有一个ConsumerRebalanceListener ,这个是用来设置相应的再均衡监听器的,之后会讲。
消费者不但可以订阅主题,还可以直接订阅主题的特定分区,通过assign方法来实现这一功能。
void assign(Collection<TopicPartition> partitions);
这个方法只接受一个参数partitions,用来指定分区集合。关于TopicPartition类,用来表示分区,这个类的内部结果如下所示:
public final class TopicPartition implements Serializable {
private final int partition;
private final String topic;
其他省略
有两个属性,partition和topic,分别代表自身的分区编号和主题名称,这个类和我们所说的主题-分区概念对应起来。在案例代码清单中,我们使用assign方法替代subscribe方法,订阅主题topic-demo的分区0。
consumer.assign(Arrays.asList(new TopicPartition("topic-demo",0)));//订阅主题topic-demo的分区0
如果,我们事先不知道主题中有多少个分区怎么办?partitionsFor方法可以用来查询指定主题的元数据信息,定义如下:
List<PartitionInfo> partitionsFor(String topic);
其中,PartitionInfo类型即为主题的分区元数据信息:
public class PartitionInfo {
private final String topic;//主题名称
private final int partition;//分区编号
private final Node leader;//leader副本所在的位置
private final Node[] replicas;//分区的AR集合
private final Node[] inSyncReplicas;//分区的ISR集合
private final Node[] offlineReplicas;//分区的OSR集合
通过partitionsFor方法的协助,我们可以通过assign方法来实现订阅主题全部分区的功能:
List<TopicPartition> partitions = new ArrayList<>();
//获取主题的全部分区
consumer.partitionsFor("topic-demo").forEach(partitionInfo -> {
System.out.println("分区:"+partitionInfo.partition());
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
});
//通过assign方法来实现订阅主题全部分区
consumer.assign(partitions);
既然有订阅,那就有取消订阅,可以使用unsubscribe方法取消订阅。
//取消订阅
consumer.unsubscribe();
集合订阅的方式、正则表达式的订阅方式和指定分区的订阅方式,分别代表了3种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN、USER_ASSIGNED,如果没有订阅那么状态为NONE。这三种状态是互斥的,在一个消费者中,只能使用其中的一种。通过sbscribe方法订阅的主题具有消费者自动再均衡的功能,在多个消费者的情况下根据分区策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign方法订阅分区时,是不具备消费者自动均衡的功能。
反序列化
在「kafka」kafka-clients,java编写生产者客户端及原理剖析我们讲过了生产者的序列化与消费者的反序列化程序demo。Kafka提供的反序列器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,这些反序列化器都实现了Deserializer接口,该接口有三个方法:
void configure(Map<String, ?> var1, boolean var2);//用来配置当前类
T deserialize(String var1, byte[] var2);//用来执行反序列化
void close();//关闭当前序列化器
我们来看一下StringDeserilizer的源码:
public class StringDeserializer implements Deserializer<String> {
private String encoding = "UTF8";
public StringDeserializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("deserializer.encoding");
}
if (encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}
}
public String deserialize(String topic, byte[] data) {
try {
return data == null ? null : new String(data, this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
}
}
public void close() {
}
}
configure方法用来定义编码格式,默认就UTF-8就好了,不用管这个。我们看一下自定义反序列化器,只要实现了Deserializer接口即可:
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public User deserialize(String s, byte[] bytes) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
int nameLength = byteBuffer.getInt();
byte name[] = new byte[nameLength];
byteBuffer.get(name,0,nameLength);
int age = byteBuffer.getInt();
return new User().setAge(age).setName(new String(name));
}
@Override
public void close() {
}
}
public class User {
private String name;
private int age = -1;
public String getName() {
return name;
}
public User setName(String name) {
this.name = name;
return this;
}
public int getAge() {
return age;
}
public User setAge(int age) {
this.age = age;return this;
}
}
总之,就是将kafka返回的字节序列转化成你的业务对象。关于序列化,我会在之后写一篇当下流行的序列化方法汇总的博文,比如Avro、JSON、Thrif、ProtoBuf或Protostuff等,欢迎关注。
这里简单举一例,用Protostuff来实现序列化与反序列化:
//依赖
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.7.2</version>
</dependency>
序列化:
public class ProtostuffUserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, User user) {
if (user == null){
return null;
}
Schema schema = RuntimeSchema.getSchema(user.getClass());
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
byte[] protostuff = null;
protostuff = ProtostuffIOUtil.toByteArray(user,schema,buffer);
buffer.clear();
return protostuff;
}
@Override
public void close() {
}
}
反序列化
public class ProtostuffUserDesirializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public User deserialize(String s, byte[] bytes) {
if (bytes == null) {
return null;
}
Schema schema = RuntimeSchema.getSchema(User.getClass());
User user = new User();
ProtostuffIOUtil.mergeFrom(bytes, user, schema);
return user;
}
@Override
public void close() {
}
}
消息消费
Kafka的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务器主动将消息推送给消费者,拉模式是消费者向服务端发送请求拉取消息。
从代码示例中可以看出,消费是一个不断轮询的过程,消费者重复调用poll方法,返回的是所订阅主题(分区)上的一组消息。
返回的消息类型为ConsumerRecord,源码如下所示:
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;//主题
private final int partition;//分区
private final long offset;//所属分区偏移量
private final long timestamp;//时间戳
//两种类型,CreateTime 和 LogAppendTime
//分别代表消息创建的时间,追加到日志的时间
private final TimestampType timestampType;
private final int serializedKeySize;//key经过序列化后的大小,如果key为空,该值为-1
private final int serializedValueSize;//value经过序列化后的大小,如果value为空,该值为-1
private final Headers headers;//消息的头部内容
private final K key;//消息的键
private final V value;//消息的值
private final Optional<Integer> leaderEpoch;
private volatile Long checksum;//CRC32的校验值
部分省略
实例代码中,我们通过遍历消息集合处理每一条消息,除此之外,我们还可以按照分区维度来进行消费,这一点很有用,在手动提交位移时尤为明显,ConsumerRecords提供了一个records(TopicPartition)方法来获取消息中指定分区的消息,此方法的定义如下:
public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
List<ConsumerRecord<K, V>> recs = (List)this.records.get(partition);
return recs == null ? Collections.emptyList() : Collections.unmodifiableList(recs);
}
修改实例代码,将所有消息按分区处理:
//按分区处理消息
for (TopicPartition tp : records.partitions()){//获取所有分区
for (ConsumerRecord<String, String> record : records.records(tp)){//获取指定分区的消息
System.out.println("partition:"+record.partition()+"----value:"+record.value());
}
}
此外,ConsumerRecords类中还提供了几个方法来方便开发人员对消息集进行处理:
- count方法,获取消息个数
- isEmpty方法,判断返回的消息是否为空
- empty方法,获取一个空的消息集
到目前为止,可以建单人位,poll方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移、消费者协调器、组协调器、消费者选举、分区分配的分发、再均衡的逻辑、心跳等内容。后续会详细介绍。
位移提交
对于Kafka的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。消费者使用offset来表示消费到分区中某个消息所在的位置。offset,顾名思义,偏移量,也可翻译为位移。在每次调用poll()方法时,它返回的是还没有消费过的消息集,要做到这一点,就需要记录上一次消费过的位移。并且这个位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知道之前的消费位移了。
当加入新的消费者的时,必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知道之前的消费位移。消费者位移存储在Kafka内部的主题_consumer_offsets中。
这种把消费位移存储起来(持久化)的动作称为“提交”,消费者再消费完消息之后需要执行消费位移的提交。
如下图,假设当前消费者已经消费了x位置的消息,那么我们就可以说消费者的消费位移为x。
不过,需要明确的是,当前消费者需要提交的消费位移并不是x,而是x+1,对应上图的position,他表示下一条需要拉取的消息的位置。在消费者中还有一个commited offset的概念,它表示已经提交过的消费位移。
KafkaConsumer类提供了position(TopicPartition)和commited(TopicPartition)两个方法来分别获取上面所说的position和commiited offset的值。
为了论证lastConsumedOffset、commited offset 和position之间的关系,我们使用上面两个方法来做相关演示。我们向主题中分区编号为0的分区发送若干消息,之后再创建一个消费者去消费其中的消息,等待消费完这些消息之后,同步提交消费位移。最后观察上面三者的值。
//定义主题topic-demo,分区编号为0
TopicPartition topicPartition = new TopicPartition("topic-demo",0);
consumer.assign(Arrays.asList(topicPartition));//订阅主题topic-demo的分区0
long lastConsumedOffset = -1;//当前消费到的位移
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//获取主题topic-demo的分区0的消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
//获取最后一条消息的偏移量,该消息是已经消费的最后一条消息
lastConsumedOffset = partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync();//同步提交消费位移
System.out.println("consumed Offset is "+lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
System.out.println("commited Offset is "+offsetAndMetadata.offset());
long position = consumer.position(topicPartition);
System.out.println("the offset of the netxt record is "+position);
打印结果
consumed Offset is 182
commited Offset is 183
the offset of the netxt record is 183
可以看出,消费者消费到此分区的最大偏移量为182,对应的消费位移lastConsumedOffset 也就是182。在消费完之后执行同步提交,但是最终结果显示所提交的位移commited offset 为183,并且下一次所要拉取的消息的起始偏移量position为183,结论
position = commited offset = lastConsumedOffset + 1
当然,position和commited offset的值不会一直相同,这一点会在下面的示例中有所体现。
对于位移提交具体时机的把握也很有讲究,有可能造成重复消费和消息丢失的现象。参考下图所示,x代表上一次提交的消费位移,说明已经完成了x-1之前的所有消息的消费。x+3表示当前正在处理的位置。如果poll拉取到消息之后就进行了位移提交,即提交了x+7,那么当前消费x+3的时候遇到了异常,在故障恢复之后, 我们重新拉取到的消息是从x+7开始的。也就是说,x+3到x+6之间的消息并未消费,如此便发生了消息丢失的现象。
再考虑另一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费x+3的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x开始的。也就是说 x到x+2之间的消息又重新消费了一遍,故而发生了重复消费的现象。
而实际情况可能更加复杂。在kafka中默认的消费位移的提交方式是自动提交,这个由消费客户端参数enable.auto.commit配置,默认为true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit为true。
在默认情况下,消费者客户端每隔5秒会将拉取到的每个分区中的最大的消息位移进行提交。自动位移提交的动作实在poll方法的逻辑里完成的,在每次真正向服务器发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
在kafka消费的编程逻辑中位移是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让代码更简洁。但随之而来的是重复消费和消费丢失的问题。假设刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费。我们可以通过减少位移提交的时间间隔来减少重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。
自动位移提交的方式在正常情况下不会发生消息丢失和重复消费的现象,但是在编程的世界里异常不可避免。自动提交无法做到精确的位移管理。Kafka提供了手动管理位移提交的操作,这样可以使开发人员对消费位移的管理控制更加灵活。很多时候并不是说poll拉取到消息就算消费完成,而是需要将消息写入到数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式让开发人员根据程序的逻辑在合适的地方进行位移提交。手动提交功能的前提是enable.auto.commit配置为false,手动提交分为同步提交和异步提交,对应于KafkaConsumer中的commitSync和commitAsync两个方法。
- commitSync
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 按分区处理消息
for (TopicPartition tp : records.partitions()){
for (ConsumerRecord<String, String> record : records.records(tp)){
System.out.println("partition:"+record.partition()+"----value:"+record.value());
}
}
consumer.commitSync();
先将拉取到的每一条消息进行处理,然后对整个消息集做同步提交。针对上面的示例还可以修改为批量处理+批量提交的方式。
final int minBatchSize = 200;
List<ConsumerRecord> buffer = new Arraylist<>();
whie(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (TopicPartition tp : records.partitions()){
for (ConsumerRecord<String, String> record : records.records(tp)){
buffer.add(record);
}
}
if(buffer.size() >= minBatchSize){
//do local processing with buffer
consumer.commitSync();
buffer.clear;
}
}
上面的示例中,将拉取到的消息存入缓存buffer,等到累积到足够多的时候,再做相应的批量处理,之后再做批量提交。
这两个示例都有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交之前,程序出现了崩溃。那么恢复之后,只能从上一次位移提交的地方拉取消息。
commitSync方法会根据poll拉取到的最新位移来进行提交,即position的位置,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,如CommitFailedException/WakeupException/InterruptException/AuthenticationException/AuthorizationException等,我们可以将其捕获并做针对性的处理。
commitSync提交位移的频率和拉取批次消息、处理批次消息的频率是一致的,如果想寻求更细粒度、更准确的提交,那么就需要commitSync另一个含参的方法,
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
参数offsets用来提交指定分区的位移。无参的commitSync方法只能提交当前批次对应的position值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
records.forEach(record->{
System.out.println("topic="+record.topic()+", partition="+record.partition()+", offset="+record.offset());
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
//每消费一条消息提交一次位移
consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)));
System.out.println("_______________________________");
});
在实际应用中,很少有这种每消费一条消息,就提交一次消费位移的场景。commitSync方法本身是同步进行的,会消耗一定的性能。更多的时候,是按照分区的粒度划分提交位移的界限,
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 按分区处理消息
for (TopicPartition tp : records.partitions()){
//获取当前分区的所有消息
List<ConsumerRecord<String,String>> partitionRecords = records.records(tp);
for (ConsumerRecord<String, String> record : partitionRecords){
System.out.println("partition:"+record.partition()+"----value:"+record.value());
}
//当前分区最后一条消息的位移
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
//按分区的粒度,进行位移提交
consumer.commitSync(Collections.singletonMap(tp,new OffsetAndMetadata(lastConsumedOffset+1)));
}
与commitSync相反,异步提交的方式commitAsync在执行的时候,消费者线程不会阻塞,可能在提交消费位移的结果返回之前就开始了新一轮的拉取操作。可以是消费者的性能增强。
void commitAsync();
void commitAsync(OffsetCommitCallback var1);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> var1, OffsetCommitCallback var2);
第一个无参的方法和第三个方法中的offsets都很好理解,对照commitSync方法即可。关键是这里第二个方法和第三个方法中的OffsetCommitCallback参数,它提供了一个异步提交的回调方法,当位移提交完成后回调OffsetCommitCallback里的onComplete方法:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 按分区处理消息
for (TopicPartition tp : records.partitions()){
//获取当前分区的所有消息
List<ConsumerRecord<String,String>> partitionRecords = records.records(tp);
for (ConsumerRecord<String, String> record : partitionRecords){
System.out.println("partition:"+record.partition()+"----value:"+record.value());
}
//当前分区最后一条消息的位移
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
//按分区的粒度,进行位移提交
consumer.commitAsync(Collections.singletonMap(tp, new OffsetAndMetadata(lastConsumedOffset + 1)), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e == null){
System.out.println(map);
}else{
System.out.println("提交失败");
}
}
});
}
控制或关闭消费
KafkaConsumer提供了对消费速度进行控制的方法,有些场景,需要我们暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。
void pause(Collection<TopicPartition> var1);
void resume(Collection<TopicPartition> var1);
还有一个无参的paused方法返回被暂停的分区集合
Set<TopicPartition> paused();
之前的示例展示的都是使用一个while循环来包裹住poll方法及相应的消费逻辑,如何优雅的退出这个循环也很有考究。还有一种方式是调用KafkaConsumer的wakeup方法,调用该方法可以退出poll的逻辑,并抛出WakeupException异常,我们不需要处理这个异常,它只是跳出循环的方式。
跳出循环以后一定要显示执行关闭动作以释放运行过程中占用的各种系统资源,包括内存资源,socket连接等等。KafkaConsumer提供了close方法实现关闭
指定位移消费
正是有了消费位移的持久化,才使消费者在关闭、崩溃或者遇到再均衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。
当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。
当消费者查找不到所记录的消费位移的时候,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为latest,表示从分区末尾开始消费。
如图,按照默认的配置,消费者会从8开始消费,更加确切的说是从8开始拉取消息。如果将auto.offset.reset设置成earliest,那么消费者会从起始处,也就是0开始消费。
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
到目前为止,我们知道消息的拉取是根据poll方法中的逻辑来处理的,这个逻辑对于普通开发人员来说是个黑盒子,无法精确的掌控其消费的起始位置。有些场景我们需要更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,seek方法正好提供了这个功能,让我们得以追前消费或回溯消费。
void seek(TopicPartition var1, long offset);
seek方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。seek方法只能重置消费者分配到的分区的消费位置,而分区的分配是在poll方法的调用过程中实现的。也就是说在执行seek方法之前需要先执行一次poll方法,等到分配到分区之后才可以重置消费位置。
consumer.poll(Duration.ofMillis(10000));
Set<TopicPartition> assignment = consumer.assignment();
for(TopicPartition tp : assignment){
consumer.seek(tp,2);
}
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
.....
}
consumer.seek(tp,2)设置每个分区消费的位置是2。
如果消费组内的消费者在启动的时候能够找到消费位移,除非发生越界,否则auto.offset.reset参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要seek方法的帮助了,如下代码所示:
Set<TopicPartition> set = new HashSet<>();
while (set.size() == 0){
consumer.poll(Duration.ofMillis(10000));
set = consumer.assignment();
}
Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(set);
for (TopicPartition tp: set){
consumer.seek(tp,topicPartitionLongMap.get(tp));
}
endOffsets方法用来获取指定分区的末尾的消息位置,与endOffsets对应的是beginningOffsets,一个分区的起始为止起初是0,但并不代表每时每刻都是0,因为日志清理的动作会清理旧的数据,所以分区的位置会自然而然的增加。
有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,KafkaConsumer提供了一个offsetForTimes方法:
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> var1, Duration timestampsToSearch);
timestampsToSearch是一个Map类型,key为待查询的分区,而value为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp中的offset和timestamp字段。
Set<TopicPartition> set = new HashSet<>();
Map<TopicPartition,Long> map = new HashMap<>();
while (set.size() == 0){
consumer.poll(Duration.ofMillis(10000));
set = consumer.assignment();
}
for (TopicPartition tp: set){
map.put(tp,System.currentTimeMillis()-1*24*3600*1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);
for (TopicPartition tp: set){
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp!=null)
consumer.seek(tp,offsetAndTimestamp.offset());
}
消费者拦截器
消费者拦截器主要在消费到消息或在提交消费位移的时候进行一些定制化的工作。
消费者拦截器需要实现ConsumerInterceptor接口,该接口有三个方法:
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
void close();
}
KafkaConsumer会在poll方法返回之前调用拦截器的onConsume方法来对消息进行相应的定制化操作,比如修改返回的内容、按照某种规则过滤消息。如果onConsume方法抛出异常,那么会被捕获并记录到日志,但是异常不会在向上传递。
KafkaConsumer会在提交完消费位移之后调用调用拦截器的onCommit方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者调用commitSync的无参方法时,我们不知道提交的具体细节,可以使用拦截器onCommit方法做到这一点。
在某些场景中,会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就被视为无效,它也不需要再被继续处理了。下面使用消费者拦截器实现一个简单的TTL功能
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
long now = System.currentTimeMillis();
//构建新的分区消息映射表
Map<TopicPartition, List<ConsumerRecord<String,String>>> newRecords = new HashMap<>();
//遍历分区
for (TopicPartition tp : consumerRecords.partitions()){
//获取分区内的消息
List<ConsumerRecord<String,String>> tpRecords = consumerRecords.records(tp);
List<ConsumerRecord<String,String>> newTpRecords = new ArrayList<>();
//遍历消息,做判断
for (ConsumerRecord<String, String> tpRecord : tpRecords) {
//拿到10秒以内的消息
if (now - tpRecord.timestamp() < EXPIRE_INTERVAL){
newTpRecords.add(tpRecord);
}
}
if (!newTpRecords.isEmpty()){
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
map.forEach((tp,offset)->{
System.out.println(tp+":"+offset.offset());
});
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
我们使用消息的timestamp字段来判定是否过期,如果消息的时间戳与当前的时间戳相差超过10秒则判定为过期,那么这条消息也就被过滤掉而不返回给消费者客户端。
自定义拦截器实现后,需要在KafkaConsumer中配置该拦截器,通过参数interceptor.classes参数实现:
properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class);
关于KafkaConsumer的多线程实现
KafkaProducer是线程安全的,然而KafkaConsumer是非线程安全的。KafkaConsumer当中定义了一个acquire方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出异常。KafkaConsumer中的每个公用方法在执行所要执行的动作之前都会调用这个方法,只有wakeup方法是个例外。
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}
KafkaConsumer非线程安全并不意味着我们在消费消息的时候只能以单线程的方式运行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的处理,造成一定的时延。除此之外,kafka中存在消息保留机制,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式实现消息消费,多线程的目的就是提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象。
一个线程对一个KafkaConsumer实例,我们可以称为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种方式实现的并发度受限于分区的实际个数,文章开头讲过,当消费者个数大于分区个数时,就会有部分消费线程一直处于空闲的状态。
第二种方式是,多个消费线程同时消费同一个分区,这个通过assign、seek等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,不过这种方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用的很少。一般而言,分区时消费线程的最小划分单位。我们通过实际编码实现第一种:
public class MultiConsumerThreadDemo {
public static final String brokerList = "192.168.3.8:9092";
public static final String topic = "topic";
public static final String group = "group-id42";
public static final String client = "client-id2";
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消费位移自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
return properties;
}
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
for (int i=0;i<consumerThreadNum;i++){
new KafkaConsumerThread(props,topic).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String,String> kafkaConsumer;
public KafkaConsumerThread(Properties props,String topic){
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
records.forEach(record->{
System.out.println("topic="+record.topic()+", partition="+record.partition()+", offset="+record.offset());
//消息处理
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
this.kafkaConsumer.close();
}
}
}
}
内部类KafkaConsumerThread代表消费线程,内部包裹着一 个独立的KafkaConsumer实例。通过main方法来启动多个消费线程,一般来讲一个主题的分区数在开发时就是确定的,可以将consumerThreadNum设置成不大于分区数的值,如果不知道主题的分区数,也可以通过之前讲的partitionsFor方法来动态获取。
这种方式的优点是每个线程可以按顺序消费各个分区中的消息。缺点是,每个消费线程都要维护一个独立的TCP链接,如果分区数和consumerThreadNum都很大,那么会造成不小的系统开销。
如果消费者对消息处理的速度很快,那么poll拉取的频次也会更高,进而整体消费性能也会提升。相反,如果客户端对消息的处理速度很慢,比如进行一个事务性操作,或者等待一个RPC的同步响应,那么poll的拉取频次也会下降,进而造成整体的性能下降。一般而言,poll拉取的速度是相当快的,而整体消费的瓶颈也正是消息处理这一块,我们可以将处理消息部分改成多线程的实现方式,如下图所示
代码如下:
public class MultiConsumerThreadDemo1 {
public static final String brokerList = "192.168.3.8:9092";
public static final String topic = "topic";
public static final String group = "group-id42";
public static final String client = "client-id2";
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消费位移自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
return properties;
}
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
for (int i=0;i<consumerThreadNum;i++){
new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String,String> kafkaConsumer;
private ExecutorService executorService;
private int threadNum;
public KafkaConsumerThread(Properties props, String topic, int processorNum){
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
this.threadNum = processorNum;
executorService = new ThreadPoolExecutor(
threadNum,
threadNum,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//消息处理
executorService.submit(new RecordsHandler(records));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
this.kafkaConsumer.close();
}
}
}
public static class RecordsHandler implements Runnable {
private final ConsumerRecords<String,String > records;
public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
//真正处理records的地方
}
}
}
RecordHandler类是用来处理消息的,而KafkaConsumerThread类对应的是一个消费线程,里面通过线程池的方式来调用RecordHandler处理一批批消息。注意KafkaConsumerThread类中ThreadPollExecutor里的最后一个参数设置的是CallerRunsPolicy,这样可以防止线程池的总体消费能力跟不上poll拉取的能力从而导致异常现象的发生。但是这种方式对消息的顺序处理能力就比较困难了。注意,代码中的参数配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
,旨在说明在具体实现的时候并没有考虑位移提交的情况。对于第一种实现方式而言,如果要做具体的位移提交,直接在KafkaConsumerThread中的run方法里实现即可。我们引入一个共享变量offsets来参与提交
每一个处理消息的RecordHandler类在处理完消息之后都将对应的消费位移保存到共享变量offsets中,KafkaConsumerThread在每一次poll方法之后都读取offsets中的内容并对其进行位移提交。注意在实现过程中需要对其进行加锁操作,防止出现并发问题。并且在写入offsets的时候需要注意位移覆盖的问题
public class MultiConsumerThreadDemo1 {
public static final String brokerList = "192.168.3.8:9092";
public static final String topic = "topic";
public static final String group = "group-id42";
public static final String client = "client-id2";
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//消费位移自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
return properties;
}
static Map<TopicPartition, OffsetAndMetadata> offsets =new HashMap<>();
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4;
for (int i=0;i<consumerThreadNum;i++){
new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String,String> kafkaConsumer;
private ExecutorService executorService;
private int threadNum;
public KafkaConsumerThread(Properties props, String topic, int processorNum){
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
this.threadNum = processorNum;
executorService = new ThreadPoolExecutor(
threadNum,
threadNum,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void run() {
try {
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//消息处理
executorService.submit(new RecordsHandler(records));
//位移提交工作
synchronized (offsets){
if(!offsets.isEmpty()){
kafkaConsumer.commitSync(offsets);
offsets.clear();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
this.kafkaConsumer.close();
}
}
}
public static class RecordsHandler implements Runnable {
private final ConsumerRecords<String,String > records;
public RecordsHandler(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
//真正处理records的地方
//处理完后进行位移操作
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = this.records.records(partition);
long lastConsumedOffset = tpRecords.get(tpRecords.size()-1).offset();
synchronized (offsets){
if (!offsets.containsKey(partition)){
offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
}else{
long position = offsets.get(partition).offset();
if (position<lastConsumedOffset+1){
offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
}
}
}
}
}
}
}