多线程事务如何回滚?

背景介绍

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法

/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n; int number=source.size()/n; int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
/**  线程池配置* @version V1.0*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService =  newThreadPool();}}}return executorService;}private static  ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* @author 86182* @version V1.0*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}

示例事务不成功操作

/*** 测试多线程事务.* @param employeeDOList*/
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);threadArray[i] =  new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}

数据库中存在一条数据:

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/***   测试多线程事务.* @throws InterruptedException*/@Testpublic  void MoreThreadTest2() throws InterruptedException {int size = 10;List<EmployeeDO> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}
}

测试结果:

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

 @ResourceSqlContext sqlContext;/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();//拆分listList<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
// sql
<insert id="saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})</foreach></insert>

数据库中一条数据:

测试结果:抛出异常,

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

成功操作示例:

 @Resource
SqlContext sqlContext;
/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);for (int i =0;i<lists.size();i++){List<EmployeeDO> list  = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}

测试结果:

数据库中数据:

删除的删除了,添加的添加成功了,测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/658367.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Mixamo网站上,下载的动画导入unity给自己的模型添加后出错怎么解决

在Mixamo网站上&#xff0c;下载的动画导入unity给自己的模型添加后出错 一、在Mixamo下载的模型可以正常使用二、在自己的模型和unity自带模型上就出错1.解决方法2.解决成功 注意 一、在Mixamo下载的模型可以正常使用 二、在自己的模型和unity自带模型上就出错 1.解决方法 选…

C/C++ - 类的继承机制

目录 类继承 继承的本质 继承的作用 继承的语法 继承的权限 继承的构造析构 继承构造函数 继承析构函数 继承的基类调用 继承的对象赋值 继承与友元关系 继承与静态成员 不能被继承的类 构造函数 语法特性 单继承 单继承的特点 多继承 多继承概念 多继承语…

Linux ubuntu 20.4.6安装docker

这边需要在vm中使用docker&#xff0c;记录下 1.更新系统包列表 确保您的系统包列表是最新的 sudo apt update 2.安装依赖工具 sudo apt install apt-transport-https ca-certificates curl software-properties-common 3.添加Docker GPG密钥 curl -fsSL https://downloa…

Android Room数据库异常: Room cannot verify the data integrity.

文章目录 一、前言二、错误信息如下三、参考链接 一、前言 在Room数据库结构变动的情况下&#xff0c;如果没有进行Room数据库升级迁移&#xff0c;则会报错Room cannot verify the data integrity.。在实际开发过程中&#xff0c;数据库结构会经常变化&#xff0c;直到发版。…

Centos 7.9 在线安装 VirtualBox 7.0

1 访问 Linux_Downloads – Oracle VM VirtualBox 2 点击 ​the Oracle Linux repo file 复制 内容到 /etc/yum.repos.d/. 3 在 /etc/yum.repos.d/ 目录下新建 virtualbox.repo&#xff0c;复制内容到 virtualbox.repo 并 :wq 保存。 [rootlocalhost centos]# cd /etc/yum.rep…

【数据结构 05】双链表

一、原理 双链表又称双向链表&#xff0c;通常情况下是带头循环结构&#xff0c;在CSTL标准模板库中封装的<list.h>头文件就是带头双向循环链表。 特性&#xff1a;增删灵活且高效&#xff0c;支持随机增删但不支持随机访问 设计思路&#xff1a; 链表包含一个头节点h…

一维差分(c++题解)

题目描述 输入一个长度为n的整数序列。 接下来输入 个操作&#xff0c;每个操作包含三个整数 &#xff0c;表示将序列中 之间的每个数加上 。 请你输出进行完所有操作后的序列。 输入格式 第一行包含两个整数 n 和 m。 第二行包含n个整数&#xff0c;表示整数序列。 接下来…

【Linux】模拟实现一个简单的minishell

目录 从显示屏获取输入字符流 分割字符串 取出命令名称及选项 去除输入时多按的那个换行符 创建子进程&#xff0c;实现程序替换 如果替换失败&#xff0c;进程终止exit 查看子进程情况 实现echo $?功能 实现cd 最终代码 基本思路 让父进程创建一个子进程&#xff0c…

我用Java写了一个简单的二叉树算法

