RocketMQ教程-(5)-功能特性-事务消息

事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。

事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。

  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。

  • 积分系统状态变更:变更用户积分,更新用户积分表。

  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

传统XA事务方案:性能不足

为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。

基于普通消息方案:一致性保障困难

将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。

普通消息方案

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:

  • 消息发送成功,订单没有执行成功,需要回滚整个事务。

  • 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。

  • 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。

                                         基于分布式事务消息:支持最终一致性

上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。

而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

Apache RocketMQ事务消息的方案,具备高性能、可扩展、业务开发简单的优势。具体事务消息的原理和流程,请参见下文的功能原理。

功能原理​

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

事务消息

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

事务消息

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制​

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

Apache RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例​

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.typeTRANSACTION的属性用来支持事务消息

 以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilder();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。**/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

使用建议​

避免大量未决事务导致超时

Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

  • 程序能正确识别正在进行中的事务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/9336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kotlin与Java语法对比学习

定义变量 // Kotlin val i: Int 10 // 定义一个整数变量 val d: Double 10.0 // 定义一个双精度浮点数变量 val b: Boolean true // 定义一个布尔变量 val c: Char a // 定义一个字符变量 val s: String "Hello, World!" // 定义一个字符串变量 val arr: …

PostgreSQL查看数据库对象大小

PostgreSQL查看数据库对象大小 PostgreSQL查看数据库对象大小1、查看某个数据库大小2、查看多个数据库大小3、按顺序查看索引大小4、查看所有对象的大小 PostgreSQL查看数据库对象大小 1、查看某个数据库大小 select pg_size_pretty(pg_database_size(tzqdb));2、查看多个数据…

【已解决】VS报错C2760

在项目中增加第三方库的时候&#xff0c;出现了C2760的报错&#xff0c;常用路径都设置好了&#xff0c;判断可能是属性设置的问题&#xff0c;就查了一下&#xff0c;确实一下就改好了 操作流程&#xff1a; 项目右键后打开最下面的属性 属性——》C/C——》语言——》符合…

数据库触发器简介——修改数据的触发器、删除数据的触发器

