消费者有两种姿势过滤感兴趣的消息:
- byTag:简单场景,类似支持
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
如果后面的过滤表达式为空或者*
,表示不过滤,全部消费;- bySQL:支持复杂的表达式,支持的语法见
ExpressionType.java
;
Aviator 表达式引擎具有强大的表达式校验功能,RocketMQ 也提供了FilterSpi
,接入 Aviator 后,过滤功能将会更加强大。
一、消息生产者
/****** Tag 过滤模式 ******/
Message msg = new Message("TagFilterTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
/****** SQL 过滤模式 ******/
Message msg = new Message("SqlFilterTest",
"*",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置自定义参数(可用在SQL过滤参数中)
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
二、消息消费者
/****** Tag 过滤模式 ******/
consumer.subscribe("TagFilterTest", "TagA || TagC");
/****** SQL 过滤模式 ******/
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(a is not null and a between 0 and 3)"));
在消费者启动前需要在 broker.conf 中配置 enablePropertyFilter=true
。