公司使用ActiveMq有一段时间了,对于一些细节性的东西这边着重介绍下, ActiveMq是什么, 能干嘛, 这些就不介绍了
原谅我的简单粗暴, 直接上代码:
ActiveMq持久化, 在conf/activeMq,xml文件中
经过测试, 配置了持久化后, 当activeMq消息队列崩溃之后, 重启后能够恢复数据
当然, 也可以使用mysql做持久化:
当消息没有消费时, 消息存在activemq_msgs表中, 消费完成后,从表中移除
持久化消息增加了消息传送的开销, 但是保证了可靠性
ActiveMq内存配置:
JMS java消息服务:
消息: 消息头 消息体 消息属性
JMSMessageId mq唯一生成的
如何避免消息调用的幂等性
消息体:
5种消息体:TextMessage MapMessage BytesMessage StreamMessage ObjectMessage
消息属性:
是以属性名和属性值对的形式制定, 可以认为是消息头的扩展, 属性指定一些消息头没有包括的附加信息, 可以在属性里指定消息选择器
message.setStringProperty();
消息的可靠性:
持久化:服务器宕机, 消息依然存在,队列默认持久化
事务
签收
topic 先启动订阅再启动生产,只要注册过一次, 不论是否在线, 都会接收到,没接收到会把没有接收到的消息接收下来
MQ的事务:
事务偏生产者, 签收偏消费者
false, 发送到队列 【自动提交】 true先执行send 再执行commit才能提交到MQ中
业务简单, 假设出现异常就不发送, 可以回滚或者回退,保证程序的高可用session.rollback();
消费者的事务:
false 直接消费【只能接收一次就OK】
true: 没有commit就会出现消息重复消费, 需要记得commit
签收:acknowledge:
Session.AUTO_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge需要手动签收 需要对每条消息做反馈,客户端调用textMessage.acknowledge()进行签收
Session.DUPS_OK_ACKNOWLEDGE 允许重复的消息【带副本的】【一般不用】
Session.SESSION_TRANSACTED
非事务:
非事务性会话中,消息何时被确认取决于创建会话时的应答模式[acknowledge mode]
事务:按照事务来提交,不管有没有ack 默认已经签收过了
事务没有提交, ack了, 会进行重复消费的
事务更大一些
在事务性会话中,当一个事务被成功提交则消息被自动签收, 如果事务回滚,则消息会被再次传送
点对点总结:
如果session关闭时有部分消息已经被收到但是还没有被签收,那么消费者下次连接到相同的队列时, 这些消息还会被再次接收
队列可以长久地保存消息知道消费者收到消息, 消费者不需要担心因为消息会丢失而时刻和队列保持激活的连接状态, 这充分提现了异步传输模式的优势
发布订阅总结
JMS pub/sub模型定义了如何向一个内容节点发布和订阅消息, 这些节点被成为topic
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题, 订阅者(subscribe)从主题订阅消息
主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送
broker:相当于ActiveMQ服务器的实例, Broker其实就是实现了用代码形式启动ActiveMQ将MQ嵌入到java代码中, 随时用随时启动, 用的时候再去启动这样节省了资源, 也保证了可靠性
ActiveMQ的传输协议
端口如何改, 生产上的链接协议是什么?
通讯协议:tcp nio vm https
在网络传输数据前, 必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流
默认情况下ActiveMQ把wire protocol叫做OpenWire, 它的目的是促使网络上的效率和数据快速交互
tcp传输的优点:可靠性高,稳定性强,高效,广泛,适用任何平台
NIO:更侧重于底层访问
可能有大量的client去连接到Broker上,一般情况下,大量的client去连接Broker是被操作系统线程所限制的,因为NIO实现比TCP需要更少的线程去运行,所以建议适用NIO协议
可能对于Broker有一个很迟钝的网络传输, NIP比TCP提供更好的性能
修改配置文件之前一定要备份
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
如果不指定ActiveMQ的网络监听端口, 那么这些端口都将使用BIO网络IO模型(openWire, STOMP, AMQP) 所以为了提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型
我们都是普通人, 需要的是勤奋,踏实,持之以恒的努力,求知若渴
如何让端口既支持nio网络模型, 又支持多种协议:
5.13之后
使用auto关键字<transport name="auto+nio" uri="auto+nio://">
使用 "+"符合为端口设置多种特性
如果我们既需要某一个端口支持NIO网络IO模型,又需要让它支持多个协议
可持久化:
5.9 可复制功能的levelDB
5.3 KahaDB
持久化机制最好和本机分开,保障高可用, 物理隔绝
ActiveMQ的消息持久化机制有JDBC, AMQ, KahaDB和LevelDB, 无论采用哪种持久化方式,消息的存储逻辑都是一致的
发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除, 失败将尝试继续发送
消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要将消息发送出去
消息中心启动以后首先要检查指定的存储位置, 如果有未发送成功的消息,则需要把消息发送出去
db-Num.log KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-Num.log,当文件已满时, 一个新的文件会随之创建 Num会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按数字编号,当不再有引用到数据文件中的任何消息时, 文件会被删除或者归档
db-1.log db.data db.free db.redo lock 四个文件一个锁
LevelDB:文件系统从ActiveMQ5.8之后引进的,基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性, 不使用自定义的B-Tree来实现索引预写日志,而是使用基于LevelDB的索引
<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
LevelDB基于文件的本地数据库存储形式, 但是比kahaDB更具有持久化
高级特性之异步投递:
消息重试机制:
哪些情况会导致消息重发:
1、使用了事务,在session中调用了rollback
2、commit之前没有关闭或
3、客户端在client_acknowledge 模式, session中调用了recover
消息重发时间间隔和重发次数 1s 6次
有毒消息Poison ACK谈谈你的理解
超过6次, 会进入死信队列
死信队列:DLQ
一条消息被重发多次之后,将会被mq移入死信队列,可以在queue中查找
ActiveMQ.DLQ <deadLetterStrategy>
IndividalDeadQueue
异步投递:
延时投递和定时投递: