NetworkClient是kafka的网络层,也就是真正发生网络I/O的地方,是一个通用的网络客户端实现,不只用于生产者消息的发送,也用于消费者消费消息以及服务端Broker之间的通信。
NetworkClient发送之前都会事先判断节点是否ready,判断的条件就是如果元数据没有正在更新,并且当前节点的连接状态是CONNECTED,且KafkaChannel也是ready的,并且队列InFlightRequests中可以发送更多的请求(canSendMore),下面介绍一下队列InFlightRequests。
NetworkClient中有一个重要的队列InFlightRequests,它主要作用是缓存了已经发出去但没有收到响应的ClientRequest,底层是通过一个Map<String, Deque<ClientRequest>>对象实现的,如图1所示。
下面我们通过图2来展示一下,请求是如何加入队列,以及请求完成时如何从队列中删除的。为了方便理解,我们假设有4个请求1,2,3,4,其中请求1,2,3都需要响应,请求4不需要响应,所有的请求都发送到一个节点上(node1)。
从图2可以看到,请求并不是可以随便加入到队列中的,只有当上一个请求全部发送完成(queue.peekFirst().request().completed()=true)之后才能发送第2个请求。
然后我们再来看一下一个请求发送的过程,请求被加入到队列之后,会被发送到Kafka的Selector(选择器)上,然后设置到KafkaChannel上,一个KafkaChannel只允许同时运行一个RequestSend(真正的请求),如果在setSend的时候,发现已经有RequestSend在KafkaChannel上了,直接抛出异常。我们从图3来模拟一下单个请求加入队列后的工作流程。
总结:NetworkClient就是封装了JAVA的NIO组件,并且通过一个双端队列来管理请求的一个通用的网络层。