1生产者发送消息到broker 需要经过拦截器,序列化器,分区器。那么经过这些之后,是怎么到达broker的?
主线程,消息累加器(RecordAccumulator),sender线程,kafka cluster
主线程 追加消息到 消息累加器中, sender线程从 消息累加器中拉取消息 添加到 kafka cluster中
1、主线程中,主要是用户线程的发送流程,拦截器,序列化器,分区器
2、消息累加器:
2.1)主要用于缓存主线程发送的消息,由sender线程批量获取 发送到broker中。 (RecordAccumulator 的大小可以通过生产者参数设置buffer.memory 来设置,默认是32MB,如果生产者主线程发送速度大于sender线程拉取的速度,就会造成生产者空间不足。 要么 KafkaProducer.send()方法被阻塞或者抛出异常,这由max.block.ms 配置 默认60s)
2.2)producerBatch 不是 ProducerRecord,而是一个ProducerRecord的集合,一个producerBatch中有一个或者多个producerRecord。(多个较小的producerRecord拼接成一个较大的producerBatch 这样可以减少网络请求的数量,增大吞吐量)
2.2. 1每条消息进入消息累加器的时候 都会找到对应的队列,没有则新建,然后获取队尾的producerBatch,没有则新建,看producerBatch的剩余大小是否可以存放进入消息,不够则新增 一个合适大小的ProducerBatch加入队列尾部。
2.3.2.ProducerBatch的大小由batch.size和producerRecord来判断,如果producerRecord小于这个值,则创建一个 batch.size大小的空间来存储(默认16KB),如果大于这个值,则新增一个 producerRecord 大小相同的空间存储。
2.3.4 消息在网络发送是以字节bety的形式传送的,在发送之前需要创建一块内存来保存对应的消息。为了减少频繁的创建和释放,消息累加器内部实现的有bufferPool,他主要是用来复用Deque<ProducerBatch> 中 batch.size 大小的 空间。 不是batch.size 大小的空间将不受他管理。
2.3)消息累加器为每个分区都维护了一个 双端队列 Deque<ProducerBatch> 生产者主线程写入时放入队尾,sender线程拉取时从队列头部获取。
3、sender线程
3.1 sender线程从消息累加器中获取到消息之后,会再次将消息转化,将<分区,Deque<ProducerBatch>>转换为<Node,List<ProducerBatch>> Node表示为具体的broker节点,对于生产者客户端是与具体的broker链接,并不关心到底是哪个分区。 消息转换为<Node,List<ProducerBatch>>后会再次转换为<Node,request>(如上图第6步) , 发送给selector 发送到 kafka cluster 上
3.2 在消息转换为<Node,request>后,在传给selector的同时还会传送给InFlightRequest模块中,结构为Map<NodeId,Deque<request>> ,字符串NodeId 为节点id,request为请求,整个InFlightRequest 中 存储的都是请求了 但是未响应的消息(未第10步)。InFlightRequest 中还提供了许多管理类的方法,并且通过配置参数还可以限制每个链接 (客户端与node之间的连接)最多缓存的为获取响应的请求数量(默认为5 配置max.in.flight.requests.pre.connection可以修改),如果真的有连接超过5个未响应的请求,那么就不能再通过该连接发送更多的请求了,因为这个连接响应较慢,再继续发送的话会增大请求超时的可能。