spring-kafka 源码解析流程
- KafkaListenerAnnotationBeanPostProcessor
该类实现BeanPostProcessor
加载标注有KafkaListener,KafkaListeners 的方法,类,封装MethodKafkaListenerEndpoint 并注册到KafkaListenerEndpointRegistrar
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
- KafkaListenerAnnotationBeanPostProcessor
该类实现了SmartInitializingSingleton接口,所有bean加载完成后
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, KafkaListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
for (KafkaListenerConfigurer configurer : instances.values()) {
configurer.configureKafkaListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
KafkaListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
}
else {
addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
}
// Actually register all listeners
//重点是这个方法,该方法中,创建MessageListenerContainer时获取Topic设置的并发数,默认是1。
this.registrar.afterPropertiesSet();
Map<String, ContainerGroupSequencer> sequencers =
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
sequencers.values().forEach(seq -> seq.initialize());
}
- AbstractMessageListenerContainer
该类继承了Lifecycle接口,会执行重写start方法。
public final void start() {
checkGroupId();
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
doStart();
}
}
}
···
执行到下面方法
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
//实例化ListenerConsumer,其中进行主题订阅
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
//该方法默认会使用SimpleAsyncTaskExecutor线程池,并开启线程
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
其中实例化ListenerConsumer 时会订阅主题,如subscribeOrAssignTopics(this.consumer);
同时该内部类ListenerConsumer 实现了runnable接口
- ListenerConsumer
线程开启后执行run方法,调用kafka-client 原生包 循环poll数据
public void run() {
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
publishConsumerStartedEvent();
Throwable exitThrowable = null;
while (isRunning()) {
try {
//获取方法
pollAndInvoke();
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
exitThrowable = nofpe;
break;
}
catch (AuthenticationException | AuthorizationException ae) {
if (this.authExceptionRetryInterval == null) {
ListenerConsumer.this.logger.error(ae,
"Authentication/Authorization Exception and no authExceptionRetryInterval set");
this.fatalError = true;
exitThrowable = ae;
break;
}
else {
ListenerConsumer.this.logger.error(ae,
"Authentication/Authorization Exception, retrying in "
+ this.authExceptionRetryInterval.toMillis() + " ms");
// We can't pause/resume here, as KafkaConsumer doesn't take pausing
// into account when committing, hence risk of being flooded with
// GroupAuthorizationExceptions.
// see: https://github.com/spring-projects/spring-kafka/pull/1337
sleepFor(this.authExceptionRetryInterval);
}
}
catch (FencedInstanceIdException fie) {
this.fatalError = true;
ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
+ "' has been fenced");
exitThrowable = fie;
break;
}
catch (StopAfterFenceException e) {
this.logger.error(e, "Stopping container due to fencing");
stop(false);
exitThrowable = e;
}
catch (Error e) { // NOSONAR - rethrown
this.logger.error(e, "Stopping container due to an Error");
this.fatalError = true;
wrapUp(e);
throw e;
}
catch (Exception e) {
handleConsumerException(e);
}
finally {
clearThreadState();
}
}
wrapUp(exitThrowable);
}
ConcurrentMessageListenerContainer 设置并发数
@Override
protected void doStart() {
if (!isRunning()) {
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
setRunning(true);
//根据并发数concurrency开启订阅主题接收消息
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
configureChildContainer(i, container);
if (isPaused()) {
container.pause();
}
container.start();
this.containers.add(container);
}
}
}
注册时 AbstractKafkaListenerContainerFactory 设置批量消费
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
if (aklEndpoint.getRecordFilterStrategy() == null) {
JavaUtils.INSTANCE
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)
.acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter);
if (aklEndpoint.getBatchListener() == null) {
//设置批量消费
JavaUtils.INSTANCE
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener);
}
}
自定义设置主题消费时间
Component
@EnableScheduling
@Slf4j
public class CustomConsumeSet {
@Autowired
KafkaListenerEndpointRegistry registry;
@Scheduled(cron = "0 35 20 * * ?")
public void startConsume(){
MessageListenerContainer listenerContainer = registry.getListenerContainer("myContainerId");
if(!listenerContainer.isRunning()){
log.info("~~~~~~消费开始~~~~~~~~");
listenerContainer.start(); //消费开始
}else{
log.info("~~~~~~消费继续~~~~~~~~");
listenerContainer.resume(); //继续消费
}
}
@Scheduled(cron = "0 40 20 * * ?")
public void stopConsume(){
MessageListenerContainer listenerContainer = registry.getListenerContainer("myContainerId");
listenerContainer.stop();
log.info("~~~~~~消费停止~~~~~~~~");
}
}