系列十一(实战)、发送 接收带标签的消息(Java操作RocketMQ)

一、发送 & 接收带标签的消息

1.1、概述

        消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。

        其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。

1.2、订阅关系一致性

        订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考

订阅关系一致文档,这里不再赘述。

1.3、Demo07MQTestApp 

/*** @Author : 一叶浮萍归大海* @Date: 2023/12/25 13:03* @Description: 发送 & 接收带标签的消息*/
@Slf4j
public class Demo07MQTestApp {/*** 发送带标签的消息*/@Testpublic void demo7Producer() throws Exception {// 1、创建一个生产者DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");// 2、连接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、启动producer.start();// 4、创建消息String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};for (int i = 1; i <= 6; i++) {String tag = tags[i % tags.length];String content = "";switch (tag) {case "NBA":content = "this is a message about NBA,消息编号[" + i + "]";break;case "run":content = "this is a message about run,消息编号[" + i + "]";break;case "star":content = "this is a message about star,消息编号[" + i + "]";break;case "mobile":content = "this is a message about mobile,消息编号[" + i + "]";break;case "tourism":content = "this is a message about tourism,消息编号[" + i + "]";break;default:content = "this is a message about foods,消息编号[" + i + "]";break;}Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));// 5、发送消息producer.send(message);log.info("【demo7Producer】发送消息成功,消息内容:{}",content);}// 关闭producerproducer.shutdown();}/*** 接收带标签的消息(Push方式)*/@Testpublic void demo7PushConsumer1() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}@Testpublic void demo7PushConsumer2() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA || star || mobile");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}}

1.4、测试

        先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。

1.5、Topic和Tag如何选择

