一、安装ActiveMQ
二、创建demo
(一)producer(生产者)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
server:
port: 8080
spring:
activemq:
user: admin
password: admin
broker-url: tcp://x.x.x.x:61616
pool:
enabled: true
max-connections: 10
queueName: publish.queue
topicName: publish.topic
@Configuration
public class ActiveMQConfig {
@Value("${queueName}")
private String queueName;
@Value("${topicName}")
private String topicName;
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Bean
public Queue queue(){
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//设置为发布订阅方式, 默认情况下使用的生产消费者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
}
@RestController
@RequestMapping(value = "/publish")
public class PublishController {
@Autowired
private JmsMessagingTemplate jms;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@RequestMapping("/queue")
public String queue(String messageStr) {
jms.convertAndSend(queue, messageStr);
return "queue消息发送成功";
}
@JmsListener(destination = "out.queue")
public void consumerMsg(String msg){
System.out.println(msg);
}
@RequestMapping("/topic")
public String topic(String messageStr) {
jms.convertAndSend(topic, messageStr);
return "topic消息发送成功";
}
}
(二)consumer-1、consumer-2内容一致
- 1、添加依赖和配置文件以及ActiveMQConfig文件相同
- 2、添加监听QueueListener
@Component
public class QueueListener {
@JmsListener(destination = "publish.queue",containerFactory = "jmsListenerContainerQueue")
@SendTo("out.queue")
public String receive(String message){
message = "收到一条消息:"+message;
System.out.println(message);
return message;
}
}
@Component
public class TopicListener {
@JmsListener(destination = "publish.topic",containerFactory = "jmsListenerContainerTopic")
public void receive(String message){
System.out.println("您收到一个消息:"+message);
}
}
三、完成以上代码,运行起来,通过接口就可以执行这个demo了,这里就不做过多的讲解了