解析 RocketMQ 业务消息 - “顺序消息”

引言

Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。

简介

顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。

功能原理

在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。

顺序发送

在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。

同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。

因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。

顺序消费

与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。

小结

无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。

我们回到开头那个问题:

既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?

现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。

下述是一个表格,简要对比了顺序消息和普通消息。

最佳实践

合理设置 MessageGroup

MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。

同步发送和发送重试

如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。

消费幂等

消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。

应用案例

  • 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
  • 电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

实战

发送

可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下:

public class ProducerFifoMessageExample {private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);private ProducerFifoMessageExample() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// Credential provider is optional for client configuration.String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);String endpoints = "foobar.com:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).setCredentialProvider(sessionCredentialsProvider).build();String topic = "yourFifoTopic";final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// Set the topic name(s), which is optional. It makes producer could prefetch the topic route before // message publishing..setTopics(topic)// May throw {@link ClientException} if the producer is not initialized..build();// Define your message body.byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// Message secondary classifier of message besides topic..setTag(tag)// Key(s) of the message, another way to mark message besides message id..setKeys("yourMessageKey-1ff69ada8e0e")// Message group decides the message delivery order..setMessageGroup("youMessageGroup0").setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {LOGGER.error("Failed to send message", t);}// Close the producer when you don't need it anymore.producer.close();}
}

消费

消费的代码如下:

public class SimpleConsumerExample {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);private SimpleConsumerExample() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// Credential provider is optional for client configuration.String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);String endpoints = "foobar.com:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).setCredentialProvider(sessionCredentialsProvider).build();String consumerGroup = "yourConsumerGroup";Duration awaitDuration = Duration.ofSeconds(30);String tag = "yourMessageTagA";String topic = "yourTopic";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)// Set the consumer group name..setConsumerGroup(consumerGroup)// set await duration for long-polling..setAwaitDuration(awaitDuration)// Set the subscription for the consumer..setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();// Max message num for each long polling.int maxMessageNum = 16;// Set message invisible duration after it is received.Duration invisibleDuration = Duration.ofSeconds(5);final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);for (MessageView message : messages) {try {consumer.ack(message);} catch (Throwable t) {LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);}}// Close the simple consumer when you don't need it anymore.consumer.close();}
}

作者:绍舒

原文链接

本文为阿里云原创内容,未经允许不得转载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510505.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ Lists(链表)

Lists将元素按顺序储存在链表中. 与 向量(vectors)相比, 它允许快速的插入和删除&#xff0c;但是随机访问却比较慢. assign()给list赋值back()返回最后一个元素begin()返回指向第一个元素的迭代器clear()删除所有元素empty()如果list是空的则返回trueend()返回末尾的迭代器e…

Koordinator 0.6:企业级容器调度系统解决方案,引入 CPU 精细编排、资源预留与全新的重调度框架

阿里云原生开源的混部系统 Koordinator 基于阿里超大规模混部生产实践经验而来&#xff0c;旨在为用户打造云原生场景下接入成本最低、混部效率最佳的解决方案&#xff0c;助力用户企业实现云原生后提升计算资源利用率、降低 IT 成本。 经过社区多位成员的贡献&#xff0c;Koor…

KubeVela Maintainer 徐佳航:什么样的开源项目将具有可延续的生命力?

云原生的技术价值喻示着它就是未来&#xff0c;加入到一个具有可延续性生命力的开源社区&#xff0c;可以帮助我们更快地到达那里。——徐佳航&#xff0c;KubeVela Maintainer&#xff0c;来自招商银行基础设施研发中心云平台及运维平台开发团队。来自招商银行基础设施研发中心…

C++ Queues(队列)

C队列是一种容器适配器&#xff0c;它给予程序员一种先进先出(FIFO)的数据结构。 back()返回最后一个元素empty()如果队列空则返回真front()返回第一个元素pop()删除第一个元素push()在末尾加入一个元素size()返回队列中元素的个数

龙蜥社区首推“分层分类”顶设 发展以云为终态的开源产业创新生态

在刚刚结束的 2022 开放原子全球开源峰会 OpenAnolis 分论坛上&#xff0c;龙蜥社区技术委员会主席杨勇做了《OpenAnolis 社区技术发展报告》的主题演讲&#xff0c;分享龙蜥社区如何从 0 到 1 实现原生社区布局&#xff0c;以及发展以云为终态的开源产业创新生态。 全文整理如…

友邦人寿可观测体系设计与落地

业务场景与挑战 友邦保险是香港联合交易所上市的人寿保险集团&#xff0c;覆盖 18 个市场。截至 2021 年 12 月 31 号&#xff0c;总资产 3400 亿美元。 友邦保险于 1992 年在上海设立分公司&#xff0c;是改革开放后最早一批获发个人人身保险业务营业执照的非本土保险机构之…

如何写出有效的单元测试

什么是单元测试 《单元测试的艺术》中对单元测试的定义&#xff1a; 一个单元测试是一段自动化的代码&#xff0c;这段代码调用被测试的工作单元&#xff0c;之后对这个单元的单个最终结果的某些假设进行校验。 单元测试几乎都是用单元测试框架编写的&#xff1b;只要产品代…

测试环境不稳定复杂的必然性及其对策

这篇文章想要讲的&#xff0c;的确是两件事情&#xff1a; 为什么测试环境的不稳定是必然的&#xff0c;怎么让它尽量稳定一点&#xff1f;为什么测试环境比生产环境更复杂&#xff0c;怎么让它尽量简单一点&#xff1f; 此外&#xff0c;还会谈一谈对测试环境和生产环境的区别…

【计算几何】线段相交

问题描述&#xff1a;已知两条线段P1P2和Q1Q2&#xff0c;判断P1P2和Q1Q2是否相交&#xff0c;若相交&#xff0c;求出交点。 两条线段的位置关系可以分为三类&#xff1a;有重合部分、无重合部分但有交点、无交点。 算法的步骤如下&#xff1a; 1.快速排斥实验。 设以线段…

代码圈复杂度治理小结

网上有个段子&#xff0c;说建筑工程师不会轻易答应会给摩天大楼增加一个地下室&#xff0c;但代码开发工程师却经常在干这样的事&#xff0c;并且总有人会对你说“这个需求很简单”。到土里埋个雷&#xff0c;这确实不复杂&#xff0c;但我们往往面临的真实场景其实是“在一片…

向量的叉积

向量的叉积性质都忘完了……但是它可以用来判断点在直线的某侧。进而可以解决点是否在三角形内&#xff0c;两个矩形是否重叠等问题。向量的叉积的模表示这两个向量围成的平行四边形的面积。 设矢量P ( x1, y1 )&#xff0c;Q ( x2, y2 )&#xff0c;则矢量叉积定义为由(0,0)…

MSE 治理中心重磅升级-流量治理、数据库治理、同 AZ 优先

本次 MSE 治理中心在限流降级、数据库治理及同 AZ 优先方面进行了重磅升级&#xff0c;对微服务治理的弹性、依赖中间件的稳定性及流量调度的性能进行全面增强&#xff0c;致力于打造云原生时代的微服务治理平台。 前情回顾 在介绍升级能力之前&#xff0c;先简要回顾 MSE 产…

1801 不重复的三位数

1801 不重复的三位数 Time Limit : 1000 MS | Memory Limit : 65536 KBSubmits : 16 | Solved : 4 Description 给定一个正整数n&#xff0c;则1,2,...,n这n个数字能组成多少个互不相同且无重复数字的三位数。Input 输入一个正整数n ( 3 < n < 9 )。Output 先输出三位数的…

基于阿里云 Serverless 快速部署 Function 的极致体验

1.Serverless 前世今生 1.1 Serverless 背景介绍 云计算的不断发展&#xff0c;涌现出很多改变传统IT架构和运维方式的新技术&#xff0c;而以虚拟机、容器、微服务为代表的技术更是在各个层面不断提升云服务的技术能力&#xff0c;它们将应用和环境中很多通用能力变成了一种…

分拆素数和

Description 把一个偶数拆成两个不同素数的和&#xff0c;有几种拆法呢&#xff1f; Input 输入包含一些正的偶数&#xff0c;其值不会超过10000&#xff0c;个数不会超过500&#xff0c;若遇0&#xff0c;则结束。 Output 对应每个偶数&#xff0c;输出其拆成不同素数的个数&a…

性能提升1倍,成本直降50%!基于龙蜥指令加速的下一代云原生网关

​ 技术背景 网络信息传输的可靠性、机密性和完整性要求日渐提升&#xff0c;HTTPS 协议已经广泛应用。HTTPS 的 SSL/TLS 协议涉及加解密、校验、签名等密码学计算&#xff0c;消耗较多 CPU 计算资源。因此 CPU 硬件厂商推出过多种加速卸载方案&#xff0c;如 AES-NI、QAT、KA…

TiDB、OceanBase、PolarDB-X、CockroachDB 二级索引写入性能测评

为什么要做这个测试 二级索引是关系型数据库相较于NoSQL数据库的一个关键差异。二级索引必须是强一致的&#xff0c;因此索引的写入需要与主键的写入放在一个事务当中&#xff0c;事务的性能是二级索引性能的基础。 目前市面上的分布式数据库中&#xff0c;从使用体验的角度看…

EMQX + PolarDB-X 一站式 IoT 数据解决方案

本文整理自 EMQX 产品经理李国伟&#xff0c;在PolarDB开源社区中关于EMQX与PolarDB-X构建一站式IoT数据解决方案的分享。本篇内容主要分为四个部分&#xff1a; 1. IoT数据特性 2. EMQX介绍 3. EMQX与PolarDB-X集成 4. EMQXPolarDB-X方案DEMO 一、IoT数据特性 物联网应用场景…

阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题

大家好&#xff0c;我是君哥。 今天来聊一聊阿里巴巴 Seata 新版本&#xff08;1.5.1&#xff09;是怎么解决 TCC 模式下的幂等、悬挂和空回滚问题的。 TCC 回顾 TCC 模式是最经典的分布式事务解决方案&#xff0c;它将分布式事务分为两个阶段来执行&#xff0c;try 阶段对每…

10分钟部署一个别人可以访问的在线网站(文末有礼

你是否幻想过拥有自己的个人网站&#xff1f;但是不会编程&#xff0c;没有任何网站搭建经验&#xff0c;搭建的时候也不知道怎么去选择系统…… 等等这一系列疑惑让大部分人还没开始就选择放弃&#xff0c;本期教大家用一个最简单的方式&#xff0c;在10分钟内搭建一个线上的…