[原创]SpringBoot整合RocketMQ消息队列

20170712182011089.gif

题外话

什么情况下的异步操作需要使用消息队列而不是多线程?

  • 消息队列和多线程两者并不冲突,多线程可以作为队列的生产者和消费者。
    使用外部的消息队列时,第一是可以提高应用的稳定性,当程序fail后,已经写入外部消息队列的数据依旧是保存的,如果使用两步commit的队列的话,可以更加提高这个项目。
  • 用线程的话,会占用主服务器资源, 消息队列的话, 可以放到其他机器上运行, 让主服务器尽量多的服务其他请求。我个人认为, 如果用户不急着知道结果的操作, 用消息队列, 否则再考虑用不用线程。
  • 解耦更充分,架构更合理
    • 多线程是在编程语言层面解决问题
    • 消息队列是在架构层面解决问题
      我认为架构层面解决问题是“觉悟比较高的方式“,理想情况下应该限制语言层面滥用多线程,能不用就不用
  • 不关心执行结果的都可以放到消息队列,不需要及时到达,放到消息队列中慢慢消化
  • 批量发送邮件时,数据量庞大,如果使用多线程对系统不安全

消息队列能解决什么问题

  • 异步处理
  • 应用解耦
  • 流量削锋
  • 日志处理
  • 消息通讯

# 环境介绍

注意尽量将rocketmq的1.应用版本,2.jar包依赖,3.recketmq-console-ng的jar包依赖版本保持一致,不然可能会出现非常诡异的问题
此项目所使用版本: rocketmq: 4.3.0,OS: win10

  1. jar包依赖
compile group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.3.0'
  1. 下载 rocketmq应用
    http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

  2. windows下rocketmq环境配置与启动
    参考 //www.greatytc.com/p/4a275e779afa

    • 在rocketmq的bin目录下启动NAMESERVER(相当于服务注册中心)
      start mqnamesrv.cmd
    • 启动 broker(真正工作的服务器,存储消息的服务器)
      start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
      3.1 mac
      Build from Release
 > unzip rocketmq-all-4.3.2-source-release.zip
  > cd rocketmq-all-4.3.2/
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/apache-rocketmq

Start Name Server

  > nohup sh bin/mqnamesrv &
  > tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...

Start Broker

  > nohup sh bin/mqbroker -n localhost:9876 &
  > tail -f ~/logs/rocketmqlogs/broker.log 
  The broker[%s, 172.30.30.233:10911] boot success...

  1. 可视化rocketmq管理项目下载
    https://github.com/apache/rocketmq-externals.git
    • 将这个项目里面rocketmq-console-ng里的rocketmq依赖修改成与你项目依赖的版本一致,次项目是4.3.0
      image.png
  2. 第三步已经把rocketmq的nameServer与broker启动起来
  3. 启动rocket-console-ng可视化管理项目,该项目是基于springboot的
  4. 访问rocket-console-ng的服务地址


    image.png

    到此环境搭建完成!!!
    回到自己的程序↓↓↓

# 配置信息

###producer
#该应用是否启用生产者
rocketmq:
  producer:
    isOnOff: on
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #消息最大长度 默认1024*4(4M)
    maxMessageSize: 4096
    #发送消息超时时间,默认3000
    sendMsgTimeout: 3000
    #发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2

  ###consumer
  ##该应用是否启用消费者
  consumer:
    isOnOff: on
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
    topics: futaotopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    #设置一次消费消息的条数,默认为1条
    consumeMessageBatchMaxSize: 1

reConsumerTimes: 3

# 生产者Bean

package com.futao.springmvcdemo.service.impl

import com.futao.springmvcdemo.foundation.LogicException
import com.futao.springmvcdemo.model.entity.constvar.ErrorMessage
import com.futao.springmvcdemo.model.system.SystemConfig
import com.futao.springmvcdemo.service.RocketMqService
import org.apache.commons.lang3.StringUtils
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.consumer.ConsumeFromWhere
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Service
import java.nio.charset.Charset
/**
 * @author futao
 * Created on 2018/10/18.
 */
@Service
open class RocketMqServiceImpl : RocketMqService {
    private val logger = LoggerFactory.getLogger(RocketMqServiceImpl::class.java)
/**
     * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
     */
    @Value("\${rocketmq.producer.groupName}")
    private lateinit var producerGroupName: String

