一、发送 & 接收带标签的消息
1.1、概述
消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。
其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。
1.2、订阅关系一致性
订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考
订阅关系一致文档,这里不再赘述。
1.3、Demo07MQTestApp
/*** @Author : 一叶浮萍归大海* @Date: 2023/12/25 13:03* @Description: 发送 & 接收带标签的消息*/
@Slf4j
public class Demo07MQTestApp {/*** 发送带标签的消息*/@Testpublic void demo7Producer() throws Exception {// 1、创建一个生产者DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");// 2、连接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、启动producer.start();// 4、创建消息String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};for (int i = 1; i <= 6; i++) {String tag = tags[i % tags.length];String content = "";switch (tag) {case "NBA":content = "this is a message about NBA,消息编号[" + i + "]";break;case "run":content = "this is a message about run,消息编号[" + i + "]";break;case "star":content = "this is a message about star,消息编号[" + i + "]";break;case "mobile":content = "this is a message about mobile,消息编号[" + i + "]";break;case "tourism":content = "this is a message about tourism,消息编号[" + i + "]";break;default:content = "this is a message about foods,消息编号[" + i + "]";break;}Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));// 5、发送消息producer.send(message);log.info("【demo7Producer】发送消息成功,消息内容:{}",content);}// 关闭producerproducer.shutdown();}/*** 接收带标签的消息(Push方式)*/@Testpublic void demo7PushConsumer1() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否* CONSUME_SUCCESS:表明消费成功,消息会从MQ出队* RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}@Testpublic void demo7PushConsumer2() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA || star || mobile");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否* CONSUME_SUCCESS:表明消费成功,消息会从MQ出队* RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}}
1.4、测试
先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。
1.5、Topic和Tag如何选择
不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:
(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;
(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;
(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;
(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;
总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。