如何解决微服务下引起的 分布式事务问题

一、什么是分布式事务?

虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。

  • 跨数据源的分布式事务

  • 跨服务的分布式事务

二、解决方案
1、使用阿里开源的Seata框架解决分布式事务

​ 1)seata的架构

​ Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
    在这里插入图片描述
    3)Seata的常用模式使用

    • XA模式

      在一阶段各个本地事务执行完成后,不提交,把执行状态给事务协调者TC,此时本地事务继续持有数据库锁

      二阶段TC基于一阶段的报告来进行判断,如果一阶段均成功则通知所有的事务参与者,提交事务,如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务。

      优点:能够实现强一致性,满足ACID原则;实现简单

      缺点:性能较差;依赖数据库的事务

      I. 在application.yml文件中开启XA模式(所有参与事务的服务都需要设置):

      seata:data-source-proxy-mode: XA  
      

      II. 在全局事务的入口方法添加@GlobalTransactional注解

      @Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
      
    • AT模式

      和xa模式一样也是二阶段提交,不同的是AT模式本地事务结束后,直接提交。但是,它会在本地事务进行数据库数据更新的时候记录一下更新前后的快照。

​ 在二阶段需要回滚的时候,根据快照进行数据的恢复,如果二阶段全局事务提交,则把记录的快照删除。

​ 优点:性能好;实现也较为简单

​ 缺点: 存在中间状态,只能达到最终的一致性;快照功能会影响一些性能,但是相对于XA模式还是要好很多

I. 在application.yml文件中开启AT模式(所有参与事务的服务都需要设置):

seata:data-source-proxy-mode: AT # 默认就是AT

II. 创建相关数据库表

#在分支事务所在的库里创建记录快照的表undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;#在TC服务所使用的库里创建全局锁记录表lock_table
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`branch_id` bigint(20) NOT NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`gmt_create` datetime NULL DEFAULT NULL,`gmt_modified` datetime NULL DEFAULT NULL,PRIMARY KEY (`row_key`) USING BTREE,INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

III. 在全局事务的入口方法添加@GlobalTransactional注解

    @Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
2、使用RocketMQ实现可靠消息最终一致性方案 (适用于不同项目的情况)在这里插入图片描述

模拟转账 a银行向b银行转账

a银行业务代码:

减少金额,像mq发送事务消息

  1. 引入rocketmq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq‐spring‐boot‐starter</artifactId><version>2.0.2</version>
</dependency>

2)配置rocketmq

rocketmq.producer.group = zhuoye #设置生产者组的名称
rocketmq.name‐server = 127.0.0.1:9876  #指定rocketmq的地址

3) 业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate  RocketMQTemplate rocketMQTemplate;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//向mq发送转账消息@Overridepublic void sendTransferAccountsMessagesToMq(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("transferAccountInfo",accountChangeEvent);String jsonString = jsonObject.toJSONString();//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/ rocketMQTemplate.sendMessageInTransaction("transferAccount_ABank","topic_transferAccount",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//扣减金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}

4)编写RocketMQLocalTransactionListener接口实现类

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "transferAccount_ABank")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@Autowiredprivate UserAccountService userAccountService;@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额userAccountService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//事务状态回查,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();int isExist = tansactionalRecordMapper.isExist(txNo);if(isExist>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}

b银行业务代码(前两步一样):

接收消息,增加金额

1)业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//更新账户,增加金额@Override@Transactionalpublic void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {//已更新if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//增加金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());//添加事务记录,用于幂等tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
}

