程序中调用kafka生产者发送消息,并不是每调用一次send方法,就直接将消息通过底层网络发送给broker了,而是会将多个消息形成一个批次,然后再以批次的形式,发送给broker,当然了,消息的发送也不是由生产者线程发送的。那么,kafka的消息如何形成一个批次,以及批次的形式,这个就是消息累加器的作用。
下面从源码的角度来看下消息累加器是如何处理消息的,并且还会和分区器一起搭配使用,下面这个方法是doSend方法的实现逻辑,这里只截取和累加器相关的代码部分
//前面代码省略
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
在对消息的key、value进行序列化后,并且根据分区器选择好分区之后,会调用累加器的append方法,因此,重点关注下append方法的实现逻辑
/**
@param abortOnNewBatch,这个参数的作用是,是否放弃使用新的批次,每个分区都会对应一个双向队列,
每个队列的元素是一个批次,当有新消息时,会取出队列的最后一个元素,并将消息累加到该批次中,假如批次的容量达到上限了,那么新消息默认需要生成一个新的批次,
再重新添加到双向队列中,如果参数为true,表示在这种情况下,放弃使用新的批次
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch) throws InterruptedException {
//每调用一次append方法,都会被记录下来
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 取出分区对应的双向队列,若没有,则生成一个新的队列,并放入到map中
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//试图将消息添加到队列的最后一个批次元素中,若添加成功,那么方法直接返回
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
//当添加失败时,若参数指定为true,那么方法会直接返回,不会创建新的批次。
//外层方法第一次调用append方法时传的参数为true,
//主要是因为,kafka的发送者线程(区别于生产者线程)以一个批次为发送基本单位,因此为了让消息尽量多的累加到一个批次,
//当第一次无法往分区队列的最后一个批次累加时,优先选择另一个分区的队列批次。
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
//计算此次消息需要的内存大小
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// 再次检查生产者线程是否关闭了
if (closed)
throw new KafkaException("Producer closed while send in progress");
//这边为何又要重新尝试append,因此当有多个线程并发的往同一分区累加消息,
//可能另一个线程已经生成好一个新的批次对象,并加入到双向队列中了,
//因而这边需要再次尝试append数据,而不是直接生成新的批次对象
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//若尝试append失败之后,这里才开始真正的构建新的批次对象,并加入到双向队列之中
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, time.milliseconds()));
dq.addLast(batch);
//每个批次还未添加到一个未完成的集合中,因此这些批次还未发送和得到broker的确认
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
append方法的返回对象RecordAppendResult包含以下几个
public final FutureRecordMetadata future;//消息记录的元数据信息
public final boolean batchIsFull; //批次是否满了或者队列是否不为空
public final boolean newBatchCreated;//是否新创建的批次
public final boolean abortForNewBatch;//放弃使用新的批次,表示消息往分区append失败,需要重新append
其中abortForNewBatch决定doSend方法中是否再次调用append方法
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
}
上述方法体中,会调用分区器的onNewBatch方法,设置一个新的分区对应的粘性分区,然后往新的分区append数据,这里为何要使用新的分区,原因在上述append方法实现中解释过了。
当批次是满的或者是新创建时,doSend方法会唤醒发送者线程。这里有个地方需要注意的是,kafka生产者线程和发送者线程是分开的,生产者线程负责往底层的队列中添加消息的批次对象,而发送者线程不断从队列中取出消息批次来发送给broker,实现了消息的构造和发送解耦。