分布式系统事务一致性解决方案(基于事务消息)

参考:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/

文章目录

    • 概要
    • 错误的方案
    • 方案一:业务方自己实现
    • 方案二:RocketMQ 事务消息
      • 什么是事务消息
      • 事务消息处理流程
      • 事务消息生命周期
      • 使用限制
      • 使用示例
      • 使用建议


概要

说起分布式事务,就会谈到 Bob 给 Smith 账户转账的问题:2 个账号, 分别处于 A、B 两个 DB,或者说 2 个不同的子系统中,Bob 的账户要扣钱,Smith 的账户要加钱,如何保证原子性?

一般思路是通过消息中间件来实现 “最终一致性”:A系统扣钱,然后发送消息给 MQ,B系统接收此消息,进行加钱。

但这里有个问题:是先update DB后发消息?还是先发消息后update DB呢?

  • 假设先update DB成功发送消息时网络问题,重发又失败,怎么办?
  • 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都会有问题。

错误的方案

有人可能会想到,可以把 “发送消息” 这个网络调用和 update DB 放在同一个事务中,如果发送消息失败,update DB 自动回滚。这样不就保证原子性了吗?

这个方案看似正确,但其实是错误的,原因:

  1. 把网络调用放在 DB 事务中,可能会因为网络的延时,导致 DB 长事务。严重的还会 block 整个 DB,这个风险很大。
  2. 如果发送消息失败,发送方并不知道是 MQ 没收到消息,还是已收到消息只是返回 response 的时候失败了。如果接收方已经收到消息了,而发送方认为没有收到,执行 update DB 回滚的操作,会导致 A 账户钱没有扣,B 账户的钱却增加了。

方案一:业务方自己实现

假设消息中间件没提供 “事务消息” 功能,比如 Kafka。那该如何解决这个问题?

结局方案如下:

  1. Producer 准备 1 个消息表,把 update DB 和 send message 这 2 个操作,放在一个DB事务中。
  2. 准备一个后台程序,不停地把消息表中的 message 传送给消息中间件。发送失败则不断重试,直到成功。允许消息重复,但消息不会丢。
  3. Consumer 准备 1 个判重表。处理过的消息记录在里面,实现业务的幂等性

通过以上 3 步,就基本解决了原子性问题

但此方案缺点是:需要设计 DB 消息表,同时还需要 1 个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合,额外增加业务方的负担。

方案二:RocketMQ 事务消息

目前各大知名电商平台和互联网公司,几乎都是使用 “事务消息” 这种方案来实现 “最终一致性” 的。这种方式适用的业务场景广泛,且比较靠谱。

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

在这里插入图片描述

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查(服务端回查的间隔时间和最大回查次数,请参见参数限制)。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

在这里插入图片描述

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制

  1. 消息类型一致性
    事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

  2. 消费事务性
    Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

  3. 中间状态可见性
    Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

  4. 事务超时机制
    Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.type为TRANSACTION的属性用来支持事务消息

以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilderImpl();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。*/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

使用建议

  1. 避免大量未决事务导致超时

    Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

  2. 正确处理"进行中"的事务

    消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

    • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

    • 程序能正确识别正在进行中的事务。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4700.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MATLAB语音信号分析与合成——MATLAB语音信号分析学习资料汇总(图书、代码和视频)

教科书&#xff1a;MATLAB语音信号分析与合成&#xff08;第2版&#xff09; 链接&#xff08;含配套源代码&#xff09;&#xff1a;https://pan.baidu.com/s/1pXMPD_9TRpJmubPGaRKANw?pwd32rf 提取码&#xff1a;32rf 基础入门视频&#xff1a; 视频链接&#xff1a; 清…

急急急!微信朋友圈删除了怎么恢复?

微信朋友圈是我们与朋友分享生活点滴的重要平台&#xff0c;但有时候微信出现异常&#xff0c;导致我们编辑好的朋友圈被删除了&#xff0c;这时候该怎么办呢&#xff1f; 幸运的是&#xff0c;微信提供了一种简单的方式来恢复已删除的朋友圈内容。微信朋友圈删除了怎么恢复&a…

利用二叉检索树将文章中的单词建立索引(正则表达式)

知识储备 链接: 【二叉检索树的实现——增删改查、读取命令文件、将结果写入新文件】 1、正则表达式的处理 &#xff08;1&#xff09;r’前缀的作用 r’前缀的用于定义原始字符串&#xff0c;特点是不会处理反斜杠\作为转义字符 &#xff08;2&#xff09;正则表达式中元…

场外个股期权开户新规及操作方法

场外个股期权开户新规 场外个股期权开户新规主要涉及对投资者资产实力、专业知识、风险承受能力和诚信记录的要求。以下是根据最新规定总结的关键要点&#xff1a; 来源/&#xff1a;股指研究院 资产门槛&#xff1a;投资者需具备一定的资产实力&#xff0c;确保在申请开户前…

【Linux】文件打包解压_tar_zip

文章目录 &#x1f4d1;引言&#xff1a;一、文件打包压缩1.1 什么是文件打包压缩&#xff1f;1.2 为什么需要文件打包压缩&#xff1f; 二、打包解压2.1 zip2.2 unzip2.3 tar指令 &#x1f324;️全篇小结&#xff1a; &#x1f4d1;引言&#xff1a; 在Linux操作系统中&#…

OpenCV-Python: 强大的计算机视觉库

文章目录 OpenCV-Python: 强大的计算机视觉库背景OpenCV-Python是什么&#xff1f;安装简单的库函数使用方法场景示例人脸检测和识别图像分割目标跟踪 常见问题和解决方案总结 OpenCV-Python: 强大的计算机视觉库 背景 OpenCV (Open Source Computer Vision Library) 是一个开…

如何修改php版本

我使用的Hostease的Windows虚拟主机产品,由于网站程序需要支持高版本的PHP,程序已经上传到主机&#xff0c;但是没有找到切换PHP以及查看PHP有哪些版本的位置&#xff0c;因此咨询了Hostease的技术支持&#xff0c;寻求帮助了解到可以实现在Plesk面板上找到此切换PHP版本的按钮…

基于Springboot+Vue的Java项目-火车票订票系统开发实战(附演示视频+源码+LW)

大家好&#xff01;我是程序员一帆&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &am…

GDPU 算法分析与设计 天码行空5

一、【实验目的】 &#xff08;1&#xff09;熟悉动态规划算法的基本思想. &#xff08;2&#xff09;理解动态规划算法中子问题的划分和递推方程设计的基本方法. &#xff08;3&#xff09;熟悉矩阵链乘法的基本思想并编程实现。 二、【实验内容】 输入:矩阵链Ai…j的输入为…

Power BI:如何将文件夹批量Excel(多sheet页)文件导入?

故事背景&#xff1a; 业务同事想用Power BI分析近两年市场费用。 数据源全部是Excel文件&#xff0c;并且以每月一个Excel文件的方式&#xff0c;统一存放到同一文件夹下面。 重点&#xff0c;每张Excel文件会有多张sheet页&#xff0c;用区分每家分公司的费用信息。 目前…

Linux之进程间通信(二)

system V system V共享内存是内核中专门设计的通信的方式, 粗粒度划分操作系统分为进程管理, 内存管理, 文件系统, 驱动管理.., 粒度更细地分还有 进程间通信模块. 对于操作系统, 通信的场景有很多, 有以传送数据, 快速传送数据, 传送特定数据块, 进程间协同与控制以目的, 它…

数字信号的产生与检测——DSP学习笔记六

本专栏的博客的图片大部分来源于老师的PPT&#xff0c;本博客只是博主对于上课内容的知识结构的分析和梳理。 几种数字信号的产生 正弦波信号 多项式逼近(除了泰勒展开&#xff0c;还有一种方法是切比雪夫逼近法&#xff0c;感兴趣可以自己去了解一下&#xff09; 查找表 核心思…

<计算机网络自顶向下> Internet Protocol

互联网中的网络层 IP数据报格式 ver: 四个比特的版本号&#xff08;IPV4 0100, IPV6 0110&#xff09; headlen&#xff1a;head的长度&#xff08;头部长度字段&#xff08;IHL&#xff09;指定了头部的长度&#xff0c;以32位字&#xff08;4字节&#xff09;为单位计算。这…

Java数组深度剖析:掌握数据结构的基石

引言 在编程世界中&#xff0c;数仅仅是一种数据类型&#xff0c;它是理解内存分配、多维数据处理以及性能优组像是构建复杂数据结构的基本积木。它们简洁、高效&#xff0c;是管理元素集的首选方式。在Java中&#xff0c;数组不化的关键。 这篇文章致力于深入探讨Java数组的各…

git出错、文件无法删除、文件无法访问、文件或目录损坏且无法读取 等相关问题处理

一、错误历程与解决方案 1. 在用idea时&#xff0c;突然出现 部分git的命令无法使用&#xff0c;提示错误 2. 尝试删除项目文件夹&#xff0c;重新从git拉取代码 3.发现无法删除文件夹&#xff0c;删除操作没有任何反应&#xff0c;但是可以对文件夹重命名。 4.重新clone g…

李沐70_bert微调——自学笔记

微调BERT 1.BERT滴哦每一个词元返回抽取了上下文信息的特征向量 2.不同的任务使用不同的特性 句子分类 将cls对应的向量输入到全连接层分类 命名实体识别 1.识别应该词元是不是命名实体&#xff0c;例如人名、机构、位置 2.将非特殊词元放进全连接层分类 问题回答 1.给…

QT c++ 代码布局原则 简单例子

本文描述QT c widget代码布局遵循的原则&#xff1a;实中套虚&#xff0c;虚中套实。 本文最后列出了代码下载链接。 在QT6.2.4 msvc2019编译通过。 所谓实是实体组件&#xff1a;比如界面框、文本标签、组合框、文本框、按钮、表格、图片框等。 所谓虚是Layout组件&#x…

Redis哈希槽和一致性哈希

前言 单点的Redis有一定的局限&#xff1a; 单点发生故障&#xff0c;数据丢失&#xff0c;影响整体服务应用自身资源有限&#xff0c;无法承载更多资源分配并发访问&#xff0c;给服务器主机带来压力&#xff0c;性能瓶颈 我们想提升系统的容量、性能和可靠性&#xff0c;就…

sentinel-1.8.7与nacos-2.3.0实现动态规则配置、双向同步

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; sentinel-1.8.7与nacos-2.3.0实现动态规则配置、双向同步 ⏱️ 创作时…

unity的特性AttriBute详解

unity的特性AttriBute曾经令我大为头疼。因为不动使用的法则&#xff0c;但是教程都是直接就写&#xff0c;卡住就不能继续学下去。令我每一次看到&#xff0c;直接不敢看了。 今天使用文心一言搜索一番&#xff0c;发现&#xff0c;恐惧都是自己想象的&#xff0c;实际上这个…