spring 使用多线程,保证事务一致性

1、背景

最近接受到接口优化的任务,查看代码逻辑后发现在批量处理数据耗时长,想到使用多线程处理批量数据,又要保持原来的事务一致性。

2、实现方法

(1)、创建多线程事务管理

@Component
@Slf4j
public class MultiThreadingTransactionManager {/*** 数据源事务管理器*/@Autowiredprivate DataSourceTransactionManager dataSourceTransactionManager;@Autowiredprivate ThreadPoolTaskExecutor executorService;private long timeout = 120;/*** 用于判断子线程业务是否处理完成* 处理完成时threadCountDownLatch的值为0*/private CountDownLatch threadCountDownLatch;/*** 用于等待子线程全部完成后,子线程统一进行提交和回滚* 进行提交和回滚时mainCountDownLatch的值为0*/private final CountDownLatch mainCountDownLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true,当子线程有异常发生时,设置为false,回滚事务*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);public boolean execute(List<Runnable> runnableList,String factorySchema) {isSubmit.set(true);setThreadCountDownLatch(runnableList.size());runnableList.forEach(runnable -> executorService.execute(() -> executeThread(factorySchema,runnable, threadCountDownLatch, mainCountDownLatch, isSubmit)));// 等待子线程全部执行完毕try {// 若计数器变为零了,则返回 trueboolean isFinish = threadCountDownLatch.await(timeout, TimeUnit.SECONDS);if (!isFinish) {// 如果还有为执行完成的就回滚isSubmit.set(false);log.info("存在子线程在预期时间内未执行完毕,任务将全部回滚");}} catch (Exception exception) {log.info("主线程发生异常,异常为: " + exception.getMessage());} finally {// 计数器减1,代表该主线程执行完毕mainCountDownLatch.countDown();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}private void executeThread(String factorySchema,Runnable runnable, CountDownLatch threadCountDownLatch, CountDownLatch mainCountDownLatch, AtomicBoolean isSubmit) {log.info("子线程: [" + Thread.currentThread().getName() + "]");// 判断别的子线程是否已经出现错误,错误别的线程已经出现错误,那么所有的都要回滚,这个子线程就没有必要执行了if (!isSubmit.get()) {log.info("整个事务中有子线程执行失败需要回滚, 子线程: [" + Thread.currentThread().getName() + "] 终止执行");// 计数器减1,代表该子线程执行完毕threadCountDownLatch.countDown();return;}//动态数据源切换SchemaContextHolder.setSchema(factorySchema);// 开启事务DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);try {// 执行业务逻辑runnable.run();} catch (Exception exception) {// 发生异常需要进行回滚,设置isSubmit为falseisSubmit.set(false);log.info("子线程: [" + Thread.currentThread().getName() + "]执行业务发生异常,异常为: " + exception.getMessage());} finally {// 计数器减1,代表该子线程执行完毕threadCountDownLatch.countDown();}try {// 等待主线程执行mainCountDownLatch.await();} catch (Exception exception) {log.info("子线程: [" + Thread.currentThread().getName() + "]等待提交或回滚异常,异常为: " + exception.getMessage());}try {// 提交if (isSubmit.get()) {dataSourceTransactionManager.commit(transactionStatus);log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交");} else {dataSourceTransactionManager.rollback(transactionStatus);log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务回滚");}} catch (Exception exception) {log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交或回滚出现异常,异常为:" + exception.getMessage());}}private void setThreadCountDownLatch(int num) {this.threadCountDownLatch = new CountDownLatch(num);}
}

(2)、测试类

@RestController
@RequestMapping("test")
public class TestController {@AutowiredTestService testService;@AutowiredMultiThreadingTransactionManager multiThreadingTransactionManager;@RequestMapping("test")public String test(){List<TestBean> list = new ArrayList<>();list.add(new TestBean("2",1));list.add(new TestBean("3",2));List<Runnable> runnableList = new ArrayList<>();list.forEach(testBean -> runnableList.add(() -> {testService.insert(testBean);}));boolean isSuccess = multiThreadingTransactionManager.execute(runnableList,"db9771");System.out.println(isSuccess);return "ok";};
}

3、总结

大体思路,就是所有子线程在各自线程内开启事务,执行业务逻辑后,判断是否抛错,一旦抛错,会把全局AtomicBoolean置为false,因为其具有原子性所以不会有线程不安全问题。所有子线程完业务代码会等待主线程,全部子线程执行业务结束后,主线程等待结束,判断AtomicBoolean是什么状态,一旦false,所有子线程回滚,否则提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/23197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

海外BGP服务器有什么功能?

当企业选择海外的BGP服务器进行租用时&#xff0c;能够实现哪些功能呢&#xff1f; 当企业拥有海外的BGP服务器时&#xff0c;可以改善网站的访问速度&#xff0c;对于面向全球用户的网站或者是应用来说&#xff0c;能够通过在不同区域所部署的BGP服务器&#xff0c;用户可以根…

【Unity Shader入门精要 第13章】使用深度和法线纹理(一)

1. 原理 深度纹理的本质是一张RenderTexture&#xff0c;只不过其中记录的不是颜色值&#xff0c;而是一个深度值 这些深度值来自于顶点在空间变换后得到的归一化设备坐标&#xff08;NDC&#xff09;的Z值 由于NDC坐标的分量取值范围在[-1, 1]之间&#xff0c;要使颜色值能…

基于pytorch的车牌识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、导入数据 from torchvision.transforms import transforms from torch.utils.data import DataLoader from torchvision import datase…

RSA 非对称加密:

非对称加密 RSA 拥有两个密钥&#xff0c; 分别为 公钥 和 私钥&#xff0c; 服务器端拥有公钥和私钥&#xff0c; 二客户端&#xff0c;只有公钥&#xff0c; 这个公钥可以随便传&#xff0c;即使被截获也没有关系&#xff0c; 加密使用公钥&#xff0c; 而解密&#xff0c;…

Mysql时间操作

一、MySql时间戳转换 select unix_timestamp(); #获取时间戳格式时间 select FROM_UNIXTIME(1717399499); #将时间戳转换为普通格式时间二、Mysql时间相加减结果转换为秒 方法1&#xff1a;time_to_sec(timediff(endTime, startTime)) SELECTDISTINCT(column1),min(last_mo…

在Jenkins 中使用 NVM 管理 Node.js 部署项目的自动化脚本

在Jenkins 中使用 NVM 管理 Node.js 部署项目的自动化脚本 人生旅途&#xff0c;总有人不断地走来&#xff0c;有人不断地离去。当新名字变成老名字&#xff0c;当老的名字渐渐模糊&#xff0c;又是一个故事的结束和另一个故事的开始。 在现代软件开发中&#xff0c;持续集成/持…

容器化实践:DevOps环境下的容器交付流程

DevOps的兴起是为了应对市场和消费者对技术应用的不断增长的需求。它的目标是构建一个更快的开发环境&#xff0c;同时保持软件的高质量标准。DevOps还致力于在敏捷开发周期中提升软件的整体品质。这一目标的实现依赖于多种技术、平台和工具的综合运用。 结合容器化技术与DevO…

深入理解mysql中的各种超时属性

1. 前言 connectTimeout: 连接超时 loginTimeout: 登录超时 socketTimeout: Socket网络超时&#xff0c;即读超时 queryTimeout: sql执行超时 transactionTimeout:spring事务超时 innodb_lock_wait_timeout:innodb锁等待超时 wait_timeout:非交互式连接关闭前的等待时间 inter…

uniapp小程序多线程 Worker 实战【2024】

需求 最近遇到个小程序异步解码的需求&#xff0c;采用了WebAssembly&#xff0c;涉及大量的计算。由于小程序的双线程模型只有一个线程处理数据&#xff0c;因此智能寻求其它的解决方案。查看小程序的文档&#xff0c;发现小程序还提供一个异步线程的Worker方案&#xff0c;可…

代码随想录算法训练营第25天|回溯

