SQL表达式过滤:
消费者将收到包含TAGA或TAGB消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。
在这种情况下,可以使用SQL表达式筛选出消息.
配置:
在`broker.conf `中添加配置
enablePropertyFilter = true
启动broker 加载指定配置文件
../bin/mqbroker -n 127.0.0.1:9876 -c broker.conf
语法:
RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.
1. 数字比较, 像 `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
2. 字符比较, 像 `=`, `<>`, `IN`;
3. `IS NULL` 或者 `IS NOT NULL`;
4. 逻辑运算`AND`, `OR`, `NOT`;
常量类型是:
1. 数字, 像123, 3.1415;
2. 字符串, 像‘abc’,必须使用单引号;
3. `NULL`, 特殊常数;
4. 布尔常量, `TRUE` 或`FALSE`;
1、生产者样例
发送消息时,你能通过putUserProperty来设置消息的属性
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("rocketMq1"); //设置nameserver地址: producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for(int i=1;i<=100;i++){ Message message1 = new Message("myTopic001","TAG-B","KEY-B",("rocketMq1 第一次发送 TAG-B"+i).getBytes()); //num属性会作为过滤条件 message1.putUserProperty("num",String.valueOf(i)); producer.send(message1); } producer.shutdown(); System.out.println("生产者下线!"); }
2、消费者样例
用MessageSelector.bySql来使用sql筛选消息
public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1"); //设置nameserver地址: consumer.setNamesrvAddr("127.0.0.1:9876"); //每个cconsumer只能关注一个topic // tag selector 在一个group中的消费者,都不能随便变,要保持统一 //根据num过滤 MessageSelector selector = MessageSelector.bySql("num >= 18 and num <= 28"); consumer.subscribe("myTopic001",selector); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt message: list){ System.out.println(new String(message.getBody())); } // 默认情况下 这条消息只会被 一个consumer 消费到 点对点 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.start(); System.out.println("消费者 start...."); }
RocketMQ(三)——————javaAPI(5.过滤消息)
原文:https://www.cnblogs.com/lifan12589/p/14597939.html