概述
kafka最初提供了两种客户端API,一种high-level,一种low-level,但在使用的过程中,逐渐发现了这些API的限制,使用high-level API可以支持消费组合故障处理,但是不支持更多更复杂的场景需求,low-level的api可以支持自定义的控制,但是需要应用程序自己管理故障和错误处理。对此,kafka的开发者重新设计了客户端API,实现之前旧的客户端很难实现的场景,新的消费者API(KafkaConsumer)是基于新的消费者协调协议(group coordination protocol),新的消费者完全使用java编写,不再依赖scala运行时环境和zookeeper,并且结合了旧的API中的simple和high level消费者客户端两种功能使用户可以构建自定义的消费策略并且不需要自己管理故障和错误处理。
细节描述
上图为一个消费者组消费topic,旧的消费者依赖于zookeeper管理消费者,新的消费者组使用了消费者协调协议,对于每个消费者,会选择一个brokers作为消费者的协调者(group coordinator),协调者负责管理消费者组的状态,负责协调partition的分配,当有新成员加入,旧成员退出,或者topic的metadata发生变化(比如partitions)重新分配partition叫做消费组的平衡(group balance)。
当消费组第一次初始化时,消费者通常会读取每个partition的最早或最近的offset,然后顺序地读取每个partition log的消息,同时,在消费者读取过程中,它会提交已经成功处理的消息offset:
上图中消费者的位置在6,最近提交的offset在位置1,当一个partition被分配给消费组中其他消费者时,初始位置会设置为原消费者最近提交的offset,比如上图中消费者突然崩溃,接管该parttion的组中其他成员会从offset=1的位置开始消费,而不是offset=6。上图中还有两个log中重要的位置信息, Log End Offset, 是写入log中最后一条消息的offset+1, High Watermark是成功拷贝到log的所有副本节点最近消息的offset,实际上是partition的所有ISR节点。从消费者角度来看,只能读取到high watermark的位置,这样做是为了防止消费者读取还没有完全复制的数据(如果消费者读取了未完全复制的数据,但是这部分数据丢失了,导致读取了不该读的消息)。
我们来看一个例子:
public class KafkaConsumerTest implements Runnable {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private String topic;
private static final String GROUPID = "groupA2";
public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
//kafka消费的的地址
props.put("bootstrap.servers", "server-1:9092,server-2:9092,server-3:9092");
//组名 不同组名可以重复消费
props.put("group.id", GROUPID);
//是否自动提交
props.put("enable.auto.commit", "true");
//从poll(拉)的回话处理时长
props.put("auto.commit.interval.ms", "1000");
//超时时间
props.put("session.timeout.ms", "30000");
//一次最大拉取的条数
props.put("max.poll.records", 1000);
// earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("auto.offset.reset", "earliest");
//序列化
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
//订阅主题列表topic
this.consumer.subscribe(Arrays.asList(topic));
}
消费者订阅主题之后,这个消费者会和消费组中的其他成员共同协调,来得到分配给他的partition,这都是在消费者开始消费前自动完成的,也可以手动调用assign API,手动地分配partitions,消费者需要并行地抓取数据,因为多个topics的多个partitions是分布在多个brokers上的,一旦topics注册时消费者实例上,所有将来的协调、平衡和数据获取都是通过在一个事件循环中调用一个poll方法来驱动的,在一个线程中完成所有的IO请求,消费者订阅topic之后,需要启动一个事件循环来得到partition的分配,并且开始抓取数据,这些都是消费者自己处理,应用程序只需要在循环中调用poll API 就可以。
public void run() {
int messageNo = 1;
System.out.println("---------开始消费---------");
try {
for (; ; ) {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
if (messageNo % 100 == 0) {
System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
}
//当消费了1000条就退出
if (messageNo % 1000 == 0) {
break;
}
messageNo++;
}
} else {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
** 每次poll调用都会返回分配给属于这个消费者的partitions的消息集,每次poll会产生新的offset,下次抓取以新的offset为基础,当第一次创建消费组时,position的值会根据重置策略为每个partition设置为最早或最近的offset,传递给poll方法的参数控制了消费者在当前的位置因为等待消息的到来而阻塞的最长时间**。消费者被设计为在自己的线程中运行,在没有外部同步的情况下,使用多线程是不安全的。
作为消费组的一部分,每个消费者会被分配它订阅的topics的一部分partitions,就像在这些partitions上加了一个组锁,只要锁被持有,组中的其他成员就不会读取他们了(每个partition都对应唯一的消费者, partition锁只属于唯一的消费者),如果消费者失败了,就需要释放掉那个锁,将那些partitions分配给其他健康的成员,kafka消费组协调协议使用心跳机制解决了这个问题,在每次rebalance,所有当前generation的成员都会定时地发送心跳给group协调者,只要协调者持续收到心跳,就认为这个成员是健康的,每次收到心跳,协调者就重置计时器,如果时间超过了,没有收到消费者的心跳,协调者标记消费者为死亡状态,并触发组中其他的消费者重新加入,来重新分配partitions.,计时器的时间间隔就是session timeout,即客户端应用程序中配置的session.timeout.ms
session timeout确保应用程序崩溃或者partition将消费者和协调者进行了隔离的情况下锁会被释放.
注意: 应用程序的失败(进程还存在)与进程挂了有点不同,因为消费者仍然会发送心跳给协调者,并不代表应用程序是健康的. 消费者的轮询循环被设计为解决这个问题. 所有的网络IO操作在调用poll或者其他的阻塞API,都是在前台完成的.
消费者并不使用任何的后台线程. 这就意味着消费者的心跳只有在调用poll的时候才会发送给协调者. 如果应用程序停止polling(不管是处理代码抛出异常或者下游系统崩溃了),就不会再发送心跳了,最终就会导致session超时(没有收到心跳,计时器开始增加), 然后消费组就会开始平衡操作. 唯一存在的问题是如果消费者处理消息花费的时间比session timeout还要长,就会触发一个假的rebalance,可以通过设置更长的session timeout防止发生这样的情况.默认的超时时间是30秒,设置为几分钟也不是不行的.更长的session timeout的缺点是,协调者会花费较长时间才能检测到真正崩溃的消费者.
当消费组第一次创建时,初始offset会根据配置项auto.offset.reset策略设置. 一旦消费者开始处理消息,它会根据应用
程序的需要正常地提交offset(可以是设置自动提交offset,或者手动提交.可以将offset存储在kafka或者外部存储中).在之后的每一次rebalance,position都会被设置为在当前组中为这个partition最近提交的offset,如果消费者已经成功处理了一批消息,但是为这批消息提交offsets之前崩溃了,其他消费者会接着最近提交的offset处重复工作.
更加频繁地提交offsets,在发生崩溃的情况下重复消费消息的情况就越少发生(处理完消息后及时地提交offset是明智之举).
我们假设开启了自动提交offset的策略.当设置enable.auto.commit=true(这也是默认值),消费者会根据配置项auto.commit.interval.ms的值定时地触发自动提交offset的行为.通过减少提交时间间隔,你可以限制在发生崩溃事件时,消费者需要重新处理的消息数量(越经常提交,越不容易重复).
如果要使用消费者的commit API,首先需要关闭自动提交的配置项:
props.put("enable.auto.commit", "false");
commit API很容易使用,但是怎么和poll循环结合起来才是关键. 下面的示例中包含了完整的循环逻辑,以及提交细节.
手动方式处理commits最简单的方式是使用同步方式的提交API,下面的示例读取消息,处理消息,然后提交offsets.
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
使用不带参数的commitSync方法会在最近一次调用poll的返回值中提交offsets.
这个方法是阻塞的(同步嘛),直到提交成功或者出现不可恢复的错误而失败.
大部分情况下你需要关心的错误是消息处理的时间超过session timeout.
这种情况发生时,协调者会将消费者从消费组中剔除出去,结果会抛出CommitFailedException.
应用程序应该处理这种错误,比如尝试从上次成功提交的offset开始回滚任何因为消息消费引起的改变.
消费者同时还暴露了一个异步的API: commitAsync.
使用异步方式提交通常来说会获得更高的吞吐量,因为你的应用程序可以在提交返回之前开始处理下一批的消息.
不过它的代价是你只能在之后的某个时刻才能发现有些commit可能是失败的(异步+回调是一种很好的结合).
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
// application specific failure handling
}
}
});
}
} finally {
consumer.close();
}
我们提供了一个回调函数给commitAsync,它会在消费者完成提交动作之后被调用(不管是提交成功还是失败都会调用).
当然如果你不关心提交的结果,你可以使用没有参数的commitAsync.
手动分配
我们提到新的消费者针对不需要消费组的场景实现了低级API,旧的消费者使用SimpleConsumer可以实现,
但是它需要自己做很多的工作来处理错误处理. 现在使用新的消费者,你只需要分配你要读取的partitions,然后开始polling数据.
下面的示例展示了如何分配一个topic的所有partitions(当然也可以静态分配一部分partitions给消费者).
在旧的消费者中,高级API使用消费组提供的语义, 而低级API使用SimpleConsumer. 而新的消费者仍然统一使用poll方式.
List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partition : consumer.partitionsFor(topic))
partitions.add(new TopicPartition(topic, partition.partition()));
consumer.assign(partitions);
和subscribe类似,调用assign的参数必须传递你要读取的所有partitions(订阅是指定你要读取的所有topics). 一旦partitions被分配了(subscribe是让消费组动态分配partitions),poll循环和之前的方式是一模一样的.有一点要注意的是,所有offset提交请求都会经过group coordinator,不管是SimpleConsumer还是Consumer Group.
所以如果你要提交offset,你还是必须要指定正确的group.id,防止和其他的消费者实例的group id发生冲突. 如果一个simple consumer尝试提交offset,它的group id和一个活动的consumer group相同,协调者会拒绝这个提交. 但是如果另外一个simple consumer实例和当前同样是simple consumer的实例有相同的group id,则是不会有问题的.
注意: 消费组有group id,而simple consumer也会指定group id,但是simple consumer的group id不是指消费组.
消费组和simple consumer是消费者消费消息的两种不同的实现,一个是high-level,一个是low-level.