2)监听事务消息

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "transferAccount_ABank",topic = "topic_transferAccount",maxReconsumeTimes = 3)
public class TxmsgConsumer implements RocketMQListener<String> {@AutowiredUserAccountService userAccountService;//接收消息@Overridepublic void onMessage(String message) {//解析消息JSONObject jsonObject = JSONObject.parseObject(message);String accountChangeString = jsonObject.getString("transferAccountInfo");//转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//更新本地账户,增加金额userAccountService.addAccountInfoBalance(accountChangeEvent);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/48464.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

配置服务器

参考博客 1. https://blog.csdn.net/qq_31278903/article/details/83146031 2. https://blog.csdn.net/u014374826/article/details/134093409 3. https://blog.csdn.net/weixin_42728126/article/details/88887350 4. https://blog.csdn.net/Dreamhai/article/details/109…

javac详解 idea maven内部编译原理 自制编译器

起因 不知道大家在开发中&#xff0c;有没有过下面这些疑问。有的话&#xff0c;今天就一次解答清楚。 如何使用javac命令编译一个项目&#xff1f;java或者javac的一些参数到底有什么用&#xff1f;idea或者maven是如何编译java项目的&#xff1f;&#xff08;你可能猜测底层…

【一刷《剑指Offer》】面试题 47:不用加减乘除做加法

力扣对应题目链接&#xff1a;LCR 190. 加密运算 - 力扣&#xff08;LeetCode&#xff09; 牛客对应题目链接&#xff1a;不用加减乘除做加法_牛客题霸_牛客网 (nowcoder.com) 一、《剑指Offer》对应内容 二、分析题目 sumdataA⊕dataB 非进位和&#xff1a;异或运…

Unity UGUI 之 Graphic Raycaster

本文仅作学习笔记与交流&#xff0c;不作任何商业用途 本文包括但不限于unity官方手册&#xff0c;唐老狮&#xff0c;麦扣教程知识&#xff0c;引用会标记&#xff0c;如有不足还请斧正 首先手册连接如下&#xff1a; Unity - Manual: Graphic Raycaster 笔记来源于&#xff…

无人车技术浪潮真的挡不住了~

正文 无人驾驶汽车其实也不算是新鲜玩意了&#xff0c;早在十年前大家都开始纷纷投入研发&#xff0c;在那时就已经蠢蠢欲动&#xff0c;像目前大部分智驾系统和辅助驾驶系统都是无人驾驶系统的一个中间过度版本&#xff0c;就像手机进入智能机时代的中间版本。 然而前段时间突…

SpringBoot 介绍和使用(详细)

使用SpringBoot之前,我们需要了解Maven,并配置国内源(为什么要配置这些,下面会详细介绍),下面我们将创建一个SpringBoot项目"输出Hello World"介绍. 1.环境准备 ⾃检Idea版本: 社区版: 2021.1 -2022.1.4 专业版: ⽆要求 如果个⼈电脑安装的idea不在这个范围, 需要…

LeetCode 热题 HOT 100 (001/100)【宇宙最简单版】

【链表】 No. 0160 相交链表 【简单】&#x1f449;力扣对应题目指路 希望对你有帮助呀&#xff01;&#xff01;&#x1f49c;&#x1f49c; 如有更好理解的思路&#xff0c;欢迎大家留言补充 ~ 一起加油叭 &#x1f4a6; 欢迎关注、订阅专栏 【力扣详解】谢谢你的支持&#x…

搜维尔科技:【产品推荐】Euleria Health Riablo 运动功能训练与评估系统

Euleria Health Riablo 运动功能训练与评估系统 Riablo提供一种创新的康复解决方案&#xff0c;将康复和训练变得可激励、可衡量和可控制。Riablo通过激活本体感觉&#xff0c;并通过视听反馈促进神经肌肉的训练。 得益于其技术先进和易用性&#xff0c;Riablo是骨科、运动医…

jmeter部署

一、windows环境下部署 1、安装jdk并配置jdk的环境变量 (1) 安装jdk jdk下载完成后双击安装包&#xff1a;无限点击"下一步"直到完成&#xff0c;默认路径即可。 (2) jdk安装完成后配置jdk的环境变量 找到环境变量中的系统变量&#xff1a;此电脑 --> 右键属性 …

C语言:温度转换

1.题目&#xff1a;实现摄氏度&#xff08;Celsius&#xff09;和华氏度&#xff08;Fahrenheit&#xff09;之间的转换。 输入一个华氏温度&#xff0c;输出摄氏温度&#xff0c;结果保留两位小数。 2.思路&#xff1a;&#xff08;这是固定公式&#xff0c;其中 F 是华氏度&a…

【C语言】详解结构体(下)(位段)

文章目录 前言1. 位段的含义2. 位段的声明3. 位段的内存分配&#xff08;重点&#xff09;3.1 存储方向的问题3.2 剩余空间利用的问题 4. 位段的跨平台问题5. 位段的应用6. 总结 前言 相信大部分的读者在学校或者在自学时结构体的知识时&#xff0c;可能很少会听到甚至就根本没…

STM32实战篇:按键(外部输入信号)触发中断

功能要求 将两个按键分别与引脚PA0、PA1相连接&#xff0c;通过按键按下&#xff0c;能够触发中断响应程序&#xff08;不需明确功能&#xff09;。 代码流程如下&#xff1a; 实现代码 #include "stm32f10x.h" // Device headerint main() {//开…

JUC并发编程01-基础概念

概念 进程 进程可以视为程序的一个实例&#xff0c;进程就是用来加载指令、管理内存、管理I0 线程 一个进程内可以有多个线程&#xff0c;一个线程就是一个指令流。 在Java中&#xff0c;线程作为最小调度单位&#xff0c;进程作为资源分配的最小单位&#xff0c;可以说进程…

Mysql数据库第二次作业

(1)显示所有职工的基本信息。 mysql> select * from t_worker; (2)查询所有职工所属部门的部门号&#xff0c;不显示重复的部门号。 mysql> select distinct department_id from t_worker; (3)求出所有职工的人数。 mysql> select count(1) from t_worker; (4)列…

Figma 中文版指南:获取和安装汉化插件

Figma是一种主流的在线团队合作设计工具&#xff0c;也是一种基于 Web 端的设计工具。在当今的设计时代&#xff0c;Figma 的使用满足了每个人的设计需求&#xff0c;不仅可以实现在线编辑&#xff0c;还可以方便日常管理&#xff0c;有效提高工作效率。然而&#xff0c;相信很…

分页查询与分页条件查询

--------------- 无PageHelper插件分页查询 1.创建PageBean实体类 Data NoArgsConstructor AllArgsConstructor public class PageBean<T> {private Long total;//总条数private List<T> items;//当前页数据集合 }类型安全性 泛型&#xff1a;提供了编译时的类型…

【Apache Doris】周FAQ集锦:第 15 期

【Apache Doris】周FAQ集锦&#xff1a;第 15 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户…

JMeter:BeanShell到JSR223迁移中的注意事项

前言 在之前的文章JMeter&#xff1a;BeanShell向JSR223迁移过程遭遇的java标准库不可用问题-如何切换JDK版本中引用了一段使用BeanShell对入参进行加密的脚本&#xff0c;迁移到JSR223&#xff0c;虽然更换JDK后编译通过&#xff0c;看似也可以执行了&#xff0c;但是其实那段…

windows USB 设备驱动开发-开发Type C接口的驱动程序(二)

编写 USB Type C 连接器驱动程序 在以下情况下&#xff0c;需要编写 USB Type-C 连接器驱动程序&#xff1a; 如果 USB Type-C 硬件能够处理电源输送 (PD) 状态机。 否则&#xff0c;请考虑编写 USB Type C 端口控制器驱动程序&#xff1b; 如果硬件没有嵌入式控制器。 否则&…

(10)深入理解pandas的核心数据结构:DataFrame高效数据清洗技巧

目录 前言1. DataFrame数据清洗1.1 处理缺失值&#xff08;NaNs&#xff09;1.1.1 数据准备1.1.2 读取数据1.1.3 查找具有 null 值或缺失值的行和列1.1.4 计算每列缺失值的总数1.1.5 删除包含 null 值或缺失值的行1.1.6 利用 .fillna&#xff08;&#xff09; 方法用Portfolio …