【RabbitMQ-12】监听消息,处理业务逻辑源码分析

RabbitMQ是如何监听消息,执行业务逻辑?

【RabbitMQ-9】@RabbitListener注解生效的源码分析中分析了注解生效的原理,最终创建了一个消费者线程对象。

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    ...
    try {
        initialize();
        //消费者线程被启动后,会进行while轮询,监听本地阻塞队列的消息。
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            //消费消息(若监听方法抛出业务异常,不会抛出异常)
            mainLoop();
        }
    //后续是消费者线程出现异常时的处理逻辑
    } catch(InterruptedException e) {
        logger.debug("Consumer thread interrupted, processing stopped.");
        Thread.currentThread().interrupt();
        aborted = true;
        //发布事件,可以配置监听器来保持监听
        publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
    } ...

mainLoop()中,会消费消息,若业务方法抛出异常,那么异常会被包装为ListenerExecutionFailedException类型的异常。

private void mainLoop() throws Exception { // NOSONAR Exception
    try {
        //处理消息,并且ACK或者UNACK
        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
        //判断是否需要动态扩容
        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
            checkAdjust(receivedOk);
        }
        //判断是否触发空闲事件的监听
        long idleEventInterval = getIdleEventInterval();
        if (idleEventInterval > 0) {
            if (receivedOk) {
                updateLastReceive();
            } else {
                long now = System.currentTimeMillis();
                long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
                long lastReceive = getLastReceive();
                if (now > lastReceive + idleEventInterval && now > lastAlertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {
                    publishIdleContainerEvent(now - lastReceive);
                }
            }
        }
    } catch(ListenerExecutionFailedException ex) {
        // 监听方法出现的异常,会被捕获且不抛出。
        if (ex.getCause() instanceof NoSuchMethodException) {
            throw new FatalListenerExecutionException("Invalid listener", ex);
        }
    } catch(AmqpRejectAndDontRequeueException rejectEx) {
       //异常被捕获且不抛出。
    }
}

该类使用了MQ的事务包装。

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
    ...//事务处理,不关注  
    return doReceiveAndExecute(consumer);
}

自动ACK和UNACK是依赖consumer.commitIfNecessary(isChannelLocallyTransacted())consumer.rollbackOnExceptionIfNecessary(ex);方法实现的。

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
    Channel channel = consumer.getChannel();

    for (int i = 0; i < this.txSize; i++) {
        logger.trace("Waiting for message from consumer.");
        //在本地队列中获取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (message == null) {
            break;
        }
        try {
            //执行监听的业务逻辑
            executeListener(channel, message);
        } catch(ImmediateAcknowledgeAmqpException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("User requested ack for failed delivery '" + e.getMessage() + "': " + message.getMessageProperties().getDeliveryTag());
            }
            break;
        } catch(Exception ex) {
            if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery: " + message.getMessageProperties().getDeliveryTag());
                }
                break;
            }
          //事务的处理
              ...
              else {
                //自动确认若是出现异常,发送UNACK
                consumer.rollbackOnExceptionIfNecessary(ex);
                throw ex;
            }
        }
    }
    //自动确认若是成功,发送ACK
    return consumer.commitIfNecessary(isChannelLocallyTransacted());
}

该方法执行UNACK,但是channel.basicNack可以判断是否重回队列。

public void rollbackOnExceptionIfNecessary(Throwable ex) {
    //注:isAutoAck()=NONE,既不是NONE模式也不是MANUAL模式,返回true
    boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
    try {
        if (this.transactional) {
            if (logger.isDebugEnabled()) {
                logger.debug("Initiating transaction rollback on application exception: " + ex);
            }
            RabbitUtils.rollbackIfNecessary(this.channel);
        }
        if (ackRequired) {
            OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l - >l).max();
            if (deliveryTag.isPresent()) {
                //发送UNACK,是否重回队列由ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger)决定。
                this.channel.basicNack(deliveryTag.getAsLong(), true, ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
            }
            if (this.transactional) {
                // Need to commit the reject (=nack)
                RabbitUtils.commitIfNecessary(this.channel);
            }
        }
    } catch(Exception e) {
        logger.error("Application exception overridden by rollback exception", ex);
        throw RabbitExceptionTranslator.convertRabbitAccessException(e); // NOSONAR stack trace loss
    } finally {
        this.deliveryTags.clear();
    }
}

若是业务方法抛出的是AmqpRejectAndDontRequeueException的异常,那么在自动ACK的模式下,消息将被丢弃。

public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
   //defaultRequeueRejected参数用户可以配置,其他的是判断异常是否是某个特殊异常的类型
    boolean shouldRequeue = defaultRequeueRejected || throwable instanceof MessageRejectedWhileStoppingException || throwable instanceof ImmediateRequeueAmqpException;
    Throwable t = throwable;
    //shouldRequeue为true表示将要重回队列,且存在异常信息
    while (shouldRequeue && t != null) {
        //若是AmqpRejectAndDontRequeueException异常,那么不需要重回队列。
        if (t instanceof AmqpRejectAndDontRequeueException) {
            shouldRequeue = false;
        }
        t = t.getCause();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
    }
    return shouldRequeue;
}

执行业务逻辑:

protected void executeListener(Channel channel, Message messageIn) {
    if (!isRunning()) {
        if (logger.isWarnEnabled()) {
            logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
        }
        throw new MessageRejectedWhileStoppingException();
    }
    try {
        //调用业务方法
        doExecuteListener(channel, messageIn);
    } catch(RuntimeException ex) {
        if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
            if (this.statefulRetryFatalWithNullMessageId) {
                throw new FatalListenerExecutionException("Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
            } else {
                throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID", new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex), messageIn);
            }
        }
        //处理异常
        handleListenerException(ex);
        throw ex;
    }
}

异常时如何被处理的呢:默认使用org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler#handleError处理器。可以自定义配置errorHandler,即出现异常时,全局的进行配置。

private void doExecuteListener(Channel channel, Message messageIn) {
    Message message = messageIn;
    if (this.afterReceivePostProcessors != null) {
        for (MessagePostProcessor processor: this.afterReceivePostProcessors) {
            message = processor.postProcessMessage(message);
            if (message == null) {
                throw new ImmediateAcknowledgeAmqpException("Message Post Processor returned 'null', discarding message");
            }
        }
    }
    Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
     //批量处理
     else {
        //调用监听方法
        invokeListener(channel, message);
    }
}
 //调用监听方法时,实际是代理对象在执行。
protected void invokeListener(Channel channel, Message message) {
    this.proxy.invokeListener(channel, message);
}

创建代理对象:

//初始化代理对象
protected void initializeProxy(Object delegate) {
    if (getAdviceChain().length == 0) {
        return;
    }
    ProxyFactory factory = new ProxyFactory();
    //MQ的重试实现的原理就是加入了spring-retry的advice
    for (Advice advice: getAdviceChain()) {
        factory.addAdvisor(new DefaultPointcutAdvisor(advice));
    }
    factory.addInterface(ContainerDelegate.class);
    factory.setTarget(delegate);
    this.proxy = (ContainerDelegate) factory.getProxy(ContainerDelegate.class.getClassLoader());
}

实际执行的目标方法:

protected void actualInvokeListener(Channel channel, Message message) {
    Object listener = getMessageListener();
    //使用了此配置。
    if (listener instanceof ChannelAwareMessageListener) {
        doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
    } else if (listener instanceof MessageListener) {
        boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
        if (bindChannel) {
            RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
            resourceHolder.setSynchronizedWithTransaction(true);
            TransactionSynchronizationManager.bindResource(this.getConnectionFactory(), resourceHolder);
        }
        try {
            doInvokeListener((MessageListener) listener, message);
        } finally {
            if (bindChannel) {
                // unbind if we bound
                TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
            }
        }
    } else if (listener != null) {
        throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: " + listener);
    } else {
        throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
    }
}
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {

    RabbitResourceHolder resourceHolder = null;
    Channel channelToUse = channel;
    boolean boundHere = false;
    try {
        ...
        //真正执行的方法
        try {
            listener.onMessage(message, channelToUse);
        } catch(Exception e) {
            throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
        }
    } finally {
        cleanUpAfterInvoke(resourceHolder, channelToUse, boundHere);
    }
}

@RabbitListener注解使用的listener类为:org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter#onMessage在解析注解时,缓存了bean和method对象,后续通过反射调用业务逻辑。

我们也可以在自定义的SimpleMessageListenerContainer配置setMessageListener

@Bean 
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new MySimpleMessageListenerContainer(connectionFactory);
    //同时监听多个队列
    container.setQueues(new Queue("kinson2"));
    //设置当前的消费者数量
    container.setConcurrentConsumers(1);
    container.setMaxConcurrentConsumers(2);
    //设置是否重回队列
    container.setDefaultRequeueRejected(false);
    //设置自动签收
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //设置监听外露
    container.setExposeListenerChannel(true);
    //设置线程池
    container.setTaskExecutor(taskExecutor());
    container.setErrorHandler((ex) - >{
        //抛出异常后的后置处理器
    });
    container.setAdviceChain();
    //        container.setMessageConverter();
    //设置消费端标签策略
    //        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
    //            @Override
    //            public String createConsumerTag(String queue) {
    //                return queue + "_" + UUID.randomUUID().toString();
    //            }
    //        });
    //      //设置消息监听(手动的设置,最终依赖反射调用)
    //传入的是策略,子类选择特定的子类来运行(监听容器配置的是他,某个队列被运行时,实际上运行的是对应监听容器的配置。)
    container.setMessageListener(new ChannelAwareMessageListener() {@Override public void onMessage(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody(), "utf-8");
            log.info("队列2—消费消息:" + msg);
        }
    });
    return container;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容