RocketMQ系列4-长轮询模式实现推送消息

1、Consumer消费消息两种模式比较

      RocketMQ提供两种方式进行消费消息pull vs push,这也是很多涉及到Client和Server之间的交互模型

1.1 pull模式

     主要是Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数。

  • 好处:是如果Broker消息特别多的话,消费端按照自身的消费能力匀速消费消息,不至于被大量消息打死。
  • 缺陷:消息超时时间可以配置,设置短则会轮训频率过快服务端会承担压力,甚至导致空转。设置长则导致消息接收不及时。
public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
         MessageQueue mq = new MessageQueue();
         mq.setQueueId(0);
         mq.setTopic("mq-test");
         mq.setBrokerName("broker-a");
        long offset = 26;
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
        System.out.printf("%s%n", pullResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
    consumer.shutdown();
}

1.2 push模式

     Push模式服务端主动向客户端发送消息,Push方式下,消息队列RocketMQ版还支持批量消费功能,可以将批量消息统一推送至Consumer进行消费。

  • 好处:可以及时收到新的消息,消费端不会产生额外的延迟。
  • 缺陷:当有大量的推送消息会加重消费端的负载甚至将消费端打死。同时Broker会维护所有建连的客户端连接。
public static void main(String[] args) throws InterruptedException, MQClientException {
    // 构造方法
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketmq-consumer");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("mq-test", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
} 

     在现实中更多根据实际场景进行选择,大多场景更喜欢使用Push模式进行消费消息,那么Push是真正Broker端发送给Consumer的吗?答案肯定不是的,现实场景会有成百上千的Consumer对应的消息队列,Broker不会主动发送消息请求的。所以消息队列如何进行设计消息推送的呢?答案是长轮询。

2、RocketMQ如何实现长轮询

     长轮询本质上也是客户端发起定时轮训请求,会保持请求到服务端,直到设置的时长(该hold时长要小于HTTP超时时间)到期或者服务端收到消息,进行返回数据。consumer收到响应后根据状态判断是否有消息。

2.1 Consumer端处理

2.1.1 Consumer启动

     首先是Consumer启动,启动过程会执行各种定时任务和守护线程。其中一个pullMessageService 定时发起请求拉取消息服务,一个MQClientInstance 只会启动一个消息拉取线程,就是push模式使用pull封装一下。

MQClientInstance#start
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 前后省略
                this.pullMessageService.start();
                     break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

2.1.2 Consumer请求

     可以看到启动后Consumer则不断轮询 Broker 获取消息。 Rocketmq将每次请求参数放入pullRequestQueue进行缓冲。这样做的好处:consumer可能对应很多topic。当拉取到消息或者长轮询请求到期后进行回调PullCallback进行下一轮拉取消息。

PullMessageService# 客户端发起拉取消息请求
public void run() {
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();  // 将返回结果添加到Queue
            this.pullMessage(pullRequest);
        }
}

Consumer处理的逻辑包括:

  • 判断 Consumer 处于运行中状态、Consumer 处于暂停中。
  • 消息处理队列持有消息最大量和消息体最大量。
  • 根据 consumeOrderly 判断是否为顺序消息。后续在进行分析。
  • 根据topic获取订阅组信息。
  • 真正拉取消息,发起netty请求。请求参数包含 messageQueue(可以认为标识当前客户端该topic具有唯一性)、当前 Consumer 最大偏移量、每次拉取数量、拉取方式(同步||异步)、回调函数PullCallback。还有netty连接的超时时长 timeoutMillis = 30s 和 broker端hold时长 brokerSuspendMaxTimeMillis =15s
private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } 
}
DefaultMQPushConsumerImpl#pullMessage
try {
    // 真正拉取消息的地方,首先获取Broker信息
    this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression,subscriptionData.getExpressionType(),
        subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(),
        sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC, pullCallback);
}

2.1.3 Consumer响应处理

     PullCallback则根据pullStatus状态判断是否有消息。不管何种状态最终会调用 executePullRequestImmediately 将拉取请求放入队列中进行下一轮消息请求。

  • FOUND:有消息则进行处理结果和统计、更新最新的偏移量(本地或者远程),完成后将请求添加到pullRequestQueue队列里继续轮训。
  • NO_NEW_MSG:拉取请求没有新消息但超过hold时长返回, 会进行下一轮消息拉取请求。
  • NO_MATCHED_MSG:有新消息但是没有匹配。
  • OFFSET_ILLEGAL:拉取的消息队列位置不合法,需要更新消费进度再进行下一轮消息拉取。
