Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 分布式事务

Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务

文章目录

  • Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
  • 0.前言
  • 1. 基础介绍
      • ConnectionFactory
      • AbstractRoutingDataSource 动态路由数据源的抽象类
    • DynamicLocalTransactionInterceptor 动态的本地事务拦截器
  • 3. 使用步骤示例
  • 4. 官方源码分析
  • 5. 参考资料

0.前言

背景
处理多数据源事务一直是一个复杂而棘手的问题,通常我们有两种主流的解决方法。

第一种是通过Atomikos手动创建多数据源事务,这种方法更适合数据源数量较少,参数配置不复杂,对性能要求不高的项目。然而,这种方法的最大困难在于需要手动配置大量设置,这可能会消耗大量时间。


第二种是通过使用Seata等分布式事务解决方案。这种方法的难点在于需要建立并维护像Seata-server这样的统一管理中心。

今天我们使用Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 实现分布式事务和本地多数据源事务。
每种解决方案都有其适用的场景,然而在实际操作中,我经常接到如下的问题:
“我为什么在添加了事务注解之后,数据源切换还是失败了?”
“我了解到这涉及到分布式事务,但我并不想使用Seata。我的场景比较简单,有没有不需要依赖第三方的解决方案?”
这些问题突显出在现实工作中,我们可能需要更灵活、更简便的解决方案来处理多数据源事务问题。
在这里插入图片描述
在这里插入图片描述

1. 基础介绍

自从3.3.0开始,由seata的核心贡献者https://github.com/a364176773 贡献了基于connection代理的方案。
完整代码 https://github.com/baomidou/dynamic-datasource-spring-boot-starter/commit/f0cbad193528296eeb64faa76c79743afbdd811d
建议从3.4.0版本开始使用,其修复了一个功能,老版本不加@DS只加@DSTransactional会报错。
在这里插入图片描述
核心的几处代码

    @Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false",matchIfMissing = true)@Beanpublic Advisor localTransactionAdvisor() {AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");return new DefaultPointcutAdvisor(pointcut, new DynamicTransactionAdvisor());}

我们可以看到通过spring.datasource.dynamic.seata=true配置来启用条件注解。这个是dynamic-datasource支持seata事务的开发和入口。

ConnectionFactory

ConnectionFactory 是一个工厂类,主要的作用是管理数据库连接,并提供获取和存储数据库连接的功能。

  1. 存储每个线程独立的数据库连接:ConnectionFactory 使用 ThreadLocal 为每个线程提供其自己的数据库连接池,这样可以防止在多线程环境中数据库连接的混乱。

  2. 提供获取数据库连接的方法:ConnectionFactory 提供 getConnection 方法,使得在同一个线程中的多个模块可以共享同一个数据库连接。

  3. 提供存储数据库连接的方法:ConnectionFactory 提供 putConnection 方法,可以存储新的数据库连接到当前线程的数据库连接池中。

  4. 提供通知数据库连接的方法:ConnectionFactory 提供 notify 方法,可以对当前线程的所有数据库连接进行统一的操作,比如提交或者回滚事务。

通过这些功能,ConnectionFactory 实现了数据库连接的有效管理,保证了在同一线程中对多个数据库进行操作时,可以共享同一连接,实现事务管理。核心代码如下。大家可以借鉴