1.修改数据的触发器 修改数据的触发器 create trigger tb_user_update_triggerafter update on tb_user for each row begininsert int user_logs(id,operation,operate_time,operate_id,operate_params)VALUES(null,update,now(),new.id,concat(更新之前的数据&#xff1a;i…

如何设置Axure中文版 Mac系统下axurerp10怎么设置成中文

有许多小伙伴肯定想知道axure rp 10怎么转换为中文版&#xff0c;接下来就为大家带来最详细的汉化教程&#xff0c;大家可以根据教程一步一步操作&#xff0c;保证可以汉化成功&#xff01; 准备工作 安装好axurerp10 axurerp10汉化包 百度网盘链接: 百度网盘 请输入提取码 …

2023牛客暑期多校训练营2(D/E/F/H/I/K)

目录 D.The Game of Eating E.Square F.Link with Chess Game H.0 and 1 in BIT I.Link with Gomoku K.Box D.The Game of Eating 思路&#xff1a;倒着贪心。因为正着贪会导致一种局面&#xff1a;我选了当前喜爱值最大的菜&#xff0c;但是就算我不选这个菜&#xff0…

PostgreSQL实战-数据库迁移部署

PostgreSQL实战-数据库迁移部署 介绍 根据项目需求&#xff0c;我们需要将现有的PostgreSQL数据库重新部署到新的服务器上。由于项目本身就是基于PostgreSQL数据库构建的&#xff0c;因此数据库迁移将变得十分便捷。接下来&#xff0c;我将简要介绍我们的迁移步骤。 迁移步骤…

halcon线扫描相机平面交点检测--

目录 代码示例代码示例的逐句解释 以下是halcon中的代码示例 * This example shows the inspection of planar intersections of * 3D objects. The intersections are derived by intersecting the * reconstructed 3D object models with suitable planes, using the * o…

2.1 色彩空间

色彩发送器 色彩认知&#xff1a;光源是出生点&#xff0c;光源发射出光线&#xff0c;光线通过直射反射折射等路径最终进入人眼。但是人眼接收到光线后&#xff0c;人眼的细胞产生了一系列化学反应。由此把产生的新号传入大脑&#xff0c;最终大脑对颜色产生了认知感知。 光的…

如何把爱好变成事业?

如何把爱好变成事业&#xff1f; 目录 引言爱好与事业的关系如何把爱好变成事业 研究市场需求建立专业技能制定计划与目标 追求爱好的「变现」的利与弊 利&#xff1a;工作的乐趣与激情弊&#xff1a;压力与挑战 如何平衡爱好与事业 分清爱好与事业的界限建立时间管理和优先级…

数据结构-时间空间复杂度

目录 前言 1.什么是数据结构 2.什么是算法 3.数据结构和算法的重要性 1.算法的时间复杂度和空间复杂度 1.1算法效率 1.1.1如何衡量一个算法的好坏 1.1.2算法的复杂度 1.2时间复杂度 1.2.1时间复杂度的概念 1.2.2大O的渐进表示法 2.编程练习 2.1.1 排序遍历 2.1.2 2.1.3 单身狗解…

第一百一十六天学习记录:C++提高:STL-string(黑马教学视频)

string基本概念 string是C风格的字符串&#xff0c;而string本质上是一个类 string和char区别 1、char是一个指针 2、string是一个类&#xff0c;类内部封装了char*&#xff0c;管理这个字符串&#xff0c;是一个char型的容器。 特点&#xff1a; string类内部封装了很多成员方…

[JAVAee]wait方法与sleep方法的区别

①最明显的一个区别是,wait方法需要搭配synchronized关键字进行使用.而sleep方法的使用不需要 ②wait是一个Object类的方法,sleep是Thread类的一个静态方法 方法说明 public static void sleep(long millis) throws interruptedException 使当前正在执行的线程以指定的毫秒数…

PostMan+Jmeter工具介绍及安装

目录 一、PostMan介绍​编辑 二、下载安装 三、Postman与Jmeter的区别 一、开发语言区别&#xff1a; 二、使用范围区别&#xff1a; 三、使用区别&#xff1a; 四、Jmeter安装 附一个详细的Jmeter按照新手使用教程&#xff0c;感谢作者&#xff0c;亲测有效。 五、Jme…

windows无盘启动技术开发之使用本地镜像文件启动电脑

by fanxiushu 2023-07-26 转载或引用请注明原始作者。 其实使用本地镜像文件启动电脑&#xff0c;这个windows操作系统本身就是自带的功能。 win7以上的系统&#xff0c;制作 vhd或vhdx格式的镜像文件&#xff0c; 然后在镜像文件中安装windows操作系统&#xff0c;然后放到真实…

【数据结构】树状数组和线段树

树状数组和线段树 下文为自己的题解总结&#xff0c;参考其他题解写成&#xff0c;取其精华&#xff0c;做以笔记&#xff0c;如有描述不清楚或者错误麻烦指正&#xff0c;不胜感激&#xff0c;不喜勿喷&#xff01; 树状数组 需求&#xff1a; 能够快速计算区间和保证在修改…

fastadmin采坑之接口分页处理

其实不算fastadmin的代码而是thinkphp自带的分页代码 paginate函数就是自带的分页函数&#xff0c;开始我以为这个只能用于渲染模板不能用于接口&#xff0c;后面看到源代码发现请求参数带page就可以 /*** ApiTitle (获取协会会员)* ApiSummary (获取协会会员)* ApiMethod …

Langchain 的 Routerchain

Langchain 的 Routerchain 1. destination_chains2. LLMRouterChain3. EmbeddingRouterChain 本笔记本演示了如何使用 RouterChain 范例创建一个链&#xff0c;该链动态选择用于给定输入的下一个链。 路由器链由两个组件组成&#xff1a; RouterChain 本身&#xff08;负责选…

React 基础篇(二)

&#x1f4bb; React 基础篇&#xff08;二&#xff09;&#x1f3e0;专栏&#xff1a;React &#x1f440;个人主页&#xff1a;繁星学编程&#x1f341; &#x1f9d1;个人简介&#xff1a;一个不断提高自我的平凡人&#x1f680; &#x1f50a;分享方向&#xff1a;目前主攻…

微服务SpringCloud教程——微服务是什么

微服务&#xff08;MicroServices&#xff09;最初是由 Martin Fowler 于 2014 年发表的论文《MicroServices》中提出的名词&#xff0c;它一经提出就成为了技术圈的热门话题。 微服务&#xff0c;我们可以从字面上去理解&#xff0c;即“微小的服务”&#xff0c;下面我们从“…