Kafka系列《二》-- 生产者Producer中的请求及源码详解

1. 背景

在前一篇系列文章Kafka系列《一》-- 生产者Producer流程及Partition详解中,相信已经对Kafka的整体流程有所了解了,在这篇文章中在进一步看看producer中的那些请求及相关源码解析;

有了前一篇的基础,再看这一片相信就觉得理所当然了。

2. Producer中的请求及其响应

2.1 API_VERSIONS请求

API_VERSIONS请求用于协商producer和broken之间的API版本,在初始请求时会使用最大的版本号发送请求

Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.9.0')

当broken不支持这个最新的版本号时,会返回错误,响应中errorCode=35对应的是UNSUPPORTED_VERSION错误,由于broken没有返回自己支持的最大版本号,因此producer会使用最低的版本号0重新发起一次API_VERSIONS请求

Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[], zkMigrationReady=false)

此时broken就会返回所有API支持的版本号;比如apiKey=18对应的是API_VERSIONS 请求,支持的最低版本号是0,最高版本号是2;再如apiKey=3对应的是METADATA请求,支持的最低版本号是0,最高版本号是8

ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=8),ApiVersion(apiKey=18, minVersion=0, maxVersion=2))

获取到API对应的版本后,后续producer发送请求时就按照broken支持的最高版本号发送消息了。

2.2 METADATA请求

METADATA请求用于更新原数据信息,其中使用API版本号就是最大的8,并且指定了需要获取kafka-k8s-test这个topic的原数据信息;这里的topic就是我们通过send方法发送的topic

Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=8, clientId=consumer-group1-1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='kafka-k8s-test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)

正常情况下,broken会返回broken list和topic信息

MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=4, host='1.1.1.1', port=9092, rack='b1'), MetadataResponseBroker(nodeId=1, host='2.2.2.2', port=9092, rack='b2'), MetadataResponseBroker(nodeId=2, host='3.3.3.3', port=9092, rack='b3'), MetadataResponseBroker(nodeId=3, host='4.4.4.4', port=9092, rack='b4')], clusterId='Yxxxxxxxxx_xxxxxxw', controllerId=1, topics=[MetadataResponseTopic(errorCode=0, name='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=3, leaderEpoch=1, replicaNodes=[3, 2, 1], isrNodes=[3, 1, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=1, leaderEpoch=1, replicaNodes=[1, 3, 2], isrNodes=[1, 3, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=2, leaderEpoch=3, replicaNodes=[2, 1, 3], isrNodes=[1, 3, 2], offlineReplicas=[])], topicAuthorizedOperations=0)], clusterAuthorizedOperations=0)

2.3 PRODUCE请求

PRODUCE请求用于发送producer的数据,最大的支持版本是7

Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=producer-1, correlationId=8, headerVersion=1) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[kafka-k8s-test-2=82]}

正常情况下broken的响应

ProduceResponseData(responses=[TopicProduceResponse(name='kafka-k8s-test', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=32, logAppendTimeMs=-1, logStartOffset=28, recordErrors=[], errorMessage=null, currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1))])], throttleTimeMs=0, nodeEndpoints=[])

3. 一些需要反复看才能懂的代码

3.1 metadata的更新

producer会缓存broken的状态在本地,对应的是一个Map集合,key是broken id,value是这个broken的状态,初始为空

private final Map<String, NodeConnectionState> nodeState;

伴随着metadata的初始化会经历的状态变化

null -> CONNECTING -> CHECKING_API_VERSIONS -> READY;

metadata的更新是由NIO线程完成的:

  • 初始为空时,会先判断到canConnect,然后开始连接broken,注册通道、OP_CONNECT事件到NIO;并将broken状态修改为CONNECTING

  • 然后再进入NIO的select方法,等待OP_CONNECT事件到来;如果连接较慢的时候,NIO超时从select方法醒来,然后继续开始下一轮的检查更新,此时会判断到isAnyNodeConnecting,存在CONNECTING状态的broken,因此直接再进入NIO的select方法,等待OP_CONNECT事件到来;

  • 监听到OP_CONNECT事件后,还需要发送API_VERSIONS请求,此时broken状态会更新为CHECKING_API_VERSIONS状态,等待API_VERSIONS请求成功返回后,才会将broken状态改为READY

  • 直达broken状态改为READY后,才会满足canSendRequest这个条件,在这个条件下才会发送METADATA请求

private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();
            // broken状态为READY
            if (canSendRequest(nodeConnectionId, now)) {
                Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
                MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
                return defaultRequestTimeoutMs;
            }
            // 存在CONNECTING 状态broken
            if (isAnyNodeConnecting()) {
                return reconnectBackoffMs;
            }
            // broken状态为空
            if (connectionStates.canConnect(nodeConnectionId, now)) {
               
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            return Long.MAX_VALUE;
        }

3.2 send消息存入缓存

send方法发送消息,本质上就是往batches数组追加批次数据;可以看到这里队列的操作都是需要同步锁的,因为send线程和NIO线程是可能同时操作队列的;而且NIO发送失败的话,对于可以重试的错误也会直接将批次重新加入到队首,等待重试

