配置
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置扫描路径 -->
<context:component-scan base-package="cn.enjoyedu">
<context:exclude-filter type="annotation"
expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="" password=""/>
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100"></property>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 队列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 发布订阅模式-->
<property name="pubSubDomain" value="true"></property>
</bean>
<!--Spring JmsTemplate 的消息生产者 end-->
<!--接收消费者应答的监听器-->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
</jms:listener-container>
<!-- 消息消费者 start-->
<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
<jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
<jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消费者 end -->
</beans>
生产者 topic
@Component("topicSender")
public class TopicSender {
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;
public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
}
}
生产者 queue
@Component("queueSender")
public class QueueSender {
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
@Autowired
private GetResponse getResponse;
//json
public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
//配置,告诉消费者如何应答
Destination tempDst = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDst);
responseConsumer.setMessageListener(getResponse);
msg.setJMSReplyTo(tempDst);
String uid = System.currentTimeMillis()+"";
msg.setJMSCorrelationID(uid);
return msg;
}
});
//发送MapMessage
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage map = session.createMapMessage();
map.setString("id", "10000");
map.setString("name", "享学学员");
return map;
}
});*/
//发送ObjectMessage,被发送的实体类必须实现Serializable 接口
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
User user = new User(10000,"享学学员");
ObjectMessage objectMessage
= session.createObjectMessage(user);
return objectMessage;
}
});*/
//发送BytesMessage
//protobuf,kyro,messgepack
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("BytesMessage类型消息".getBytes());
return bytesMessage;
}
});*/
//发送StreamMessage
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("享学学员");
streamMessage.writeInt(10000);
//streamMessage.writeString(age);
return streamMessage;
}
});*/
}
}
接受应答
@Component
public class GetResponse implements MessageListener {
public void onMessage(Message message) {
String textMsg = null;
try {
textMsg = ((TextMessage) message).getText();
System.out.println("GetResponse accept msg : " + textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者Queue应答
@Component
public class QueueReceiver1 implements MessageListener {
@Autowired
private ReplyTo replyTo;
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage) message).getText();
System.out.println("QueueReceiver1 accept msg : " + textMsg);
// do business work;
replyTo.send(textMsg,message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Component
public class QueueReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
// 接收Text消息
if (message instanceof TextMessage) {
String textMsg = ((TextMessage) message).getText();
System.out.println("QueueReceiver2 accept msg : " + textMsg);
}
// 接收Map消息
if (message instanceof MapMessage) {
MapMessage mm = (MapMessage) message;
System.out.println("获取 MapMessage: name:" + mm.getString("name")
+ " msg:" + mm.getString("msg"));
}
/* // 接收Object消息
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
User user = (User) objectMessage.getObject();
System.out.println("获取 ObjectMessage: "+user);
}*/
// 接收bytes消息
/* if (message instanceof BytesMessage) {
byte[] b = new byte[1024];
int len = -1;
BytesMessage bm = (BytesMessage) message;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}*/
/* // 接收Stream消息
if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage) message;
System.out.println(streamMessage.readString());
System.out.println(streamMessage.readInt());
}*/
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Component
public class ReplyTo {
@Autowired
@Qualifier("jmsConsumerQueueTemplate")
private JmsTemplate jmsTemplate;
public void send(final String consumerMsg, Message producerMessage)
throws JMSException {
jmsTemplate.send(producerMessage.getJMSReplyTo(),
new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
Message msg
= session.createTextMessage("ReplyTo " + consumerMsg);
return msg;
}
});
}
}
消费者topic
@Component
public class TopicReceiver1 implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}