RabbitMQ学习笔记 - RabbitMQ整合SpringBoot

RabbitMQ整合SpringBoot

    你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!

    使用SpringBoot整合RabbitMQ能够使配置更简单,使用起来更方便,也是线上版本使用最多的配置方式。

引入相应依赖

主要依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

消息生产端

生产端配置说明

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672   # 连接地址
    username: nihao # 对应用户名称
    password: 123456 # 对应用户密码 
    virtual-host: / # 虚拟主机 默认是/
    connection-timeout: 15000 # 连接超时时间
    publisher-confirms: true # 开启监听Broker端给我们返回的确认
    publisher-returns: true # 开启不可达的消息进行后续的处理
    template:
      mandatory: true # 消息不可达不会自动删除 默认false为自动删除

生产端RabbitTemplate注入配置说明

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate () {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        final RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory);
        // 如果设置ReturnCallback,mandatory必须设置为true,如果为false,消息不可达会被删除
        rabbitTemplate.setMandatory(true);
        // 设置确认请求
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.error("correlationData:{}, ack:{}", JSON.toJSONString(correlationData), ack);
                if(!ack){
                    log.error("异常处理:{}", System.currentTimeMillis());
                }
            }
        });
        // 设置消息不可达 后续处理
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message,
                                        int replyCode, String replyText,
                                        String exchange, String routingKey) {
                log.error("exchange:{}, routingKey:{}, replyCode:{}, replyText:{}", exchange, routingKey, replyCode, replyText);
            }
        });
        return rabbitTemplate;
    }
}

生产端代码

// 相应接口
import com.show.model.User;
import java.util.Map;

public interface RabbitService {

    // 发送消息
    public void sendRabbitMsg(String exchange, String queue, String routingKey, Object message, Map<String, Object> properties);

    // 发送对象消息
    public void sendRabbitObjectMsg(String exchange, String queue, String routingKey, User user, Map<String, Object> properties);

}

// 接口实现
import com.show.model.User;
import com.show.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;

@Service(value = "rabbitService")
@Slf4j
public class RabbitServiceImpl implements RabbitService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /*
     * @Description: 生成correlationDataId
     * @return java.lang.String
     * @date 2021/3/22 10:12
     */
    public String getUUID () {
        return Optional.ofNullable(UUID.randomUUID())
                .map(UUID::toString)
                .map(w->w.replaceAll("-", "")).get();
    }

    @Override
    public void sendRabbitMsg(String exchange, String queue, String routingKey, Object message, Map<String, Object> properties) {
        final MessageHeaders messageHeaders = new MessageHeaders(properties);
        final Message msg = MessageBuilder.createMessage(message, messageHeaders);
        final CorrelationData correlationData = new CorrelationData(this.getUUID());
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
    }

    @Override
    public void sendRabbitObjectMsg(String exchange, String queue, String routingKey, User user, Map<String, Object> properties) {
        final CorrelationData correlationData = new CorrelationData(this.getUUID());
        rabbitTemplate.convertAndSend(exchange, routingKey, user, correlationData);
    }

}

// 对应测试类
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.RabbitConfig;
import com.show.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = EProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class EProducerApplicationTests {

    private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Resource
    private RabbitService rabbitService;

    @Test
    public void sendRabbitMsg() throws Exception {
        final Map<String, Object> properties =
                ImmutableMap.of("number", UUID.randomUUID().toString(),"sendTime", simpleDateFormat.format(new Date()));
        rabbitService.sendRabbitMsg(RabbitConfig.PRODUCER_EXCHANGE_NAME, RabbitConfig.PRODUCER_QUEUE_NAME,
                RabbitConfig.ROUTING_KEY, "Hello RabbitMQ", properties);
    }

    @Test
    public void sendRabbitObjMsg() throws Exception {
        final Map<String, Object> properties =
                ImmutableMap.of("number", UUID.randomUUID().toString(),"sendTime", simpleDateFormat.format(new Date()));
        final User user = new User("simon", "simon", 22, new BigDecimal(100));
        rabbitService.sendRabbitObjectMsg(RabbitConfig.PRODUCER_OBJ_EXCHANGE_NAME, RabbitConfig.PRODUCER_OBJ_QUEUE_NAME,
                RabbitConfig.ROUTING_KEY, user, properties);
    }

}

消息消费端

消费端配置说明

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672 # 设置连接地址
    username: nihao # 对应用户名称
    password: 123456 # 对应用户密码 
    virtual-host: /  # 虚拟主机 默认是/
    connection-timeout: 15000 # 连接超时时间
    listener:
      simple:
        acknowledge-mode: manual  # 设置签收模式 
        concurrency: 5  # 默认处理线程数量
        max-concurrency: 10 # 最大处理线程数量

消费端注解说明

// @RabbitListener 组合注解,里面注解配置如下
// @QueueBinding 开启交换机和队列绑定
// @Queue 设置绑定队列属性 
//  value 设置绑定队列名称
//  durable 设置绑定队列是否持久化
// @Exchange 设置绑定交换机
//  value 这是交换机名称
//  durable 设置交换机是否持久化
//  type 交换机类型 
//  ignoreDeclarationExceptions 是否忽略异常申明
// key 设置交换机队列key匹配规则
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(
                value = RabbitConfig.PRODUCER_QUEUE_NAME,
                durable = RabbitConfig.DURABLE
        ),
        exchange = @Exchange(
                value = RabbitConfig.PRODUCER_EXCHANGE_NAME,
                durable = RabbitConfig.DURABLE,
                type = RabbitConfig.TYPE,
                ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
        key = RabbitConfig.PRODUCER_ROUTING_KEY
)
)
@RabbitHandler

消费端代码

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;

@Slf4j
@Component
public class RabbitService {

    /**
     * @Description: 消费端处理 Message 消息
     * @Param: [message, channel]
     * @return: void
     * @Author: ly
     * @Date: 2021/3/21 13:20
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(
                    value = RabbitConfig.PRODUCER_QUEUE_NAME,
                    durable = RabbitConfig.DURABLE
            ),
            exchange = @Exchange(
                    value = RabbitConfig.PRODUCER_EXCHANGE_NAME,
                    durable = RabbitConfig.DURABLE,
                    type = RabbitConfig.TYPE,
                    ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
            key = RabbitConfig.PRODUCER_ROUTING_KEY
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        log.error("Message:{}", JSON.toJSONString(message));
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        // 业务处理
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

    /**
     * @Description: 消费端处理 Object 消息
     * @Param: [user, channel, headers]
     * @return: void
     * @Author: ly
     * @Date: 2021/3/21 14:35
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(
                    value = RabbitConfig.PRODUCER_OBJ_QUEUE_NAME,
                    durable = RabbitConfig.DURABLE
            ),
            exchange = @Exchange(
                    value = RabbitConfig.PRODUCER_OBJ_EXCHANGE_NAME,
                    durable = RabbitConfig.DURABLE,
                    type = RabbitConfig.TYPE,
                    ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
            key = RabbitConfig.PRODUCER_ROUTING_KEY
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload User user, Channel channel, @Headers Map<String, Object> headers) throws Exception {
        log.error("User:{}", JSON.toJSONString(user));
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        // 业务处理 
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢!

PS:转载请注明出处!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352