JMS – ActiveMQ

JMS Java消息服务

Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的
API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的
API,绝大多数MOM提供商都对JMS提供支持。

Java消息服务的规范包括两种消息模式
  • 点对点
  • 发布者/订阅者

消息中间件

JMS API

  • ConnectionFactory

  • Connection

  • Session

  • Destination

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题

  • MessageConsumer

由会话创建的对象,用于接收发送到目标的消息

  • MessageProducer

由会话创建的对象,用于发送消息到目标

  • Message

是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序

消息的类型

  • StreamMessage Java原始数据流

  • MapMessage 键值对

  • TextMessage 字符串对象

  • ObjectMessage 序列化对象

  • ByteMessage 字节数据流


ActiveMQ安装

  • 解压文件夹,双击 home/bin/winXX/wrapper.exe 进行启动

  • 浏览器中访问 http://localhost:8161

  • 管理员账号和密码为 admin / admin

点对点消息

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:
  • • 只有一个消费者将获得消息

  • • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。

  • • 每一个成功处理的消息都由接收者签收

消息生产者

//1. 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2. 创建连接 并 开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 创建Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4. 创建消息目的地
Destination destination = session.createQueue("weixin-Queue");
//5. 创建生产者
MessageProducer producer = session.createProducer(destination);
//6. 发送消息
TextMessage textMessage = session.createTextMessage("Hello,MQ3");
producer.send(textMessage);
//7. 释放资源
producer.close();
session.close();
connection.close();

消息消费者

//1. 创建连接工厂
ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
//2. 创建并启动连接
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 创建Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地对象
Destination destination = session.createQueue(“weixin-Queue”);
//5. 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//6. 获取消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//7. 释放资源
consumer.close();
session.close();
connection.close()

事务

//使用事务,需要手动提交才可以发送消息
Session session = con.createSession(true,Session.AUTO_ACKNOWLEDGE);
//提交事务
session.commit();
//回滚事务
session.rollback();

签收模式

  • • Session.AUTO_ACKNOWLEDGE 自动签收
  • • Session.CLIENT_ACKNOWLEDGE 消费端手工签收,可以方便失败时记录日志或其他处理,消费端接收消息后如果不签收,那
    么该消息依然会被认为未消费
生产端:
Session session = con.createSession(true,Session.CLIENT_ACKNOWLEDGE);

消费端:
Session session = con.createSession(false,Session.CLIENT_ACKNOWLEDGE);
//手动签收
textMessage.acknowledge();


持久模式

MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
  • DeliveryMode.PERSISTENT 持久传输,MQ服务重启后,未消费的消息还会存在
  • DeliveryMode.NON_PERSISTENT 非持久传输,MQ服务重启后,未消费的消息将会消失

发布/订阅 消息

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴
趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式被概括为:

  • • 多个消费者可以获得消息

  • • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订
    阅者必须保持持续的活动状态以接收消息。

添加Maven依赖

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>

消息发布者

//1. 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2. 创建连接 并 开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 创建Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4. 创建Topic对象
Topic topic = session.createTopic("weixin-Topic");
//5. 创建生产者
MessageProducer producer = session.createProducer(topic);
//6. 发送消息
TextMessage textMessage = session.createTextMessage("Hello,Topic MQ");
producer.send(textMessage);
//7. 释放资源
producer.close();
session.close();
connection.close();

消息订阅者

//1. 创建连接工厂
ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
//2. 创建并启动连接
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 创建Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地对象
Topic topic = session.createTopic("weixin-Topic");
//5. 创建消费这
MessageConsumer consumer = session.createConsumer(topic);
//6. 获取消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//7. 释放资源
consumer.close();
session.close();
connection.close();

JMS + Spring

添加Maven依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.9.RELEASE</version>
</dependency>
创建链接工厂
<!--配置ActiveMQ ConnectionFactory-->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!--Spring适配的连接工厂-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
</bean>

点对点

<!--配置JMSTemplate-->
<bean id=“jmsTemplate” class=“org.springframework.jms.core.JmsTemplate”>
<property name=“connectionFactory” ref=“connectionFactory”/>
<!--默认的目的地名称 可以省略,在发送时指定-->
<property name="defaultDestinationName" value="weixin-Queue"/>
</bean>

发送请求

@Autowired
private JmsTemplate jmsTemplate;
@Test
public void sendMessage() {
jmsTemplate.send("weixin-Queue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("Spring MQ");
}
});
}

接收请

创建MessageListener
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class WeixinListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

在Spring中配置监听器。配置监听器后,当MQ队列中有消息就会自动触发监听器的运行

<!--监听器-->
<bean id="listener" class="com.kaishengit.mq.listener.WeixinListener"/>
<!--监听器容器-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="weixin-Queue"/>
<property name="messageListener" ref="listener"/>
</bean>


订阅发布模式

配置JmsTemplate
• 配置目的地对象
<!--配置JMSTemplate-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--Topic对象-->
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
<!--主题名称-->
<constructor-arg name="name" value="weixin-Topic"/>
</bean>

发送消息

@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination destination;
@Test
public void sendMessage() throws IOException {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("Spring MQ");
}
});
System.in.read();
}

接收消息

<!--监听器-->
<bean id="listener" class="com.kaishengit.mq.listener.WeixinListener"/>
<!--监听器容器-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="listener"/>
</bean>

使用注解模式接收消息

<jms:annotation-driven container-factory="jmsListenerContainerFactory"/>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<!--线程池的配置,控制在3-10个线程-->
<property name="concurrency" value="3-10"/>
</bean>
@Component
public class SpringAnnotationListener {
@JmsListener(destination = "spring-mq")
public void doSomething(String message) {
System.out.println("<<<<<<<<<<<<< " + message);
}
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,914评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,935评论 2 383
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,531评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,309评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,381评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,730评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,882评论 3 404
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,643评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,095评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,448评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,566评论 1 339
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,253评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,829评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,715评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,945评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,248评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,440评论 2 348

推荐阅读更多精彩内容

  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,481评论 0 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,626评论 18 139
  • 什么是JMS JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台...
    闽越布衣阅读 2,607评论 2 3
  • 自己的工作性质有时候比较闲的闲死,有时候忙的忙死,今天比较悠闲,所以想写点什么来。 我是一个双鱼座的麻麻,毕业10...
    多妞的麻麻阅读 158评论 0 0
  • 一、简单说明 线程间通信:在1个进程中,线程往往不是孤立存在的,多个线程之间需要经常进行通信 线程间通信的体现 1...
    tdwydan1阅读 603评论 0 1