上一篇文章讲的是在主线程,消息在调用了send后,消息内容和该消息关联的future对象被一起放入了RecordAccumulator中,future对象最终被send方法返回。对于客户端来说,send方法返回了,但是send方法返回并不代表消息已经被成功发送到Broker了,如果接下去的任意行为都是需要确保消息成功发送的情况下进行,客户端需要调用future.get()
等待future的完成。
这一节继续接下去的工作。消息被主线程放入RecordAccumulator后,主线程早就撒手不管了,这时一个叫做Sender线程会从RecordAccumulator把消息拉出来,并且发送给Broker。Sender线程早在构造KafkaProducer的时候,已经被创建和启动。
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient) {
try {
// ...
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
// ...
} catch (Throwable t) {
// ...
}
}
Sender线程是一个事件循环,总是在while循环中做一些事情,接下来主要分析这两个事情
// org.apache.kafka.clients.producer.internals.Sender#run(long)
void run(long now) {
// ..
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
在sendProducerData
方法中,简化下它的主要逻辑
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// code
// remove any nodes we aren't ready to send to
// code
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
// code
// code
sendProduceRequests(batches, now);
return pollTimeout;
}
首先调用this.accumulator.ready(cluster, now)
找到哪一些Broker是已经准备好的。然后再调用this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now)
将那些已经准备的Broker上的Batch进行重新整理后,全部从RecordAccumulator的Deque中取出来,发送出去。