DefaultMQPushConsumerImpl#pullMessage
 // 当拉取的请求有响应时
PullCallback pullCallback = new PullCallback() {
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                   
                    // 统计消费组下消息主题拉取耗时
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                        // 提交拉取到的消息到消息处理队列
                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        // 提交消费请求  ConsumeRequest#run 拉取消息响应listener.consumeMessage最终返回给客户端,同时也包括执行前和执行后逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            // 消费者拉取完消息后,立马就有开始下一个拉取任务
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }
                    if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {
                    }
                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                    //消费者没有消息,立马就有开始下一个拉取任务
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                default:
                    break;
            }
        }
    }

2.2 Broker收到Consumer请求

可以思考一下Broker端需要面临哪些设计?

    1. 当生产端发送消息后,如何实时将消息发给消费端?
    1. 当Consumer长轮询怎样设置hold住时长?
    1. Broker端hold连接后如何解决占用资源问题?

2.2.1 没有收到消息?如何hold请求

     如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。
     hold的请求存放在 ConcurrentHashMap<String, ManyPullRequest> 中,key 为 topic@queueId ,value 是 ManyPullRequest 实际是List<PullRequest> 可以理解对应的多个相同的topic客户端。

1. Consumer发起拉取消息请求,Broker端无消息
Broker端
PullMessageProcessor#processRequest
// broker端没有拉取到消息
case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }
    
// 先将拉取请求放在this.pullRequestTable中,进行挂载起来
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    mpr.addPullRequest(pullRequest);
}

2.2.2 hold请求超时处理

     Broker端启动线程 PullRequestHoldService 不断轮训检测hold请求是否超时,然后唤醒请求并返回给consumer端。其中轮训时间设置可以是5s一次或者设定时长,进行定期检测。

PullRequestHoldService  轮训遍历是否阻塞请求快到超时时间,进行唤醒    
public void run() {
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) { }
        }  
    }
}
// 
private void checkHoldRequest() {
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    this.notifyMessageArriving(topic, queueId, offset);
                }  
            }
        }
}

2.2.3 服务端收到Producer消息

     Producer写入消息,Broker端有消息通知Consumer端。
     当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。当拉取消息请求获取不到消息则进行阻塞。当有消息或者或者阻塞超时,重新执行获取消息逻辑,主要是NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(…) 方法通知消费端有消息到达。这时候克隆hold的请求列表,从挂起的请求列表中找到当前新的消息的匹配的,匹配到然后在reput这个操作中顺带激活了长轮询休眠的PullRequest。

DefaultMessageStore#doReput
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
NotifyMessageArrivingListener#arriving
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
}

PullRequestHoldService
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
        long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (mpr != null) {
            List<PullRequest> requestList = mpr.cloneListAndClear();  // 克隆挂起的请求列表
            if (requestList != null) {
                List<PullRequest> replayList = new ArrayList<PullRequest>();
                for (PullRequest request : requestList) {
                    long newestOffset = maxOffset;
                    if (newestOffset <= request.getPullFromThisOffset()) {
                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    }
                    if (newestOffset > request.getPullFromThisOffset()) {
                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));                       
                       // 从挂起的请求列表中找到当前新的消息的匹配的,匹配到了则唤起请求立即给客户端返回。
                       if (match) {
                            try {
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                            }  
                            continue;
                        }
                     }
                    // 如果列表中挂起的请求快超时了则立即唤醒返回给客户端
                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                        try {
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
                        }  
                        continue;
                    }
                    replayList.add(request);
                }
                if (!replayList.isEmpty()) {
                    mpr.addPullRequest(replayList);
                }
            }
        }
}

3、总结

     当生产者发送最新消息过来后,首先持久化到commitLog文件,通过异步方式同时持久化consumerQueue和index。然后激活consumer发送来hold的请求,立即将消息通过channel写入consumer客户。
     如果没有消息到达且客户端拉取的偏移量是最新的,会hold住请求。其中hold请求超时时间 < 请求设定的超时时间。同时Broker端也定时检测是否请求超时,超时则立即将请求返回,状态code为NO_NEW_MESSAGE。


image.jpeg
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容