@Author Jacky Wang
日常积累,转载请注明出处,//www.greatytc.com/p/b081f1fd1480
最近新开发的一个项目是由传统的SSM架构进行开发的,之前介绍了SpringBoot集成RabbitMQ的方式,这次特地对Spring集成RabbitMQ做一次记录及介绍。
如需较详细了解RabbitMQ的相关知识,可参考我的另一篇文章:03_SpringBoot集成RabbitMQ
由Spring集成RabbitMQ一般采用Xml配置或注解式两种方式来进行集成。由于个人不喜欢过多的xml文件,因此这里仅对注解方式进行记录。
1. 声明式注解集成RabbitMQ
1.1 步骤
- 引入pom依赖
- 创建配置文件,包含mq的基础配置
- 创建RabbitMQ监听器
- 声明RabbitMQ配置类
- 创建消息生产者
- 测试
1.2 集成
1.2.1 引入Pom依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<!-- 解决冲突 -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
1.2.2 创建配置文件 rabbitmq.properties
#rabbitmq.host=127.0.0.1
#rabbitmq服务器
rabbitmq.host=192.168.3.171
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual.host=/
#交换机
exchange.mes.com.add=mes.fanout.com.add
exchange.mes.com.del=mes.fanout.com.delete
exchange.mes.com.update=mes.fanout.com.update
exchange.mes.user.add=mes.fanout.user.add
exchange.mes.user.del=mes.fanout.user.delete
exchange.mes.user.update=mes.fanout.user.update
#队列
queue.mes.com.add=mes_company_add
queue.mes.com.del=mes_company_delete
queue.mes.com.update=mes_company_update
queue.mes.user.add=mes_user_add
queue.mes.user.del=mes_user_delete
queue.mes.user.update=mes_user_update
1.2.3 创建RabbitMQ监听器
/**
* 消费监听类
*/
@Component
@Transactional
public class QueueListener {
public static final Logger logger = LoggerFactory.getLogger(QueueListener.class);
@Autowired
private AppInterfaceService appInterfaceService;
@Autowired
private RabbitMQConfig rabbitMQConfig;
@Autowired
private OfficeMapper officeMapper;
@Autowired
private SystemService systemService;
@Autowired
private UserMapper userMapper;
/**
* 如果监听队列指定的方法不存在则执行默认方法
*/
public void onMessage(byte[] msg) {
try {
logger.info("onMessage : [{}]", new String(msg, "UTF-8"));
} catch (Exception e) {
logger.error("Error : [{}]", e);
}
}
/**
* 公司信息同步
*
* @param message
*/
public void addCompany(byte[] message) {
logger.info("RabbitMQ Method addCompany Get Msg : [{}]", new String(message));
}
public void updateCompany(byte[] message) {
logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
}
/**
* 用户信息同步
*
* @param message
*/
public void addUser(byte[] message) {
logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
}
public void delUser(byte[] message) {
logger.info("RabbitMQ Method delUser Get Msg : [{}]", new String(message));
}
public void updateUser(byte[] message) {
logger.info("RabbitMQ Method updateUser Get Msg : [{}]", new String(message));
}
}
1.2.4 创建RabbitMQ声明配置类
- 此次使用消费者容器进行消息消费,可支持单类多方法消费不同队列
- 若无须使用消费者容器,可取消下面 声明消费者监听执行类的注释即可
- 示例总共创建了六个交换机与六个队列,具体根据实际情况创建即可
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@PropertySource(value = "classpath:/properties/rabbitmq.properties")
public class RabbitMQConfig {
@Autowired
private QueueListener queueListener;
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private int port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.virtual.host}")
private String vhost;
@Value("${exchange.mes.com.add}")
private String companyAddExchangeName;
@Value("${exchange.mes.com.del}")
private String companyDelExchangeName;
@Value("${exchange.mes.com.update}")
private String companyUpdateExchangeName;
@Value("${exchange.mes.user.add}")
private String userAddExchangeName;
@Value("${exchange.mes.user.del}")
private String userDelExchangeName;
@Value("${exchange.mes.user.update}")
private String userUpdateExchangeName;
@Value("${queue.mes.com.add}")
private String companyAddQueueName;
@Value("${queue.mes.com.del}")
private String companyDelQueueName;
@Value("${queue.mes.com.update}")
private String companyUpdateQueueName;
@Value("${queue.mes.user.add}")
private String userAddQueueName;
@Value("${queue.mes.user.del}")
private String userDelQueueName;
@Value("${queue.mes.user.update}")
private String userUpdateQueueName;
/**
* rabbitmq连接配置
*
* @return
*/
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin admin = new RabbitAdmin(rabbitConnectionFactory());
admin.setIgnoreDeclarationExceptions(true); //即使有关rabbitmq的bean初始化失败整个web应用还能正常启动
return admin;
}
/**
* 声明交换机Exchange
*
* @return
*/
@Bean
public FanoutExchange companyAddExchange() {
return new FanoutExchange(companyAddExchangeName, true, false);
}
@Bean
public FanoutExchange companyDelExchange() {
return new FanoutExchange(companyDelExchangeName, true, false);
}
@Bean
public FanoutExchange companyUpdateExchange() {
return new FanoutExchange(companyUpdateExchangeName, true, false);
}
@Bean
public FanoutExchange userAddExchange() {
return new FanoutExchange(userAddExchangeName, true, false);
}
@Bean
public FanoutExchange userDelExchange() {
return new FanoutExchange(userDelExchangeName, true, false);
}
@Bean
public FanoutExchange userUpdateExchange() {
return new FanoutExchange(userUpdateExchangeName, true, false);
}
/**
* 声明队列Queue
*
* @return
*/
@Bean
public Queue companyAddQueue() {
return new Queue(companyAddQueueName, true, false, false);
}
@Bean
public Queue companyDelQueue() {
return new Queue(companyDelQueueName, true, false, false);
}
@Bean
public Queue companyUpdateQueue() {
return new Queue(companyUpdateQueueName, true, false, false);
}
@Bean
public Queue userAddQueue() {
return new Queue(userAddQueueName, true, false, false);
}
@Bean
public Queue userDelQueue() {
return new Queue(userDelQueueName, true, false, false);
}
@Bean
public Queue userUpdateQueue() {
return new Queue(userUpdateQueueName, true, false, false);
}
/**
* 将队列绑定到指定的交换机
*
* @param companyAddQueue
* @param companyAddExchange
* @return
*/
@Bean
public Binding companyAddBinding(Queue companyAddQueue, FanoutExchange companyAddExchange) {
return BindingBuilder.bind(companyAddQueue).to(companyAddExchange);
}
@Bean
public Binding companyDelBinding(Queue companyDelQueue, FanoutExchange companyDelExchange) {
return BindingBuilder.bind(companyDelQueue).to(companyDelExchange);
}
@Bean
public Binding companyUpdateBinding(Queue companyUpdateQueue, FanoutExchange companyUpdateExchange) {
return BindingBuilder.bind(companyUpdateQueue).to(companyUpdateExchange);
}
@Bean
public Binding userAddBinding(Queue userAddQueue, FanoutExchange userAddExchange) {
return BindingBuilder.bind(userAddQueue).to(userAddExchange);
}
@Bean
public Binding userDelBinding(Queue userDelQueue, FanoutExchange userDelExchange) {
return BindingBuilder.bind(userDelQueue).to(userDelExchange);
}
@Bean
public Binding userUpdateBinding(Queue userUpdateQueue, FanoutExchange userUpdateExchange) {
return BindingBuilder.bind(userUpdateQueue).to(userUpdateExchange);
}
/**
* 声明消费者监听执行类
* @param receiver
* @return
*/
/*@Bean
public MessageListenerAdapter listenerAdapter(QueueListener receiver) {
MessageListenerAdapter m = new MessageListenerAdapter(receiver, "process");
m.setMessageConverter(jsonMessageConverter());
return m;
}*/
/**
* 消费者容器
* 为不同队列指定不同的执行方法
* @param rabbitConnectionFactory
* @return
*/
@Bean
SimpleMessageListenerContainer listenerContainer(ConnectionFactory rabbitConnectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
//container.setMessageConverter(jsonMessageConverter());
//container.setConcurrentConsumers(1);
//container.setMaxConcurrentConsumers(5);
//container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerTagStrategy(q -> projectKey + "_" + q);
//container.setQueues(companyAddQueue(), companyDelQueue(), companyUpdateQueue(), userAddQueue(), userDelQueue(), userUpdateQueue());
container.setQueueNames(companyAddQueueName, companyDelQueueName, companyUpdateQueueName, userAddQueueName, userDelQueueName, userUpdateQueueName);
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(queueListener);
listenerAdapter.setDefaultListenerMethod("onMessage");
listenerAdapter.setMessageConverter(jsonMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(companyAddQueueName, "addCompany");
queueOrTagToMethodName.put(companyDelQueueName, "delCompany");
queueOrTagToMethodName.put(companyUpdateQueueName, "updateCompany");
queueOrTagToMethodName.put(userAddQueueName, "addUser");
queueOrTagToMethodName.put(userDelQueueName, "delUser");
queueOrTagToMethodName.put(userUpdateQueueName, "updateUser");
listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(listenerAdapter);
return container;
}
}
1.2.5 创建消息生产者
@Service
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
private final static Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);
public void sendDataToQueue(String exchange, String routingKey, Object object) {
try {
rabbitTemplate.setMessagePropertiesConverter(new MessagePropertiesConverter() {
@Override
public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
messageProperties.setContentEncoding("UTF-8");
return messageProperties;
}
@Override
public AMQP.BasicProperties fromMessageProperties(MessageProperties source, String charset) {
return null;
}
});
rabbitTemplate.convertAndSend(exchange, routingKey, object);
} catch (Exception e) {
logger.error("发送mq消息异常,Cause:[]", e);
}
}
}
1.2.6 测试
自定义Test或自定义Controller测试调用生产者发送消息,查看消费者是否消费即可。
eg:
@Controller
@RequestMapping("${adminPath}/rabbitmq")
public class RabbitMQController {
@Autowired
private RabbitMQSender rabbitMQSender;
@RequestMapping("/sendToCompanyAdd")
@ResponseBody
public String sendToCompanyAdd(String id) {
HashMap<String, String> map = new HashMap<>();
map.put("id", id);
rabbitMQSender.sendDataToQueue("mes.fanout.com.add", null, map);
return "SUCCESS";
}
}
3 注意事项
- 若配置失败,请检查Spring的注解扫描是否能扫描到配置类。
- 检查RabbitMQ基础配置信息是否有错误
2. Xml方式集成配置文件参考
spring-rabbitmq.xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<description>rabbitmq 连接服务配置</description>
<!-- 加载配置属性文件 -->
<context:property-placeholder ignore-unresolvable="true" location="classpath:/properties/rabbitmq.properties"/>
<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}"
password="${rabbitmq.password}" port="${rabbitmq.port}"
virtual-host="${rabbitmq.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- spring template声明-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
message-converter="jsonMessageConverter"/>
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 申明一个消息队列Queue -->
<rabbit:queue id="testQueueId" name="${rabbitmq.queue}" durable="false" auto-delete="false" exclusive="false"/>
<!-- 定义交换机 -->
<rabbit:direct-exchange id="testExchangeId" name="${rabbitmq.exchange}" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="testQueueId" key="${rabbitmq.routingKey}"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- MQ监听配置 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="testQueueId" ref="queueListener" method="process"/>
</rabbit:listener-container>
</beans>