消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢?
在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送”两部分进行了详细的阐述,本文将主要从消息消费为切入点简要地介绍下“RocketMQ中Pull和Push的两种消费方式”、“RocketMQ中消费者(Push模式)的启动流程”和“RocketMQ中Pull和Push两种消费方式的简要流程”。在阅读本篇之前希望读者能够先仔细阅读下关于RocketMQ分布式消息队列的前几篇文章:
(1)消息中间件—RocketMQ的RPC通信(一)
(2)消息中间件—RocketMQ的RPC通信(二)
(3)消息中间件—RocketMQ消息发送

一、如何选择消息消费的方式—Pull or Push?

1.1 MQ中Pull和Push的两种消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:
(1)Push方式:由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;
(2)Pull方式:由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

1.2 RocketMQ消息消费的长轮询机制

思考题
上面简要说明了Push和Pull两种消息消费方式的概念和各自特点。如果长时间没有消息,而消费者端又不停的发送Pull请求不就会导致RocketMQ中Broker端负载很高吗?那么在RocketMQ中如何解决以做到高效的消息消费呢?

通过研究源码可知,RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中),然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到Broker端(这里,在RocketMQ的Broker端会有一个后台独立线程—ReputMessageService不停地构建ConsumeQueue/IndexFile数据,同时取出hold住的请求并进行二次处理),则通过重新调用一次业务处理器—PullMessageProcessor的处理请求方法—processRequest()来重新尝试拉取消息(此处,每隔5S重试一次,默认长轮询整体的时间设置为30s)。
RocketMQ消息Pull的长轮询机制的关键在于Broker端的PullRequestHoldService和ReputMessageService两个后台线程。对于RocketMQ的长轮询(LongPolling)消费模式后面会专门详细介绍。

二、RocketMQ中两种消费方式的demo代码

(1)Pull模式的Consumer端代码如下:

        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.println(pullResult.getMsgFoundList().get(0).toString());
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
        consumer.shutdown();

在示例代码中,可以看到业务工程在Consumer启动后,Consumer主动获取MessageQueue的Set集合,遍历该集合中的每一个队列,发送Pull的请求(参数中带有队列中的消息偏移量),同时需要Consumer端自己保存消息消费的offset偏移量至本地变量中。在Pull模式下,需要业务应用代码自身去完成比较多的事情,因此在实际应用中用的较少。
(2)Push模式的Consumer端代码如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest111", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName("consumer1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

在示例代码中,业务工程的应用程序使用Push方式进行消费时,Consumer端注册了一个监听器,Consumer在收到消息后主动调用这个监听器完成消费并进行对应的业务逻辑处理。由此可见,业务应用代码只需要完成消息消费即可,无需参与MQ本身的一些任务处理(ps:业务代码显得更为简洁一些)。

三、RocketMQ中消费者Push方式的启动流程

这一节主要先讲下RocketMQ消费者的启动流程,看下在启动的时候究竟完成了什么样的操作。由于RocketMQ的DefaultMQPushConsumer和DefaultMQPullConsumer启动流程大部分类似,而DefaultMQPushConsumer更为复杂一些,因此这一节内容主要讲的是DefaultMQPushConsumer启动流程。Push方式的Consumer启动流程的时序图如下图所示:


RocketMQ的PushConsumer启动时序图.jpg

从上面的时序图上可以看出,Push方式的Consumer启动流程完成的任务比较多,主要任务如下:
(1)设置consumerGroup、NameServer服务地址、消费起始偏移地址并根据参数Topic构建Consumer端的SubscriptionData(订阅关系值);
(2)在Consumer端注册消费者监听器,当消息到来时完成消费消息;
(3)启动defaultMQPushConsumerImpl实例,主要完成前置校验、复制订阅关系(将defaultMQPushConsumer的订阅关系复制至rebalanceImpl中,包括retryTopic(重试主题)对应的订阅关系)、创建MQClientInstance实例、设置rebalanceImpl的各个属性值、pullAPIWrapper包装类对象的初始化、初始化offsetStore实例并加载消费进度、启动消息消费服务线程以及在MQClientInstance中注册consumer等任务;
(4)启动MQClientInstance实例,其中包括完成客户端网络通信线程、拉取消息服务线程、负载均衡服务线程和若干个定时任务的启动;
(5)向所有的Broker端发送心跳(采用加锁方式);
(6)最后,唤醒负载均衡服务线程在Consumer端开始负载均衡;

四、RocketMQ中Pull和Push两种消费模式流程简析

RocketMQ提供了两种消费模式,Push和Pull,大多数场景使用的是Push模式,在源码中这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类。Push模式实际上在内部还是使用的Pull方式实现的,通过Pull不断地轮询Broker获取消息,当不存在新消息时,Broker端会挂起Pull请求,直到有新消息产生才取消挂起,返回新消息。
(1)RocketMQ的Pull消费模式流程简析
RocketMQ的Pull模式相对来得简单,从上面的demo代码中可以看出,业务应用代码通过由Topic获取到的MessageQueue直接拉取消息(最后真正执行的是PullAPIWrapper的pullKernelImpl()方法,通过发送拉取消息的RPC请求给Broker端)。其中,消息消费的偏移量需要Consumer端自己去维护。
(2)RocketMQ的Push消费模式流程简析
在本文前面已经提到过了,从严格意义上说,RocketMQ并没有实现真正的消息消费的Push模式,而是对Pull模式进行了一定的优化,一方面在Consumer端开启后台独立的线程—PullMessageService不断地从阻塞队列—pullRequestQueue中获取PullRequest请求并通过网络通信模块发送Pull消息的RPC请求给Broker端。另外一方面,后台独立线程—rebalanceService根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列—pullRequestQueue中。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。然后,再根据业务反馈是否成功消费来推动消费进度。
在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如1.2节内容所述,如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。

思考题
使用RocketMQ的Pull模式进行消息消费时,由上面可知该模式下无需自动拉取消息,这样在DefaultMQPullConsumerImpl启动时,消息拉取线程—PullMessageService和消息队列负载线程—RebalanceService其实也就没必要启动,但实际上却启动了,这里会有问题么?

五、总结

RocketMQ的消息消费(一)(入门篇幅)就先分析到这里了。建议读者可以将作者之前写的三篇文章—“RocketMQ的RPC通信(一)/(二)”以及“RocketMQ消息发送”结合起来读,这样会整体会更加连贯,收获更大。关于RocketMQ消息消费的内容比较多也比较复杂,涉及“Consumer端的负载均衡机制”、“RocketMQ的长轮询机制”和“RocketMQ中Pull和Push消费模式的细节内容”将在后续篇幅进行介绍和分析。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 197,368评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,941评论 2 374
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,369评论 0 326
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,848评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,719评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,505评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,904评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,528评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,819评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,848评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,652评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,468评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,912评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,095评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,389评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,906评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,120评论 2 339

推荐阅读更多精彩内容