【RocketMQ系列五】消息示例-顺序消息延迟消息广播消息的实现

1. 前言

上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。

顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。

顺序消息分为 全局顺序消息和局部顺序消息。

全局顺序消息就是全局使用一个queue。

局部顺序消息就是 有顺序依赖的消息放在同一个queue中,多个queue并行消费。

2. 局部顺序消息

默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中,这样的话就不能保证消息是有序的。

比如在购物网站下单场景下:有 1. 创建订单---->2. 订单支付---->3. 订单发货---->4. 订单完成 四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式,那么同一个订单,有可能创建订单被投递到了 MessageQueue1,订单支付的话被投递到了MessageQueue2。 由于消息在不同的MessageQueue中,消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。

局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中,这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示:

局部顺序消息

局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。

2.1. 定义生产者

  1. 这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。

    image-20231003154231683

public class OrderProducer {// 局部顺序消费,核心就是自己选择Queue,保证需要顺序保障的消息落到同一个队列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();for (int i = 0; i < 10; i++) {int orderId = i;for (int j = 0; j < 5; j++) {// 构建消息体,tags和key 只是做一个简单区分Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部顺序消息处理_" + orderId + ";step_" + j).getBytes());SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {@Override//这里的arg参数就是外面的orderIdpublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", send);}}defaultMQProducer.shutdown();}
}
  1. 在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中, public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 方法有三个参数:其中,mqs表示当前topic所路由的全部队列数,这里就是8个队列,broker-a有4个队列,broker-b有4个队列。msg就是传入的消息体,arg 就是传入的orderId。

  2. 这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中,这样就保证了相同的orderId的消息会落到同一个队列中

    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
    
生产者运行结果(部分截图)

image-20231003160039915

从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中,而相同MessageQueue队列天然是有顺序的。

2.2.定义消费者

说完了生产者,接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是:

	// 2.订阅消费消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("消费得到的消息是={}" + msg);System.out.println("消息体内容是={}" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

这里启动了三个消费者,不管消费者消费的顺序如何,相同的orderId下的5条消息都是被顺序消费的。

image-20231010125014684

image-20231010125043838

image-20231010125110337

3. 碰到的问题

在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的,可以通过 df -h 命令查看当前磁盘空间的占用情况,当磁盘空间使用率超过90%的话则会报此错。

image-20231003131237358

4. 全局顺序消息

全局顺序消息是指消费者消费全部消息都是顺序的,只能让所有的消息都发送到同一个MessageQueue中来实现,在高并发场景下会非常影响效率。

5. 广播消息

广播消息是向主题(topic)的所有订阅者发送消息,订阅同一个topic的多个消费者,都能全量收到生产者发送的所有消息。

广播消息的生产者与普通同步消息的生产者实现是一致的,不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");// 设置消费者的模式是广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//从第一位开始消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. 延迟消息

延迟消息与普通消息的不同之处在于,它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息,而是发送到topic里面,消费者延迟指定的时间进行消费。

6.1. 延迟消息生产者

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");defaultMQProducer.start();for (int i = 0; i < 100; i++) {Message message = new Message("Schedule_topic", ("延迟消息测试" + i).getBytes());//设置延迟级别,默认有18个延迟级别,这个消息将延迟10秒消费message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println("所有延迟消息发送完成");defaultMQProducer.shutdown();

延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别,这里设置级别是3,则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。

image-20231003200021490

延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。

image-20231003201410410

7.批量消息

批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M,实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代码如下:

	//4.创建消息List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100*100; i++) {// 创建消息,指定topic,以及消息体messageList.add(new Message("batch_topic", ("飞哥测试批量消息" + i).getBytes()));}//批量消息消息小于4M的处理SendResult send = defaultMQProducer.send(messageList);System.out.println(send);

8.过滤消息

使用tag过滤

在大多数情况下,标签是一种简单而有用的设计,可以用来选择你想要的消息。

首先是根据tag来过滤消息,生产者在发送消息的时候指定该消息的tag标签,消费者则可以根据tag来过滤消息。

8.1. 过滤消息生产者

这里定义了三个tag,分别是tagA,tagB以及tagC,生产者在生产消息的时候给每个消息指定不同的tag。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("TagFilterTest", tags[i % tags.length], ("飞哥tag消息过滤" + tags[i % tags.length]).getBytes());SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

8.2. 过滤消息的消费者

消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

image-20231003212919155

使用SQL过滤

SQL 功能可以通过发送消息时输入的属性进行一些计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑。

语法

RocketMQ只定义了一些基本的语法类支持这个特性。

