多线程事务怎么回滚

1、背景介绍

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

2、公用的类和方法

/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n; int number=source.size()/n; int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
/**  线程池配置* @version V1.0*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService =  newThreadPool();}}}return executorService;}private static  ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* @author 86182* @version V1.0*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}
}

3、示例事务不成功操作

  /*** 测试多线程事务.* @param employeeDOList*/
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);threadArray[i] =  new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}

数据库中存在一条数据

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/***   测试多线程事务.* @throws InterruptedException*/@Testpublic  void MoreThreadTest2() throws InterruptedException {int size = 10;List<EmployeeDO> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}
}

测试结果:
在这里插入图片描述
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

4、使用sqlSession控制手动提交事务

 @ResourceSqlContext sqlContext;/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();//拆分listList<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
// sql
<insert id="saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})</foreach></insert>

数据库中一条数据

测试结果:抛出异常,
在这里插入图片描述
删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。
成功操作示例:

@Resource
SqlContext sqlContext;
/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);for (int i =0;i<lists.size();i++){List<EmployeeDO> list  = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}

测试结果:
在这里插入图片描述
数据库中数据:

删除的删除了,添加的添加成功了,测试成功。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git重修系列 ------ Git的使用和常用命令总结

一、Git的安装和配置 git安装&#xff1a; Git - Downloads git首次配置用户信息&#xff1a; $ git config --global user.name "kequan" $ git config --global user.email kequanchanqq.com $ git config --global credential store 配置 Git 以使用本地存储机…

mysql主库delete一个没主键的表导致从库延迟很久问题处理

一 问题描述 发现线上环境一个从库出现延迟&#xff0c;延迟了2天了&#xff0c;还没追上主库。 查看当前运行的sql及事务&#xff0c;发现这个sql语句是在delete一个没主键的表。 二 问题模拟 这里在测试环境复现下这个问题。 2.1 在主库造数据 use baidd; CREATE TABL…

【数据库】Redis

文章目录 [toc]Redis终端操作进入Redis终端Redis服务测试切换仓库 String命令存储字符串普通存储设置存储过期时间批量存储 查询字符串查询单条批量查询 Key命令查询key查询所有根据key首字母查询判断key是否存在查询指定的key对应的value的类型 删除键值对 Hash命令存储hash查…

软件测试_v模型_w模型

v模型&#xff1a; w模型&#xff1a; 一、V模型的8个阶段及其对应关系如下&#xff1a; 1. 需求分析&#xff1a;明确项目的需求&#xff0c;为后续设计提供依据。 2. 总体设计&#xff1a;根据需求分析&#xff0c;设计系统的总体架构。 3. 详细设计&#xff1a;在总体设计的…

在no branch上commit后,再切换到其他分支,找不到no branch分支的修改怎么办?

解决办法 通过git reflog我们可以查看历史提交记录&#xff0c;这里的第二条提交&#xff08;fbd3ea8&#xff09;就是我在no branch上的提交。 再通过git checkout -b backup fbd3ea8&#xff0c;恢复到上次提交的状态&#xff0c;并且为其创建个分支backup&#xff0c;此时…

(七)Servlet教程——Idea编辑器集成Tomcat

1. 点击桌面上Idea快捷方式打开Idea编辑器&#xff0c;假如没有创建项目的话打开Idea编辑器后的界面展示如下图所示 2. 点击界面左侧菜单中的自定义 3. 然后点击界面中的“所有设置...”,然后点击“构建、执行、部署”&#xff0c;选择其中的“应用程序服务器” 4. 点击“”按钮…

C语言-动态内存分配

即使行动导致错误&#xff0c;却也带来了学习与成长;不行动则是停滞与萎缩。&#x1f493;&#x1f493;&#x1f493; •&#x1f319;知识回顾 亲爱的友友们大家好&#xff01;&#x1f496;&#x1f496;&#x1f496;&#xff0c;我们紧接着要进入一个新的内容&#xff0c;…

k8s RBAC 角色访问控制详解与生产中的实际应用案例

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《Kubernetes航线图&#xff1a;从船长到K8s掌舵者》 &#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、前言 1、k8s简介 2、RBAC简介 二、RBAC关键…

【AMBA Bus ACE 总线 6 -- ACE cache maintenance 详细介绍】

文章目录 ACE cache maintenance什么叫 cache maintenance operations呢?ACE cache line statesACE cache maintenance 什么叫 cache maintenance operations呢? 比如ARM CPU 对自己的Icache 和 Dcache会有大量的transaction操作,也即maintenance操作,如果cache 是dirty 话…

Python的历史演变与作用

目录 1.概述 2.起源 3.发展阶段 4.Python 3的诞生 5.现状与未来 6.Python的作用 6.1.Web开发 6.2.数据科学与人工智能 ​​​​​​​6.3.自动化与脚本编程 ​​​​​​​6.4.教育与学习 ​​​​​​​6.5.其他领域 7.结语 1.概述 Python&#xff0c;一门富有表…

26.统一网关Gateway

网关的功能 1.身份认证&#xff0c;权限的校验。 2.服务的路由&#xff0c;负载均衡。用户请求被分配到哪一个微服务。一个微服务可以有多个实例&#xff0c;所以使用负载均衡。 3.请求限流。 springcloud网关实现有两种&#xff1a;gateway, zuul zuul是基于servlet实现的…

JavaEE——介绍 HTTPServlet 三部分使用与 cookie 和 session 的阐述

文章目录 一、HTTPServlet介绍其中的关键 三个方法 二、HTTPServletRequest(处理请求)1.分块介绍方法作用get 为前缀的方法字段中 含有 getParameter 字段 的方法(前后端交互)&#xff1a;字段中 含有 getHeader 字段 的方法&#xff1a; 2.解释前后端的交互过程3.使用 json 格…

币圈是什么意思?币圈开发

币圈是一个涵盖了区块链、加密货币及其应用的独特领域&#xff0c;它的兴起与发展已经彻底改变了我们对金融、科技和未来的认知。 一、什么是币圈&#xff1f; 币圈可以被理解为围绕虚拟货币展开的一系列活动和产业的总称。它包括区块链技术的研发、数字货币的创造、交易、投资…

数字旅游打造个性化旅行体验,科技让旅行更精彩:借助数字技术,旅行者可以定制专属旅行计划,享受个性化的旅行体验

目录 一、引言 二、数字旅游的兴起与发展 三、数字技术助力个性化旅行体验 1、智能推荐系统&#xff1a;精准匹配旅行者需求 2、定制化旅行计划&#xff1a;满足个性化需求 3、实时互动与分享&#xff1a;增强旅行体验 四、科技提升旅行便捷性与安全性 1、移动支付与电…

PotatoPie 4.0 实验教程(30) —— FPGA实现摄像头图像中值滤波

中值滤波是什么&#xff1f; 图像的中值滤波是一种非线性图像滤波方法&#xff0c;它用于去除图像中的椒盐噪声或其他类型的噪声。中值滤波的原理是用每个像素周围的邻域中的中值来替代该像素的值。与均值滤波不同&#xff0c;中值滤波不会受到极端值的影响&#xff0c;因此在处…

数据仓库是什么

写在前面 刚接触大数据的新手小白可能会对数据仓库这个词比较陌生&#xff0c;本文将介绍数据仓库的主要特征及OLTP&OLAP的区别&#xff0c;帮助读者更好理解数据仓库。 一、什么是数据仓库 数据仓库&#xff0c;简称数仓&#xff0c;是一个对数据进行加工&#xff0c;集…

YOLOv8 的安装、使用与训练

YOLOV8 YOLOv8简介 YOLOv8是YOLO系列的最新版本&#xff0c;它融合了先进的深度学习技术和目标检测领域的最新研究成果 与其前身相比&#xff0c;YOLOv8在速度和精度方面都有了显著的提升&#xff0c;使其成为一个理想的实时目标检测解决方案。该模型结合了卷积神经网络&…

分段函数拟合-施加分段点连续约束条件|【Matlab源码+视频介绍】

专栏导读 作者简介&#xff1a;工学博士&#xff0c;高级工程师&#xff0c;专注于工业软件算法研究本文已收录于专栏&#xff1a;《复杂函数拟合案例分享》本专栏旨在提供 1.以案例的形式讲解各类复杂函数拟合的程序实现方法&#xff0c;并提供所有案例完整源码&#xff1b;2.…

PotatoPie 4.0 实验教程(27) —— FPGA实现摄像头图像拉普拉斯边缘提取

拉普拉斯边缘提取有什么作用&#xff1f; 拉普拉斯边缘检测是一种常用的图像处理技术&#xff0c;用于检测图像中的边缘和边界。它的主要作用包括&#xff1a; 边缘检测&#xff1a;拉普拉斯算子可以帮助检测图像中的边缘&#xff0c;即图像中亮度快速变化的位置。这些边缘通常…

数据集笔记:处理北大POI 数据:保留北京POI

数据来源&#xff1a;Map POI (Point of Interest) data - Official data of the contest (pku.edu.cn) windows 下载方法&#xff1a;数据集笔记&#xff1a;windows系统下载北大开放数据研究平台的POI数据-CSDN博客 1 读取数据 1.1 列出所有的文件 dir1D:/data/PKU POI/2…