RocketMQ启动过程之Consumer

consumer 1.启动

有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。

Consumer消费拉取的消息的方式有两种

1.      Push方式:rocketmq已经提供了很全面的实现,consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可

2.      Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现

下面介绍默认以push方式为主,因为绝大多数是由push消费方式来使用rocketmq的。 

consumer启动流程

指定group

订阅topic

注册消息监听处理器,当消息到来时消费消息

消费端Start

        复制订阅关系

        初始化rebalance变量

        构建offsetStore消费进度存储对象

        启动消费消息服务

        向mqClientFactory注册本消费者

        启动client端远程通信

        启动定时任务

                  定时获取nameserver地址

                  定时从nameserver获取topic路由信息

                  定时清理下线的borker

                  定时向所有broker发送心跳信息,(包括订阅关系)

                  定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)

                  统计信息打点

                  动态调整消费线程池

        启动拉消息服务PullMessageService

        启动消费端负载均衡服务RebalanceService

        从namesrv更新topic路由信息

        向所有broker发送心跳信息,(包括订阅关系)

        唤醒Rebalance服务线程


consumer 2.消费端负载均衡

消费端负载均衡


消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载


消费端遍历自己的所有topic,依次调rebalanceByTopic

根据topic获取此topic下的所有queue

选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)

选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法,获取队列集合SetmqSet

1)  平均分配算法,其实是类似于分页的算法

将所有queue排好序类似于记录

将所有消费端consumer排好序,相当于页数

然后获取当前consumer所在页面应该分配到的queue

2)  按照配置来分配队列, 也就是说在consumer启动的时候指定了queue

3)  按照机房来配置队列

Consumer启动的时候会指定在哪些机房的消息

获取指定机房的queue

然后在执行如1)平均算法

根据分配队列的结果更新ProccessQueueTable1)      比对mqSet 将多余的队列删除,当broker当机或者添加,会导致分配到mqSet变化,

a)        将不在被本consumer消费的messagequeue的ProcessQueue删除,其实是设置ProcessQueue的droped属性为true

b)        将超过两份中没有拉取动作ProcessQueue删除

//TODO 为什么要删除掉,两分钟后来了消息怎么办?

//

2)      添加新增队列,比对mqSet,给新增的messagequeue

构建长轮询对象PullRequest对象,会从broker获取消费的进度

构建这个队列的ProcessQueue

将PullRequest对象派发到长轮询拉消息服务(单线程异步拉取)

注:ProcessQueue正在被消费的队列,

(1)    长轮询拉取到消息都会先存储到ProcessQueue的TreeMap集合中,消费调后会删除掉,用来控制consumer消息堆积,

TreeMap key是消息在此ConsumeQueue队列中索引

(2)    对于顺序消息消费处理

locked属性:当consumer端向broker申请锁队列成功后设置true,只有被锁定的processqueue才能被执行消费

rollback: 将消费在msgTreeMapTemp中的消息,放回msgTreeMap重新消费

commit: 将临时表msgTreeMapTemp数据清空,代表消费完成,放回最大偏移值

(3)    这里是个TreeMap,对key即消息的offset进行排序,这个样可以使得消息进行顺序消费


consumer 3.长轮询

Rocketmq的消息是由consumer端主动到broker拉取的, consumer向broker发送拉消息请求, PullMessageService服务通过一个线程将阻塞队列LinkedBlockingQueue中的PullRequest到broker拉取消息     

  DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法执行向broker拉消息动作

1.      获取ProcessQueue判读是否drop的, drop为true返回

2.      给ProcessQueue设置拉消息时间戳

3.      流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息

4.      流量控制,正在消费队列中消息的跨度超过阀值(默认2000),稍后在消费

5.      根据topic获取订阅关系

6.      构建拉消息回调对象PullBack, 从broker拉取消息(异步拉取)返回结果是回调

7.      从内存中获取commitOffsetValue  //TODO 这个值跟pullRequest.getNextOffset区别

8.      构建sysFlag  pull接口用到的flag

9.      调底层通信层向broker发送拉消息请求

如果master压力过大,会建议去slave拉取消息

如果是到broker拉取消息清楚实时提交标记位,因为slave不允许实时提交消费进度,可以定时提交

//TODO 关于master拉消息实时提交指的是什么?

10.  拉到消息后回调PullCallback

