CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)

😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)
⏱️ @ 创作时间: 2023年11月26日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、实现
  • 3、方法说明:
  • 4、代码实例

前言

通过CountDownLatch开启多个子线程,由子线程完成数据的处理,子线程完成数据处理后进行等待,直到所有的子线程完成数据处理后,再判断是否进行回滚,如果需要回滚则所有线程执行回滚操作

如果需要由子线程处理完数据,但是由主线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630258

1、概述

CountDownLatch是一个同步器工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,不可重置使用。

2、实现

使用一个计数器进行实现,计数器初始值为线程的数量,当每一个线程完成自己任务后,计数器的值就会减一,当计数器的值为0时,在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

3、方法说明:

  • public void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
  • public viod await() /boolean await(long timeout,TimeUnit unit) :使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果计数到达零,则该方法返回true值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待。参数:timeout-要等待的最长时间、unit-timeout 参数的时间单位

4、代码实例

有用到hutool的工具包,pom如下:

        <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.0.7</version></dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate CountDownService countDownService;/*** CountDownLatch实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据** @return*/@ApiOperation(value = "测试CountDownLatch", notes = "测试")@ApiOperationSupport(order = 5)@GetMapping("/countDown/handleDataSonBack")public String handleDataSonBack() {countDownService.handleDataSonBack();return "success";}

Sevice:

@Service
@Slf4j
public class CountDownService {@Resourceprivate TestMapper testMapper;@Resourceprivate ApplicationContext applicationContext;/*** CountDownLatch实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据** @return*/@Transactional(rollbackFor = Exception.class)public void handleDataSonBack() {List<TestEntity> testList = getData();AtomicBoolean errorTag = new AtomicBoolean(false);long start = System.currentTimeMillis();// 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定// 比如:一万条数据,每条单独处理需要50ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定// 需要使用hutool工具类进行分组// 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量List<List<TestEntity>> splitList = CollUtil.split(testList, 200);// 设置countDown大小CountDownLatch countDownLatch = new CountDownLatch(splitList.size());// 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚CountDownLatch errorCountDown = new CountDownLatch(1);// 异步调用其他Service,执行业务处理CountDownService bean = applicationContext.getBean(CountDownService.class);// 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用ExecutorService executorService = Executors.newCachedThreadPool();splitList.forEach(list -> {// 线程处理executorService.execute(() -> {bean.handleDataAsyncSonBack(list, countDownLatch, errorCountDown, errorTag);});});executorService.shutdown();try {// 主线程阻塞countDownLatch.await();// 可以设置最大阻塞时间,防止线程一直挂起/*boolean await = countDownLatch.await(1, TimeUnit.SECONDS);if (!await) {// 超过时间子线程都还没有结束,直接都回滚errorTag.set(true);}*/log.info("继续执行主线程");// 继续执行后续的操作,比如insert、update等TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(1);entity.setCommodityCode("handleTestMain");entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-handleTestMain");testMapper.insert(entity);} catch (Exception e) {log.error("主线程业务执行异常");errorTag.set(true);} finally {// 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中errorCountDown.countDown();}long end = System.currentTimeMillis();log.info("数据处理完成,耗时:{}", (end - start) / 1000);// 如果出现异常if (errorTag.get()) {throw new RuntimeException("异步业务执行出现异常");}log.info("主线程执行完成");}/*** 子线程异步处理,并且实现回滚* 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量*/@Transactional(rollbackFor = Exception.class)public void handleDataAsyncSonBack(List<TestEntity> list, CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {try {log.info("开始执行子线程");for (TestEntity entity : list) {if (errorTag.get()) {break;}// 对实体类的业务处理,此处模拟业务处理,耗时50msThreadUtil.sleep(50);// 数据库查询操作testMapper.insert(entity);// 模拟数据处理中,出现了异常if (entity.getCount().equals(2000)) {throw new RuntimeException("子线程执行异常");}}} catch (Exception e) {log.error("子线程异常:{}", e.getMessage(), e);errorTag.set(true);} finally {// 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作countDownLatch.countDown();}log.info("handleDataAsyncSonBack-业务处理完成从,等待其他子线程");// 子阻塞,直到其他子线程完成操作try {errorCountDown.await();} catch (Exception e) {errorTag.set(true);}log.info("handleDataAsyncSonBack-子线程执行完成");if (errorTag.get()) {// 抛出异常,回滚数据throw new RuntimeException("handleDataAsyncSonBack-子线程业务执行异常");}}/*** 模拟解析的excel等文件的数据*/private List<TestEntity> getData() {List<TestEntity> list = new ArrayList<>();// 此处模拟一万条数据for (int i = 1; i <= 10000; i++) {TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(i);entity.setCommodityCode("code-" + i);entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-" + i);list.add(entity);}return list;}
}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/173233.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【UnLua】在 Lua 中定义 UE 反射类型

【UnLua】在 Lua 中定义 UE 反射类型 用法 启动编辑器时遍历 Defines 目录下 lua 脚本来加载 UE 反射类型&#xff08;开个临时的 Lua VM 即可&#xff09;直接像 -- define a uenum in lua UEnum.EEnumGuestSomethingElse {Value1 1;Value2 2; }-- use it like a native …

NX二次开发UF_CURVE_ask_ocf_data 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_ocf_data Defined in: uf_curve.h int UF_CURVE_ask_ocf_data(tag_t feature, UF_CURVE_ocf_data_p_t * offset_data ) overview 概述 Returns the offset data for …

Halcon Solution Guide I basics(4.1): Blob Analysis 自主练习

文章目录 文章专栏前言自主练习题目输出电路板焊点个数解决方案:正确率&#xff1a;90 文章专栏 我的Halcon开发 CSDN 专栏 Halcon学习 练习项目gitee仓库 CSDN Major 博主Halcon文章推荐 随笔分类 - Halcon入门学习教程 前言 为了更加熟练的掌握Halcon的练习&#xff0c;我之…

基于SSM实现的叮当书城

一、系统架构 前端&#xff1a;jsp | jquery | layui 后端&#xff1a;spring | springmvc | mybatis 环境&#xff1a;jdk1.7以上 | mysql | maven 二、代码与数据库 三、功能介绍 01. 系统首页 02. 商品分类 03. 热销 04. 新品 05. 注册 06. 登录 07. 购物车 08. 后台-首页 …

Grafana采用Nginx反向代理

一、场景介绍 在常规操作中&#xff0c;一般情况下不会放开许多端口给外部访问&#xff0c;特别是直接 ip:port 的方式开放访问。但是 Grafana 的请求方式在默认情况下是没有任何规律可寻的。 为了满足业务需求&#xff08;后续通过 Nginx 统一一个接口暴露 N 个服务&#xf…

解决Vue编程式导航路由跳转不显示目标路径问题

我们配置一个编程式导航的路由跳转&#xff0c;跳转到 /search 页面&#xff0c;并且携带categoryName和categoryId两个query参数。 this.$router.push({path: "/search",query: {categoryName: dataset.categoryname,categoryId: dataset.categoryid} }) 如果我们…

霍夫丁不等式(Hoeffding‘s inequality)

参考资料&#xff1a;Hoeffdings inequality | encyclopedia article by TheFreeDictionary 霍夫丁不等式&#xff08;Hoeffdings inequality&#xff09;描述了随机变量的和、与和的期望之差的上限&#xff1b;或者表述为&#xff1a;随机变量的均值、与均值的期望之差的上限。…

2017年五一杯数学建模B题自媒体时代的消息传播问题解题全过程文档及程序

2017年五一杯数学建模 B题 自媒体时代的消息传播问题 原题再现 电视剧《人民的名义》中人物侯亮平说&#xff1a;“现在是自媒体时代&#xff0c;任何突发性事件几分钟就传播到全世界。”相对于传统媒体&#xff0c;以互联网技术为基础的自媒体以其信息传播的即时性、交往方式…

LeetCode198.打家劫舍

打家劫舍和背包问题一样是一道非常经典的动态规划问题&#xff0c;只要做过几道动态规划的题&#xff0c;这道题简直就非常容易做出来。我应该花了10来分钟左右就写出来了&#xff0c;动态规划问题最重要的就是建立状态转移方程&#xff0c;就是说如何从上一个状态转移到下一个…

【开源】基于Vue+SpringBoot的独居老人物资配送系统

项目编号&#xff1a; S 045 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S045&#xff0c;文末获取源码。} 项目编号&#xff1a;S045&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询社区4…

浏览器中实现可视化的方式有哪几种?带你盘点一下

前言 &#x1f4eb; 大家好&#xff0c;我是南木元元&#xff0c;热衷分享有趣实用的文章&#xff0c;希望大家多多支持&#xff0c;一起进步&#xff01; &#x1f345; 个人主页&#xff1a;南木元元 目录 可视化的含义 浏览器中实现可视化的4种方式 1. HTMLCSS 2. SVG …

springboot 自动配置

1. O(∩_∩)O 对应功能的starter --autoconfigure --寻找autoconfigure的META-INF/spring/org.springframework.boot.autoconfigure.Autoconfiguration.imports–加载所有自动配置类 自动配置类的作用&#xff1a;提供后续所需要的功能组件 在自动配置类中找到注解EnableConf…

A100 A800 H100 H800 模块

老美对A100、A800、H100和H800在内的多款AI芯片实施了出口限制&#xff0c; 目前&#xff0c;具体限制的时长并没有明确的公开信息。 科研人员在面对此类限制 &#xff0c;可能需要寻找替代的供应渠道&#xff0c;加强国内外合作&#xff0c; 或者加大在本土技术研发的投入&a…

如何查看电脑版Office的有效期

有时候点击Office账户看不到有效期信息&#xff0c;那么如何查看呢&#xff0c;其实用一条命令就可以查看。 首选WinR运行&#xff0c;输入cmd回车&#xff0c;然后输入下面的命令&#xff1a; cscript “C:\Program Files\Microsoft Office\Office16\ospp.vbs” /dstatus当然…

Java游戏制作——王者荣耀

一.准备工作 首先创建一个新的Java项目命名为“王者荣耀”&#xff0c;并在src下创建两个包分别命名为“com.sxt"、”com.stx.beast",在相应的包中创建所需的类。 创建一个名为“img”的文件夹来储存所需的图片素材。 二.代码呈现 package com.sxt;import javax.sw…

物联网AI 无线连接学习之蓝牙基础篇 协议概述

学物联网&#xff0c;来万物简单IoT物联网&#xff01;&#xff01; 1 蓝牙协议总体架构 1.1 Application层 应用属性层&#xff0c;通过API函数与协议栈交互&#xff1b; 1.2 Host层 Host层&#xff0c;逻辑链路控制及自适应协议层、安全管理层、属性协议层、通用访问配置…

Java8实战-总结49

Java8实战-总结49 CompletableFuture&#xff1a;组合式异步编程对多个异步任务进行流水线操作构造同步和异步操作将两个 CompletableFuture 对象整合起来&#xff0c;无论它们是否存在依赖 CompletableFuture&#xff1a;组合式异步编程 对多个异步任务进行流水线操作 构造同…

【开源】基于Vue+SpringBoot的企业项目合同信息系统

项目编号&#xff1a; S 046 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S046&#xff0c;文末获取源码。} 项目编号&#xff1a;S046&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 合同审批模块2.3 合…

深入理解 Docker 核心原理:Namespace、Cgroups 和 Rootfs

来自&#xff1a;探索云原生 https://www.lixueduan.com 原文&#xff1a;https://www.lixueduan.com/posts/docker/03-container-core/ 通过这篇文章你可以了解到 Docker 容器的核心实现原理&#xff0c;包括 Namespace、Cgroups、Rootfs 等三个核心功能。 后续文章会演示如…