二叉树是一种常见的数据结构&#xff0c;它是由节点和连接节点的边组成的。每个节点最多有两个子节点&#xff0c;分别称为左子节点和右子节点。二叉树算法包括遍历、查找、插入、删除等操作。 class Node {int data;Node left, right;public Node(int item) {data item;left…

[NCTF2019]Fake XML cookbook(特详解)

先试了一下弱口令&#xff0c;哈哈习惯了 查看页面源码发现xml function doLogin(){var username $("#username").val();var password $("#password").val();if(username "" || password ""){alert("Please enter the usern…

微信小程序如何调整checkbox和radios选择框的大小和样式

目录 修改选中框大小 修改Checkbox样式 修改Radio样式 修改选中框大小 直接使用width和height调整checkbox和radios选择框的大小是无效的,简单的调整大小可以通过修改transform值,如下所示: .wxss .fill-checkbox{transform: scale(0.5,0.5); } scale参数分别为在长…

二分查找(c++题解)

题目描述 在一个单调递增的序列里查找X。 如果找到x&#xff0c;则返回x在数组中的位置 如果没有找到&#xff0c;则返回&#xff0d;1 输入格式 第1行&#xff1a;1个整数N(1<N<2000000), 表示元素的个数 第2行开始的若干行&#xff0c;每行10个空格分开的整数&am…

SpringMVC处理ajax请求(@RequestBody注解),ajax向后端传递的数据格式详解

目录 RequestBody注解简单介绍 RequestBody获取json格式的请求参数 Servlet方式处理ajax请求 本文讲解两种方式实现SpringMVC与Ajax交互&#xff0c;1、通过SpringMVC提供的注解RequestBody处理ajax请求&#xff1b;2、通过JavaEE时期的Servlet来处理 RequestBody注解简单介…

CISAW和CISP-PTE证书选择指南

&#x1f4e3;在信息安全领域&#xff0c;选择合适的证书可以为你的职业生涯增添光彩。很多从事信息渗透行业的朋友经常讨论CISP-PTE和CISAW之间的选择问题。今天就从4个方面带你详细了解这两张证书&#xff0c;帮你做出明智的选择&#xff01; 1️⃣证书的行业前景 &#x1f4…

【数据结构1-4】图的基本应用

一、【P5318】查找文献&#xff08;邻接表DFSBFS&#xff09; 本题是图的遍历模板题&#xff0c;需要使用DFS和BFS遍历方法。 由于本题最多有1e5个顶点&#xff0c;如果采用邻接矩阵存储图的话需要4*1e10 Byte空间&#xff0c;显然会超内存&#xff0c;因此这里采用邻接表的方法…

【Nuxt3】layouts的使用

简言 Nuxt 提供了一个布局框架&#xff0c;用于将常见的 UI 模式提取为可重用的布局。 为了获得最佳性能&#xff0c;在使用时&#xff0c;放置在此目录中的组件将通过异步导入自动加载。 layouts layouts文件夹存放的是ui布局文件&#xff0c;就是实现一个页面整体架构规则的…

线程池相关的类学习

Executor public interface Executor {//执行任务void execute(Runnable command); }ExecutorService public interface ExecutorService extends Executor {//关闭线程池&#xff0c;不能再向线程池中提交任务&#xff0c;已存在与线程池中的任务会继续执行&#xff0c;直到…

超声波风速风向仪的工作原理

TH-WQX2在我们的日常生活中&#xff0c;气象条件的影响无处不在。无论是准备户外活动&#xff0c;还是安排农业生产&#xff0c;了解当天的风速和风向都是非常关键的。随着科技的发展&#xff0c;超声波风速风向仪的出现为气象监测带来了革命性的变化。 一、超声波风速风向仪的…

聚合支付,实现支付宝微信扫二维码直接跳转支付

具体要实现的功能&#xff1a;手机支付宝或微信扫描同一个二维码&#xff0c;跳转各自的支付 微信使用&#xff1a;jsapi支付 支付宝&#xff1a;wappay 上篇已写了如何实现内网穿透调试就不多叙述 1.判断客户端类型&#xff0c;从request的中将user-agent拉取下来&#xf…

【Javaweb】【C00157】基于SSM的宠物护理预定系统(论文+PPT)

基于SSM的宠物护理预定系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的宠物护理预订系统 本系统分为前台系统模块、后台管理员模块以及后台会员用户模块 其中前台系统模块&#xff1a;当游客打开系统的网址后&…