1.1 搭建SpringBoot环境
我们选择基于Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp
使用spring-boot-starter-amqp会自动添加spring-rabbit依赖,
1.2配置
1、配置application.yml
配置连接rabbitmq的参数
server:
port: 44000
spring:
application:
name: test‐rabbitmq‐consumer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
2、定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。
本例配置Topic交换机。
@Configuration
public class RabbitConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static final String ROUTINGKEY_SMS="inform.#.sms.#";
//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明QUEUE_INFORM_EMAIL队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL(){
return new Queue(QUEUE_INFORM_EMAIL);
}
//声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS(){
return new Queue(QUEUE_INFORM_SMS);
}
// 每个消费者监听自己的队列,并且设置routingkey。
//生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
1.3 生产端
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer04_topics_Boot {
@Autowired
RabbitTemplate rabbitTemplate;
//使用rabbitTemplate发送消息
@Test
public void testSendEmail(){
String message = "send email message to user";
/**
* 参数:
* 1、交换机名称
* 2、routingKey
* 3、消息内容
*/
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM,"inform.email",message);
}
1.4消费端
目录结构与生产者一样
使用@RabbitListener注解监听队列。
@Component
public class ReceiveHandler {
@RabbitListener(queues = {RabbitConfig.QUEUE_INFORM_EMAIL})
public void emil(String msg, Message message , Channel channel){
System.out.println(msg);
}
}