Kafka-多线程消费处理

Kafka-多线程代码赏析

在2020年8月 13号 IGOR BUZATOVIĆ 这个人在

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/写下了这篇博客。

以下内容,纯属学习。

源代码路径

https://github.com/inovatrend/mtc-demo

MultithreadedKafkaConsumer

package com.inovatrend.mtcdemo;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;


public class MultithreadedKafkaConsumer implements Runnable, ConsumerRebalanceListener {
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executor = Executors.newFixedThreadPool(8);
    private final Map<TopicPartition, Task> activeTasks = new HashMap<>();
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private long lastCommitTime = System.currentTimeMillis();
    private final Logger log = LoggerFactory.getLogger(MultithreadedKafkaConsumer.class);

    public MultithreadedKafkaConsumer(String topic) {
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "multithreaded-consumer-demo");
        consumer = new KafkaConsumer<>(config);
        new Thread(this).start();
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Collections.singleton("topic-name"), this);
            while (!stopped.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
                handleFetchedRecords(records);
                checkActiveTasks();
                commitOffsets();
            }
        } catch (WakeupException we) {
            if (!stopped.get())
                throw we;
        } finally {
            consumer.close();
        }
    }

    private void handleFetchedRecords(ConsumerRecords<String, String> records) {
        if (records.count() > 0) {
            List<TopicPartition> partitionsToPause = new ArrayList<>();
             records.partitions().forEach(partition -> {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 Task task = new Task(partitionRecords);
                 partitionsToPause.add(partition);
                 executor.submit(task);
                 activeTasks.put(partition, task);
             });
            consumer.pause(partitionsToPause);
        }
    }



    private void commitOffsets() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - lastCommitTime > 5000) {
                if(!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit);
                    offsetsToCommit.clear();
                }
                lastCommitTime = currentTimeMillis;
            }
        } catch (Exception e) {
            log.error("Failed to commit offsets!", e);
        }
    }





    private void checkActiveTasks() {
        List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
        activeTasks.forEach((partition, task) -> {
            if (task.isFinished())
                finishedTasksPartitions.add(partition);
            long offset = task.getCurrentOffset();
            if (offset > 0)
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
        });
        finishedTasksPartitions.forEach(partition -> activeTasks.remove(partition));
        consumer.resume(finishedTasksPartitions);
    }





    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 1. Stop all tasks handling records from revoked partitions
        Map<TopicPartition, Task> stoppedTask = new HashMap<>();
        for (TopicPartition partition : partitions) {
            Task task = activeTasks.remove(partition);
            if (task != null) {
                task.stop();
                stoppedTask.put(partition, task);
            }
        }

        // 2. Wait for stopped tasks to complete processing of current record
        stoppedTask.forEach((partition, task) -> {
            long offset = task.waitForCompletion();
            if (offset > 0)
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
        });


        // 3. collect offsets for revoked partitions
        Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();
        partitions.forEach( partition -> {
            OffsetAndMetadata offset = offsetsToCommit.remove(partition);
            if (offset != null)
                revokedPartitionOffsets.put(partition, offset);
        });

        // 4. commit offsets for revoked partitions
        try {
            consumer.commitSync(revokedPartitionOffsets);
        } catch (Exception e) {
            log.warn("Failed to commit offsets for revoked partitions!");
        }
    }



    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.resume(partitions);
    }





    public void stopConsuming() {
        stopped.set(true);
        consumer.wakeup();
    }

}



Task

下面赏析线程代码

package com.inovatrend.mtcdemo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;


public class Task implements Runnable {
    private final List<ConsumerRecord<String, String>> records;
    private volatile boolean stopped = false;
    private volatile boolean started = false;
    private volatile boolean finished = false;
    private final CompletableFuture<Long> completion = new CompletableFuture<>();
    private final ReentrantLock startStopLock = new ReentrantLock();
    private final AtomicLong currentOffset = new AtomicLong();
    private Logger log = LoggerFactory.getLogger(Task.class);

    public Task(List<ConsumerRecord<String, String>> records) {
        this.records = records;
    }


    public void run() {
        startStopLock.lock();
        if (stopped){
            return;
        }
        started = true;
        startStopLock.unlock();
        for (ConsumerRecord<String, String> record : records) {
            if (stopped)
                break;
            // process record here and make sure you catch all exceptions;
            currentOffset.set(record.offset() + 1);
        }

        finished = true;
        completion.complete(currentOffset.get());
    }



    public long getCurrentOffset() {
        return currentOffset.get();
    }



    public void stop() {
        startStopLock.lock();
        this.stopped = true;
        if (!started) {
            finished = true;
            completion.complete(currentOffset.get());
        }
        startStopLock.unlock();
    }

    public long waitForCompletion() {
        try {
            return completion.get();
        } catch (InterruptedException | ExecutionException e) {
            return -1;
        }
    }