    @Value("\${rocketmq.producer.namesrvAddr}")
    private lateinit var producerNamesrvAddr: String
    /**
     * 消息最大大小,默认4M
     */
    @Value("\${rocketmq.producer.maxMessageSize}")
    private var maxMessageSize: Int = 0
    /**
     * 消息发送超时时间,默认3秒
     */
    @Value("\${rocketmq.producer.sendMsgTimeout}")
    private var sendMsgTimeout: Int = 0
    /**
     * 消息发送失败重试次数,默认2次
     */
    @Value("\${rocketmq.producer.retryTimesWhenSendFailed}")
    private var retryTimesWhenSendFailed: Int = 0


    /**
     * 生产者Bean
     */
    @Bean
    override fun producer(): DefaultMQProducer {
        if (this.producerGroupName.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_GROUP_NAME_EMPTY)
        }
        if (this.producerNamesrvAddr.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
        }
        val defaultMQProducer = DefaultMQProducer(producerGroupName)
        defaultMQProducer.namesrvAddr = producerNamesrvAddr
        defaultMQProducer.maxMessageSize = maxMessageSize
        defaultMQProducer.sendMsgTimeout = sendMsgTimeout
        defaultMQProducer.isVipChannelEnabled = false
        //消息发送到mq服务器失败重试次数
        defaultMQProducer.retryTimesWhenSendFailed = retryTimesWhenSendFailed
        try {
            defaultMQProducer.start()
            logger.info("rocketMq Producer start success; nameServer:{},producerGroupName:{}", producerNamesrvAddr, producerGroupName)
        } catch (e: Exception) {
            logger.error("rocketMq Producer start fail;{}", e.message, e)
        }
        return defaultMQProducer
    }
}

# 消费者

@Value("\${rocketmq.consumer.namesrvAddr}")
    private lateinit var consumerNamesrvAddr: String

    @Value("\${rocketmq.consumer.groupName}")
    private lateinit var consumerGroupName: String

    @Value("\${rocketmq.consumer.consumeThreadMin}")
    private var consumeThreadMin: Int = 0

    @Value("\${rocketmq.consumer.consumeThreadMax}")
    private var consumeThreadMax: Int = 0

    @Value("\${rocketmq.consumer.topics}")
    private lateinit var topics: String

    @Value("\${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private var consumeMessageBatchMaxSize: Int = 0

//    @Resource
//    private lateinit var mqMessageListenerProcessor: MQConsumeMsgListenerProcessor


    @Value("\${reConsumerTimes}")
    private var reConsumerTimes: Int = 0


    /**
     * 消费者Bean
     */
    @Bean
    override fun consumer(): DefaultMQPushConsumer {
        val topic = SystemConfig.ROCKET_MQ_TOPIC_MAIL
        val tag = SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER
        if (this.consumerGroupName.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_GROUP_NAME_EMPTY)
        }
        if (this.consumerNamesrvAddr.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
        }
        if (this.topics.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_TOPICS_EMPTY)
        }
        try {
            //DefaultMQPushConsumer DefaultMQPullConsumer
            val defaultMQPushConsumer = DefaultMQPushConsumer(consumerGroupName)
            defaultMQPushConsumer.namesrvAddr = consumerNamesrvAddr
            defaultMQPushConsumer.consumeThreadMin = consumeThreadMin
            defaultMQPushConsumer.isVipChannelEnabled = false
//        defaultMQPushConsumer.createTopic()
            defaultMQPushConsumer.consumeThreadMax = consumeThreadMax
            //消费模式 集群还是广播,默认为集群(自动负载均衡)
            //广播消费: 消息会发给Consume Group中的每一个消费者进行消费,如果设置为广播消息会导致NOT_ONLINE异常,https://github.com/apache/rocketmq/issues/296
            defaultMQPushConsumer.messageModel = MessageModel.CLUSTERING
            // 设置消费模型,
            //consumer.setMessageModel(MessageModel.CLUSTERING);

            // * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            // * 如果非第一次启动,那么按照上次消费的位置继续消费
            defaultMQPushConsumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
            //设置一次消费消息的条数,默认为1条
            defaultMQPushConsumer.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize
            //订阅topic
            defaultMQPushConsumer.subscribe(topic, tag)

            //        defaultMQPushConsumer.registerMessageListener(mqMessageListenerProcessor)
            defaultMQPushConsumer.registerMessageListener(
                    MessageListenerConcurrently { msgs, _ ->
                        if (msgs == null || msgs.isEmpty()) {
                            logger.info("接受到的消息为空,不处理,直接返回成功")
                            return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                        }
                        val msg = msgs[0]
                        logger.info("接收到的消息为:" + msg.toString())
                        if (msg.topic == topic && msg.tags == tag) {
                            //判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
                            //获取该消息重试次数
                            if (msg.reconsumeTimes >= reConsumerTimes) {
                                //消息已经重试了3次,如果不需要再次消费,则返回成功
                                //TODO("如果重试了三次还是失败则执行对于失败的业务逻辑")
                                logger.error("消息重试消费失败:", msg)
                                return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                            } else {
                                //如果失败重试次数还没到三次则继续重试
                                ConsumeConcurrentlyStatus.RECONSUME_LATER
                            }
                            //TODO("开始正常的业务逻辑")
                            println(StringUtils.repeat(":", 30) + String(msg.body, Charset.forName(SystemConfig.UTF8_ENCODE)))
                        }
                        return@MessageListenerConcurrently ConsumeConcurrentlyStatus.CONSUME_SUCCESS    //消费成功
                    }
            )
            defaultMQPushConsumer.start()
            logger.info("rocketMq Consumer start success; namesrvAddr:{},groupName:{},topics:{}", consumerNamesrvAddr, consumerGroupName, topics)
            return defaultMQPushConsumer
        } catch (e: Exception) {
            logger.error("rocketMq Consumer start fail;{}", e.message, e)
            return DefaultMQPushConsumer()
        }
    }

