项目中用到了大量的client,由于client和AMQ相连,AMQ对大量queue的支持不是很好,所以在项目中用到了发布/订阅+消息选择器的方法来进行针对某个或某几个client的消息分发。
所谓的发布/订阅,指的就是多个client同时订阅一个TOPIC,生产者往这个TOPIC里发送数据的时候,所有的client都能接收到相同的消息。形式很像广播,所以在消息中间件里,这种模式也叫广播模式。
在AMQ遵循的JMS规范中,广播模式有两种形式:持久化订阅和非持久化订阅。
- 非持久化订阅:消费者只有在线的时候能接收到消息,一旦离线,服务器认为该消费者已离开,不会为其保留消息。等消费者下一次上线时,也只能收到从他上线以后生产者发送的消息。
- 持久化订阅:服务器为离线的消费者保留消息,当消费者离线时,服务器会记录该消费者的离线时间,并为其保留离线期间的所有消息(保存在磁盘上),等其上线后按顺序发给这个消费者。想起之前写过的一个聊天工具用的就是持久订阅。
项目里用到的是非持久化订阅,设计时考虑的主要原因有两个:
- 持久化订阅极度消耗资源,因为agent的数量比较多,所以一旦为每个消费者保留消息,如果出现消费者再也不上线的情况,消息就会永远保存在服务器上。AMQ的官网上也提到了这种情况:
Durable topic subscribers that are offline for a long period of time are usually not desired in the system. The reason for that is that broker needs to keep all the messages sent to those topics for the said subscribers. And this message piling can over time exhaust broker store limits for example and lead to the overall slowdown of the system.
- 非持久化订阅的前期是agent在线,在设计时考虑agent一定要上线才能执行任务,对agent异常的场景考虑不足。
最近项目里几个人都问我AMQ能不能实现为离线agent保留消息,其实是可以的,也是最近才发现的功能,目前想了以下几种方法:
1. 使用持久化订阅
AMQ的官方既然提到了持久化订阅的问题,自然也考虑了一下解决方法。在官方提供的方法里,可通过设置消息的有效期和自动清理离线的消费者的方法来降低系统负载。
- 消息有效期:每条消息可以在生产者端设置有效期,比如我们希望为离线的agent保留15分钟数据,那只要在生产者端把有效期设置为15分钟,等agent上线后就可以接收到在其上线前15分钟内的消息。
- 自动清理长期离线的消费者:可在服务器上配置该功能,主要的能力就是可以自动清理长期离线的消费者,有两个配置:
offlineDurableSubscriberTimeout 消费者离线时间
offlineDurableSubscriberTaskSchedule 服务端多长时间清理一次
但是这个方案有两个问题还待测试:
- 我们使用的是AMQ集群,持久化订阅是否会在集群内生效还不确定。因为每一次Agent重连时,连接的都可能是不同的MQ,持久化订阅的配置会否通过集群传播至所有的MQ呢?
- 平均两千个持久化订阅一台4C8G的AMQ能不能抗住,也需要测试。
- 持久化订阅要求Agent至少上线过一次,也就是在MQ上注册过一次,因此如果Agent在离线期间出现了clientID变化,MQ会认其为新的消费者,不会把离线消息发给它。
2. 使用AMQ的Retroactive Consumer功能
翻译是“可追溯”消费者,该功能只对Topic有效,如果consumer是可追溯的,那么它可以获取实例创建之前的消息。通常而言,非持久化订阅不可能获取实例创建之前的消息,因为服务器根本不知道它的存在。对于服务器而言,如果一个Topic通道创建,且有发布者发布消息,那么服务器将会在内存中(非持久化)或者磁盘中(持久化)保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么retroactive类型的订阅者,就可以获取这些原本不属于自己但broker上还保存的旧消息,就像我们订阅一种Feed,可以立即获取旧的内容列表一样。如果此订阅者不是持久化订阅,它可以获取最近发布的一些消息;如果是持久化订阅,它可以获取存储器中尚未删除的所有的旧消息。
“可回溯”消费者与clientID无关,就算离线时clientID变化也不会影响到消息的回溯。
此外,AMQ支持几种消息回溯的方法:
- 保留一定大小的消息
- 保留一定条数的消息
- 保留一定时间的消息
- 只保留一条消息
- 只保留消息属性中带上特定字段的消息
这个功能有一个问题,貌似不能保证所有满足条件的消息都被发送至“可追溯”消费者。且在集群模式下也有一定的问题,也是待测试的方案。
5:00pm更新:基本功能验证成功,可以收到离线前的消息,试了保留100条消息和保留1条消息都成功了,生产者和消费者在同一个集群不同机器也可以成功。但集群有个问题,消费者会收到多份同样的消息。消息数量与集群中服务器数量相同。比如3台mq的集群,设置保留10条消息,那新的agent上线就会收到3*10条消息,需要应用自己判断。
如果出现了agent大量掉线重连的场景,对网络带宽应该也有冲击。
3.由应用封装一层任务重新获取机制
在保持原有的代码不变的情况下,考虑在一个缓存(比如redis)设置一个有序集合,每次agent上线时可从该缓存中取出上一条消息或前几条消息并再次处理。
= =对架构冲击比较大,估计不靠谱。