RocketMQ tag过滤和sql92语法过滤
tag过滤
生产者,由于springboot没有专门对mq进行tag标记的方法,只是在topic:后面加上,所以只需 rocketMQTemplate.sendOneWay(“tagFilterBoot:TagA”,msg1);标记即可
生产者代码如下
/***生产者* tag过滤*/@Testpublic void sendTagFilterMsg(){Message msg1 = MessageBuilder.withPayload("消息A").build();rocketMQTemplate.sendOneWay("tagFilterBoot:TagA",msg1);Message msg2 = MessageBuilder.withPayload("消息B").build();rocketMQTemplate.sendOneWay("tagFilterBoot:TagB",msg2);Message msg3 = MessageBuilder.withPayload("消息C").build();rocketMQTemplate.sendOneWay("tagFilterBoot:TagC",msg3);}
消费者在注解处添加selectorExpression = "TagA || TagC"表达式选项即可。
消费者代码如下
/***消费者* tag过滤*/
@Component
@RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC")
public class TagFilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到的消息:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}
sql92过滤
生产者 setHeader就是设置属性
生产者代码如下
/*** sql92过滤*/@Testpublic void sendSQL92FilterMsg(){Message msg1 = MessageBuilder.withPayload("美女A,年龄22,体重45").setHeader("age",22).setHeader("weight",45).build();rocketMQTemplate.sendOneWay("SQL92FilterBoot",msg1);Message msg2 = MessageBuilder.withPayload("美女B,年龄33,体重65").setHeader("age",33).setHeader("weight",65).build();rocketMQTemplate.sendOneWay("SQL92FilterBoot",msg2);Message msg3 = MessageBuilder.withPayload("美女C,年龄55,体重99").setHeader("age",55).setHeader("weight",99).build();rocketMQTemplate.sendOneWay("SQL92FilterBoot",msg3);}
消费者 只需要加上如下注解即可
selectorType = SelectorType.SQL92,selectorExpression = “age>23 and weight>80”
selectorType设置类型
selectorExpression过滤条件
消费者代码如下
@Component
@RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age>23 and weight>80")
public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到的消息:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}