    public boolean isFinished() {
        return finished;
    }



}



分析

1.手动提交偏移量

属性:enable.auto.commit 设置为true ;则在轮询方法之后自动提交数据偏移量。

若设置为false ;则需要下面两种:

  • commitSync() 在记录处理完成且下一个轮询方法调用前

  • 实现 ConsumerRebalanceListener这个接口,重写其中的方法,比如当分区被撤销,此时提交偏移量

2.处理速度慢的问题

当轮询获取的消息,之后处理逻辑复杂,如果消费者未能再此时间间隔内调用轮询方法,那此消费者会被移除监听。

kafka的max.poll.interval.ms配置,默认是5分钟,当使用线程消费模型时,,你可以根据下面两个配置处理这个问题。

  • max.poll.recoreds 设置更小的值

  • max.poll.interval.ms 设置更高的值

  • 执行两者的组合;看逻辑执行时间,若轮询记录大小为50,每次逻辑处理为6秒,则是300秒(5分钟);这可以减小50,和提高时间间隔300秒以上

3.处理消息异常

对程序中的异常处理,如下三种选项:

  • 停止处理且关闭消费(在此操作之前,可以选择重试几次)

  • 将记录发送到死信队列且继续下个记录(在此操作之前,可以选择重试几次)

  • 重试,直到成功处理记录(这个可能花费很长时间)

第三种选择,无限重试,在某些场景中是可取的。列如,如果一个外部系统脱机了,且涉及到写操作,你可能想保持重试,直到外部系统可用,无论它花费多久。

当然,在kafka中,由于有max.poll.interval.ms,所以在每一个线程执行消费模型时,当个记录处理必须在一个时间限制内完成。否正会超出规定时间,被消费组移除。

对于此原因,必须为重试实现相当复杂的逻辑。

4.多线程下的不良影响

  • 1.在一个记录处理之前偏移量可能被提交

  • 2.从相同分区获取的消息可能被平行处理(出现多次相同记录处理),消息处理的顺序不能保证

我们当然希望多线程像单线程一样保持执行顺序,且不重复获取相同分区的相同记录。对于此篇文章中的taskconsumer也只是,提供了解决问题的思路,不是适合所有场景。

实现线程池、且配置好分区轮询获取的记录大小及数据量。

5.保证处理顺序

既然轮询是以多线程方式,去处理逻辑,那可以在线程模型中,分区处理完成后,对消费者暂停此记录分区集合。待所有主线程执行完成后,消费者再放开分区限制。

大体思路即如此。这里用到了KafkaConsumer两个API:

  • pause(Collection<TopicPartition> partitions)

  • resume(Collection<TopicPartition> partitions)

这里对于放开,也不是放开所有的task的分区。而是放开完成子线程的任务的分区。

6.处理组再平衡

由于是多线程,那消费者可能再平衡,且一些分区可以再分配给另外的消费者,此时仍旧有一些线程再执行那些分区的记录,这样,一些记录就可能被多个消费者处理。出现重复

数据等。

当然,通过处理撤销分区的记录完成,且再分区被重写分配之前提交相对应的偏移量,可以最小化由于组重平衡引起的重复处理。

ConsumerRebalanceListener 的实例作为参数设置KafkaConsumer.subscribe()方法,这样重写onPartiionsRevoke()方法。由于此被调用来自消费的轮询方法,发生再主线程。

所以consumer.commitSync()同步提交,不用担心报ConcurretnModificationException

如果有些线程任务当前正在处理来自撤销分区的记录。有两种选项可以处理这情况。

  • 1.等待所有线程任务完成。

  • 2.停止这些线程任务,且直等待当前被处理完成的记录。

在上述完成之后,这些分区的偏移量可以被提交。

onPartitionsRevoked() 方法等待结果正阻塞这个消费主进程,因此要意识到等太长时间会超出max.poll.interval.ms时间间隔,导致此消费被组移除。所以,

这第二种稍微好些,由于它花费较少的时间。

所以,在和其他系统交互时,应该选择比max.poll.interval.ms时间间隔更小,以防出现上面的情况。

如果一个会话超时发生,这对应的偏移量不应该被提交,因为这个请求将不被作为成功提交。这意味着这被处理的记录在分区被重新分配之后将会再次处理,那将会产生重复操作在系统中,除非写操作是等幂的(任何几次操作和一次操作的影响一样)。

总结

实现一个多线程消费模型比每个消费者线程模型提供了更重要的优点,在这些用例中。尽管有很多方法去实现,但对应的关键点总是相同的:

  • 确保从分区来的记录通过一个线程被处理仅仅执行一次

  • 在记录被处理之后,主线程提交偏移量

  • 妥善处理组在平衡

CONFLUENT的博客 中还有一些二值得推荐。比如:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容