RocketMq 顺序消费、分区消息、延迟发送消息、Topic、tag分类 实战 (消费者) (三)

消费端配置
如下所示:是消费者的配置类,有以下几点需要注意的地方
1、是TargetMessageListener这个监听类(下文会把这个监听类的具体代码贴出来),需要把这个监听类订阅。
2、rocketMqDcProperties.getTargetProperties()这个方法里面有相关的配置信息(这里面有绑定消费者是那个组,因为一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致),具体代码见下文
3、subscription.setTopic(rocketMqDcProperties.getOrderTopic()) 是绑定Topic,这个Topic跟上篇文章生产者的Topic是一致的,这样就能保证消费者能准确消费到生产者发送的消息
4、subscription.setExpression(MqTagEnum.target.name()); 这个是一个消息的过滤,也是一个对消息的具体分类。详细用法请参考链接:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/message-filtering?spm=a2c4g.11186623.0.i38#concept-2047069
综上所述:
这段代码主要是用于配置一个 OrderConsumerBean 对象,设置其属性和订阅关系

@Configuration
public class TargetConsumerClient {@Autowiredprivate RocketMqDcProperties rocketMqDcProperties;@Autowiredprivate TargetMessageListener messageListener;@ConditionalOnProperty(name = "rocket.mq.dc.enabled", havingValue = "true", matchIfMissing = true)@Bean(initMethod = "start", destroyMethod = "shutdown",name = {"buildTargetConsumer"})public OrderConsumerBean buildTargetConsumer() {OrderConsumerBean orderConsumerBean = new OrderConsumerBean();//配置文件orderConsumerBean.setProperties(rocketMqDcProperties.getTargetProperties());//订阅关系Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();Subscription subscription = new Subscription();subscription.setTopic(rocketMqDcProperties.getOrderTopic());subscription.setExpression(MqTagEnum.target.name());subscriptionTable.put(subscription, messageListener);orderConsumerBean.setSubscriptionTable(subscriptionTable);return orderConsumerBean;}
}
	public Properties getTargetProperties() {return this.getProperties(this.targetGroupId);}private Properties getProperties(String groupId) {Properties properties = new Properties();properties.setProperty("AccessKey", this.accessKey);properties.setProperty("SecretKey", this.secretKey);properties.setProperty("NAMESRV_ADDR", this.nameSrvAddr);properties.setProperty("GROUP_ID", groupId);properties.setProperty("ConsumeThreadNums", this.getConsumeThreadNums().toString());properties.setProperty("maxReconsumeTimes", this.getMaxReconsumeTimes().toString());properties.setProperty("consumeTimeout", this.getConsumeTimeout().toString());properties.setProperty("suspendTimeMillis", this.getSuspendTimeMillis().toString());return properties;}    

消费端代码
如下所示:TargetMessageListener 实现了MessageOrderListener接口,并如上文所示,其和OrderConsumerBean也绑定了订阅关系。

@Slf4j
@Component
public class TargetMessageListener implements MessageOrderListener {@Autowiredprivate MqMessageRecordDao mqMessageRecordDao;@Autowiredprivate TargetService targetServiceImpl;@Overridepublic OrderAction consume(final Message message, final ConsumeOrderContext context) {log.info("MQ消息消费者监听消息内容:{}", message);MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo();try {String body = new String(message.getBody());String tag = message.getTag();mqMessageRecordMo.setMsgId(message.getMsgID());mqMessageRecordMo.setOrderTopic(message.getTopic());mqMessageRecordMo.setProducerIp(message.getBornHost());mqMessageRecordMo.setTag(tag);//mqMessageRecordMo.setMessageKey(message.getKey());mqMessageRecordMo.setShardingKey(message.getShardingKey());mqMessageRecordMo.setBodyJson(body)mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp()));mqMessageRecordMo.setCreateTime(LocalDateTime.now());mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId"));log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID());DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey());targetServiceImpl.process(message.getMsgID(),dataBase,body);log.info("MQ消息体消费监听解析结果:{}", body);mqMessageRecordMo.setIsSuccess(true);return OrderAction.Success;} catch (Exception e) {//消费失败,挂起当前队列// 存储错误消息,重试,记录日志/*log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e);mqMessageRecordMo.setIsSuccess(false);mqMessageRecordMo.setErrorMsg(e.getMessage());return OrderAction.Success;} finally {mqMessageRecordDao.save(mqMessageRecordMo);}}
}            

在这里想讲下顺序消息
顺序消息
顺序消息可以保证消息的消费顺序和发送的顺序一致,即先发送的先消费,后发送的后消费,常用于金融证券、电商业务等对消息指令顺序有严格要求的场景。本文介绍云消息队列 RocketMQ 版顺序消息的概念、适用场景、实现原理以及使用过程中的注意事项。
什么是顺序消息
顺序消息是云消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景
适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例
用户注册需要发送验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
阿里巴巴集团内部电商系统均使用分区顺序消息,既保证业务的顺序,同时又能保证业务的高性能。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景
适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
示例
在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
说明
全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高
如何实现顺序消息
在这里插入图片描述
在云消息队列 RocketMQ 版中,消息的顺序需要由以下三个阶段保证:
消息发送
如上图所示,A1、B1、A2、A3、B2、B3是订单A和订单B的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单A的消息发送和消费都按照A1、A2、A3的顺序。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时云消息队列 RocketMQ 版支持将Sharding Key相同(例如同一订单号)的消息序路由到一个队列中。
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。
消息存储
如上图所示,顺序消息的Topic中,每个逻辑队列对应一个物理队列,当消息按照顺序发送到Topic中的逻辑队列时,每个分区的消息将按照同样的顺序存储到对应的物理队列中。
消息消费
云消息队列 RocketMQ 版按照存储的顺序将消息投递给Consumer,Consumer收到消息后也不对消息顺序做任何处理,按照接收到的顺序进行消费。
Consumer消费消息时,同一Sharding Key的消息使用单线程消费,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致
注意事项
a、同一个Group ID只对应一种类型的Topic,即不同时用于顺序消息和无序消息的收发。
b、对于全局顺序消息,建议消息不要有阻塞。同时运行多个实例,是为了防止工作实例意外退出而导致业务中断。当工作实例退出时,其他实例可以立即接手工作,不会导致业务中断,实际工作的只会有一个实例。
c、云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致

顺序消息常见问题
a、同一条消息是否可以既是顺序消息,又是定时消息和事务消息?
不可以。顺序消息、定时消息、事务消息是不同的消息类型,三者是互斥关系,不能叠加在一起使用。

b、顺序消息支持哪些地域?
支持云消息队列 RocketMQ 版所有公共云地域和金融云地域。

c、为什么全局顺序消息性能一般?
全局顺序消息是严格按照FIFO的消息阻塞原则,即上一条消息没有被成功消费,那么下一条消息会一直被存储到Topic队列中。如果想提高全局顺序消息的TPS,可以升级实例配置,同时消息客户端应用尽量减少处理本地业务逻辑的耗时。

d、顺序消息支持哪种消息发送方式?
顺序消息只支持可靠同步发送方式,不支持异步发送方式,否则将无法严格保证顺序。

e、顺序消息是否支持集群消费和广播消费?
顺序消息暂时仅支持集群消费模式,不支持广播消费模式。
以上文档链接来源:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/ordered-messages?spm=a2c4g.11186623.0.0.34b428e5LL1Jlh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/766958.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯day9刷题日记

P8649 [蓝桥杯 2017 省 B] k 倍区间 思路&#xff1a;前缀和的题&#xff0c;对k取余相同的数就可以得到k的倍数 #include <iostream> #include <string> using namespace std; long long ans; int n,k; long long q[100010]; long long sum[100010];int main() …

Redis 教程系列之Redis 安全(六)

我们可以通过 redis 的配置文件设置密码参数&#xff0c;这样客户端连接到 redis 服务就需要密码验证&#xff0c;这样可以让你的 redis 服务更安全。 实例 我们可以通过以下命令查看是否设置了密码验证&#xff1a; 127.0.0.1:6379> CONFIG get requirepass 1) "re…

小程序返回webview h5 不刷新问题

我的场景&#xff1a;a、小程序首页-》b、webview h5活动列表-》c、小程序活动详情 c返回b b无法刷新 网上说了好多办法试过了都不行 求解啊 比如 1、先清空URL在赋值 <web-view wx:if"{{url}}" src"{{url}}" bindmessage"onMessage"&g…

【机器学习】k近邻(k-nearest neighbor )算法

文章目录 0. 前言1. 算法原理1.1 距离度量1.2 参数k的选择 2. 优缺点及适用场景3. 改进和扩展4. 案例5. 总结 0. 前言 k近邻&#xff08;k-nearest neighbors&#xff0c;KNN&#xff09;算法是一种基本的监督学习算法&#xff0c;用于分类和回归问题。k值的选择、距离度量及分…

Linux中Oracle数据库启动顺序

首先使用oracle用户登录Linux&#xff0c;用lsnrctl status查看监听状态 1、&#xff1a;进入sqlplus $ sqlplus /nolog SQL> 2&#xff1a;使用sysdab角色登录sqlplus SQL> conn /as sysdba 3&#xff1a;启动数据库 SQL> startup 4&#xff1a;打开Oracle监听 …

微信小程序 - picker-viewer实现省市选择器

简介 本文会基于微信小程序picker viewer组件实现省市选择器的功能。 实现效果 实现代码 布局 <picker-view value"{{value}}" bindchange"bindChange" indicator-style"height: 50px;" style"width: 100%; height: 300px;" &…

OCR研究背景及相关论文分享

光学字符识别&#xff08;Optical Character Recognition&#xff0c;OCR&#xff09;是指使用光学方法将图像中的文字转换为机器可编辑的文本的技术。OCR技术的研究和应用已有数十年的历史&#xff0c;其背景和发展受到多方面因素的影响。 技术需求背景 1.自动化文档处理&am…

【数据分享】2012-2023年全球范围逐年NPP/VIIRS夜间灯光数据

夜间灯光数据是我们在各项研究中经常使用的数据&#xff01;本次我们给大家分享的是2012-2023年全球范围的逐年的NPP/VIIRS夜间灯光数据&#xff0c;数据格式为栅格格式(.tif)。该数据来自于NCEI国家环境信息中心&#xff0c;近期该网站更新了2023年的夜间灯光数据&#xff0c;…

安卓开发button控件的使用

在 Android 开发中&#xff0c;Button 控件用于创建用户可点击的按钮。以下是使用 Button 控件的一般步骤&#xff1a; 1. 在布局文件中添加 Button&#xff1a;打开你的布局文件&#xff08;例如 activity_main.xml&#xff09;&#xff0c;将 Button 控件添加到布局中。可以使…

电脑如何关闭自启动应用?cmd一招解决问题

很多小伙伴说电脑刚开机就卡的和定格动画似的&#xff0c;cmd一招解决问题&#xff1a; CtrlR打开cmd,输入&#xff1a;msconfig 进入到这个界面&#xff1a; 点击启动&#xff1a; 打开任务管理器&#xff0c;禁用不要的自启动应用就ok了

Linux——进程间通信管道

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、进程间通信1、进程间通信的目的2、进程间通信发展3、进程间通信分类 二、管道1、什么是管…

tcp 协议详解

什么是 TCP 协议 TCP全称为 “传输控制协议(Transmission Control Protocol”). 人如其名, 要对数据的传输进行一个详细的控制。TCP 是一个传输层的协议。 如下图&#xff1a; 我们接下来在讲解 TCP/IP 协议栈的下三层时都会先解决这两个问题&#xff1a; 报头与有效载荷如何…

【Linux】线程控制{fork() / vfork / clone/pthread_join()/pthread_cancel()}

文章目录 1.fork() / vfork / clone2.线程等待2.1pthread_join()2.2pthread_tryjoin_np() 3.pthread_exit()4.pthread_cancel()5.一些线程相关的问题6.pthread_detach()7.pthread_self()8.认识线程标识符&#xff1a;pthread_self()获取线程标识符9.POSIX线程库 1.fork() / vfo…

【CVPR2024】CricaVPR

【CVPR2024】CricaVPR: Cross-image Correlation-aware Representation Learning for Visual Place Recognition 这个论文提出了一种具有跨图像相关性的鲁棒全局表示方法用于视觉位置识别&#xff08;VPR&#xff0c;Visual Place Recognition &#xff09;任务&#xff0c;命…

MySQL学习笔记------SQL(1)

关系型数据库&#xff08;RDBMS&#xff09; 建立在关系模型基础上&#xff0c;由多张相互连接的二维表组成的数据库 特点&#xff1a;使用表储存数据&#xff0c;格式统一&#xff0c;便于维护 使用SQL语言操作&#xff0c;标准统一&#xff0c;使用方便 SQL通用语法 SQL…

Java 开篇之 JDK 下载、安装、配置、卸载、运行及乱码处理

JDK 安装和配置 本文内容简介&#xff1a; JDK 简介JDK 正确卸载方式JDK 下载和安装环境变量配置开发过程运行结果及乱码解决 JDK 简介 JDK&#xff08;Java Development Kit&#xff09;是 Java 语言的软件开发工具包&#xff0c;它是 Sun Microsystems&#xff08;现已被 Ora…

【研发日记】C/C++开发避坑秘籍(一)——CAN接收Buffer溢出Bug

文章目录 背景介绍 问题描述 分析排查 解决方案 总结归纳 背景介绍 在一个嵌入式软件项目中&#xff0c;有一段使用C语言写的嵌入式代码&#xff0c;功能是把CAN总线上的几帧报文接收进来&#xff0c;并解析出数据。示例如下&#xff1a; 乍一看感觉挺简单&#xff0c;想着…

Python编程异步爬虫——协程的基本原理

Python编程之异步爬虫 协程的基本原理 要实现异步机制的爬虫&#xff0c;自然和协程脱不了关系。 案例引入 先看一个案例网站&#xff0c;地址为https://www.httpbin.org/delay/5&#xff0c;访问这个链接需要先等5秒钟才能得到结果&#xff0c;这是因为服务器强制等待5秒时…

cookie、session和token的区别

引言 在当今的互联网时代&#xff0c;Web 应用程序的安全性和用户体验至关重要。身份验证和状态管理是构建安全、可靠的 Web 应用的核心部分。cookie、session、token都是常用的身份验证和状态管理机制。 Cookie 什么是 Cookie&#xff1f; Cookie 是存储在用户浏览器中的小…

【算法】回溯与深搜

方法论 1.构建决策树 2.设计代码&#xff1a;全局变量、dfs函数 3.剪枝&#xff0c;回溯 全排列 给定一个不含重复数字的整数数组 nums &#xff0c;返回其 所有可能的全排列 。可以 按任意顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff…