😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CyclicBarrier实战应用——实现异步多线程业务处理,异常情况回滚全部子线程
⏱️ @ 创作时间: 2023年12月23日
目录
- 1、概述
- 2、方法说明:
- 3、代码实例
1、概述
CyclicBarrier是一个同步器工具类,用来协调多个线程之间的同步,通过await()
进行阻塞,直到所有的线程都执行await()
后,所有的线程再继续执行。
2、方法说明:
- public viod await() /int await(long timeout,TimeUnit unit) :使当前线程一直等待,除非线程被中断或超出了指定的等待时间。
当线程会被阻塞,直到下面的情况之一发生才会返回:- 如果每执行一次
await()
计数加一,直到达到初始值。 - 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
- 如果超出了指定的等待时间,则该方法根本不会再进行阻塞。
- 如果每执行一次
3、代码实例
Controller:
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate CyclicService cyclicService;/*** CyclicBarrier实现异步多线程业务处理,不同service,异常情况回滚全部子线程** @return*/@ApiOperation(value = "测试CountDownLatch", notes = "测试")@ApiOperationSupport(order = 5)@GetMapping("/cyclic/handleDataBack")public String cyclicHandleDataBack() {cyclicService.handleDataBack();return "success";}
Sevice:
@Service
@Slf4j
public class CountDownService {@Resourceprivate TestMapper testMapper;@Resourceprivate ApplicationContext applicationContext;/*** 主线程同时调用多个子线程执行不同业务处理,当其中一个子线程出现异常,则全部子线程进行回滚*/@Transactional(rollbackFor = Exception.class)public void handleDataBack() {AtomicBoolean errorTag = new AtomicBoolean(false);// 设置CyclicBarrier大小,需要比实际子线程+1,业务主线程需要进行阻塞CyclicBarrier cyclicBarrier = new CyclicBarrier(2 + 1);// 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚CountDownLatch errorCountDown = new CountDownLatch(1);// 异步调用其他Service,执行业务处理CyclicService bean = applicationContext.getBean(CyclicService.class);bean.handleTestOne(cyclicBarrier, errorCountDown, errorTag);bean.handleTestTwo(cyclicBarrier, errorCountDown, errorTag);try {// 主线程阻塞cyclicBarrier.await();// 可以设置最大阻塞时间,防止线程一直挂起,当子线程时间大于当前时间后会抛出TimeOut异常// cyclicBarrier.await(1, TimeUnit.SECONDS);// 继续执行后续的操作,比如insert、update等TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(1);entity.setCommodityCode("handleTestMain");entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-handleTestMain");testMapper.insert(entity);} catch (Exception e) {log.error("主线程业务执行异常");errorTag.set(true);} finally {// 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中errorCountDown.countDown();}// 如果出现异常if (errorTag.get()) {throw new RuntimeException("异步业务执行出现异常");}log.info("主线程执行完成");}/*** 子线程具体业务处理*/@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)@Asyncpublic void handleTestOne(CyclicBarrier cyclicBarrier, CountDownLatch errorCountDown, AtomicBoolean errorTag) {log.info("开始执行handleTestOne线程");// 模拟业务耗时ThreadUtil.sleep(2000);try {// 执行数据库操作TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(1);entity.setCommodityCode("handleTestOne");entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-handleTestOne");testMapper.insert(entity);// 模拟出现异常int a = 1 / 0;} catch (Exception e) {errorTag.set(true);}// 子线程中,业务处理完成后,利用cyclicBarrier的特性,计数器加一操作try {cyclicBarrier.await();} catch (Exception e) {errorTag.set(true);}// 子阻塞,直到其他子线程完成操作try {errorCountDown.await();} catch (Exception e) {errorTag.set(true);}log.info("handleTestOne-子线程执行完成");if (errorTag.get()) {// 抛出异常,回滚数据throw new RuntimeException("handleTestOne-子线程业务执行异常");}}/*** 子线程具体业务处理*/@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)@Asyncpublic void handleTestTwo(CyclicBarrier cyclicBarrier, CountDownLatch errorCountDown, AtomicBoolean errorTag) {log.info("开始执行handleTestTwo线程");// 模拟业务耗时ThreadUtil.sleep(500);try {// 执行数据库操作TestEntity entity = new TestEntity();entity.setId(new Random().nextInt(999999999));entity.setCount(1);entity.setCommodityCode("handleTestTwo");entity.setMoney(new Random().nextInt(1000000));entity.setUserId("user-handleTestTwo");testMapper.insert(entity);} catch (Exception e) {errorTag.set(true);}// 子线程中,业务处理完成后,利用cyclicBarrier的特性,计数器加一操作try {cyclicBarrier.await();} catch (Exception e) {errorTag.set(true);}// 子阻塞,直到其他子线程完成操作try {errorCountDown.await();} catch (Exception e) {errorTag.set(true);}log.info("handleTestTwo-子线程执行完成");if (errorTag.get()) {// 抛出异常,回滚数据throw new RuntimeException("handleTestTwo-子线程业务执行异常");}}
}