多线程事务

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author poxiao* @create 2023-01-05 22:22* <p>* 多线程事务管理器* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议* 解决基于线程池的多线程事务一致性问题*/
@Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0*/private CountDownLatch oneStageLatch = null;/*** 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);/*** 构造方法* @param transactionManager 事务管理器* @param timeout 超时时间* @param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager = transactionManager;this.timeout = timeout;this.unit = unit;}/*** 线程池方式执行任务,可保证线程间的事务一致性* @param runnableList 任务列表* @param executor 线程池* @return*/public boolean execute(List<Runnable> runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread = new Thread() {@Overridepublic void run() {// 如果别的线程执行失败,则该任务就不需要再执行了if (!isSubmit.get()) {log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常,设置回滚isSubmit.set(false);log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成,监控是否有异常,有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0* 主线程发起第二阶段,执行阶段(提交或回滚),根据*/try {// 主线程等待所有线程执行完成,超时时间设置为五秒oneStageLatch.await(timeout, unit);long count = oneStageLatch.getCount();System.out.println("countDownLatch值:" + count);// 主线程等待超时,子线程可能发生长时间阻塞,死锁if (count > 0) {// 设置为回滚isSubmit.set(false);log.info("主线线程等待超时,任务即将全部回滚");}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* @param size 任务数量*/private void innit(int size) {oneStageLatch = new CountDownLatch(size);}
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE = CPU_COUNT * 4;private static final int MAX_COUR_SIZE = CPU_COUNT * 8;// 接下来配置一个bean,配置线程池。@Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();//模拟任务出现异常runnableList.add(() -> {int a = 10 / 0;});users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/15552.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

并发编程笔记7--并发编程基础

1、线程简介 1.1、什么是线程 现代操作系统中运行一个程序&#xff0c;会为他创建一个进程。而每一个进程中又可以创建许多个线程。现代操作系统中线程是最小的调度单元。 两者关系&#xff1a;一个线程只属于一个进程&#xff0c;而一个进程可以拥有多个线程。线程是一个轻量…

微服务01-Eureka Ribbon

微服务 我的个人地址 微服务是一种架构风格&#xff0c;旨在将单一应用程序拆分为一组小型、独立部署的服务&#xff0c;每个服务都围绕特定的业务功能进行构建。这些服务 之间通过轻量级的通信机制互相通信&#xff0c;比如使用HTTP协议或消息队列。微服务架构提供了灵活性和…

CS 下载安装详解

目录 CS简介&#xff1a; CS下载地址&#xff1a; CS的安装&#xff1a; CS简介&#xff1a; CS为目前渗透中常用的一款工具&#xff0c;它的强大在于控制windows木马&#xff0c;CS主要控制windows木马。 CS下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/…

WordPress Country State City Dropdown CF7插件 SQL注入漏洞复现(CVE-2024-3495)

0x01 产品简介 Country State City Dropdown CF7插件是一个功能强大、易于使用的WordPress插件,它为用户在联系表单中提供国家、州/省和城市的三级下拉菜单功能,帮助用户更准确地填写地区信息。同时,插件的团队和支持也非常出色,为用户提供高质量的服务。 0x02 漏洞概述 …

内存分配算法

一、实验目的&#xff1a; 实验目的&#xff1a; 通过编写一个内存分配模拟程序&#xff0c;实现首次适应算法&#xff08;First Fit&#xff09;、循环首次适应算法&#xff08;Next Fit&#xff09;、最佳适应算法&#xff08;Best Fit&#xff09;和最差适应算法&#xff08…

【Pytorch】【MacOS】14.m1芯片使用mps进行深度模型训练

读者要先自行安装python以及anaconda&#xff0c;并且配置pytorch环境 第一步 测试环境 import torch # 判断macOS的版本是否支持 print(torch.backends.mps.is_available()) # 判断mps是否可用 print(torch.backends.mps.is_built())如果第一个语句为False&#xff0c;说明当前…

Python简介

Python简介 1. Python定义 Python 是一种简单易学并且结合了解释性、编译性、互动性和面向对象的脚本语言。Python提供了高级数据结构&#xff0c;它的语法和动态类型以及解释性使它成为广大开发者的首选编程语言。 Python 是解释型语言&#xff1a; 开发过程中没有了编译这个环…

AIGC-常见图像质量评估MSE、PSNR、SSIM、LPIPS、FID、CSFD,余弦相似度----理论+代码

持续更新和补充中…多多交流&#xff01; 参考: 图像评价指标PNSR和SSIM 函数 structural_similarity 图片相似度计算方法总结 MSE和PSNR MSE: M S E 1 m n ∑ i 0 m − 1 ∑ j 0 n − 1 [ I ( i , j ) − K ( i , j ) ] 2 MSE\frac{1}{mn}\sum_{i0}^{m-1}\sum_{j0}^{n-1}[…

汽车展厅应用客流统计,洞察客户规律,完成热门车型分析

在汽车展厅中&#xff0c;客流统计正逐渐成为一项不可或缺的重要工具&#xff0c;它帮助我们洞察客户规律&#xff0c;从而能够更好地完成热门车型分析。 一、客流统计-客户画像分析 客流统计下的客户画像构建为我们提供了深入了解客户的途径。通过对进入展厅的人群进行细致分析…

Flutter 中的 InkWell 小部件:全面指南

Flutter 中的 InkWell 小部件&#xff1a;全面指南 在 Flutter 中&#xff0c;InkWell 是一个用于添加可交互元素的 widget&#xff0c;它能够响应用户的点击操作&#xff0c;并且提供了墨水涟漪动画效果&#xff0c;这是 Material Design 中的一个标准反馈机制。InkWell 可以…

(五)Python3 接口自动化测试,pytest的使用

(五)Python3 接口自动化测试,pytest的使用 简介pytest是python的单元测试框架,用于单元测试,集成测试,功能测试。 它提供了丰富的功能和灵活的用法,使得编写和运行测试变得简单而高效。 pytest框架优点: 1)简单易用:语法简洁清晰,编写测试用例友好,几分钟内上手。 …

2007NOIP普及组真题 4. Hanoi双塔问题

线上OJ&#xff1a; 【07NOIP普及组】Hanoi双塔问题 题解分析 1、本题考的其实不是Hanoi塔&#xff0c;而是瞪眼法&#xff08;数学推导&#xff09;和高精度。 2、本题不需要输出移动的顺序&#xff0c;只是输出移动的次数即可。 核心思想&#xff1a; 1、从上述图中&#x…

常见算法(3)

1.Arrays 它是一个工具类&#xff0c;主要掌握的其中一个方法是srot&#xff08;数组&#xff0c;排序规则&#xff09;。 o1-o2是升序排列&#xff0c;o2-o1是降序排列。 package test02; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparat…

LeetCode 每日一题 2024/5/20-2024/5/26

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 5/20 1542. 找出最长的超赞子字符串5/21 2769. 找出最大的可达成数字5/22 2225. 找出输掉零场或一场比赛的玩家5/23 2831. 找出最长等值子数组5/24 1673. 找出最具竞争力的…

PostgreSQL用户与角色简述

简述 PostgreSQL通过角色&#xff08;role&#xff09;来控制数据库的访问权限。角色可以拥有数据库对象&#xff08;比如表、函数等&#xff09;&#xff0c;并允许将这些对象的权限授予其他角色&#xff0c;从而实现对象访问的控制。角色&#xff08;role&#xff09;包含了…

19、设计模式之命令模式

命令模式 命令模式&#xff08;Command Pattern&#xff09;是一种数据驱动的设计模式&#xff0c;它属于行为型模式。请求以命令的形式包裹在对象中&#xff0c;并传给调用对象。调用对象寻找可以处理该命令的合适的对象&#xff0c;并把该命令传给相应的对象&#xff0c;该对…

虹科Pico汽车示波器 | 免拆诊断案例 | 2012 款雪佛兰科鲁兹车偶尔多个故障灯异常点亮

故障现象 一辆2012款雪佛兰科鲁兹车&#xff0c;搭载1.8 L 发动机&#xff0c;累计行驶里程约为9.6万km。该车组合仪表上的发动机故障灯、ABS故障灯及动力转向故障灯偶尔异常点亮&#xff0c;同时发动机转速表和发动机冷却液温度表的指针会突然归零&#xff0c;严重时发动机无…

独享IP是原生IP吗?二者有何区别?

原生IP&#xff1a; 原生IP是指由Internet服务提供商&#xff08;ISP&#xff09;直接分配给用户的IP地址&#xff0c;这些IP地址通常反映了用户的实际地理位置和网络连接。原生IP是用户在其所在地区或国家使用的真实IP地址&#xff0c;与用户的物理位置直接相关。在跨境电商中…

C++ 学习 关于无符号数的计算

C 学习 关于含无符号数表达式的计算 &#x1f308; 哈喽&#xff0c;失踪人口回归 这篇blog 来源于C 的学习 当然C语言同样适用 C Primer 的笔记 &#x1f33f;让我们从一个问题开始 如果你回答对了 那么就可以跳过了~ 对于下面的问题 //读程序写结果。 unsigned u 10,u2 42…

牛客NC367 第K个n的排列【困难 dfs,全排列问题 Java/Go/PHP/C++】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/1595969179464e4c940a90b36abb3c54 思路 全排列问题本文提供的答案在力扣同一道题60. 排列序列&#xff0c;超时了但是截止文章发表日&#xff0c;牛客上是能通过全部测试用例的Java代码 import java.util.*;pu…