# 简单测试

  • 发送注册邮件的topic与tag配置
    • 个人理解的topic: 一类业务可以归为一个topic,比如所有的发邮件功能
    • 个人理解的tag: 某类业务下的细分,比如发送邮件业务下的发送注册邮件可以使用一个tag,发送忘记密码邮件可以再使用一个tag
    /**
     * rocket mq 发送邮件的 topic
     */
    public static final String ROCKET_MQ_TOPIC_MAIL = "topic_mail";

    /**
     * rocket mq 发送邮件-注册邮件的tag
     */
    public static final String ROCKET_MQ_TAG_MAIL_REGISTER = "tag_mail_register";
  • 发送邮件消息队列Service
    @Resource
    lateinit var producer: DefaultMQProducer
/**
     * 通过消息队列发送邮件
     */
    override fun sendMq(mailM: MailM) {
        val message = Message(SystemConfig.ROCKET_MQ_TOPIC_MAIL, SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER, JSON.toJSONString(mailM).toByteArray(Charset.forName(SystemConfig.UTF8_ENCODE)))
        try {
            producer.send(message)
        } catch (e: Exception) {
            logger.error(e.message, e)
        }
    }
  • 请求controller
@GetMapping("sendMailMq")
    open fun sendMailMq() {
        val mailM = MailM().apply {
            to = arrayOf("1185172056@qq.com")
            cc = arrayOf("taof@wicrenet.com")
            subject = "消息队列"
            content = "<h1>您好,RocketMq</h1>"
        }
        mailService.sendMq(mailM)
    }
  • 在请求了controller之后可以在rocketmq-console-ng控制台查看到相应的topic与消息信息
    • topic


      image.png
    • 已发送到rocketmq服务器上的消息


      image.png
    • 查看消息状态


      image.png
    • 查看控制台


      image.png

# 坑:

image.png
  • 消息不能被消费使用RocketMq控制台resend提示NOT_CONSUME_YET:检查rocketmq应用版本,rocketmq-console-ng依赖版本,自己的项目依赖jar包版本是否一致
  • Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException:检查rocketmq应用版本,rocketmq-console-ng依赖版本,自己的项目依赖jar包版本是否一致
  • Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message:尝试把消费者的消费模式改成集群模式
  • NOT_CONSUME_YET:如果还是不能解决请不要使用公司的网络,公司的网络可能会有很多的限制,用自己的手机进行测试(我被这个网络给坑惨了)

# 资源:

Windows下安装RocketMq://www.greatytc.com/p/4a275e779afa
RocketMq名词解释: https://my.oschina.net/javamaster/blog/2051703
解释Push与Pull区别: //www.greatytc.com/p/f071d5069059?utm_source=oschina-app
官网:http://rocketmq.apache.org/
windows下rocketmq的消息信息存储在 C:\Users\user\store文件夹下,删除该文件夹即可删除所有的消息

我的完整项目:地址 https://gitee.com/FutaoSmile/springboot_framework

FutaoSmile_springboot框架2.png

我在这里等你:


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容