前言
对于中间件的使用都必须按照他们自己的规范和流程来使用, KafkaConsumer也对消息的负载消费定义了流程,新版本的流程跟老版本的流程有一些改变,这里只对新版本的流程来介绍,同时也会介绍这样去设计组件与流程的好处
KafkaConsumer 对于多线程访问是不安全的,通过使用
acquire()
跟release()
方法来操作AtomicLong currentThread
字段(保存当前访问线程ID), 有多个线程同时访问抛出ConcurrentModificationException
, 来防止对个线程同时访问。
核心组件
- ConsumerCoordinator: 消费者的协调者, 管理消费者的协调过程
- 维持coordinator节点信息(也就是对consumer进行assignment的节点)
- 维持当前consumerGroup的信息, 当前consumer已进入consumerGroup
- Fetcher: 数据请求类
- ConsumerNetworkClient: 消费者的网络客户端,负责网络传输的流程
- SubscriptionState: 订阅状态类
- Metadata: 集群的元数据管理类,使用租约机制
工作流程
kafka是以拉模式去消费数据,可由用户自由控制消费速度,对用户的消费位置可以选择自动异步commit,或者由用户主动同步commit, 实例代码如下:
KafkaConsumer consumer = ...
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(long timeout);
// Handle new records 用户处理消息
// consumer.commitSync 可由用户自主提交消费位置
}
时序图: