RocketMQ 顺序消息

顺序消息

顺序消息为云消息队列 RocketMQ 版中的高级特性消息,本文为您介绍顺序消息的应用场景、功能原理、使用限制、使用方法和使用建议。

应用场景

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用云消息队列 RocketMQ 版的顺序消息可以有效保证数据传输的顺序性。

典型场景一:撮合交易

交易撮合

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

图 1. 普通消息

普通消息

图 2. 顺序消息

顺序消息

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过云消息队列 RocketMQ 版传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

功能原理

什么是顺序消息

顺序消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

云消息队列 RocketMQ 版顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组。

重要

只有同一消息组的消息才能保证顺序,不同消息组或未设置消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性

云消息队列 RocketMQ 版的消息的顺序性分为两部分,生产顺序性和消费顺序性。

  • 生产顺序性:云消息队列 RocketMQ 版通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

    如需保证消息生产的顺序性,则必须满足以下条件:

    • 同一消息组(MessageGroup)

      消息生产顺序性的范围为消息组,生产者发送消息时可以为每条消息设置消息组,只有同一消息组内的消息可以保证顺序性,不同消息组或未设置消息组的消息之间不保证顺序。

    • 单一生产者

      消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

    • 串行发送

      云消息队列 RocketMQ 版生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    满足以上条件的生产者,将顺序消息发送至云消息队列 RocketMQ 版后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

    顺序存储逻辑

    • 相同消息组的消息按照先后顺序被存储在同一个队列。

    • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

    如上图所示,消息组1和消息组4的消息混合存储在队列1中,云消息队列 RocketMQ 版保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

  • 消费顺序性:云消息队列 RocketMQ 版通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

    如需保证消息消费的顺序性,则必须满足以下条件:

    • 投递顺序

      云消息队列 RocketMQ 版通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。

      重要

      消费者类型为PushConsumer时,云消息队列 RocketMQ 版保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类。

    • 有限重试

      云消息队列 RocketMQ 版顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

      对于需要严格保证消费顺序的场景,请务必设置合理的重试次数,避免参数不合理导致消息乱序。

生产顺序性和消费顺序性组合

如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。但一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

生产顺序

消费顺序

顺序性效果

设置消息组,保证消息顺序发送。

顺序消费

按照消息组粒度,严格保证消息顺序。

同一消息组内的消息的消费顺序和发送顺序完全一致。

设置消息组,保证消息顺序发送。

并发消费

并发消费,尽可能按时间顺序处理。

未设置消息组,消息乱序发送。

顺序消费

按队列存储粒度,严格顺序。

基于云消息队列 RocketMQ 版本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。

未设置消息组,消息乱序发送。

并发消费

并发消费,尽可能按照时间顺序处理。

顺序消息生命周期

生命周期

  • 初始化

    消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 待消费

    消息被发送到服务端,对消费者可见,等待消费者消费的状态。

  • 消费中

    消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。

    此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,云消息队列 RocketMQ 版会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交

    消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。

    云消息队列 RocketMQ 版默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除

    云消息队列 RocketMQ 版按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

重要

  • 消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束。

  • 顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。

使用限制

顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

使用示例

和普通消息发送相比,顺序消息发送必须要设置消息组。消息组的粒度建议按照业务场景,尽可能细粒度设计,以便实现业务拆分和并发扩展。

以Java语言为例,收发顺序消息的示例代码如下:

完整的消息收发示例代码请参见RocketMQ 5.x系列SDK(推荐)。

        //顺序消息发送。MessageBuilder messageBuilder = null;Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。.setMessageGroup("fifoGroup001")//消息体。.setBody("messageBody".getBytes()).build();try {//发送消息,需要关注发送结果,并捕获失败等异常SendReceipt sendReceipt = producer.send(message);System.out.println(sendReceipt.getMessageId());} catch (ClientException e) {e.printStackTrace();}//消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。//消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。MessageListener messageListener = new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {System.out.println(messageView);//根据消费结果返回状态。return ConsumeResult.SUCCESS;}};//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。List<MessageView> messageViewList = null;try {messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);//消费处理完成后,需要主动调用ACK提交消费结果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {e.printStackTrace();}});} catch (ClientException e) {//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。e.printStackTrace();}

获取顺序消息消费重试日志

PushConsumer顺序消费的重试是在消费者客户端进行,服务端无法获取消费重试的详细日志,若消息轨迹中顺序消息的投递结果为失败时,您需要在消费者客户端日志中查看消息重试的最大次数、消费者客户端等信息。

消费者客户端日志查看路径,请参见日志配置。