package com.baomidou.dynamic.datasource.tx;import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author funkye*/
public class ConnectionFactory {// 使用ThreadLocal来保存与当前线程相关的数据库连接信息,以Map形式存储,Map中的key为数据源名称,value为对应的数据库连接代理类private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =new ThreadLocal<Map<String, ConnectionProxy>>() {@Overrideprotected Map<String, ConnectionProxy> initialValue() {return new ConcurrentHashMap<>(8);}};// 存储数据库连接到当前线程的连接池中,如果当前线程的连接池中没有该数据源的连接,则新建一个并放入public static void putConnection(String ds, ConnectionProxy connection) {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();if (!concurrentHashMap.containsKey(ds)) {try {connection.setAutoCommit(false);} catch (SQLException e) {e.printStackTrace();}concurrentHashMap.put(ds, connection);}}// 从当前线程的连接池中获取指定数据源的数据库连接public static ConnectionProxy getConnection(String ds) {return CONNECTION_HOLDER.get().get(ds);}// 对当前线程的所有数据库连接执行通知操作,根据参数state决定是提交还是回滚,如果在执行过程中发生错误,则在所有连接处理完后抛出public static void notify(Boolean state) throws Exception {Exception exception = null;try {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {try {connectionProxy.notify(state);} catch (SQLException e) {exception = e;}}} finally {CONNECTION_HOLDER.remove(); //清除当前线程的连接池if (exception != null) {throw exception;}}}}

AbstractRoutingDataSource 动态路由数据源的抽象类

动态路由数据源的抽象类,用于根据不同的业务需要,动态地选择需要使用的数据源。关键的方法是getConnection()getConnection(String username, String password),这两个方法会根据当前是否存在全局事务来动态地选择获取原始的数据库连接还是数据库连接代理。

public abstract class AbstractRoutingDataSource extends AbstractDataSource {// 抽象方法,子类需要实现该方法以确定数据源protected abstract DataSource determineDataSource();// 抽象方法,子类需要实现该方法以确定默认的数据源名称protected abstract String getPrimary();// 获取数据库连接,根据事务上下文中是否有XID来判断是否需要获取代理连接@Overridepublic Connection getConnection() throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {// 如果没有XID,说明当前不处于全局事务中,直接获取原始连接return determineDataSource().getConnection();} else {// 如果有XID,说明当前处于全局事务中,需要获取代理连接String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;}}// 与上面的方法类似,只不过这个方法可以传入用户名和密码来获取数据库连接@Overridepublic Connection getConnection(String username, String password) throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {return determineDataSource().getConnection(username, password);} else {String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password)): connection;}}// 创建数据库连接代理,并将代理连接放入连接工厂private Connection getConnectionProxy(String ds, Connection connection) {ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);ConnectionFactory.putConnection(ds, connectionProxy);return connectionProxy;}// 获取指定类型的代理对象@Override@SuppressWarnings("unchecked")public <T> T unwrap(Class<T> iface) throws SQLException {if (iface.isInstance(this)) {return (T) this;}return determineDataSource().unwrap(iface);}// 判断是否是指定类型的代理对象@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return (iface.isInstance(this) || determineDataSource().isWrapperFor(iface));}
}

DynamicLocalTransactionInterceptor 动态的本地事务拦截器

动态的本地事务拦截器。基本思想是在方法调用前后添加事务处理的逻辑。当这个拦截器被应用到某个方法时,那么在调用这个方法时,会首先检查当前是否已经存在事务,如果存在则直接调用原始方法。如果不存在,则会先开启一个新的事务,然后调用原始方法,方法结束后根据方法执行的结果来提交或回滚事务。入口在这,看一眼就懂了。
在这里插入图片描述

// 实现MethodInterceptor接口定义拦截器
public class DynamicLocalTransactionInterceptor implements MethodInterceptor {@Override// invoke方法会在原方法执行前后进行拦截public Object invoke(MethodInvocation methodInvocation) throws Throwable {// 如果当前上下文中已存在事务,则直接调用原方法,不进行拦截处理if (!StringUtils.isEmpty(TransactionContext.getXID())) {return methodInvocation.proceed();}// 定义一个状态标志,标记事务是否执行成功boolean state = true;Object o;// 开启一个新的事务LocalTxUtil.startTransaction();try {// 调用原始方法o = methodInvocation.proceed();} catch (Exception e) {// 如果原方法执行抛出异常,则标记事务执行失败state = false;throw e;} finally {// 根据事务执行状态,提交或回滚事务if (state) {LocalTxUtil.commit();} else {LocalTxUtil.rollback();}}// 返回原方法的执行结果return o;}
}

3. 使用步骤示例

官方示例:https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-local-sample

完整示例项目 数据库都已准备好,可以直接运行测试。http://localhost:8080/doc.html

示例项目A,B,C分别对应OrderService,ProductService,AccountService。分别是独立的数据库。

用户下单分别调用产品库扣库存,账户库扣余额。
如果库存不足,或用户余额不足都抛出RuntimeException,触发整体回滚。

@Slf4j
@Service
@AllArgsConstructor
public class OrderService {private final OrderMapper orderMapper;private final AccountService accountService;private final ProductService productService;//@DS("order") 这里不需要,因为order是默认库,如果开启事务的不是默认库则必须加@DSTransactional //注意这里开启事务public void placeOrder(PlaceOrderRequest request) {log.info("=============ORDER START=================");Long userId = request.getUserId();Long productId = request.getProductId();Integer amount = request.getAmount();log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);log.info("当前 XID: {}", TransactionContext.getXID());Order order = Order.builder().userId(userId).productId(productId).status(OrderStatus.INIT).amount(amount).build();orderMapper.insert(order);log.info("订单一阶段生成,等待扣库存付款中");// 扣减库存并计算总价Double totalPrice = productService.reduceStock(productId, amount);// 扣减余额accountService.reduceBalance(userId, totalPrice);order.setStatus(OrderStatus.SUCCESS);order.setTotalPrice(totalPrice);orderMapper.updateById(order);log.info("订单已成功下单");log.info("=============ORDER END=================");}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {private final ProductMapper productMapper;@DS("product")public Double reduceStock(Long productId, Integer amount) {log.info("=============PRODUCT START=================");log.info("当前 XID: {}", TransactionContext.getXID());// 检查库存Product product = productMapper.selectById(productId);Assert.notNull(product, "商品不存在");Integer stock = product.getStock();log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);if (stock < amount) {log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);throw new RuntimeException("库存不足");}log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());// 扣减库存int currentStock = stock - amount;product.setStock(currentStock);productMapper.updateById(product);double totalPrice = product.getPrice() * amount;log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);log.info("=============PRODUCT END=================");return totalPrice;}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountService {private final AccountMapper accountMapper;@DS("account")public void reduceBalance(Long userId, Double price) {log.info("=============ACCOUNT START=================");log.info("当前 XID: {}", TransactionContext.getXID());Account account = accountMapper.selectById(userId);Assert.notNull(account, "用户不存在");Double balance = account.getBalance();log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);if (balance < price) {log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);throw new RuntimeException("余额不足");}log.info("开始扣减用户 {} 余额", userId);double currentBalance = account.getBalance() - price;account.setBalance(currentBalance);accountMapper.updateById(account);log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);log.info("=============ACCOUNT END=================");}
}

4. 官方源码分析

5. 参考资料

