一、RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。
Direct Exchange
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
Topic Exchange
Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:
*表示一个词.
表示零个或多个词.
Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.
在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。
二、Maven依赖
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
三、简单配置
- 配置文件
# rabbitmq 地址
spring.rabbitmq.host=192.168.77.132
# rabbitmq 端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
- 配置类
@Configuration
@Slf4j
public class RabbitmqConf {
/**
* 消息交换机的名字
* */
private static final String DIRECT_EXCHANGE = "DirectExchange"; // 直连交换机
private static final String TOPIC_EXCHANGE = "TopicExchange"; // 主题交换机
private static final String FANOUT_EXCHANGE ="FanoutExchange" ; // 扇形交换机
private static final String HEADERS_EXCHANGE ="HeadersExchange" ; // 头部交换机
/**
* 队列的名字
* */
private static final String DIRECT_QUEUE = "DirectQueue";
private static final String TOPIC_QUEUE = "TopicQueue";
private static final String FANOUT_QUEUE = "FanoutQueue";
private static final String HEADERS_QUEUE = "HeadersQueue";
/**
* key
* */
private static final String DIRECT_KEY = "DirectKey";
private static final String TOPIC_KEY = "Topic.#";
/**
* 1.队列名字
* 2.durable="true" 是否持久化 rabbitmq重启的时候不需要创建新的队列
* 3.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* 4.exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
@Bean
public Queue dirctQueue() {
return new Queue(DIRECT_QUEUE,true,false,false);
}
@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE,true,false,false);
}
@Bean
public Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE,true,false,false);
}
@Bean
public Queue headersQueue() {
return new Queue(HEADERS_QUEUE,true,false,false);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE,true,false);
}
/**
* 1.交换机名字
* 2.durable="true" 是否持久化 rabbitmq重启的时候不需要创建新的交换机
* 3.autoDelete 当所有消费客户端连接断开后,是否自动删除队列
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE,true,false);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE,true,false);
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADERS_EXCHANGE,true,false);
}
/**
* 将direct队列和交换机进行绑定
*/
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(dirctQueue()).to(directExchange()).with(DIRECT_KEY);
}
@Bean
public Binding bindingTopic() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_KEY);
}
@Bean
public Binding bindingFanout() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding headersBinding(){
Map<String,Object> map = new HashMap<>();
map.put("headers1","value1");
map.put("headers2","value2");
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
}
/**
* 定义消息转换实例 转化成 JSON 传输 传输实体就可以不用实现序列化
* */
@Bean
public MessageConverter integrationEventMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- 控制器
@RestController
@Slf4j
public class RabbitmqController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @Description: 发送消息
* 直连交换机
* 1.交换机
* 2.key
* 3.消息
* 4.消息ID
* rabbitTemplate.send(message);
* 发消息;参数对象为org.springframework.amqp.core.Message
* rabbitTemplate.convertAndSend(message);
* 转换并发送消息;将参数对象转换为org.springframework.amqp.core.Message后发送,消费者不能有返回值
* rabbitTemplate.convertSendAndReceive(message)
* 转换并发送消息,且等待消息者返回响应消息.消费者可以有返回值
*/
@GetMapping("/directSend")
public void directSend() {
String message = "direct 发送消息";
rabbitTemplate.convertAndSend("DirectExchange", "DirectKey",
message, new CorrelationData(UUID.randomUUID().toString()));
}
@GetMapping("/topicSend")
public void topicSend() {
String message = "topic 发送消息";
rabbitTemplate.convertAndSend("TopicExchange", "Topic.Key",
message, new CorrelationData(UUID.randomUUID().toString()));
}
@GetMapping("/fanoutSend")
public void fanoutSend() {
String message = "fanout 发送消息";
rabbitTemplate.convertAndSend("FanoutExchange", "", message, new CorrelationData(UUID.randomUUID().toString()));
}
@GetMapping("/headersSend")
public void headersSend() {
Goods goods = new Goods();
goods.setId(1L);
goods.setName("手机");
goods.setPrice(new BigDecimal(2000.36));
String json = JSON.toJSONString(goods);
MessageProperties properties = new MessageProperties();
properties.setHeader("headers1", "value1");
properties.setHeader("headers2", "value2");
properties.setContentType("application/json");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.convertAndSend("HeadersExchange", "", message, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* @Description: 消费消息
*/
@RabbitListener(queues = "DirectQueue")
@RabbitHandler
public void directMessage(String message) {
log.info("DirectConsumer {} directMessage :" + message);
}
@RabbitListener(queues = "TopicQueue")
@RabbitHandler
public void topicMessage(String message) {
log.info("TopicConsumer {} topicMessage :" + message);
}
@RabbitListener(queues = "FanoutQueue")
@RabbitHandler
public void fanoutMessage(String message) {
log.info("FanoutConsumer {} fanoutMessage :" + message);
}
@RabbitListener(queues = "HeadersQueue")
@RabbitHandler
public void headersMessage(Message message) {
log.info("HeadersConsumer {} headersMessage :" + message);
}
}
- 测试
直连队列:
http://localhost:8081/directSend
主题队列:
http://localhost:8081/topicSend
扇形队列:
http://localhost:8081/fanoutSend
头部队列:
http://localhost:8081/headersSend
注:头部发送一定要为正确的JSON格式数据。
四、开启消息发送交换机确认、重试、同步返回
- 配置文件
# rabbitmq 地址
spring.rabbitmq.host=192.168.77.132
# rabbitmq 端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 消费者端的重试
spring.rabbitmq.listener.direct.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
# 超时时间 时间配置应该诸如: D H M S 分别为:天 小时 分钟 秒(用于同步,为10秒)
spring.rabbitmq.template.reply-timeout=10S
# 设置为true的时候RabbitTemplate(生产端)能够实现重试
spring.rabbitmq.template.retry.enabled=true
# 设置为true 才会触发returnCallback回调方法的执行。
spring.rabbitmq.template.mandatory=true
#消息确认机制 --- 是否开启手ack动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
ACK确认模式:
AcknowledgeMode.NONE :不确认
- 默认所有消息消费成功,会不断的向消费者推送消息
- 因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险
AcknowledgeMode.AUTO:自动确认
- 由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。 存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息,如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失;
- 使用自动确认模式时,需要考虑的另一件事是消费者过载
AcknowledgeMode.MANUAL:手动确认
- 手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者;
- 手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量;
#消息确认机制 --- 是否开启手ack动确认模式
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消息确认机制 --- 是否开启手ack动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 业务类
RabbitSender.java
@Service
@Slf4j
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 实现消息发送到RabbitMQ交换器后接收ack回调,如果消息发送确认失败就进行重试.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功,消息ID:{}", correlationData.getId());
} else {
log.info("消息发送失败,消息ID:{}", correlationData.getId());
}
}
/**
* 实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}", replyCode, replyText, exchange, routingKey, new String(message.getBody()));
}
/**
* convertAndSend 异步,消息是否发送成功用ConfirmCallback和ReturnCallback回调函数类确认。
* 发送MQ消息
*/
public void sendMessage(String exchangeName, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* sendMessageAndReturn 当发送消息过后,该方法会一直阻塞在哪里等待返回结果,直到请求超时,
* 配置spring.rabbitmq.template.reply-timeout来配置超时时间。
* 发送MQ消息并返回结果
* 实现发送消息之后可以同步接收返回信息。
*/
public Object sendMessageAndReturn(String exchangeName, String routingKey, Object message) {
return rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
}
}
- 控制器
@RestController
@Slf4j
public class RabbitmqController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitSender rabbitSender;
/**
* @Description: 发送消息
* 1.交换机
* 2.key
* 3.消息
* 4消息ID
* rabbitTemplate.send(message);
* 发消息;参数对象为org.springframework.amqp.core.Message
* rabbitTemplate.convertAndSend(message);
* 转换并发送消息;将参数对象转换为org.springframework.amqp.core.Message
* 后发送,消费者不能有返回值
* rabbitTemplate.convertSendAndReceive(message)
* 转换并发送消息,且等待消息者返回响应消息.消费者可以有返回值
*/
@GetMapping("/directSend")
public void directSend() {
String message = "direct 发送消息";
// rabbitSender.sendMessage("DirectExchange", "DirectKey", message);
// TODO 发送消息接受返回,如果无返回,则一直等待,直到超时
Object object = rabbitSender.sendMessageAndReturn("DirectExchange", "DirectKey", message);
log.info("生产者接受到返回:"+object.toString());
}
@GetMapping("/topicSend")
public void topicSend() {
String message = "topic 发送消息";
rabbitSender.sendMessage("TopicExchange", "Topic.Key", message);
}
@GetMapping("/fanoutSend")
public void fanoutSend() {
String message = "fanout 发送消息";
rabbitSender.sendMessage("FanoutExchange", "", message);
}
@GetMapping("/headersSend")
public void headersSend() {
String msg = "headers 发送消息";
MessageProperties properties = new MessageProperties();
properties.setHeader("headers1", "value1");
properties.setHeader("headers2", "value2");
Message message = new Message(msg.getBytes(), properties);
rabbitSender.sendMessage("HeadersExchange", "", message);
}
/**
* @Description: 消费消息-无返回
*/
// @RabbitListener(queues = "DirectQueue")
// @RabbitHandler
public void directMessage(Channel channel, Message message) throws IOException {
log.info("DirectConsumer {} directMessage :" + new String(message.getBody(),"utf-8"));
// 业务处理成功后调用,消息会被确认消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 业务处理失败后调用
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
/**
* @Description: 消费消息-有返回(同步)
* 用于方法:convertSendAndReceive
*/
@RabbitListener(queues = "DirectQueue")
@RabbitHandler
public String directMessage(String msg,Channel channel, Message message) throws IOException {
log.info("DirectConsumer {} directMessage :" + msg);
// TODO 一定要有确认,否则会Unacked
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return msg;
}
@RabbitListener(queues = "TopicQueue")
@RabbitHandler
public void topicMessage(String message) {
log.info("TopicConsumer {} topicMessage :" + message);
}
@RabbitListener(queues = "FanoutQueue")
@RabbitHandler
public void fanoutMessage(String message) {
log.info("FanoutConsumer {} fanoutMessage :" + message);
}
@RabbitListener(queues = "HeadersQueue")
@RabbitHandler
public void headersMessage(Message message) {
log.info("HeadersConsumer {} headersMessage :" + message);
}
}
附:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// ack返回false,并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
五、常用配置
spring.rabbitmq.host=192.168.77.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 消费者端的重试
spring.rabbitmq.listener.direct.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
# 消费者的最小数量
spring.rabbitmq.listener.simple.concurrency=10
# 消费者的最大数量
spring.rabbitmq.listener.simple.max-concurrency=20
# 在单个请求中处理的消息个数,他应该大于等于事务数量
spring.rabbitmq.listener.simple.prefetch=5
# 启动时自动启动容器
spring.rabbitmq.listener.simple.auto-startup=true
# 投递失败时是否重新排队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
# 超时时间 10 秒
spring.rabbitmq.template.reply-timeout=10S
# 设置为true的时候RabbitTemplate(生产端)能够实现重试
spring.rabbitmq.template.retry.enabled=true
# 第一次与第二次发布消息的时间间隔 1秒
spring.rabbitmq.template.retry.initial-interval=1S
# 尝试发布消息的最大数量 3
spring.rabbitmq.template.retry.max-attempts=3
# 尝试发布消息的最大时间间隔 10000
spring.rabbitmq.template.retry.max-interval=10000S
# 上一次尝试时间间隔的乘数 1.0
# 等待间隔 的倍数。如果为2 第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0
六、常见问题:
- Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
参数有问题,不能解析成json对象 - replyCode: 312 replyText: NO_ROUTE
routingKey是一个不存在的key
由于生产端设置的是一个错误的路由key,所以消费端没有任何打印。如果我们将 Mandatory 属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。如果我们的路由key设置为正确的,那么消费端能够正确消费,生产端也不会进行任何打印。