这个帖子我们就可以使用springboot经典的配置文件的方式来实现消息的确认和失败回调
消息确认和失败回调 代码示例
配置文件
spring.application.name=rabbit-sample
server.port=63001
#rabbitMQ连接字符串
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host = /
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
消息发送类,我们在消息发送类中定义了消息的确认方法,消息的失败回调方法:
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData:"+correlationData);
System.out.println("ack:"+ack);
if(!ack){
System.out.println("异常处理机制:-----");
}
}
};
final RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("return:"+returned);
}
};
public void send(Object message, Map<String,Object> properties) throws Exception{
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<Object> msg = MessageBuilder.createMessage(message,messageHeaders);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback);
// CorrelationData cd = new CorrelationData();
// cd.setId("444555666");//全局唯一id,可以加时间戳
rabbitTemplate.convertAndSend("exchange-boot","rabbit.a",msg);
}
}
测试类:
@Test
public void send1() throws Exception{
Map<String,Object> messageProperties = new HashMap<>();
messageProperties.put("number",1234);
messageProperties.put("sendTime",new Date());
rabbitSender.send("this is a msg!",messageProperties);
}
消费端配置
消费端@RabbitListener注解使用
在前面的基础上加上下面配置:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency = 5
spring.rabbitmq.listener.simple.max-concurrency = 10
创建一个消息接收类:
@Component
public class RabbitReceiver {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "queue-boot",durable = "true"),
exchange = @Exchange(value="exchange-boot",durable = "true",
type="topic",ignoreDeclarationExceptions ="true"),
key = "spring.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception{
System.out.println("-------");
System.out.println("消费端payload:"+message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手动ACK
channel.basicAck(deliveryTag,false);
}
}
可以看到我们使用注解的方式创建了一个exchange,一个队列,然后还建立了绑定关系,然后注解的方法就监听这个队列,接收消息并手动ack。
使用配置文件配置exchange以及队列等信息
直接在代码中写死是不灵活的方式,所以我们在配置文件写好,然后以变量的方式注入到rabbit的注解中以达到创建队列的方式。
在接收端我们也可以直接接收java对象,这种方式更加便捷: