RabbitMQ整合SpringBoot
你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!
使用SpringBoot整合RabbitMQ能够使配置更简单,使用起来更方便,也是线上版本使用最多的配置方式。
引入相应依赖
主要依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息生产端
生产端配置说明
spring:
rabbitmq:
addresses: 127.0.0.1:5672 # 连接地址
username: nihao # 对应用户名称
password: 123456 # 对应用户密码
virtual-host: / # 虚拟主机 默认是/
connection-timeout: 15000 # 连接超时时间
publisher-confirms: true # 开启监听Broker端给我们返回的确认
publisher-returns: true # 开启不可达的消息进行后续的处理
template:
mandatory: true # 消息不可达不会自动删除 默认false为自动删除
生产端RabbitTemplate注入配置说明
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate () {
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
final RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory);
// 如果设置ReturnCallback,mandatory必须设置为true,如果为false,消息不可达会被删除
rabbitTemplate.setMandatory(true);
// 设置确认请求
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.error("correlationData:{}, ack:{}", JSON.toJSONString(correlationData), ack);
if(!ack){
log.error("异常处理:{}", System.currentTimeMillis());
}
}
});
// 设置消息不可达 后续处理
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message,
int replyCode, String replyText,
String exchange, String routingKey) {
log.error("exchange:{}, routingKey:{}, replyCode:{}, replyText:{}", exchange, routingKey, replyCode, replyText);
}
});
return rabbitTemplate;
}
}
生产端代码
// 相应接口
import com.show.model.User;
import java.util.Map;
public interface RabbitService {
// 发送消息
public void sendRabbitMsg(String exchange, String queue, String routingKey, Object message, Map<String, Object> properties);
// 发送对象消息
public void sendRabbitObjectMsg(String exchange, String queue, String routingKey, User user, Map<String, Object> properties);
}
// 接口实现
import com.show.model.User;
import com.show.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;
@Service(value = "rabbitService")
@Slf4j
public class RabbitServiceImpl implements RabbitService {
@Resource
private RabbitTemplate rabbitTemplate;
/*
* @Description: 生成correlationDataId
* @return java.lang.String
* @date 2021/3/22 10:12
*/
public String getUUID () {
return Optional.ofNullable(UUID.randomUUID())
.map(UUID::toString)
.map(w->w.replaceAll("-", "")).get();
}
@Override
public void sendRabbitMsg(String exchange, String queue, String routingKey, Object message, Map<String, Object> properties) {
final MessageHeaders messageHeaders = new MessageHeaders(properties);
final Message msg = MessageBuilder.createMessage(message, messageHeaders);
final CorrelationData correlationData = new CorrelationData(this.getUUID());
rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
}
@Override
public void sendRabbitObjectMsg(String exchange, String queue, String routingKey, User user, Map<String, Object> properties) {
final CorrelationData correlationData = new CorrelationData(this.getUUID());
rabbitTemplate.convertAndSend(exchange, routingKey, user, correlationData);
}
}
// 对应测试类
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.RabbitConfig;
import com.show.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = EProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class EProducerApplicationTests {
private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Resource
private RabbitService rabbitService;
@Test
public void sendRabbitMsg() throws Exception {
final Map<String, Object> properties =
ImmutableMap.of("number", UUID.randomUUID().toString(),"sendTime", simpleDateFormat.format(new Date()));
rabbitService.sendRabbitMsg(RabbitConfig.PRODUCER_EXCHANGE_NAME, RabbitConfig.PRODUCER_QUEUE_NAME,
RabbitConfig.ROUTING_KEY, "Hello RabbitMQ", properties);
}
@Test
public void sendRabbitObjMsg() throws Exception {
final Map<String, Object> properties =
ImmutableMap.of("number", UUID.randomUUID().toString(),"sendTime", simpleDateFormat.format(new Date()));
final User user = new User("simon", "simon", 22, new BigDecimal(100));
rabbitService.sendRabbitObjectMsg(RabbitConfig.PRODUCER_OBJ_EXCHANGE_NAME, RabbitConfig.PRODUCER_OBJ_QUEUE_NAME,
RabbitConfig.ROUTING_KEY, user, properties);
}
}
消息消费端
消费端配置说明
spring:
rabbitmq:
addresses: 127.0.0.1:5672 # 设置连接地址
username: nihao # 对应用户名称
password: 123456 # 对应用户密码
virtual-host: / # 虚拟主机 默认是/
connection-timeout: 15000 # 连接超时时间
listener:
simple:
acknowledge-mode: manual # 设置签收模式
concurrency: 5 # 默认处理线程数量
max-concurrency: 10 # 最大处理线程数量
消费端注解说明
// @RabbitListener 组合注解,里面注解配置如下
// @QueueBinding 开启交换机和队列绑定
// @Queue 设置绑定队列属性
// value 设置绑定队列名称
// durable 设置绑定队列是否持久化
// @Exchange 设置绑定交换机
// value 这是交换机名称
// durable 设置交换机是否持久化
// type 交换机类型
// ignoreDeclarationExceptions 是否忽略异常申明
// key 设置交换机队列key匹配规则
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
value = RabbitConfig.PRODUCER_QUEUE_NAME,
durable = RabbitConfig.DURABLE
),
exchange = @Exchange(
value = RabbitConfig.PRODUCER_EXCHANGE_NAME,
durable = RabbitConfig.DURABLE,
type = RabbitConfig.TYPE,
ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
key = RabbitConfig.PRODUCER_ROUTING_KEY
)
)
@RabbitHandler
消费端代码
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class RabbitService {
/**
* @Description: 消费端处理 Message 消息
* @Param: [message, channel]
* @return: void
* @Author: ly
* @Date: 2021/3/21 13:20
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
value = RabbitConfig.PRODUCER_QUEUE_NAME,
durable = RabbitConfig.DURABLE
),
exchange = @Exchange(
value = RabbitConfig.PRODUCER_EXCHANGE_NAME,
durable = RabbitConfig.DURABLE,
type = RabbitConfig.TYPE,
ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
key = RabbitConfig.PRODUCER_ROUTING_KEY
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
log.error("Message:{}", JSON.toJSONString(message));
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// 业务处理
// 手工ACK
channel.basicAck(deliveryTag, false);
}
/**
* @Description: 消费端处理 Object 消息
* @Param: [user, channel, headers]
* @return: void
* @Author: ly
* @Date: 2021/3/21 14:35
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
value = RabbitConfig.PRODUCER_OBJ_QUEUE_NAME,
durable = RabbitConfig.DURABLE
),
exchange = @Exchange(
value = RabbitConfig.PRODUCER_OBJ_EXCHANGE_NAME,
durable = RabbitConfig.DURABLE,
type = RabbitConfig.TYPE,
ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
key = RabbitConfig.PRODUCER_ROUTING_KEY
)
)
@RabbitHandler
public void onOrderMessage(@Payload User user, Channel channel, @Headers Map<String, Object> headers) throws Exception {
log.error("User:{}", JSON.toJSONString(user));
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 业务处理
// 手工ACK
channel.basicAck(deliveryTag, false);
}
}
本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢!
PS:转载请注明出处!