CyclicBarrier实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)

😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)
⏱️ @ 创作时间: 2023年11月26日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、方法说明:
  • 3、代码实例

前言

通过CyclicBarrierCountDownLatch配合开启多个子线程,由子线程完成数据的处理,子线程完成数据处理后进行等待,直到所有的子线程完成数据处理后,再判断是否进行回滚,如果需要回滚则所有线程执行回滚操作

如果需要由子线程处理完数据,但是由主线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630750

1、概述

CyclicBarrier是一个同步器工具类,用来协调多个线程之间的同步,通过await()进行阻塞,直到所有的线程都执行await()后,所有的线程再继续执行。

2、方法说明:

  • public viod await() /int await(long timeout,TimeUnit unit) :使当前线程一直等待,除非线程被中断或超出了指定的等待时间。
    当线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果每执行一次await() 计数加一,直到达到初始值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则该方法根本不会再进行阻塞。

3、代码实例

有用到hutool的工具包,pom如下:

        <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.0.7</version></dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate CyclicService cyclicService;/*** CyclicBarrier实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据** @return*/@ApiOperationSupport(order = 5)@GetMapping("/cyclic/handleDataSonBack")public String handleDataSonBack() {cyclicService.handleDataSonBack();return "success";}

Sevice:

@Service
@Slf4j
public class CountDownService {@Resourceprivate TestMapper testMapper;@Resourceprivate ApplicationContext applicationContext;/*** CyclicBarrier实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据** @return*/@Transactional(rollbackFor = Exception.class)public void handleDataSonBack() {List<TestEntity> testList = getData();AtomicBoolean errorTag = new AtomicBoolean(false);long start = System.currentTimeMillis();// 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定// 比如:一万条数据,每条单独处理需要50ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定// 需要使用hutool工具类进行分组// 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量List<List<TestEntity>> splitList = CollUtil.split(testList, 200);// 设置CyclicBarrier大小,需要比实际子线程+1,业务主线程需要进行阻塞CyclicBarrier cyclicBarrier = new CyclicBarrier(splitList.size() + 1);// 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚CountDownLatch errorCountDown = new CountDownLatch(1);// 异步调用其他Service,执行业务处理CyclicService bean = applicationContext.getBean(CyclicService.class);// 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用ExecutorService executorService = Executors.newCachedThreadPool();splitList.forEach(list -> {// 线程处理executorService.execute(() -> {bean.handleDataAsyncSonBack(list, cyclicBarrier, errorCountDown, errorTag);});});executorService.shutdown();try {// 主线程阻塞,直到子线程执行完成cyclicBarrier.await();// 可以设置最大阻塞时间,防止线程一直挂起,当子线程时间大于当前时间后会抛出TimeOut异常// cyclicBarrier.await(5, TimeUnit.SECONDS);// 模拟执行主线程业务逻辑耗时,比如insert、update等ThreadUtil.sleep(20);log.info("继续执行主线程");// 继续执行后续的操作,比如insert、update等TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(1);entity.setCommodityCode("handleTestMain");entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-handleTestMain");testMapper.insert(entity);} catch (Exception e) {log.error("主线程业务执行异常");errorTag.set(true);} finally {// 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中errorCountDown.countDown();}long end = System.currentTimeMillis();log.info("数据处理完成,耗时:{}", (end - start) / 1000);// 如果出现异常if (errorTag.get()) {throw new RuntimeException("异步业务执行出现异常");}log.info("主线程执行完成");}/*** 子线程异步处理,并且实现回滚* 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量*/@Transactional(rollbackFor = Exception.class)public void handleDataAsyncSonBack(List<TestEntity> list, CyclicBarrier cyclicBarrier, CountDownLatch errorCountDown, AtomicBoolean errorTag) {try {log.info("开始执行子线程");for (TestEntity entity : list) {if (errorTag.get()) {break;}// 对实体类的业务处理,此处模拟业务处理,耗时50msThreadUtil.sleep(50);// 数据库查询操作testMapper.insert(entity);// 模拟数据处理中,出现了异常if (entity.getCount().equals(2000)) {throw new RuntimeException("子线程执行异常");}}} catch (Exception e) {log.error("子线程异常:{}", e.getMessage(), e);errorTag.set(true);} finally {// 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作try {cyclicBarrier.await();} catch (Exception e) {errorTag.set(true);}}log.info("handleDataAsyncSonBack-业务处理完成从,等待其他子线程");// 子阻塞,直到其他子线程完成操作try {errorCountDown.await();} catch (Exception e) {errorTag.set(true);}log.info("handleDataAsyncSonBack-子线程执行完成");if (errorTag.get()) {// 抛出异常,回滚数据throw new RuntimeException("handleDataAsyncSonBack-子线程业务执行异常");}}/*** 模拟解析的excel等文件的数据*/private List<TestEntity> getData() {List<TestEntity> list = new ArrayList<>();// 此处模拟一万条数据for (int i = 1; i <= 10000; i++) {TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(i);entity.setCommodityCode("code-" + i);entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-" + i);list.add(entity);}return list;}
}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192937.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【源码篇】基于SpringBoot+thymeleaf实现的大学生自习室座位预定系统

文章目录 系统介绍管理员学生 技术选型成果展示账号地址及其他说明 系统介绍 基于SpringBootthymeleaf实现的大学生自习室座位预定系统是为座位管理打造的一款在线管理平台&#xff0c;它可以实时完成信息处理&#xff0c;使其系统化和规范化。 系统功能说明 管理员 1、用户…

误用STM32串口发送标志位 “USART_FLAG_TXE” “USART_FLAG_TC”造成的BUG

当你使用串口发送数据时是否出现过这样的情况&#xff1a; 1.发送时第一个字节丢失。 2.发送时出现莫名的字节丢失。 3.各种情况字节丢失。 1.先了解一下串口发送的流程图&#xff08;手动描绘&#xff09;&#xff1a; 可以假想USART_FLAG_TXE是用于检测"弹仓"&…

【STM32F103】GPIO通用输入输出口

GPIO 简介 GPIO&#xff08;General Purpose Input Output&#xff09;通用输入输出口是微控制器&#xff08;MCU&#xff09;必备的片上外设&#xff0c;可以实现微控制器与外部设备的数字交换。 STM32F103系列的芯片最多可以提供112个多功能双向IO引脚&#xff0c;但是显然…

机器学习-回归问题(Regression)

前言 与KNN分类任务预测的输出为离散型不同. 在机器学习中&#xff0c;回归任务是用于预测连续数值型变量的任务。回归任务在很多领域都有着广泛的应用. 回归问题求解 在一个回归问题中&#xff0c;很显然模型选择和好坏会直接关系到将来预测结果的接近程度&#xff0c;举个…

规则引擎专题---2、开源规则引擎对比

开源规则引擎 开源的规则引擎整体分为下面几类&#xff1a; 通过界面配置的成熟规则引擎&#xff0c;这种规则引擎相对来说就比较重&#xff0c;但功能全&#xff0c;比较出名的有:drools, urule。 基于jvm脚本语言&#xff0c;互联网公司会觉得drools太重了&#xff0c;然后…

常见的AI安全风险(数据投毒、后门攻击、对抗样本攻击、模型窃取攻击等)

文章目录 数据投毒&#xff08;Data Poisoning&#xff09;后门攻击&#xff08;Backdoor Attacks&#xff09;对抗样本攻击&#xff08;Adversarial Examples&#xff09;模型窃取攻击&#xff08;Model Extraction Attacks&#xff09;参考资料 数据投毒&#xff08;Data Poi…

GEE:不同方向的线性检测算子

作者:CSDN @ _养乐多_ 本文将介绍在 Google Earth Engine(GEE)平台上,使用不同方向的线性检测算子进行卷积操作的代码框架、核心函数和多种卷积核,比如 E-W、NE-SW、N-S、NW-SE 方向检测算子等。 结果如下图所示, 文章目录 一、定向检测算子二、完整代码三、代码链接一…

JAVA代码优化:CommandLineRunner(项目启动之前,预先加载数据)

CommandLineRunner接口是Spring Boot框架中的一个接口&#xff0c;用于在应用程序启动后执行一些特定的代码逻辑。它是一个函数式接口&#xff0c;只包含一个run方法&#xff0c;该方法在应用程序启动后被自动调用。可以帮助我们在应用程序启动后自动执行一些代码逻辑&#xff…

Java(十)(网络编程,UDP,TCP)

目录 网络编程 两种软件架构 网络通信的三要素 IP IPv4的地址分类 特殊IP 端口号 协议 用UDP协议发送数据 用UDP接收数据 TCP接收和发送数据 TCP通信--支持与多个客户端同时通信 网络编程 可以让设备中的程序与网络上其他设备的程序进行数据交互(实现网络通信) 两…

【面试经典150 | 二分查找】搜索二维矩阵

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a;二分查找 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带一些对于本题涉及到的数据结构等…

Fiddler抓包工具之fiddler的composer可以简单发送http协议的请求

一&#xff0c;composer的详解 右侧Composer区域&#xff0c;是测试接口的界面&#xff1a; 相关说明&#xff1a; 1.请求方式&#xff1a;点开可以勾选请求协议是get、post等 2.url地址栏&#xff1a;输入请求的url地址 3.请求头&#xff1a;第三块区域可以输入请求头信息…

springmvc+mybatis+mysql8+idea+jqgrid前端

一、背景 主要是为了学习jqgrid前端技术&#xff0c;熟练一下前后端交互数据 二、效果图 访问地址&#xff1a;http://localhost:8080/cr/views/jqGridDemo.jsp 三、代码展示 控制层JqGridController.java Controller RequestMapping("/jqgrid") public class Jq…

拥抱变化,良心AI工具推荐

文章目录 &#x1f4a5; 简介&#x1f344; 工具介绍&#x1f353; 功能特点&#x1f957; 使用场景&#x1f389; 用户体验&#x1f9e9; 下载地址&#x1f36d; 总结 &#x1f4a5; 简介 我是一名资深程序员&#xff0c;但薪资缺对不起资深两个字&#xff0c;为了生存&#x…

安装selenium+chrome详解

1、创建yaml文件 创建yaml文件,命名为:docker-compose-chrome.yaml,具体内容如下: version: "3.9" services:spiderdriver:image: selenium/standalone-chrome:114.0restart: alwayshostname: spiderdrivercontainer_name: spiderdriverdeploy:resources:limit…

使用Docker部署开源分布式任务调度系统DolphinScheduler

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《Linux》《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 使用Docker部署开源分布式任务调度系统DolphinScheduler 前些天发现了一个巨牛的人工智能学习网…

【iOS】Bug调试

文章目录 前言一、定位编译错误二、设置与查看断点1.文件行断点设置2.符号断点设置3.Exception Breakpoint4.Constraint Error Breakpoint 三、调试工具四、输出窗口五、变量查看窗口六、查看线程七、LLDB调试工具1.p&#xff0c;po命令2.expr命令3.bt命令 前言 在我们的编码过…

Collection的其他相关知识

前置知识&#xff1a;可变参数 就是一种特殊参数&#xff0c;定义在方法 构造器的形参列表里&#xff0c;格式是&#xff1a;数据类型...参数名称&#xff1b; 可变参数的特点和好处 特点&#xff1a;可以不传数据给它&#xff1b;可以传一个或者同时传多个数据给它&#xff…

Vue3+nuxt+ts项目引入高德地图API实现步骤

看了好多相关的文章都没有完全贴合选用Vue3nuxtts框架的&#xff0c;也不太靠谱&#xff0c;只好自己踩坑实现了 首先去高德开放平台用自己的账号申请一个key&#xff0c;位置如下&#xff0c;申请好后保存好生成的key 我们使用vuemap/vue-amap&#xff0c;一个高德地图2.0版本…

阅读软件OmniReader Pro mac功能特色

OmniReader Pro mac是一款文字识别和阅读软件&#xff0c;它可以将印刷体和手写体的文字转换为数字文本&#xff0c;并将其朗读出来。该软件适用于视力受损、阅读困难、语言障碍等用户&#xff0c;可以帮助他们更加轻松地获取信息和阅读文本。 OmniReader Pro具有简洁直观的用户…

单细胞个性化细胞注释

关于单细胞中级的课程内容&#xff0c;前面已经有了三次直播。欢迎回看&#xff5e; 单细胞直播一理解seurat数据结构与pbmc处理流程 单细胞直播二从GSE104154中理解seurat结构 单细胞直播三seurat数据结构与数据可视化 本期主要内容 本期指哪打哪&#xff0c;自己选定细胞&…