😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)
⏱️ @ 创作时间: 2023年11月26日
目录
- 前言
- 1、概述
- 2、方法说明:
- 3、代码实例
前言
通过CyclicBarrier
与CountDownLatch
配合开启多个子线程,由子线程完成数据的处理,子线程完成数据处理后进行等待,直到所有的子线程完成数据处理后,再判断是否进行回滚,如果需要回滚则所有线程执行回滚操作
如果需要由子线程处理完数据,但是由主线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630750
1、概述
CyclicBarrier是一个同步器工具类,用来协调多个线程之间的同步,通过await()
进行阻塞,直到所有的线程都执行await()
后,所有的线程再继续执行。
2、方法说明:
- public viod await() /int await(long timeout,TimeUnit unit) :使当前线程一直等待,除非线程被中断或超出了指定的等待时间。
当线程会被阻塞,直到下面的情况之一发生才会返回:- 如果每执行一次
await()
计数加一,直到达到初始值。 - 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
- 如果超出了指定的等待时间,则该方法根本不会再进行阻塞。
- 如果每执行一次
3、代码实例
有用到hutool的工具包,pom如下:
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.0.7</version></dependency>
Controller:
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate CyclicService cyclicService;/*** CyclicBarrier实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据** @return*/@ApiOperationSupport(order = 5)@GetMapping("/cyclic/handleDataSonBack")public String handleDataSonBack() {cyclicService.handleDataSonBack();return "success";}
Sevice:
@Service
@Slf4j
public class CountDownService {@Resourceprivate TestMapper testMapper;@Resourceprivate ApplicationContext applicationContext;/*** CyclicBarrier实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据** @return*/@Transactional(rollbackFor = Exception.class)public void handleDataSonBack() {List<TestEntity> testList = getData();AtomicBoolean errorTag = new AtomicBoolean(false);long start = System.currentTimeMillis();// 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定// 比如:一万条数据,每条单独处理需要50ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定// 需要使用hutool工具类进行分组// 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量List<List<TestEntity>> splitList = CollUtil.split(testList, 200);// 设置CyclicBarrier大小,需要比实际子线程+1,业务主线程需要进行阻塞CyclicBarrier cyclicBarrier = new CyclicBarrier(splitList.size() + 1);// 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚CountDownLatch errorCountDown = new CountDownLatch(1);// 异步调用其他Service,执行业务处理CyclicService bean = applicationContext.getBean(CyclicService.class);// 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用ExecutorService executorService = Executors.newCachedThreadPool();splitList.forEach(list -> {// 线程处理executorService.execute(() -> {bean.handleDataAsyncSonBack(list, cyclicBarrier, errorCountDown, errorTag);});});executorService.shutdown();try {// 主线程阻塞,直到子线程执行完成cyclicBarrier.await();// 可以设置最大阻塞时间,防止线程一直挂起,当子线程时间大于当前时间后会抛出TimeOut异常// cyclicBarrier.await(5, TimeUnit.SECONDS);// 模拟执行主线程业务逻辑耗时,比如insert、update等ThreadUtil.sleep(20);log.info("继续执行主线程");// 继续执行后续的操作,比如insert、update等TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(1);entity.setCommodityCode("handleTestMain");entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-handleTestMain");testMapper.insert(entity);} catch (Exception e) {log.error("主线程业务执行异常");errorTag.set(true);} finally {// 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中errorCountDown.countDown();}long end = System.currentTimeMillis();log.info("数据处理完成,耗时:{}", (end - start) / 1000);// 如果出现异常if (errorTag.get()) {throw new RuntimeException("异步业务执行出现异常");}log.info("主线程执行完成");}/*** 子线程异步处理,并且实现回滚* 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量*/@Transactional(rollbackFor = Exception.class)public void handleDataAsyncSonBack(List<TestEntity> list, CyclicBarrier cyclicBarrier, CountDownLatch errorCountDown, AtomicBoolean errorTag) {try {log.info("开始执行子线程");for (TestEntity entity : list) {if (errorTag.get()) {break;}// 对实体类的业务处理,此处模拟业务处理,耗时50msThreadUtil.sleep(50);// 数据库查询操作testMapper.insert(entity);// 模拟数据处理中,出现了异常if (entity.getCount().equals(2000)) {throw new RuntimeException("子线程执行异常");}}} catch (Exception e) {log.error("子线程异常:{}", e.getMessage(), e);errorTag.set(true);} finally {// 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作try {cyclicBarrier.await();} catch (Exception e) {errorTag.set(true);}}log.info("handleDataAsyncSonBack-业务处理完成从,等待其他子线程");// 子阻塞,直到其他子线程完成操作try {errorCountDown.await();} catch (Exception e) {errorTag.set(true);}log.info("handleDataAsyncSonBack-子线程执行完成");if (errorTag.get()) {// 抛出异常,回滚数据throw new RuntimeException("handleDataAsyncSonBack-子线程业务执行异常");}}/*** 模拟解析的excel等文件的数据*/private List<TestEntity> getData() {List<TestEntity> list = new ArrayList<>();// 此处模拟一万条数据for (int i = 1; i <= 10000; i++) {TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(i);entity.setCommodityCode("code-" + i);entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-" + i);list.add(entity);}return list;}
}