但是对于ByteBuffer的分配却是放在同步代码块外面的,因为ByteBuffer分配是可能需要等待的,因为producer可以分配的最大内存默认是32M,无法立即分配的ByteBuffer则需要wait内存释放;如果分配ByteBuffer也放在同步锁内,那NIO线程就没机会获取到队列的锁了,进而就没机会从队列中移除批次发送后释放内存;这就形成了死锁了;send线程占用了队列的锁等着释放内存,NIO线程想要释放内存但是获取不到队列的锁了。

Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
                synchronized (dq) {
                    // After taking the lock, validate that the partition hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;

                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
                    if (appendResult != null) {
                        // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                        boolean enableSwitch = allBatchesFull(dq);
                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                        return appendResult;
                    }
                }

                if (buffer == null) {
                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
                    
                    buffer = free.allocate(size, maxTimeToBlock);
                    
                    nowMs = time.milliseconds();
                }

                synchronized (dq) {
                    // After taking the lock, validate that the partition hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;

                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                    // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                    if (appendResult.newBatchCreated)
                        buffer = null;
                    // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                    boolean enableSwitch = allBatchesFull(dq);
                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                }

3.3 NIO处理写事件

producer请求broken都是通过doSend方法完成的,而请求都会先转为Send对象,这个步骤对于理解NIO处理读事件至关重要

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
       
        Send send = request.toSend(header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(new NetworkSend(clientRequest.destination(), send));
    }

在构建Send对象的时候,可以看到其中主要包括开始的4字节、请求Header(包括请求类型、版本号信息)、请求体(消息主体);而开始的4字节写入的是整个message的大小builder.writeInt(messageSize.totalSize());,因为int对应的就是4字节;接着才是写入请求头信息、请求体信息

private static Send buildSend(
        Message header,
        short headerVersion,
        Message apiMessage,
        short apiVersion
    ) {
        ObjectSerializationCache serializationCache = new ObjectSerializationCache();

        MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
        header.addSize(messageSize, serializationCache, headerVersion);
        apiMessage.addSize(messageSize, serializationCache, apiVersion);

        SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
        builder.writeInt(messageSize.totalSize());
        header.write(builder, serializationCache, headerVersion);
        apiMessage.write(builder, serializationCache, apiVersion);

        return builder.build();
    }

注意到了这个细节,再来看看NIO是怎么处理读事件的就一目了然了。

3.4 NIO处理读事件

当producer收到broken的响应后,就需要从通道读取数据,相信很多人看到这里的ByteBuffer.allocate(4)都会很迷惑,为什么是4?而不是其它的数字呢?

这就和上面的写数据刚好对应上了,因为这两处代码里使用的是魔法值,所以有点难直接联想到;这也告诫我们代码里尽量不要使用魔法值~~

public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
        this.source = source;
        this.size = ByteBuffer.allocate(4);
        this.buffer = null;
        this.maxSize = maxSize;
        this.memoryPool = memoryPool;
    }

然后开始从通道读取数据的时候,也是要先把size读满,也就是一定要先读4字节;只有读完4字节后!size.hasRemaining()才会开始分配ByteBuffer来接受实际的响应数据

因为写数据的时候最开始写入的4字节是整条请求数据的大小,按照Kafka的协议,响应数据的最开始4字节也必须是响应数据的大小。

public long readFrom(ScatteringByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                int receiveSize = size.getInt();
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; // may be 0 for some payloads (SASL)
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }
        if (buffer == null && requestedBufferSize != -1) { // we know the size we want but haven't been able to allocate it yet
            buffer = memoryPool.tryAllocate(requestedBufferSize);
            if (buffer == null)
                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
        }
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }

获取到响应数据的大小后,就可以为其分配ByteBuffer来接受实际的响应数据了;而且也不用担心TCP的粘包和拆包问题了:

  • 粘包问题:多个响应数据同时返回了,但是现在producer已经有了响应数据的实际大小了,多的数据也不会读取了,因为分配的ByteBuffer的大小就是实际响应数据的大小;多余的数据直接等待下一次处理读事件时再重头开始接收即可

  • 拆包问题:响应数据包被拆成多个了,但是现在producer已经有了响应数据的实际大小了,如果没能读到完整的ByteBuffer,那么这里的buffer.hasRemaining()就是true了,也就是本次select的complete是无法完成的,那么本次接受就会按照未完成的接收处理,等待下一次读事件再次触发时,继续往这次接收里写入数据,直到写满ByteBuffer

public NetworkReceive maybeCompleteReceive() {
        if (receive != null && receive.complete()) {
            receive.payload().rewind();
            NetworkReceive result = receive;
            receive = null;
            return result;
        }
        return null;
    }

@Override
    public boolean complete() {
        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
    }

Kafka中教科书级别的处理粘包/拆包问题,有没有给你一些触动呢?

继续阅读:

你觉得producer里还有哪些写的很好的代码或者很难理解的代码,欢迎评论区一起学习~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容