Kafka2.0消费者客户端源码分析

1 KafkaConsumer 构造器

  1. 初始化参数配置。
  2. 初始化消费者网络客户端ConsumerNetworkClient
  3. 初始化消费者协调器ConsumerCoordinator
  4. 初始化拉取器Fetcher

2 订阅主题

  1. 调用订阅方法subscribe()、assign()会将订阅信息记录到SubscriptionState,多次订阅会覆盖旧数据。
  2. 如果元数据缓存Metadata不包含订阅的主题,则设置needUpdate=true,标识需要更新元数据。

3 拉取消息

  1. poll()方法指定超时时间timeoutMs,在这个时间范围内不断轮询。
  2. 更新分配给消费者的数据,包括消费者协调器偏移量心跳等。
  3. 根据超时时间拉取消息。
  4. 如果拉取的消息不为空,立即出发下一轮的拉取,可以避免因处理消息响应,而阻塞等待。
  5. 拉取的消息会先反序列化,再调用消费者拦截器,最后返回给消费者。
  6. 拉取超时后,返回空记录。
private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
    long elapsedTime = 0L;
    do {
        final long metadataEnd;
        // 更新分配元数据,协调器、心跳、消费位置
        if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
            return ConsumerRecords.empty();
        }
        // 拉取消息
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));

        if (!records.isEmpty()) {
            // 消息不为空时,立即发起下一轮的拉取消息,避免阻塞等待响应处理。
            // 注意,在消息返回之前,不能触发唤醒或其他错误。
            if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                client.pollNoWakeup();
            }
            // 回调执行消费者拦截器后返回给消费者
            return this.interceptors.onConsume(new ConsumerRecords<>(records));
        }
        final long fetchEnd = time.milliseconds();
        elapsedTime += fetchEnd - metadataEnd;
    } while (elapsedTime < timeoutMs); // 轮询拉取,知道超过输入的超时时间

    return ConsumerRecords.empty();
}

3.1 拉取消息的详细流程

  1. 如果分区记录缓存PartitionRecords存在缓存记录,则优先会从分区记录缓存队列completedFetches中拉取一部分记录,直接返回。
  2. 否则,向服务端发送拉取请求,消费者并不会立即发送请求,而是先构造 Node 和请求的缓存LinkedHashMap
  3. 遍历上述缓存,构造成可以直接发送的请求,并缓存到ConcurrentHashMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent中,同时添加处理响应的监听器。
  4. 遍历unsent,使用NetworkClient发送请求,这里和生产者使用相同的方法,处理流程相似。发送完后即清空unsent
  5. 当拉取到消息,会回调第3步中的监听器,将消息缓存到队列ConcurrentLinkedQueue<CompletedFetch> completedFetches
  6. 类似第1步,从分区记录缓存队列completedFetches中拉取一部分记录返回给消费者。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
    final long startMs = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // 从缓存队列拉取
    if (!records.isEmpty()) { // 缓存中有数据则直接返回
        return records;
    }

    // 1.将拉取请求构造成节点和请求的映射关系,并缓存在 unsent
    // 2.添加响应处理监听器,处理发送拉取请求后,从服务端返回的消息,并缓存在队列中
    fetcher.sendFetches();

    // 用 NetworkClient 向服务端发送拉取请求
    client.poll(pollTimeout, startMs, () -> return !fetcher.hasCompletedFetches());

    return fetcher.fetchedRecords(); // 再次从缓存队列拉取
}
// 从缓存拉取队列拉取消息
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
    int recordsRemaining = maxPollRecords;

    while (recordsRemaining > 0) { // 在超时时间内不断轮询
        if (nextInLineRecords == null || nextInLineRecords.isFetched) { // 分区记录为空,或者已拉取
            CompletedFetch completedFetch = completedFetches.peek(); // 从缓存队列拉取消息
            nextInLineRecords = parseCompletedFetch(completedFetch); // 将消息解析成分区消息记录 PartitionRecords
            completedFetches.poll(); // 对缓存队列移除
        } else {
            List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); // 从分区记录拉取消息
            TopicPartition partition = nextInLineRecords.partition;
            if (!records.isEmpty()) { // 拉取到消息,方法 Map,以返回给消费者
                fetched.put(partition, records);
        }
    }
    return fetched;
}

4 整体流程

整体流程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容