第七章 Rocketmq--消息驱动

接上文,本文主要介绍了MQ是什么,及它的应用场景,消息发送和接收演示以及相关的案例。

第一章:微服务的架构介绍发展
第二章:微服务环境搭建
第三章:Nacos Discovery--服务治理
第四章:Sentinel--服务容错
第五章:Gateway--服务网关
第六章:Sleuth--链路追踪
第七章:Rocketmq--消息驱动

7.1 MQ 简介

7.1.1 什么是 MQ

MQ(Message Queue)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

image

7.1.2 MQ 的应用场景

7.1.2.1 异步解耦

最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

image

此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。

所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

image

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。

7.1.2.2 流量削峰

流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。

在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。

image

秒杀处理流程如下所述:

  1. 用户发起海量秒杀请求到秒杀业务处理系统。

  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。

  3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。

  4. 用户收到秒杀成功的通知。

7.1.3 常见的 MQ 产品

目前业界有很多MQ产品,比较出名的有下面这些:

ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用 C语言 实现,实际上只是一个 socket 库的重新封装,如果做为消息队列使用,需要开发大量的代码。ZeroMQ 仅提供非持久性的队列,也就是说如果 down机,数据将会丢失。

RabbitMQ:使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。

ActiveMQ:历史悠久的 Apache 开源项目。已经在很多产品中得到应用,实现了 JMS1.1 规范,可以和 spring-jms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。

RocketMQ:阿里巴巴的 MQ 中间件,由 java语言 开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单。

Kafka:是 Apache 下的一个子项目,是一个高性能跨语言分布式 Publish/Subscribe 消息队列系统,相对于ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

7.2 RocketMQ 入门

RocketMQ 是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转。

7.2.1 RocketMQ 环境搭建

接下来我们先在 linux 平台下安装一个 RocketMQ 的服务

7.2.1.1 环境准备

下载RocketMQ

环境要求

Linux 64位操作系统

64bit JDK 1.8+

7.2.1.2 安装 RocketMQ

1)上传文件到Linux系统

[root@spiritmark rocketmq]# ls /usr/local/src/
rocketmq-all-4.4.0-bin-release.zip

2)解压到安装目录

[root@spiritmark src] # unzip rocketmq-all-4.4.0-bin-release.zip
[root@spiritmark src] # mv rocketmq-all-4.4.0-bin-release ../rocketmq

7.2.1.3 启动 RocketMQ

1) 切换到安装目录

[root@spiritmark rocketmq]# ls 
benchmark bin conf lib LICENSE NOTICE README.md

2)启动 NameServer

[root@spiritmark rocketmq]# nohup ./bin/mqnamesrv & 
[1] 1467
# 只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@spiritmark rocketmq]# tail -f /root/logs/rocketmqlogs/namesrv.log

3)启动 Broker

# 编辑 bin/runbroker.sh 和 bin/runserver.sh 文件, 修改里面的
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
[root@spiritmark rocketmq]# nohup bin/mqbroker -n localhost:9876 &
[root@spiritmark rocketmq]# tail -f /root/logs/rocketmqlogs/broker.log

7.2.1.4 测试 RocketMQ

1)测试消息发送

[root@spiritmark rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@spiritmark rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Producer

2)测试消息接收

[root@spiritmark rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@spiritmark rocketmq]# bin/tools.sh
org.apache.rocketmq.example.quickstart.Consumer

7.2.1.5 关闭 RocketMQ

[root@heima rocketmq]# bin/mqshutdown broker
[root@heima rocketmq]# bin/mqshutdown namesrv

7.2.2 RocketMQ 的架构及概念

image

如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer。

Broker(邮递员):Broker是 RocketMQ 的核心,负责消息的接收,存储,投递等功能

NameServer(邮局):消息队列的协调者,Broker 向它注册路由信息,同时 Producer 和 Consumer 向其获取路由信息

Producer(寄件人):消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消息

Consumer(收件人):消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息

Topic(地区):用来区分不同类型的消息,发送和接收消息前都需要先创建 Topic ,针对 Topic 来发送和接收消息

Message Queue(邮件):为了提高性能和吞吐量,引入了 Message Queue,一个 Topic 可以设置一个或多个 Message Queue,这样消息就可以并行往各个 Message Queue 发送消息,消费者也可以并行的从多个 Message Queue 读取消息 Message。Message 是消息的载体。

Producer Group(生产者组),简单来说就是多个发送同一类消息的生产者称之为一个生产者组。

Consumer Group(消费者组),消费同一类消息的多个 consumer 实例组成一个消费者组。

7.2.3 RocketMQ 控制台安装

1)下载

#在 git 上下载下面的工程 rocketmq-console-1.0.0
wget https://github.com/apache/rocketmq-externals.git

2)修改配置文件

# 修改配置文件 rocketmq-console/src/main/resources/application.properties
# 项目启动后的端口号
server.port=7777
# nameserv 的地址,注意防火墙要开启 9876 端口
rocketmq.config.namesrvaddr=192.168.109.131:9876

3)打成 jar 包,并启动

# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
# 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar

4)访问控制台

image

7.3 消息发送和接收演示

接下来我们使用Java代码来演示消息的发送和接收

<dependency>
<groupId> org.apache.rocketmq </groupId>
  <artifactId> rocketmq-spring-boot-starter </artifactId>
  <version>2.0.2</version>
</dependency>

7.3.1 发送消息

消息发送步骤:

  1. 创建消息生产者, 指定生产者所属的组名

  2. 指定Nameserver地址

  3. 启动生产者

  4. 创建消息对象,指定主题、标签和消息体

  5. 发送消息

  6. 关闭生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketMQSendMessageTest {

    // 发送消息
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 1.创建消息生产着,并且设置生产组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        // 2.为生产者设置NameServer的地址
        producer.setNamesrvAddr("192.168.109.131:9876");

        // 3.启动生产者
        producer.start();

        // 4.构建消息对象,主要是设置消息的主题 标签 内容
        Message message = new Message("myTopic", "myTag", "Test RocketMQ Message".getBytes());

        // 5.发送消息 第二个参数代表超时时间
        SendResult result = producer.send(message, 10000);
        System.out.println(result);

        // 6.关闭客户端
        producer.shutdown();
    }

}

7.3.2 接收消息

消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名

  2. 指定Nameserver地址

  3. 指定消费者订阅的主题和标签

  4. 设置回调函数,编写处理消息的方法

  5. 启动消息消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author Morse
 * @Date 2020-04-18 14:00
 */
public class RocketMQReceiveMessageTest {

    // 接收消息
    public static void main(String[] args) throws MQClientException {

        // 1. 创建消费者,并且为其指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
        
        // 2. 为消费者设置 NameServer 的地址
        consumer.setNamesrvAddr("192.168.109.131:9876");
        
        // 3. 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");
        
        // 4. 设置一个回调函数, 并在函数中编写接收到消息之后的处理方法
        consumer.registerMessageListener(
                new MessageListenerConcurrently() {
                    // 处理获取到的消息
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        // 消费逻辑
                        System.out.println("Message===>" + list);
                        
                        // 返回消费成功状态
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
        );
        
        // 5. 启动消费者
        consumer.start();
        System.out.println("启动消费者成功了");
    }
    
}

7.4 案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

image

7.4.1 订单微服务发送消息

1)在 order-service 中添加 rocketmq 的依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>

2)添加配置

# rocketmq
rocketmq:
  mame-server: 192.168.109.131:9876 #rocketMQ服务的地址
  producer:
    group: order-service # 生产组

3)编写测试代码

import com.alibaba.fastjson.JSON;
import com.bxs.order.api.entity.Order;
import com.bxs.order.service.OrderService;
import com.bxs.product.api.entity.Product;
import com.bxs.product.api.feign.ProductService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author Morse
 * @Date 2020-04-18 14:23
 */
@RestController
@Slf4j
public class OrderController {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private ProductService productService;
    
    
    // 下单
    @RequestMapping(path = "/order/prod/{pid}")
    public Order order(@PathVariable(name = "pid") Integer pid){
        log.info("接收到{}号商家的下单请求,接下来调用商品微服务查询此商品信息", pid);
        
        // 调用商品微服务,查询商品信息
        Product product = productService.findByPid(pid);
        
        // 模拟调用商品微服务需要2s的时间
        try {
            Thread.sleep(2000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));

        // 下单( 创建订单 )
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);
        
        // 为了不产生大量的额外垃圾数据, 暂时本保存订单入库
        //orderService.createOrder(order);
        
        log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        return order;
    }
    
    // 测试高并发
    @RequestMapping("/order/message")
    public String message() {
        return "测试高并发";
    }
    
}

7.4.2 用户微服务订阅消息

1)修改shop-user 模块配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <parent>
        <artifactId>user-microservice</artifactId>
        <groupId>com.bxs</groupId>
        <version>1.0.0</version>
    </parent>
    <artifactId>user-service</artifactId>
    <name>${project.artifactId}</name>
    <description>用户服务</description>

    <dependencies>
        <dependency>
            <groupId>com.bxs</groupId>
            <artifactId>user-service-api</artifactId>
        </dependency>
        <!--springboot-web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.0</version>
        </dependency>
    </dependencies>
</project>

2)修改主类