  1. dynamic-datasource GitHub 仓库 ↗:dynamic-datasource 的官方 GitHub 仓库,包含源代码、文档和示例等资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/57092.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS学习笔记01

CSS笔记01 什么是CSS CSS&#xff08;Cascading Style Sheets &#xff09;&#xff1a;层叠样式表&#xff0c;也可以叫做级联样式表&#xff0c;是一种用来表现 HTML 或 XML 等文件样式的计算机语言。字体&#xff0c;颜色&#xff0c;边距&#xff0c;高度&#xff0c;宽度…

5 群起集群

1.在启动集群之前&#xff0c;先配置workers,有几个节点就配置几个 [atguiguhadoop102 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workers在该文件中增加如下内容&#xff1a; hadoop102 hadoop103 hadoop104 注意&#xff1a;该文件中添加的内容结尾不允许有空格&a…

成都瀚网科技:抖店如何经营?

作为热门的短视频分享平台&#xff0c;抖音不仅是一种娱乐工具&#xff0c;更是一个蕴藏着无限商机的电商平台。开店、抖音下单成为很多人的选择。那么&#xff0c;抖音如何开店、下单呢&#xff1f; 1、如何在抖音上开店和下单&#xff1f; 注册账号&#xff1a;首先&#xff…

vue 后台管理系统登录 记住密码 功能(Cookies实现)

安装插件 import Cookies from js-cookie 组件引入 import Cookies from js-cookie; 存值&#xff1a; Cookies.set(username, state.account, { expires: 30 }); // username 存的值的名字&#xff0c;state.account 存的值 expires 存储的时间&#xff0c;30天Cookies…

mysql sql_mode数据验证检查

sql_mode 功能 sql_mode 会影响MySQL支持的sql语法以及执行的数据验证检查。通过设置sql_mode ,可以完成不同严格程度的数据校验&#xff0c;有效地保障数据准确性 sql_mode 严格模式 VS 宽松模式 宽松模式 比如&#xff0c;插入的数据不满足 表的数据类型&#xff0c;也可能…

2023年高教社杯 国赛数学建模思路 - 案例:ID3-决策树分类算法

文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法&#xff0c;就是频繁模…

opencv 车牌号的定位和识别+UI界面识别系统

目录 一、实现和完整UI视频效果展示 主界面&#xff1a; 识别结果界面&#xff1a;&#xff08;识别车牌颜色和车牌号&#xff09; 查看历史记录界面&#xff1a; 二、原理介绍&#xff1a; 车牌检测->图像灰度化->Canny边缘检测->膨胀与腐蚀 边缘检测及预处理…

Vue3(开发h5适配)

在开发移动端的时候需要适配各种机型&#xff0c;有大的&#xff0c;有小的&#xff0c;我们需要一套代码&#xff0c;在不同的分辨率适应各种机型。 因此我们需要设置meta标签 <meta name"viewport" content"widthdevice-width, initial-scale1.0">…

第十四课:采用 Qt 开发翻页/分页/多页窗体组件

功能描述&#xff1a;采用 Qt 开发一个翻页/分页/多页的窗体组件&#xff0c;封装为 QWidget 的子类&#xff0c;在你的应用程序中可直接使用。 一、最终演示效果 本次制作的翻页/分页/多页窗体组件是基于 Qt 开发&#xff0c;整个程序封装成 PageWidget 类&#xff0c;继承于…

【微信红包】Axure聊天发红包原型图,含流程图和PRD产品文档

作品概况 页面数量&#xff1a;共 60 页 兼容软件&#xff1a;Axure RP 9/10&#xff0c;不支持低版本 应用领域&#xff1a;聊天软件、社交软件 作品申明&#xff1a;页面内容仅用于功能演示&#xff0c;无实际功能 作品特色 本作品为「发红包」的原型设计图&#xff0c…

Linux_4_文本处理工具和正则表达式

目录 1文本编辑工具之神VIM1.1 vi和vim简介1.2使用vim1.2.1 vim 命令格式1.2.2三种主要模式和转换 1.3扩展命令模式1.3.1扩展命令模式基本命令1.3.2 地址定界1.3.3查找并替换1.3.4定制vim的工作特性1.3.4.1行号1.3.4.2忽略字符的大小写1.3.4.3白动缩进1.3.4.4复制粘贴保留格式1…

商城-学习整理-集群-K8S-集群环境部署(二十四)

目录 一、MySQL集群1、mysql集群原理2、Docker安装模拟MySQL主从复制集群1、下载mysql镜像2、创建Master实例并启动3、创建 Slave 实例并启动4、为 master 授权用户来同步数据1、进入 master 容器2、进入 mysql 内部 &#xff08;mysql –uroot -p&#xff09;3、查看 master 状…

Haproxy+Keepalive 整合rabbitmq实现高可用负载均衡

Haproxy 实现负载均衡 HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理&#xff0c;支持虚拟主机&#xff0c;它是免费、快速并且可靠的一种解决方案&#xff0c;包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种…

响应式web-PC端web与移动端web(H5)兼容适配 选型方案

背景 项目需要&#xff0c;公司已经有一套PC端web&#xff0c;需要做一套手机端浏览器可用的&#xff0c;但是又想兼容pc端&#xff0c;适配的web项目。 以下是查阅到响应布局现成的开源模版。根据自己技术栈&#xff0c;vue2,js来搜索相关的开源项目。 RuoYi 使用若依快速…

【LeetCode75】第三十七题 二叉树中的最长交错路径

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 给我们一棵二叉树&#xff0c;问我们在这棵树里能找到的最长交错路径。最长交错路径就是在二叉树里一左一右一左一右这样走&#xff0c;最…

后端面试话术集锦第三篇:spring cloud 面试话术

🚗后端面试集锦目录 💖后端面试话术集锦第一篇:spring面试话术💖 💖后端面试话术集锦第二篇:spring boot面试话术💖 💖后端面试话术集锦第三篇:spring cloud面试话术💖 💖后端面试话术集锦第四篇:ElasticSearch面试话术💖 💖后端面试话术集锦第五篇:r…

利用深度蛋白质序列嵌入方法通过 Siamese neural network 对 virus-host PPIs 进行精准预测【Patterns,2022】

研究背景&#xff1a; 病毒感染可以导致多种组织特异性损伤&#xff0c;所以 virus-host PPIs 的预测有助于新的治疗方法的研究&#xff1b;目前已有的一些 virus-host PPIs 鉴定或预测方法效果有限&#xff08;传统实验方法费时费力、计算方法要么基于蛋白结构或基因&#xff…

以太网交换机高稳定性时钟系统应用方案

随着网络技术的不断发展&#xff0c;我们的生活也发生着巨大的变化&#xff0c;这离不开以太网起到的重大作用&#xff0c;全球大部分地区的以太网交换机市场都出现了增长。 那么&#xff0c;平常我们所说的以太网交换机到底是什么&#xff1f;今天小扬给大家科普科普以太网交…

ubuntu学习(六)----文件编程实现cp指令

1 思路 Linux要想复制一份文件通常指令为&#xff1a; cp src.c des.c 其中src.c为源文件&#xff0c;des.c为目标文件。 要想通过文件编程实现cp效果&#xff0c;思路如下 1 首先打开源文件 src.c 2 读src到buf 3 创建des.c 4 将buf写入到des.c 5 close两个文件 2 实现 vi …

第一次实验:Protocol Layers

第一次实验&#xff1a;Protocol Layers 捕获跟踪*Pick a URL and fetch it with* wget *or* curl*.* 检查跟踪数据包结构协议开销复用密钥*Which Ethernet header field is the demultiplexing key that tells it the next higher layer is IP?**Which IP header field is th…