1、过期时间(TTL)
通过消息的timestamp字段和ConsumerInterface接口的onConsumer()方法可实现消息的TTL功能,但是有一个局限,就是每条消息的超时时间都是一样的。
我们可以通过消息中的headers字段来做配合,可以将TTL的值设置为键值对的形式保存在消息的headers字段中
2、延时队列
使用拦截器可以来实现延时队列,但是局限性很大。比如拉取到的消息集合中有一条消息的延时时间很长,其他的消息延时时间很短而被消费,那么该如何处理?
如果此时提交消费位移,那么延时时间很长的那条消息会丢失。
如果这时不继续拉取消息而等待这条延时时间很长的时间到达延时时间,这样又会导致消费滞后很多,而且如果这条消息后面的很多消息的延时时间很短,那么也会被这条消息无端地拉长延时时间,从而大大地降低了延时的精度。
如果这个时候不提交消费位移而继续拉取消息,等待这条延时时间很长的消息满足条件再提交消费位移,那么在此期间这条消息要驻留在内存中,而且需要一个定时机制来定时检测是否满足被消费的条件,那么这类消息很多时必定会引起内存的暴涨。
有一种方案,在发送延时消息的时候先不是投递到要发送的真实主题(real_topic)中,而是先投递到一些Kafka内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实主题中,消费者所订阅的还是真实的主题。
一般是按照不同的延时等级来划分的,比如设定5s、10s、2min、10min等等这些按照延时时间递增的延时等级,延时的消息按照延时时间投递到不同等级的主题中,投递到同一主题中的延时时间会被强制转换为与此延时等级一致的延时时间。虽然有一定的延时误差,但是误差可控。
每条消息的延时时间可借用timestamp、headers字段。
3、消息路由
Kafka默认按照主题来路由,也就是说,消息发往主题之后会被订阅的消费者全盘接收,这里没有类似消息路由的功能进行二次路由。
具体实现方法,可在消息的headers字段中加入一个键为“routingkey”、值为特定业务标示的Header,然后在消费端中使用拦截器挑选出特定业务标示的信息。多个消费者组订阅,不同的消费者组根据不同的routingkey来进行消费。
4、消息中间件的选型
1、功能维度
首先是功能维度,这个决定了能否最大限度地实现开箱即用,继而缩短项目周期、降低成本等。如果一款消息中间件达不到需求,那么就需要进行二次开发,会增加项目的技术难度、复杂度以及延长项目周期。比如优先队列、延时队列、重试队列、消费模式、广播消费、回溯消费、消息堆积+持久化、消息顺序性等等。
2、性能维度
虽然从功能维度上来说,RabbitMQ的优势要大于Kafka,但是Kafka的吞吐量要比RabbitMQ高出1至2个数量级。
3、可靠性和可用性
消息丢失是消息中间件不得不面对的一个痛点,消息可靠性是衡量消息中间件好坏的一个关键因素。消息可靠性是指对消息不丢失的保障程度,而消息中间件的可用性是指无故障运行的时间百分比,通常用几个9来衡量。
4、运维管理
5、社区力度及生态发展
消息中间件选型切记一位地追求性能或功能,性能可以优化,功能可以二次开发。如果要在功能和性能方面做一个抉择,那么首选性能,因为总体上来说性能优化的空间没有功能扩展的空间大。