        不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:

(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;

(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;

(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;

(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;

        总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/549683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Comments are not permitted in JSON

用vsCode开发uniappvue3.0TypeeScriptvite项目的时候&#xff0c;打开json文件报错Comments are not permitted in JSON。 解决办法如下&#xff1a; 1.点击右下角 2.输入JSON选择第二个JSON with Comments即可

OCP-052考试题库汇总(32)-CUUG内部解答版

Which state must a database be to enable ARCHIVELOG ? A)NOMOUNT B)OPEN IN READ WRITE mode C)OPEN IN READ ONLY mode D)OPEN IN RESTRICTED mode E)MOUNT Answer: E 赵&#xff1a; 题译&#xff1a;数据库必须是哪个状态才能启用 ARCHIVELOG? oracle 启动分为三步&…

微软推中文Live地图搜索服务与Office集成

2008年1月15日&#xff0c;微软MSN在北京宣布正式推出中文版Live地图搜索。Live地图搜索基于微软Virtual Earth技术&#xff0c;目前涵盖中国114个城市信息&#xff0c;并推出为中国市场特别研发的公交换乘线路查询。 现在是进入的最好时期随着城市基础设施建设不断发展&#x…

uni-nav-bar设置height自适应高度

今天拿到ui设计的图之后&#xff0c;发现需要用到自定义navbar 给它设置height的时候&#xff0c;一开始用的iphone X的屏幕&#xff0c;像素单位用的rpx&#xff0c;发现切换到别的屏幕就不对了 然后还是用iphone X的屏幕&#xff0c;像素改成固定的88px&#xff0c;发现有一…

AD中的五种角色

AD五种操作主机的作用<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />Active Directory 定义了五种操作主机角色&#xff08;又称&#xff26;&#xff33;&#xff2d;&#xff2f;&#xff09;&#xff1a; 1.架构主机 schema…

css文本超出容器宽度自动换行及超过行数加省略号...

css文本超过div的宽度时&#xff0c;让它进行自动换行&#xff0c;并且超过div高度时候&#xff0c;在最后一行加省略号... 废话不多说&#xff0c;上代码 display: -webkit-box;overflow: hidden;text-overflow: ellipsis;word-wrap: break-word;white-space: normal !import…

Dubbo+ZooKeeper搭建的简单示例

一、简介 基于Dubbo ZooKeeper实现的分布式架构&#xff0c;调用接口方法就像调用本地方法一样调用远程服务。 来自Dubbo官网的架构图&#xff1a; 节点角色说明 节点角色说明Provider暴露服务的服务提供方Consumer调用远程服务的服务消费方Registry服务注册与发现的注册中心M…

uni-calendar更改打点颜色实现签到和缺勤不同打点颜色效果

1.公司要实现打卡功能&#xff0c;发现uni-calendar插件不支持不同打点颜色的效果&#xff0c;所以就自己改一下源码 下图是公司ui设计师给的图 2.我们打开调试&#xff0c;可以看到红色打点的样式是.uni-calendar-item__weeks-box-circle 3.我们复制下来这个class名&#xff…

C++_练习—面积

面积 代码示例&#xff1a; 1 #include <iostream>2 3 using namespace std;4 5 class circle {6 7 public:8 void set_r(int rr);9 double c_circle(); 10 double s_circle(); 11 12 private: 13 int r; 14 }; 15 16 17 void circle::set_r(int rr) …

C++_练习—函数指针与函数重载

函数指针与函数重载 成员函数与普通函数区别&#xff1a; 定义一个对象时&#xff0c;系统只为数据成员分配空间。那么对于类的成员函数而言&#xff0c;它如何知道函数中提到的数据成员是哪个对象的数据成员呢&#xff1f;……实际上&#xff0c;C为每个成员函数设置了一个隐藏…

反编译工具jad的使用

from:http://www.javaresearch.org/article/55024.htm这是jad的readme文件jad -sjava example1.classjad的主页是&#xff1a;http://www.geocities.com/SiliconValley/Bridge/8617/jad.htmlCopyright 2000 Pavel Kouznetsov (kpdusyahoo.com).1. 请先读jad主页的否认声明文件2…

[转载]Redis 持久化之RDB和AOF

原文链接&#xff1a;https://www.cnblogs.com/itdragon/p/7906481.html 温馨提示 在正式数据&#xff08;当然是非生产环境啦&#xff09;练习以下操作时&#xff0c;一定一定一定记得备份dump.rdb文件。 我给自己的服务器添加了aof持久化配置&#xff0c;重启后&#xff0c;发…

C++_练习—构造与析构

构造与析构 1 #include <iostream>2 3 using namespace std;4 5 class info {6 public:7 info(int a);8 info(int a, int b);9 info(int a,int b,int c); 10 11 ~info(); // 无类型无返无参 12 13 private: 14 int age; 15 int *temp; 16 }; …

分布式 dynamips+dyangen (更新于07.3.30)

更新内容&#xff1a;很多网友都反映用此文方法行不通&#xff0c;现象为&#xff1a;分布在各计算机上的路由器能起来&#xff0c;但互联的端口是Down的&#xff0c;以至于ping 不通。在此特别感谢 flyxj 网友QQ联系到我才引起我的高度注视使问题得到解决&#xff0c;不会再给…

vs 运行的时候产生伴随cmd窗口

简介 很多程序运行的时候&#xff0c;需要输出调试信息。如果没有伴随cmd窗口的话&#xff0c;输出的信息很难被看到 参考链接 https://blog.csdn.net/weixin_39278265/article/details/81865362 转载于:https://www.cnblogs.com/eat-too-much/p/11357709.html

tcl学习---windows下安装及运行环境

以前一直用tcl&#xff0c;但是面试的时候发现很多基本的概念竟然又忘记了。有什么好说的&#xff0c;从头到尾再实践一遍吧~~~~1&#xff1a;下载windows版本不方便UNIX/LINUX,所以直接下载windows版本&#xff0c;现在版本已经到8.5了&#xff1b;下载链接&#xff1a;[url]h…

OCP-052考试题库汇总(33)-CUUG内部解答版

Which two can be exported by a non-administrative account by using Data Pump? A)directory objects B)tables C)tablespaces D)schemas E)database Answer: BD 赵&#xff1a; EXP 和 IMP 是客户端工具程序&#xff0c;它们既可以在客户端使用&#xff0c;也可以在服务端…

linux mesg 命令详解

linux mesg 命令详解功能说明&#xff1a;设置终端机的写入权限。语  法&#xff1a;mesg [ny]补充说明&#xff1a;将mesg设置y时&#xff0c;其他用户可利用write指令将信息直接显示在您的屏幕上。参  数&#xff1a;n 不允许气筒用户将信息直接显示在你的屏幕上。y 允许…

OCP-052考试题库汇总(34)-CUUG内部解答版

Where is an expdp operation tracked? A)dump files B)control file C)log files D)Automatic Diagnostic Repository(ADR) E)master table (MT) Answer: E 赵&#xff1a; 题译&#xff1a;哪里有一个 Exdp 操作跟踪? Master table 是一个临时 table&#xff0c;专门为 imp…