1. 背景
在前一篇系列文章Kafka系列《一》-- 生产者Producer流程及Partition详解中,相信已经对Kafka的整体流程有所了解了,在这篇文章中在进一步看看producer中的那些请求及相关源码解析;
有了前一篇的基础,再看这一片相信就觉得理所当然了。
2. Producer中的请求及其响应
2.1 API_VERSIONS请求
API_VERSIONS请求用于协商producer和broken之间的API版本,在初始请求时会使用最大的版本号发送请求
Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.9.0')
当broken不支持这个最新的版本号时,会返回错误,响应中errorCode=35
对应的是UNSUPPORTED_VERSION错误,由于broken没有返回自己支持的最大版本号,因此producer会使用最低的版本号0重新发起一次API_VERSIONS请求
Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[], zkMigrationReady=false)
此时broken就会返回所有API支持的版本号;比如apiKey=18对应的是API_VERSIONS 请求,支持的最低版本号是0,最高版本号是2;再如apiKey=3对应的是METADATA请求,支持的最低版本号是0,最高版本号是8
ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=8),ApiVersion(apiKey=18, minVersion=0, maxVersion=2))
获取到API对应的版本后,后续producer发送请求时就按照broken支持的最高版本号发送消息了。
2.2 METADATA请求
METADATA请求用于更新原数据信息,其中使用API版本号就是最大的8,并且指定了需要获取kafka-k8s-test
这个topic的原数据信息;这里的topic就是我们通过send方法发送的topic
Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=8, clientId=consumer-group1-1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='kafka-k8s-test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
正常情况下,broken会返回broken list和topic信息
MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=4, host='1.1.1.1', port=9092, rack='b1'), MetadataResponseBroker(nodeId=1, host='2.2.2.2', port=9092, rack='b2'), MetadataResponseBroker(nodeId=2, host='3.3.3.3', port=9092, rack='b3'), MetadataResponseBroker(nodeId=3, host='4.4.4.4', port=9092, rack='b4')], clusterId='Yxxxxxxxxx_xxxxxxw', controllerId=1, topics=[MetadataResponseTopic(errorCode=0, name='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=3, leaderEpoch=1, replicaNodes=[3, 2, 1], isrNodes=[3, 1, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=1, leaderEpoch=1, replicaNodes=[1, 3, 2], isrNodes=[1, 3, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=2, leaderEpoch=3, replicaNodes=[2, 1, 3], isrNodes=[1, 3, 2], offlineReplicas=[])], topicAuthorizedOperations=0)], clusterAuthorizedOperations=0)
2.3 PRODUCE请求
PRODUCE请求用于发送producer的数据,最大的支持版本是7
Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=producer-1, correlationId=8, headerVersion=1) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[kafka-k8s-test-2=82]}
正常情况下broken的响应
ProduceResponseData(responses=[TopicProduceResponse(name='kafka-k8s-test', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=32, logAppendTimeMs=-1, logStartOffset=28, recordErrors=[], errorMessage=null, currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1))])], throttleTimeMs=0, nodeEndpoints=[])
3. 一些需要反复看才能懂的代码
3.1 metadata的更新
producer会缓存broken的状态在本地,对应的是一个Map集合,key是broken id,value是这个broken的状态,初始为空
private final Map<String, NodeConnectionState> nodeState;
伴随着metadata的初始化会经历的状态变化
null -> CONNECTING -> CHECKING_API_VERSIONS -> READY;
metadata的更新是由NIO线程完成的:
初始为空时,会先判断到
canConnect
,然后开始连接broken,注册通道、OP_CONNECT
事件到NIO;并将broken状态修改为CONNECTING
然后再进入NIO的select方法,等待
OP_CONNECT
事件到来;如果连接较慢的时候,NIO超时从select方法醒来,然后继续开始下一轮的检查更新,此时会判断到isAnyNodeConnecting
,存在CONNECTING
状态的broken,因此直接再进入NIO的select方法,等待OP_CONNECT
事件到来;监听到
OP_CONNECT
事件后,还需要发送API_VERSIONS
请求,此时broken状态会更新为CHECKING_API_VERSIONS
状态,等待API_VERSIONS
请求成功返回后,才会将broken状态改为READY
直达broken状态改为
READY
后,才会满足canSendRequest
这个条件,在这个条件下才会发送METADATA
请求
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
// broken状态为READY
if (canSendRequest(nodeConnectionId, now)) {
Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
return defaultRequestTimeoutMs;
}
// 存在CONNECTING 状态broken
if (isAnyNodeConnecting()) {
return reconnectBackoffMs;
}
// broken状态为空
if (connectionStates.canConnect(nodeConnectionId, now)) {
initiateConnect(node, now);
return reconnectBackoffMs;
}
return Long.MAX_VALUE;
}
3.2 send消息存入缓存
send方法发送消息,本质上就是往batches
数组追加批次数据;可以看到这里队列的操作都是需要同步锁的,因为send线程和NIO线程是可能同时操作队列的;而且NIO发送失败的话,对于可以重试的错误也会直接将批次重新加入到队首,等待重试
但是对于ByteBuffer的分配却是放在同步代码块外面的,因为ByteBuffer分配是可能需要等待的,因为producer可以分配的最大内存默认是32M,无法立即分配的ByteBuffer则需要wait内存释放;如果分配ByteBuffer也放在同步锁内,那NIO线程就没机会获取到队列的锁了,进而就没机会从队列中移除批次发送后释放内存;这就形成了死锁了;send线程占用了队列的锁等着释放内存,NIO线程想要释放内存但是获取不到队列的锁了。
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();
}
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
3.3 NIO处理写事件
producer请求broken都是通过doSend
方法完成的,而请求都会先转为Send
对象,这个步骤对于理解NIO处理读事件至关重要
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
Send send = request.toSend(header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(new NetworkSend(clientRequest.destination(), send));
}
在构建Send
对象的时候,可以看到其中主要包括开始的4字节、请求Header(包括请求类型、版本号信息)、请求体(消息主体);而开始的4字节写入的是整个message的大小builder.writeInt(messageSize.totalSize());
,因为int对应的就是4字节;接着才是写入请求头信息、请求体信息
private static Send buildSend(
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
header.addSize(messageSize, serializationCache, headerVersion);
apiMessage.addSize(messageSize, serializationCache, apiVersion);
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
return builder.build();
}
注意到了这个细节,再来看看NIO是怎么处理读事件的就一目了然了。
3.4 NIO处理读事件
当producer收到broken的响应后,就需要从通道读取数据,相信很多人看到这里的ByteBuffer.allocate(4)
都会很迷惑,为什么是4?而不是其它的数字呢?
这就和上面的写数据刚好对应上了,因为这两处代码里使用的是魔法值,所以有点难直接联想到;这也告诫我们代码里尽量不要使用魔法值~~
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
然后开始从通道读取数据的时候,也是要先把size读满,也就是一定要先读4字节;只有读完4字节后!size.hasRemaining()
才会开始分配ByteBuffer
来接受实际的响应数据
因为写数据的时候最开始写入的4字节是整条请求数据的大小,按照Kafka的协议,响应数据的最开始4字节也必须是响应数据的大小。
public long readFrom(ScatteringByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize; // may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
if (buffer == null && requestedBufferSize != -1) { // we know the size we want but haven't been able to allocate it yet
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
获取到响应数据的大小后,就可以为其分配ByteBuffer
来接受实际的响应数据了;而且也不用担心TCP的粘包和拆包问题了:
粘包问题:多个响应数据同时返回了,但是现在producer已经有了响应数据的实际大小了,多的数据也不会读取了,因为分配的
ByteBuffer
的大小就是实际响应数据的大小;多余的数据直接等待下一次处理读事件时再重头开始接收即可拆包问题:响应数据包被拆成多个了,但是现在producer已经有了响应数据的实际大小了,如果没能读到完整的
ByteBuffer
,那么这里的buffer.hasRemaining()
就是true了,也就是本次select的complete是无法完成的,那么本次接受就会按照未完成的接收处理,等待下一次读事件再次触发时,继续往这次接收里写入数据,直到写满ByteBuffer
public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
receive.payload().rewind();
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}
@Override
public boolean complete() {
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
Kafka中教科书级别的处理粘包/拆包问题,有没有给你一些触动呢?
继续阅读:
你觉得producer里还有哪些写的很好的代码或者很难理解的代码,欢迎评论区一起学习~