您可以通过搜索以下关键字在客户端日志中快速定位消费失败的相关内容:

Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times

使用建议

串行消费,避免批量消费导致乱序

消息消费建议串行处理,避免一次消费多条消息,否则可能出现乱序情况。

例如:发送顺序为1->2->3->4,消费时批量消费,消费顺序为1->23(批量处理,失败)->23(重试处理)->4,此时可能由于消息3的失败导致消息2被重复处理,最后导致消息消费乱序。

消息组尽可能打散,避免集中导致热点

云消息队列 RocketMQ 版保证相同消息组的消息存储在同一个队列中,如果不同业务场景的消息都集中在少量或一个消息组中,则这些消息存储压力都会集中到服务端的少量队列或一个队列中。容易导致性能热点,且不利于扩展。一般建议的消息组设计会采用订单ID、用户ID作为顺序参考,即同一个终端用户的消息保证顺序,不同用户的消息无需保证顺序。

因此建议将业务以消息组粒度进行拆分,例如,将订单ID、用户ID作为消息组关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/865826.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

万字详解AI开发中的数据预处理(清洗)

数据清洗&#xff08;Data Cleaning&#xff09;是通过修改、添加或删除数据的方式为数据分析做准备的过程&#xff0c;这个过程通常也被称为数据预处理&#xff08;Data Preprocessing&#xff09;。对于数据科学家和机器学习工程师来说&#xff0c;熟练掌握数据清洗全流程至关…

21_硬件电路基础

目录 组合逻辑电路 组合逻辑电路原理 真值表 布尔代数 门电路 译码器 发光二极管LED 液晶字符显示器LCD 数据选择器 数据分配器 多路开关 时序逻辑电路 时序逻辑电路原理 时钟信号 触发器 电位触发方式触发器 边沿触发方式触发器 寄存器 移位器 计数器 总线…

经营人心:Mrs. B的百年传奇

这个故事的主角是Rose Blumkin&#xff0c;也被称为Mrs. B&#xff0c;她是Nebraska Furniture Mart的创始人。 她的故事确实是一个关于用户思维、客户关系和创业精神的经典案例。 Rose Blumkin于1893年出生在俄罗斯的一个小村庄&#xff0c;她在1921年移民到美国。 1937年&…

ImportError cannot import name ‘uic‘ from ‘PyQt5‘

ImportError cannot import name ‘uic’ from ‘PyQt5’ 1、描述 使用nuitka把PyQt5打包exe文件时报错: ImportError cannot import name ‘uic’ from ‘PyQt5’ 2、原因 这个是由于无法找到uic的目录导致的,在PyQt5的目录下是有uic文件的。 3、解决方案 找到导入uic…

SQL Server 2022的组成

《SQL Server 2022从入门到精通&#xff08;视频教学超值版&#xff09;》图书介绍-CSDN博客 SQL Server 2022主要由4部分组成&#xff0c;分别是数据库引擎、分析服务、集成服务和报表服务。本节将详细介绍这些内容。 1.2.1 SQL Server 2022的数据库引擎 SQL Server 2022的…

NGINX+KEEPALIVED | 一文搞懂NG+KL负载均衡高可用架构的实操教程(详细)

文章目录 NGINXKEEPALIVED负载均衡高可用架构为什么需要多节点应用为什么需要Nginx服务为什么需要Keepalived服务NGKL简述前期准备Linux服务器公共环境配置Server1 NGKL服务器配置Server2 NGKL服务器配置Server3 HTTP服务器配置Server4 HTTP服务器配置运行测试用例 NGINXKEEPAL…

使用Keil将STM32部分程序放在RAM中运行

手动分配RAM区域,新建.sct文件,定义RAM_CODE区域,并指定其正确的起始地址和大小。 ; ************************************************************* ; *** Scatter-Loading Description File generated by uVision *** ; ************************************************…

C语言 -- 函数

C语言 -- 函数 1. 函数的概念2. 库函数2.1 标准库和头文件2.2 库函数的使用方法2.2.1 功能2.2.2 头文件包含2.2.3 实践2.2.4 库函数文档的一般格式 3. 自定义函数3.1 函数的语法形式3.2 函数的举例 4. 形参和实参4.1 实参4.2 形参4.3 实参和形参的关系 5. return 语句6. 数组做…

Element中的消息提示组件Message和弹框组件MessageBox

简述&#xff1a;在 Element UI 中&#xff0c;Message和MessageBox都是比较常用的组件&#xff0c;Message用来提示消息&#xff0c;而MessageBox是一个用于创建模态对话框的组件。它可以用于在页面上快速展示信息、警告或错误提示&#xff0c;而不会阻止用户的其他操作。简单…

116-基于5VLX110T FPGA FMC接口功能验证6U CPCI平台

一、板卡概述 本板卡是Xilinx公司芯片V5系列芯片设计信号处理板卡。由一片Xilinx公司的XC5VLX110T-1FF1136 / XC5VSX95T-1FF1136 / XC5VFX70T-1FF1136芯片组成。FPGA接1片DDR2内存条 2GB&#xff0c;32MB Nor flash存储器&#xff0c;用于存储程序。外扩 SATA、PCI、PCI expres…

【期末复习】数据库系统概论(附带考点汇总)

第1章.绪论 目录 第1章.绪论1.1. 数据库系统概述1.1.1.基本概念1.1.2.产生和发展 1.2.概念模型1.2.1.三种模型1.2.2.概念模型1.2.3.关系模型 1.3.数据库系统结构1.3.1三级模式结构1.3.2.两级映像与数据独立性 第2章.关系型数据库2.1.关系2.2.关系操作2.2.1.基本关系操作2.2.2.关…

SALOME源码分析:View Model

作为一款开源的CAx(CAD/CAE/CAM)软件集成平台&#xff0c;为了实现各个Module支持不同的数据显示与交互方案&#xff0c;出于扩展性的考虑&#xff0c;SALOME引入了View Model&#xff0c;用以支持OpenGL、OCC、VTK、ParaView、Qwt等数据显示与交互实现。 本文将以OCCViewer、…

昇思25天学习打卡营第16天|Diffusion扩散模型

导入必要的库函数 import math from functools import partial %matplotlib inline import matplotlib.pyplot as plt from tqdm.auto import tqdm import numpy as np from multiprocessing import cpu_count from download import downloadimport mindspore as ms import mi…

Python基于卷积神经网络分类模型(CNN分类算法)实现时装类别识别项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 在深度学习领域&#xff0c;卷积神经网络&#xff08;Convolutional Neural Networks, CNNs&#xff0…

【架构-20】死锁

什么是死锁&#xff1f; 死锁(Deadlock)是指两个或多个线程/进程在执行过程中,由于资源的互相占用和等待,而陷入一种互相等待的僵局,无法继续往下执行的情况。 产生死锁的四个必要条件: &#xff08;1&#xff09;互斥条件(Mutual Exclusion)&#xff1a;至少有一个资源是非共享…

Elasticsearch:结合稀疏、密集和地理字段

作者&#xff1a;来自 Elastic Madhusudhan Konda 如何以自定义方式组合多个稀疏、密集和地理字段 Elasticsearch 是一款强大的工具&#xff0c;可用于近乎实时地搜索和分析数据。作为开发人员&#xff0c;我们经常会遇到包含各种不同字段的数据集。有些字段是必填字段&#x…

relation-graph——数据组装+鼠标移入后的详情(自定义插槽的用法)——js技能提升

最近在写后台管理系统的时候&#xff0c;遇到一个需求&#xff0c;就是给我一些节点&#xff0c;让我渲染到页面上&#xff0c;效果图如下&#xff1a; 之前写过一篇文章关于relation-graph关系图组件http://t.csdnimg.cn/7BGYm的用法 还有一篇关于relation-graph——实现右击…

回溯算法-以单位人事管理系统为例

1.回溯算法介绍 1.来源 回溯算法也叫试探法&#xff0c;它是一种系统地搜索问题的解的方法。 用回溯算法解决问题的一般步骤&#xff1a; 1、 针对所给问题&#xff0c;定义问题的解空间&#xff0c;它至少包含问题的一个&#xff08;最优&#xff09;解。 2 、确定易于搜…

rk3568 OpenHarmony 串口uart与电脑通讯开发案例

一、需求描述&#xff1a; rk3568开发板运行OpenHarmony4.0&#xff0c;通过开发板上的uart串口与电脑进行通讯&#xff0c;相互收发字符串。 二、案例展示 1、开发环境&#xff1a; &#xff08;1&#xff09;rk3568开发板 &#xff08;2&#xff09;系统&#xff1a;OpenHar…

又一个被催的相亲对象!家庭不和,是因为智慧不够?——早读(逆天打工人爬取热门微信文章解读)

你相亲过吗&#xff1f; 引言Python 代码第一篇 洞见 家庭不和&#xff0c;是因为智慧不够第二篇 口播结尾 引言 yue 昨天居然忘记了 正事&#xff1a;拍视频j 居然忘记了 别着急 让我找下理由&#xff08;借口&#xff09; 前天我妈给我介绍了个相亲对象 推给我了她的微信 我…