@SpringBootApplication
@EnableDiscoveryClient
public class UserServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run( UserServiceApplication.class, args );
    }
}

3)修改配置文件

server:
  port: 8071
spring:
  application:
    name: user-service
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://39.105.167.131:3306/smile_boot?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
    username: admin
    password: admin
  jpa:
    properties:
      hibernate:
        hbm2ddl:
          #auto: create
          auto: update
        dialect: org.hibernate.dialect.MySQL5InnoDBDialect
        format_sql: true
    show-sql: true
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
# rocketmq
rocketmq:
  name-server: 192.168.109.131:9876

4)编写消息接收服务 @Slf4j

import com.alibaba.fastjson.JSON;
import com.bxs.user.api.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service(value = "user-sms-service")
// consumerGroup-消费者组名, topic-要消费的主题
@RocketMQMessageListener(
        consumerGroup = "user-service",         // 消费着组名
        topic = "order-topic",                  // 消费主题
        consumeMode = ConsumeMode.CONCURRENTLY, // 消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序)
        messageModel = MessageModel.CLUSTERING  // 消息模式 BROADCASTING(广播) CLUSTERING(集群,默认)
)
public class SmsService implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order order) {
        log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
    }
}

5)启动服务,执行下单操作,观看后台输出

7.5 发送不同类型的消息

7.5.1 普通消息

RocketMQ 提供三种方式来发送普通消息:可靠同步发送可靠异步发送单向发送

可靠同步发送

解释:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

可靠异步发送

解释:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送

解释:单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
</dependency>
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MethodOrderer.OrderAnnotation.class)
public class MessageTypeTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 同步消息
    @Test
    public void testSyncSend() {
        // 参数一 : topic, 如果想添加tag, 可以使用“topic:top”的写法
        // 参数二 : 消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
        System.out.println(sendResult);
    }

    // 异步消息
    @Test
    public void testAsyncSend() throws InterruptedException {
        // 参数一:topic:tag
        // 参数二:消息体
        // 参数三:回调
        rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {
            // 成功响应的回调
            @Override
            public void onSuccess(SendResult result) {
                System.out.println(result);
            }

            // 异步响应的回调
            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });
        System.out.println("**************************");
        Thread.sleep(300000000);
    }

    // 单向消息
    @Test
    public void testOneWay() {
        rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
    }
}

三种发送方式的对比

image

7.5.2 顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。

image
    // 同步顺序消息[异步顺序 单向顺序写法类似
    public void testSyncSendOrderly() {
        // 第三个参数用于队列的选择
        rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息", "xxxx");
    }

7.5.3 事务消息

RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

事务消息交互流程:

image

两个概念:

半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了 RocketMQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

事务消息发送步骤:

1)发送方将半事务消息发送至 RocketMQ 服务端。
2)RocketMQ 服务端将消息持久化之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
3)发送方开始执行本地事务逻辑。
4)发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤:

1)在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
2)发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
3)发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

//消息事物状态记录
@Entity(name = "shop_txlog")
@Data
public class TxLog {
    @Id
    private String txId;
    private Date date;
}
@RestController
@Slf4j
public class OrderController4 {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private OrderServiceImpl4 orderService;
    @Autowired
    private ProductService productService;

    //下单--fegin
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid) {
        log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid);
//调用商品微服务,查询商品信息
        Product product = productService.findByPid(pid);
        if (product.getPid() == -100) {
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }
        log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));
//下单(创建订单)
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);
        orderService.createOrderBefore(order);
        log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));
        return order;
    }
}
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;

    @Autowired
    private TxLogDao txLogDao;

    // 执行本地事务
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {

        String txId = (String) message.getHeaders().get("txId");

        try {
            // 本地事务
            Order order = (Order) arg;
            orderServiceImpl4.createOrder(txId, order);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 消息回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String txId = (String) message.getHeaders().get("txId");
        TxLog txLog = txLogDao.findById(txId).get();

        if (txLog != null) {
            // 本地事务(订单)成功了
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

7.6 消息消费要注意的细节

@Slf4j
@Service(value = "user-sms-service")
// consumerGroup-消费者组名, topic-要消费的主题
@RocketMQMessageListener(
        consumerGroup = "user-service",         // 消费着组名
        topic = "order-topic",                  // 消费主题
        consumeMode = ConsumeMode.CONCURRENTLY, // 消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序)
        messageModel = MessageModel.CLUSTERING  // 消息模式 BROADCASTING(广播) CLUSTERING(集群,默认)
)
public class SmsService implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order order) {
        log.info("收到一个订单信息{},接下来发送短信", JSON.toJSONString(order));
    }
}

RocketMQ 支持两种消息模式:

广播消费:每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;
集群消费:一条消息只能被一个消费者实例消费

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容