spring-kafka源码解析

spring-kafka 源码解析流程

  • KafkaListenerAnnotationBeanPostProcessor
    该类实现BeanPostProcessor

加载标注有KafkaListener,KafkaListeners 的方法,类,封装MethodKafkaListenerEndpoint 并注册到KafkaListenerEndpointRegistrar

public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
            Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                        Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (KafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }
  • KafkaListenerAnnotationBeanPostProcessor
    该类实现了SmartInitializingSingleton接口,所有bean加载完成后
public void afterSingletonsInstantiated() {
        this.registrar.setBeanFactory(this.beanFactory);

        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, KafkaListenerConfigurer> instances =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
            for (KafkaListenerConfigurer configurer : instances.values()) {
                configurer.configureKafkaListeners(this.registrar);
            }
        }

        if (this.registrar.getEndpointRegistry() == null) {
            if (this.endpointRegistry == null) {
                Assert.state(this.beanFactory != null,
                        "BeanFactory must be set to find endpoint registry by bean name");
                this.endpointRegistry = this.beanFactory.getBean(
                        KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                        KafkaListenerEndpointRegistry.class);
            }
            this.registrar.setEndpointRegistry(this.endpointRegistry);
        }

        if (this.defaultContainerFactoryBeanName != null) {
            this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
        }

        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
        }
        else {
            addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
        }

        // Actually register all listeners
//重点是这个方法,该方法中,创建MessageListenerContainer时获取Topic设置的并发数,默认是1。
        this.registrar.afterPropertiesSet();
        Map<String, ContainerGroupSequencer> sequencers =
                this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
        sequencers.values().forEach(seq -> seq.initialize());
    }
  • AbstractMessageListenerContainer
    该类继承了Lifecycle接口,会执行重写start方法。
public final void start() {
        checkGroupId();
        synchronized (this.lifecycleMonitor) {
            if (!isRunning()) {
                Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
                        () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
                doStart();
            }
        }
    }

···
执行到下面方法

protected void doStart() {
        if (isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) { // stand-alone container
            checkTopics();
        }
        ContainerProperties containerProperties = getContainerProperties();
        checkAckMode(containerProperties);

        Object messageListener = containerProperties.getMessageListener();
        AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
        ListenerType listenerType = determineListenerType(listener);

//实例化ListenerConsumer,其中进行主题订阅
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
        setRunning(true);
        this.startLatch = new CountDownLatch(1);

//该方法默认会使用SimpleAsyncTaskExecutor线程池,并开启线程
        this.listenerConsumerFuture = consumerExecutor
                .submitListenable(this.listenerConsumer);
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Consumer thread failed to start - does the configured task executor "
                        + "have enough threads to support all containers and concurrency?");
                publishConsumerFailedToStart();
            }
        }
        catch (@SuppressWarnings(UNUSED) InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

其中实例化ListenerConsumer 时会订阅主题,如subscribeOrAssignTopics(this.consumer);

同时该内部类ListenerConsumer 实现了runnable接口

  • ListenerConsumer
    线程开启后执行run方法,调用kafka-client 原生包 循环poll数据
public void run() {
            ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
            publishConsumerStartingEvent();
            this.consumerThread = Thread.currentThread();
            setupSeeks();
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);
            this.count = 0;
            this.last = System.currentTimeMillis();
            initAssignedPartitions();
            publishConsumerStartedEvent();
            Throwable exitThrowable = null;
            while (isRunning()) {
                try {

//获取方法
                    pollAndInvoke();
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
                    exitThrowable = nofpe;
                    break;
                }
                catch (AuthenticationException | AuthorizationException ae) {
                    if (this.authExceptionRetryInterval == null) {
                        ListenerConsumer.this.logger.error(ae,
                                "Authentication/Authorization Exception and no authExceptionRetryInterval set");
                        this.fatalError = true;
                        exitThrowable = ae;
                        break;
                    }
                    else {
                        ListenerConsumer.this.logger.error(ae,
                                "Authentication/Authorization Exception, retrying in "
                                        + this.authExceptionRetryInterval.toMillis() + " ms");
                        // We can't pause/resume here, as KafkaConsumer doesn't take pausing
                        // into account when committing, hence risk of being flooded with
                        // GroupAuthorizationExceptions.
                        // see: https://github.com/spring-projects/spring-kafka/pull/1337
                        sleepFor(this.authExceptionRetryInterval);
                    }
                }
                catch (FencedInstanceIdException fie) {
                    this.fatalError = true;
                    ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
                            + "' has been fenced");
                    exitThrowable = fie;
                    break;
                }
                catch (StopAfterFenceException e) {
                    this.logger.error(e, "Stopping container due to fencing");
                    stop(false);
                    exitThrowable = e;
                }
                catch (Error e) { // NOSONAR - rethrown
                    this.logger.error(e, "Stopping container due to an Error");
                    this.fatalError = true;
                    wrapUp(e);
                    throw e;
                }
                catch (Exception e) {
                    handleConsumerException(e);
                }
                finally {
                    clearThreadState();
                }
            }
            wrapUp(exitThrowable);
        }

ConcurrentMessageListenerContainer 设置并发数

@Override
    protected void doStart() {
        if (!isRunning()) {
            checkTopics();
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
                        + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                        + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            setRunning(true);
//根据并发数concurrency开启订阅主题接收消息 
            for (int i = 0; i < this.concurrency; i++) {
                KafkaMessageListenerContainer<K, V> container =
                        constructContainer(containerProperties, topicPartitions, i);
                configureChildContainer(i, container);
                if (isPaused()) {
                    container.pause();
                }
                container.start();
                this.containers.add(container);
            }
        }
    }

注册时 AbstractKafkaListenerContainerFactory 设置批量消费

private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
        if (aklEndpoint.getRecordFilterStrategy() == null) {
            JavaUtils.INSTANCE
                    .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy);
        }
        JavaUtils.INSTANCE
                .acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
                .acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
                .acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
                .acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
                .acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
                .acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)
                .acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter);
        if (aklEndpoint.getBatchListener() == null) {
//设置批量消费
            JavaUtils.INSTANCE
                    .acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener);
        }
    }

自定义设置主题消费时间

Component
@EnableScheduling
@Slf4j
public class CustomConsumeSet {

    @Autowired
    KafkaListenerEndpointRegistry registry;

    @Scheduled(cron = "0 35 20 * * ?")
    public void startConsume(){

        MessageListenerContainer listenerContainer = registry.getListenerContainer("myContainerId");
        if(!listenerContainer.isRunning()){
            log.info("~~~~~~消费开始~~~~~~~~");
            listenerContainer.start();  //消费开始
        }else{
            log.info("~~~~~~消费继续~~~~~~~~");
            listenerContainer.resume(); //继续消费
        }
    }

    @Scheduled(cron = "0 40 20 * * ?")
    public void stopConsume(){
        MessageListenerContainer listenerContainer = registry.getListenerContainer("myContainerId");
        listenerContainer.stop();
        log.info("~~~~~~消费停止~~~~~~~~");
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容