回溯part02 216. 组合总和 III /*** param {number} k* param {number} n* return {number[][]}*/ var combinationSum3 function(k, n) {// k个数字相加为n// 只能使用1-9// 每个数字只能使用一次// 不能重复 如 1 2 4 、 4 1 2 不可以let res [];backtracking(k, n, [], …

联想Y410P跑大模型

安装vs 2017 查看GPU版本 查看支持哪个版本的cuda windows cuda更新教程_cuda 12.0-CSDN博客 下载并安装cuda tookit 10.1 CUDA Toolkit 10.1 Update 2 Archive | NVIDIA Developer 找到下载的文件&#xff0c;安装 参考安装链接 Win10 Vs2017 CUDA10.1安装&#xff08;避坑…

Due to a bug fix in https://github.com/huggingface/transformers/pull/28687

错误&#xff1a; Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking …

InnoDB存储引擎非常重要的一个机制--MVCC(多版本并发控制)

Mysql是如何实现隔离性的&#xff1f;&#xff08;锁MVCC&#xff09; 隔离性是指一个事务内部的操作以及操作的数据对正在进行的其他事务是隔离的&#xff0c;并发执行的各个事务之间不能相互干扰。隔离性可以防止多个事务并发执行时&#xff0c;可能存在交叉执行导致数据的不…

安全U盘和普通U盘有什么区别?

安全U盘&#xff08;也称为加密U盘或安全闪存驱动器&#xff09;与普通U盘肯定是有一些区别的&#xff0c;从字面意思上来看&#xff0c;就能看出&#xff0c;安全U盘是能够保护文件数据安全性的&#xff0c;普通U盘没这一些功能的&#xff0c;可随意拷贝文件&#xff0c;不防盗…

面试4:c++(数位物联)

1.const 关健字的作用 定义常量&#xff0c;防止变量被意外修改&#xff0c;增强程序的可读性和维护性。 可以用于指针&#xff0c;声明指向常量的指针或常量指针。 2.static关健字的作用 (1)在函数内&#xff0c;用于修饰局部变量&#xff0c;使其生命周期延长到整个程序运行期…

mybatisplus多数据源内置方法报Invalid bound statement (not found)

在用mybatis-plus多数据源时用mapper内置的 selectList(queryWrapper) 查询数据报org.apache.ibatis.binding.BindingException: Invalid bound statement (not found): 问题是在配置多数据源时用的是SqlSessionFactoryBean&#xff0c;改为MybatisSqlSessionFactoryBean即可…

Python怎么逐行处理文件:深度解析与实用技巧

Python怎么逐行处理文件&#xff1a;深度解析与实用技巧 在Python中&#xff0c;逐行处理文件是一项常见且重要的任务。无论是读取大型日志文件、分析文本数据还是处理配置文件&#xff0c;逐行读取都能帮助我们更有效地管理内存并提高处理速度。本文将详细介绍Python中逐行处…

一文了解UVLED线光源的应用

在机器视觉系统中&#xff0c;光源作为不可或缺的一部分&#xff0c;能够提高目标成像效果&#xff0c;增强检测效果。光源的选择至关重要&#xff0c;选到不合适的会影响成像及检测效果。针对不同的检测对象,不同的形状光源应运而生。我们来看看最UVLED线光源。 下面以CCS的光…

某红书旋转滑块验证码分析与协议算法实现

文章目录 1. 写在前面2. 接口分析3. 验证轨迹4. 算法还原【🏠作者主页】:吴秋霖 【💼作者介绍】:擅长爬虫与JS加密逆向分析!Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Python与爬虫领域研究与开发工作! 【🌟作者推…

zoomeye api报错 request invalid, validate usage and try again

项目场景&#xff1a; 调用zoomeye的api接口进行数据拿取 问题描述 之前接口一直通着今天突然报错&#xff0c;以下为源代码 pip install zoomeye from zoomeye.sdk import ZoomEye zm ZoomEye(api_key"34A8B452-D874-C63E0-8471-F3D4f89766f") zm.dork_search(a…