系列十一(实战)、发送 接收带标签的消息(Java操作RocketMQ)

一、发送 & 接收带标签的消息

1.1、概述

        消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。

        其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。

1.2、订阅关系一致性

        订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考

订阅关系一致文档,这里不再赘述。

1.3、Demo07MQTestApp 

/*** @Author : 一叶浮萍归大海* @Date: 2023/12/25 13:03* @Description: 发送 & 接收带标签的消息*/
@Slf4j
public class Demo07MQTestApp {/*** 发送带标签的消息*/@Testpublic void demo7Producer() throws Exception {// 1、创建一个生产者DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");// 2、连接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、启动producer.start();// 4、创建消息String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};for (int i = 1; i <= 6; i++) {String tag = tags[i % tags.length];String content = "";switch (tag) {case "NBA":content = "this is a message about NBA,消息编号[" + i + "]";break;case "run":content = "this is a message about run,消息编号[" + i + "]";break;case "star":content = "this is a message about star,消息编号[" + i + "]";break;case "mobile":content = "this is a message about mobile,消息编号[" + i + "]";break;case "tourism":content = "this is a message about tourism,消息编号[" + i + "]";break;default:content = "this is a message about foods,消息编号[" + i + "]";break;}Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));// 5、发送消息producer.send(message);log.info("【demo7Producer】发送消息成功,消息内容:{}",content);}// 关闭producerproducer.shutdown();}/*** 接收带标签的消息(Push方式)*/@Testpublic void demo7PushConsumer1() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}@Testpublic void demo7PushConsumer2() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("tag-topic", "NBA || star || mobile");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));}/*** 返回值:消费消息成功与否*      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队*      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}}

1.4、测试

        先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。

1.5、Topic和Tag如何选择

