一、安装ActiveMQ
- 1、下载ActiveMQ:http://activemq.apache.org/components/classic/download/
- 2、下载后上传到linux服务器上解压:
tar -zxvf apache-activemq-5.15.12-bin.tar.gz
- 3、打开文件夹:
cd apache-activemq-5.15.12
- 4、启动activemq:
bin/activemq start
(若是停止用:bin/activemq stop
) - 5、打开activemq管理页面:http://x.x.x.x:8161/admin(x.x.x.x为服务器IP,8161位固定端口)
二、创建demo
文件结构.png
(一)producer(生产者)
- 1、所需依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
- 2、配置application.yml
server:
port: 8080
spring:
activemq:
user: admin
password: admin
broker-url: tcp://x.x.x.x:61616
pool:
enabled: true
max-connections: 10
queueName: publish.queue
topicName: publish.topic
- 3、创建ActiveMQConfig
@Configuration
public class ActiveMQConfig {
@Value("${queueName}")
private String queueName;
@Value("${topicName}")
private String topicName;
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Bean
public Queue queue(){
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//设置为发布订阅方式, 默认情况下使用的生产消费者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
}
- 4、创建PublishController
@RestController
@RequestMapping(value = "/publish")
public class PublishController {
@Autowired
private JmsMessagingTemplate jms;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@RequestMapping("/queue")
public String queue(String messageStr) {
jms.convertAndSend(queue, messageStr);
return "queue消息发送成功";
}
@JmsListener(destination = "out.queue")
public void consumerMsg(String msg){
System.out.println(msg);
}
@RequestMapping("/topic")
public String topic(String messageStr) {
jms.convertAndSend(topic, messageStr);
return "topic消息发送成功";
}
}
(二)consumer-1、consumer-2内容一致
- 1、添加依赖和配置文件以及ActiveMQConfig文件相同
- 2、添加监听QueueListener
@Component
public class QueueListener {
@JmsListener(destination = "publish.queue",containerFactory = "jmsListenerContainerQueue")
@SendTo("out.queue")
public String receive(String message){
message = "收到一条消息:"+message;
System.out.println(message);
return message;
}
}
- 3、添加监听TopicListener
@Component
public class TopicListener {
@JmsListener(destination = "publish.topic",containerFactory = "jmsListenerContainerTopic")
public void receive(String message){
System.out.println("您收到一个消息:"+message);
}
}