多线程事务

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author poxiao* @create 2023-01-05 22:22* <p>* 多线程事务管理器* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议* 解决基于线程池的多线程事务一致性问题*/
@Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0*/private CountDownLatch oneStageLatch = null;/*** 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);/*** 构造方法* @param transactionManager 事务管理器* @param timeout 超时时间* @param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager = transactionManager;this.timeout = timeout;this.unit = unit;}/*** 线程池方式执行任务,可保证线程间的事务一致性* @param runnableList 任务列表* @param executor 线程池* @return*/public boolean execute(List<Runnable> runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread = new Thread() {@Overridepublic void run() {// 如果别的线程执行失败,则该任务就不需要再执行了if (!isSubmit.get()) {log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常,设置回滚isSubmit.set(false);log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成,监控是否有异常,有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0* 主线程发起第二阶段,执行阶段(提交或回滚),根据*/try {// 主线程等待所有线程执行完成,超时时间设置为五秒oneStageLatch.await(timeout, unit);long count = oneStageLatch.getCount();System.out.println("countDownLatch值:" + count);// 主线程等待超时,子线程可能发生长时间阻塞,死锁if (count > 0) {// 设置为回滚isSubmit.set(false);log.info("主线线程等待超时,任务即将全部回滚");}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* @param size 任务数量*/private void innit(int size) {oneStageLatch = new CountDownLatch(size);}
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE = CPU_COUNT * 4;private static final int MAX_COUR_SIZE = CPU_COUNT * 8;// 接下来配置一个bean,配置线程池。@Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();//模拟任务出现异常runnableList.add(() -> {int a = 10 / 0;});users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/15552.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

并发编程笔记7--并发编程基础

1、线程简介 1.1、什么是线程 现代操作系统中运行一个程序&#xff0c;会为他创建一个进程。而每一个进程中又可以创建许多个线程。现代操作系统中线程是最小的调度单元。 两者关系&#xff1a;一个线程只属于一个进程&#xff0c;而一个进程可以拥有多个线程。线程是一个轻量…

CS 下载安装详解

目录 CS简介&#xff1a; CS下载地址&#xff1a; CS的安装&#xff1a; CS简介&#xff1a; CS为目前渗透中常用的一款工具&#xff0c;它的强大在于控制windows木马&#xff0c;CS主要控制windows木马。 CS下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/…

WordPress Country State City Dropdown CF7插件 SQL注入漏洞复现(CVE-2024-3495)

0x01 产品简介 Country State City Dropdown CF7插件是一个功能强大、易于使用的WordPress插件,它为用户在联系表单中提供国家、州/省和城市的三级下拉菜单功能,帮助用户更准确地填写地区信息。同时,插件的团队和支持也非常出色,为用户提供高质量的服务。 0x02 漏洞概述 …

【Pytorch】【MacOS】14.m1芯片使用mps进行深度模型训练

读者要先自行安装python以及anaconda&#xff0c;并且配置pytorch环境 第一步 测试环境 import torch # 判断macOS的版本是否支持 print(torch.backends.mps.is_available()) # 判断mps是否可用 print(torch.backends.mps.is_built())如果第一个语句为False&#xff0c;说明当前…

Python简介

Python简介 1. Python定义 Python 是一种简单易学并且结合了解释性、编译性、互动性和面向对象的脚本语言。Python提供了高级数据结构&#xff0c;它的语法和动态类型以及解释性使它成为广大开发者的首选编程语言。 Python 是解释型语言&#xff1a; 开发过程中没有了编译这个环…

AIGC-常见图像质量评估MSE、PSNR、SSIM、LPIPS、FID、CSFD,余弦相似度----理论+代码

持续更新和补充中…多多交流&#xff01; 参考: 图像评价指标PNSR和SSIM 函数 structural_similarity 图片相似度计算方法总结 MSE和PSNR MSE: M S E 1 m n ∑ i 0 m − 1 ∑ j 0 n − 1 [ I ( i , j ) − K ( i , j ) ] 2 MSE\frac{1}{mn}\sum_{i0}^{m-1}\sum_{j0}^{n-1}[…

汽车展厅应用客流统计,洞察客户规律,完成热门车型分析

在汽车展厅中&#xff0c;客流统计正逐渐成为一项不可或缺的重要工具&#xff0c;它帮助我们洞察客户规律&#xff0c;从而能够更好地完成热门车型分析。 一、客流统计-客户画像分析 客流统计下的客户画像构建为我们提供了深入了解客户的途径。通过对进入展厅的人群进行细致分析…

2007NOIP普及组真题 4. Hanoi双塔问题

线上OJ&#xff1a; 【07NOIP普及组】Hanoi双塔问题 题解分析 1、本题考的其实不是Hanoi塔&#xff0c;而是瞪眼法&#xff08;数学推导&#xff09;和高精度。 2、本题不需要输出移动的顺序&#xff0c;只是输出移动的次数即可。 核心思想&#xff1a; 1、从上述图中&#x…

常见算法(3)

1.Arrays 它是一个工具类&#xff0c;主要掌握的其中一个方法是srot&#xff08;数组&#xff0c;排序规则&#xff09;。 o1-o2是升序排列&#xff0c;o2-o1是降序排列。 package test02; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparat…

PostgreSQL用户与角色简述

简述 PostgreSQL通过角色&#xff08;role&#xff09;来控制数据库的访问权限。角色可以拥有数据库对象&#xff08;比如表、函数等&#xff09;&#xff0c;并允许将这些对象的权限授予其他角色&#xff0c;从而实现对象访问的控制。角色&#xff08;role&#xff09;包含了…

虹科Pico汽车示波器 | 免拆诊断案例 | 2012 款雪佛兰科鲁兹车偶尔多个故障灯异常点亮

故障现象 一辆2012款雪佛兰科鲁兹车&#xff0c;搭载1.8 L 发动机&#xff0c;累计行驶里程约为9.6万km。该车组合仪表上的发动机故障灯、ABS故障灯及动力转向故障灯偶尔异常点亮&#xff0c;同时发动机转速表和发动机冷却液温度表的指针会突然归零&#xff0c;严重时发动机无…

独享IP是原生IP吗?二者有何区别?

原生IP&#xff1a; 原生IP是指由Internet服务提供商&#xff08;ISP&#xff09;直接分配给用户的IP地址&#xff0c;这些IP地址通常反映了用户的实际地理位置和网络连接。原生IP是用户在其所在地区或国家使用的真实IP地址&#xff0c;与用户的物理位置直接相关。在跨境电商中…

牛客NC367 第K个n的排列【困难 dfs,全排列问题 Java/Go/PHP/C++】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/1595969179464e4c940a90b36abb3c54 思路 全排列问题本文提供的答案在力扣同一道题60. 排列序列&#xff0c;超时了但是截止文章发表日&#xff0c;牛客上是能通过全部测试用例的Java代码 import java.util.*;pu…

Redis - 优惠卷秒杀

场景分析 为了避免对数据库造成压力&#xff0c;我们在新增优惠卷的时候&#xff0c;可以将优惠卷的信息储存在Redis中&#xff0c;这样用户抢购的时候访问优惠卷信息&#xff0c;通过Redis读取信息。 抢购流程&#xff1a; 业务分析 既然在新增优惠卷的时候&#xff0c;我…

【c语言】了解指针,爱上指针(5)

了解指针&#xff0c;爱上指针&#xff08;5&#xff09; 回调函数qsort函数冒泡排序模拟实现qsort函数 回调函数 回调函数&#xff1a;就是一个通过函数指针调用的函数。 把函数的指针作为参数传给另一个函数&#xff0c;当这个指针被用来调用指向的函数时&#xff0c;此时被…

驱动编译报error: negative width in bit-field ‘<anonymous>’错误

错误如下图所示&#xff1a; 代码如下&#xff1a; 问题点&#xff1a;module_param的其他用户的权限参数上。 在Linux中&#xff0c;文件权限由读(r)、写(w)、执行(x)权限组成&#xff0c;分别对应数值4、2、1。 第一位0是占位符&#xff0c;在这里没有意义&#xff0c;因为…

如何使用DotNet-MetaData识别.NET恶意软件源码文件元数据

关于DotNet-MetaData DotNet-MetaData是一款针对.NET恶意软件的安全分析工具&#xff0c;该工具专为蓝队研究人员设计&#xff0c;可以帮助广大研究人员轻松识别.NET恶意软件二进制源代码文件中的元数据。 工具架构 当前版本的DotNet-MetaData主要由以下两个部分组成&#xf…

java图书电子商务网站的设计与实现源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的图书电子商务网站的设计与实现。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 图书电子商…

基于附带Attention机制的seq2seq模型架构实现英译法的案例

模型架构 先上图 我们这里选用GRU来实现该任务&#xff0c;因此上图的十个方框框都是GRU块&#xff0c;如第二张图&#xff0c;放第一张图主要是强调编码器的输出是作用在解码器每一次输入的观点&#xff0c;具体的详细流程图将在代码实现部分给出。 编码阶段 1. 准备工作…

【C++进阶】AVL树

0.前言 前面我们已经学习过二叉搜索树了&#xff0c;但如果我们是用二叉搜索树来封装map和set等关联式容器是有缺陷的&#xff0c;很可能会退化为单分支的情况&#xff0c;那样效率就极低了&#xff0c;那么有没有方法来弥补二叉搜索树的缺陷呢&#xff1f; 那么AVL树就出现了&…