处理broker返回结果pullResult

          更新从哪个broker(master 还是slave)拉取消息

          反序列化消息

          消息过滤

          消息中放入队列最大最小offset,方便应用来感知消息堆积度

将消息加入正在处理队列ProcessQueue

将消息提交到消费消息服务ConsumeMessageService

流控处理, 如果pullInterval参数大于0 (拉消息间隔,如果为了降低拉取速度,可以设置大于0的值),延迟再执行拉消息,  如果pullInterval为0立刻在执行拉消息动作

序列图

1.      向broker发送长轮询请求

2.   Broker接收长轮询请求

3.      Consumer接收broker响应

长轮询活动图:

一张图画不下,再来一张

consumer 4.长轮询push消息-并发消息

通过长轮询拉取到消息后会提交到消息服务ConsumeMessageConcurrentlyService,

ConsumeMessageConcurrentlyServic的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。

长轮询向broker拉取消息是批量拉取的, 默认设置批量的值为pullBatchSize= 32,可配置

消费端consumer构建一个消费消息任务ConsumeRequest消费一批消息的个数是可配置的consumeMessageBatchMaxSize = 1, 默认批量个数为一个

         ConsumeRequest 任务run方法执行   

判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息

                  构建并行消费上下文

                  给消息设置消费失败时候的retrytopic,当消息发送失败的时候发送到topic为%RETRY%groupname的队列中

                  调MessageListenerConcurrently监听器的consumeMessage方法消费消息,返回消费结果

                  如果ProcessQueue的droped为true,不处理结果,不更新offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险

                  处理消费结果

消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,但是消费消息的时候一旦遇到消费消息失败直接放回,根据ackIndex来标记成功消费到哪里了

                            消费失败, ackIndex设置为-1

广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理

集群模式, 将消费失败的消息一条条的发送到broker的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在cosumer的本地线程定时5秒钟以后重试重新消费消息,在走一次上面的消费流程。

                  删除正在消费的队列processQueue中本次消费的消息,放回消费进度

                  更新消费进度,这里只是一个内存offsettable的更新,后面有定时任务更新到broker上去


consumer 5.长轮询push消息-顺序消费消息

顺序消费服务ConsumeMessageConcurrentlyService构建的时候

                  构建一个线程池来接收消费请求ConsumeRequest

                  构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务

        周期性锁队列lockMQPeriodically

                  获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map>

                  遍历Map>的brokername, 获取broker的master机器地址,将brokerName的Set发送到broker请求锁定这些队列。

在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合,

根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false                  通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService,ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。 ConsumeRequest任务的run方法        判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息        每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费,        判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息        计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死        获取客户端的消费批次个数,默认一批次为一条        从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费        调回调接口消费消息,返回状态对象ConsumeOrderlyStatus        根据消费状态,处理结果1)  非事物方式,自动提交消息消息状态为success:调用processQueue.commit方法                  获取msgTreeMapTemp的最后一个key,表示提交的 offset                  清空msgTreeMapTemp的消息,已经成功消费2)  事物提交,由用户来控制提交回滚(精卫专用)    更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去

consumer 6.消息消费

消费者主动拉取消息消费,客户端通过类DefaultMQPullConsumer

        客户端可以指定特定MessageQueue

        也可以通过DefaultMQPullConsumer.fetchMessageQueuesInBalance(topic) 获取消费的队列

        业务自己获取消费队列,自己到broker拉取消息,以及自己更新消费进度

因为内部实现跟push方式类似就不在啰嗦,用法也请求看示例代码去

consumer 7.shutdown

DefaultMQPushConsumerImpl  关闭消费端

        关闭消费线程

        将分配到的Set的消费进度保存到broker

利用DefaultMQPushConsumerImpl获取ProcessQueueTable的keyset的messagequeue去获取

RemoteBrokerOffsetStore.offsetTableMap中的消费进度,

offsetTable中的messagequeue的值,在update的时候如果没有对应的Messagequeue会构建, 但是也会rebalance的时候将没有分配到的messagequeue删除

rebalance会将offsettable中没有分配到messagequeue删除, 但是在从offsettable删除之前会将offset保存到broker

        Unregiser客户端

        pullMessageService关闭

        scheduledExecutorService关闭,关闭一些客户端的起的定时任务

        mqClientApi关闭

        rebalanceService关闭

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容