Queue模式
生产者
public class MyProducer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue-demo";
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
//创建会话
// 1.是否在事务中处理 2.连接的应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自动应答
//创建目的地
Destination destination = session.createQueue(queueName);
//创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("发送消息--" + i);
producer.send(textMessage);
System.out.println("发送消息--" + i);
}
//关闭连接
connection.close();
}
}
消费者
public class MyCustomer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue-demo";
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
//创建会话
// 1.是否在事务中处理 2.连接的应答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自动应答
//创建目的地
Destination destination = session.createQueue(queueName);
//创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//创建监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(("接收:" + textMessage.getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
topic模式
topic模式只需要在前面代码上修改一行
Destination destination = session.createQueue(queueName);
改为
Destination destination = session.createTopic(topicName);
spring集成
common.xml
<!--开启注解-->
<context:annotation-config/>
<!--activemq 提供的ConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!--spring jsm 提供的ConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--队列目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!--队列名字-->
<constructor-arg value="queue"/>
</bean>
<!--主题目的地 发布订阅模式-->
<bean id="targetDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>
consumer.xml
<import resource="common.xml"/>
<!--消息监听器-->
<bean id="consumerMessageListener" class="com.reige.jmsdemo.springtest.consumer.ConsumerMessageListener"/>
<!--配置消息监听容器-->
<bean id="jsmContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<!--监听的目的地-->
<property name="destination" ref="queueDestination"/>
<!--监听器-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
producer.xml
<import resource="common.xml"/>
<!--jmsTemplate 用于发送消息-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="producerServiceImpl" class="com.reige.jmsdemo.springtest.producer.ProducerServiceImpl"/>
ProducerService
public interface ProducerService {
void sendMessage(String message);
}
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination destination;
@Override
public void sendMessage(final String message) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println(("发送消息:" + message));
}
}
ProducerService发送消息测试
public class MyProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService service = context.getBean(ProducerService.class);
for (int i = 0; i < 10; i++) {
service.sendMessage("这是第" + i + "条消息");
}
context.close();
}
}
ConsumerMessageListener .java
public class ConsumerMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(("接收消息" + textMessage.getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息测试
public class MyConsumer {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}