        不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:

(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;

(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;

(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;

(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;

        总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/549683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Comments are not permitted in JSON

用vsCode开发uniappvue3.0TypeeScriptvite项目的时候&#xff0c;打开json文件报错Comments are not permitted in JSON。 解决办法如下&#xff1a; 1.点击右下角 2.输入JSON选择第二个JSON with Comments即可

uni-nav-bar设置height自适应高度

今天拿到ui设计的图之后&#xff0c;发现需要用到自定义navbar 给它设置height的时候&#xff0c;一开始用的iphone X的屏幕&#xff0c;像素单位用的rpx&#xff0c;发现切换到别的屏幕就不对了 然后还是用iphone X的屏幕&#xff0c;像素改成固定的88px&#xff0c;发现有一…

css文本超出容器宽度自动换行及超过行数加省略号...

css文本超过div的宽度时&#xff0c;让它进行自动换行&#xff0c;并且超过div高度时候&#xff0c;在最后一行加省略号... 废话不多说&#xff0c;上代码 display: -webkit-box;overflow: hidden;text-overflow: ellipsis;word-wrap: break-word;white-space: normal !import…

Dubbo+ZooKeeper搭建的简单示例

一、简介 基于Dubbo ZooKeeper实现的分布式架构&#xff0c;调用接口方法就像调用本地方法一样调用远程服务。 来自Dubbo官网的架构图&#xff1a; 节点角色说明 节点角色说明Provider暴露服务的服务提供方Consumer调用远程服务的服务消费方Registry服务注册与发现的注册中心M…

uni-calendar更改打点颜色实现签到和缺勤不同打点颜色效果

1.公司要实现打卡功能&#xff0c;发现uni-calendar插件不支持不同打点颜色的效果&#xff0c;所以就自己改一下源码 下图是公司ui设计师给的图 2.我们打开调试&#xff0c;可以看到红色打点的样式是.uni-calendar-item__weeks-box-circle 3.我们复制下来这个class名&#xff…

分布式 dynamips+dyangen (更新于07.3.30)

更新内容&#xff1a;很多网友都反映用此文方法行不通&#xff0c;现象为&#xff1a;分布在各计算机上的路由器能起来&#xff0c;但互联的端口是Down的&#xff0c;以至于ping 不通。在此特别感谢 flyxj 网友QQ联系到我才引起我的高度注视使问题得到解决&#xff0c;不会再给…

换了坐骑

公司本来发了个DELL D400。好大好沉啊。&#xff08;虽然加了配置&#xff09;所以一直在使用自己的lenovo。今天总算给换了一个别的本本。当然不是新本本啦。不过也不错。毕竟才来还没一年。淘汰给我的那哥们跟我关系不错。他换了新的联想的天逸系列。 也是挺高配的。双核啊。…

C++_练习—继承_构造析构

构造析构 继承与构造析构&#xff1a; 在子类对象构造时&#xff0c;需要调用父类构造函数对其继承得来的成员进行初始化 在子类对象析构时&#xff0c;需要调用父类析构函数对其继承得来的成员进行清理 1 #include <iostream>2 3 using namespace std;4 5 class info1 {…

Vista修改默认字体

装了Vista之后&#xff0c;发现访问很多网站时字体都不好看。根据网上找到的方法&#xff0c;用Windows XP中的宋体替换Vista中的宋体。1.用 Total Commander (或 WinRAR) 进入 Vista 的 Windows\Fonts 文件夹&#xff0c;simsun.ttc 文件重命名。(我在重命名时&#xff0c;遇到…

在 CCR 环境中使用 Exchange 命令行管理程序移动存储组和数据库

作为Exchange管理员或许会遇到需要对Exchange Server存储组和数据库更改存储路径的情况&#xff0c;在常规情况下&#xff0c;更改Exchange Server存储组和数据库的路径一项比较简单的操作&#xff0c;通常在图形界面下经过简单的几步操作即可&#xff0c;路径更改过程数据库会…

C++_练习—多态_证明vptr指针的存在

证明vptr指针的存在 1 // 证明vptr指针的存在2 3 #include <iostream>4 5 using namespace std;6 7 class parent {8 public:9 parent(int a) { 10 this->a a; 11 } 12 13 virtual void pri(void) { 14 cout << "parent &…

NetCore的配置管理(1)

学习NetCore的配置管理&#xff1b; 目录 命令行配置Json文件配置配置文件文本至C#对象实例的映射配置文件热更新总结命令行配置&#xff1a; 打开VS2017,新建NetCore控制台项目&#xff1b; 打开nuget包管理&#xff1b;添加Microsoft.Asp.NetCore.all&#xff0c;或者使用命令…

buffer busy waits

buffer busy waitshttp://metalink.oracle.com/metalink/plsql/ml2_documents.showDocument?p_database_idNOT&p_id34405.1当会话想要访问缓冲存储器中的数据块&#xff0c;而该数据块正在被其它会话使用时产生buffer busywaits事件。其它会话可能正在从数据文件向缓冲区存…

SetupFactory安装制作心得

很多年前&#xff0c;因为仰慕Install Shield的鼎鼎大名&#xff0c;所以很是花了些功夫研究了一番&#xff0c;最后&#xff0c;基本上也可以打出很完善的包了&#xff0c;其中也不乏一些很有难度的事情&#xff0c;比如ODBC的打包等。但它实在太难用了&#xff0c;过上一段时…

Timus 1114. Boxes

Timus 1114. Boxes 要求计算出将两种颜色的球放到盒子中的各种组合的数目。1114. Boxes Time Limit: 0.6 second Memory Limit: 16 MB N boxes are lined up in a sequence (1 ≤ N ≤ 20). You have A red balls and B blue balls (0 ≤ A ≤ 15, 0 ≤ B ≤ 15). The red bal…

Speerio Skinergy 'Image' is ambiguous 错误

使用BeyondCSS皮肤时报错&#xff1a;Could Not Load Skin: /Portals/0/Skins/beyondcss/1column_speerio.ascx Error: E:"Development"DotNetNuke"InstallArea"DotNetNuke_04.08.03_Source"Website"controls"Speerio"Skinergy"s…

Sql Server中自动序号的方法

第一种:使用identity函数增加临时表的方法 selectid identity(int,1,1),*into#tmp fromtableselect*from#tmp droptable#tmp 在SQL2005中新增了ROW_NUMBER()函数,给我们带来了很多方便,使用方法如下: SELECTid,ROW_NUMBER() OVER(orderbyid)asRowNumber FROMTable有一个方便,as…

flex白板之图形绘制函数

图形的绘制 Graphics类提供了相关的方法&#xff1a; 1&#xff0c;清空画布 graphics.clear();2&#xff0c;设置画笔 graphics.lineStyle(thickness:Number NaN, color:uint 0, alpha:Number 1.0, pixelHinting:Boolean false, scaleMode:String "normal", ca…

三层体系结构学习总结

三层架构学习总结KeyWords: 三层体系结构,DAL,BLL,USL,学习心得,三层体系结构,软件三层体系结构 By Flouse2008年7月24日三层体系结构的概念 用户界面表示层(USL)业务逻辑层(BLL)数据访问层(DAL) 图一&#xff1a;BLL将USL与DAL隔开了&#xff0c;并且加入了业务规则 各层的作用…

具有全局观的网络拓扑

近年来&#xff0c;IT技术发展迅速&#xff0c;随着各个企业IT系统的建设&#xff0c;网络架构从单一的局域网扩展到广域网&#xff0c;网络设备也是类型多种多样&#xff0c;路由器、交换机、防火墙、IDS、***等等&#xff0c;而且网络设备的供应厂商也是越来越多&#xff0c;…