1. 数值比较:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比较:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 逻辑`AND`,`OR`,`NOT`;

常量类型有:

1. 数字,如 123,
2. 字符,如 'abc',必须用单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE` 或 `FALSE`;

SQL过滤生产者

生产者主要设置属性过滤 message.putUserProperty("a", String.valueOf(i)); 表示第一条消息键值对是 a=0,第二条消息键值对是a=1。

	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飞哥sql消息过滤" + tags[i % tags.length]).getBytes());message.putUserProperty("a", String.valueOf(i));SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

SQL过滤消费者:

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息体=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消费者已经启动");

如果运行报 The broker does not support consumer to filter message by SQL92

image-20231003221207618

则需要修改 broker.conf 文件,增加如下配置:

# 开启对 propertyfilter的支持
enablePropertyFilter = true 
filterSupportRetry = true

然后重启broker。

总结

本文介绍了局部顺序消息,全局顺序消息,广播消息,延迟消息,以及如何批量发送消息和过滤消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/112684.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从实时数据库转战时序数据库,他陪伴 TDengine 从 1.0 走到 3.0

关于采访嘉宾 在关胜亮的学生时代&#xff0c;“神童”这个称号如影随形&#xff0c;很多人初听时会觉得这个称谓略显夸张&#xff0c;有些人还会认为这是不是就是一种调侃&#xff0c;但是如果你听说过他的经历&#xff0c;就会理解这一称号的意义所在了。 受到教师母亲的影…

处理sass-loader安装失败

Vue项目中安装node-sass跟sass-loader 我们在开发中,经常会使用sass语法来编写css&#xff0c;在安装node-sass和sass-loader时&#xff0c;经常会出现错误&#xff08;通常是依赖冲突&#xff09;导致安装失败。因为官方发布的版本号并不是连续的&#xff0c;有些版本与版本之…

ESRI ArcGIS Desktop 10.8.2图文安装教程及下载

ArcGIS 是由美国著名的地理信息系统公司 Esri 开发的一款地理信息系统软件&#xff0c;它是目前全球最流行的 GIS 软件之一。ArcGIS 提供了图形化用户界面和数据分析工具&#xff0c;可以帮助用户管理、分析和可视化各种空间数据。ArcGIS Desktop是一个完整的桌面GIS软件套件&a…

JVMGC复习

TLAB:默认给每一个线程开辟一块内存空间存放线程自己的对象 Class对象是存放在堆区的&#xff0c;不是方法区&#xff0c;类的元数据元数据并不是类的Class对象&#xff0c;Class对象是加载的最终产品&#xff0c;类的方法代码&#xff0c;变量名&#xff0c;方法名&#xff0c…

【黑马程序员】机器学习

&#xff08;一&#xff09;机器学习概述 一、机器学习算法分类 1、监督学习&#xff1a; &#xff08;1&#xff09;目标值是类别&#xff1a;分类问题 k-近邻算法、贝叶斯分类、决策树与随机森林、逻辑回归 &#xff08;2&#xff09;目标值是连续型的数据&#xff1a;回归…

微信小程序进阶——后台交互

目录 一、后台准备 1.1 pom.xml 1.2 配置数据源 1.3 整合mybatis 二、前后端交互 2.1 method1 2.2 method2 2.2.1 封装request 2.2.2 头部引用util 2.2.3 编写方法 2.2.4 展示效果 三、WXS的使用 3.1 会议状态 3.1.2 引入wxs 3.1.3 修改代码 3.1.4 展示效果 3…

【python】进程和线程

进程和线程 今天我们使用的计算机早已进入多CPU或多核时代&#xff0c;而我们使用的操作系统都是支持“多任务”的操作系统&#xff0c;这使得我们可以同时运行多个程序&#xff0c;也可以将一个程序分解为若干个相对独立的子任务&#xff0c;让多个子任务并发的执行&#xff…

关于 Invalid bound statement (not found): 错误的解决

关于 Invalid bound statement not found: 错误的解决 前言错误原因解决方法1. 检查SQL映射文件2. 检查MyBatis配置3. 检查SQL语句4. 检查命名约定5. 清除缓存6. 启用日志记录 重点注意 结语 我是将军我一直都在&#xff0c;。&#xff01; 前言 当开发Java Spring Boot应用程…

自然语言处理基础

自然语言 自然语言处理是人工智能能够通过图灵测试的重要工具。 自然语言处理基本的任务和应用 词性标注&#xff1a;把每句话的各个单词的词性标注出来&#xff0c;例如&#xff1a;形容词、名词、动词 named entity recognition命名实体的识别&#xff1a;识别哪些单词是真…

Service Mesh和Kubernetes:加强微服务的通信与安全性

文章目录 什么是Service Mesh&#xff1f;Service Mesh的优势1. 流量控制2. 安全性3. 可观测性 Istio&#xff1a;Service Mesh的领军者流量管理安全性可观测性 Linkerd&#xff1a;轻量级Service Mesh流量管理安全性可观测性 Istio vs. Linkerd实际应用结论 &#x1f388;个人…

小度打头阵,百度大模型能否“赋能万物”?

文 | 智能相对论 作者 | 楷楷 近日&#xff0c;百度集团副总裁、小度科技原CEO景鲲因个人原因辞任&#xff0c;百度集团副总裁、首席信息官&#xff08;CIO&#xff09;李莹轮岗出任小度科技CEO&#xff0c;并向李彦宏直接汇报。 随着“景鲲时代”落幕&#xff0c;新任CEO李…

2023年中国档案信息化发展历程、竞争格局及行业市场规模分析[图]

档案信息化是以网络、计算机、信息技术为手段&#xff0c;以档案资源为对象&#xff0c;以档案工作为依托&#xff0c;以档案管理学最新理论为指导&#xff0c;按照信息社会和国家档案行政管理部门的要求、开展档案的收集、整理、保管、开发和利用的现代化管理过程。 档案信息化…

[SQL开发笔记]INSERT INTO 语句:将新记录插入到数据库表中

目前&#xff0c;向数据库插入数据是数据管理的重要环节&#xff0c;它可以将数据长期保存、共享访问、保证数据的完整性和安全性&#xff0c;同时也是进行数据检索和分析的基础。其中&#xff0c;INSERT INTO 语句是SQL&#xff08;结构化查询语言&#xff09;中用于向数据库表…

使用guacamole进行WEB远程桌面连接

Apache Guacamole 是一个无客户端的远程桌面网关。它支持标准协议&#xff0c;如 VNC、RDP 和 SSH&#xff0c;甚至还支持k8s、telnet连接。它可以在任何有网络的地方连接上你的服务器和Windows主机。可以同时连接多个终端&#xff0c;并且能够无缝切换。本文采用docker进行部署…

数据结构与算法(十):动态规划与贪心算法

参考引用 Hello 算法 Github&#xff1a;hello-algo 1. 动态规划算法 动态规划将一个问题分解为一系列更小的子问题&#xff0c;并通过存储子问题的解来避免重复计算&#xff0c;从而大幅提升时间效率 问题&#xff1a;给定一个共有 n 阶的楼梯&#xff0c;你每步可以上 1 阶或…

【王道代码】【2.3链表】d1

关键字&#xff1a; 递归删除x&#xff1b;删除所有x&#xff1b;递归反向输出&#xff1b;删除最小结点&#xff08;2组指针&#xff09;&#xff1b;原地逆置&#xff1b;使递增有序

2008-2021年上市公司实体企业金融化程度测算数据(原始数据+stata代码)

2008-2021年上市公司实体企业金融化程度测算&#xff08;原始数据stata代码&#xff09; 1、时间&#xff1a;2008-2021年 2、指标&#xff1a;股票代码、年份、交易性金融资产、衍生金融资产、发放贷款及垫款净额、可供出售金融资产净额、持有至到期投资净额、长期债权投资净…

github 终端克隆操作,以及对 https/ssh 的理解

前言 最近瞎搞 github 的一些配置&#xff0c;结果搞得有一段时间克隆不了仓库。不过经历了这次风波后&#xff0c;我对 github 的一些原理有了更清楚的了解。所以想稍微写一小篇文章总结输出一下&#xff0c;也欢迎有疑问的读者与博主进一步交流&#xff0c;我的理解还是有限…

[1Panel]开源,现代化,新一代的 Linux 服务器运维管理面板

测评介绍 本期测评试用一下1Panel这款面板。1Panel是国内飞致云旗下开源产品。整个界面简洁清爽&#xff0c;后端使用GO开发&#xff0c;前端使用VUE的Element-Plus作为UI框架&#xff0c;整个面板的管理都是基于docker的&#xff0c;想法很先进。官方还提供了视频的使用教程&…

leetcode:2678. 老人的数目(python3解法)

难度&#xff1a;简单 给你一个下标从 0 开始的字符串 details 。details 中每个元素都是一位乘客的信息&#xff0c;信息用长度为 15 的字符串表示&#xff0c;表示方式如下&#xff1a; 前十个字符是乘客的手机号码。接下来的一个字符是乘客的性别。接下来两个字符是乘客的年…