1、背景
最近接受到接口优化的任务,查看代码逻辑后发现在批量处理数据耗时长,想到使用多线程处理批量数据,又要保持原来的事务一致性。
2、实现方法
(1)、创建多线程事务管理
@Component
@Slf4j
public class MultiThreadingTransactionManager {/*** 数据源事务管理器*/@Autowiredprivate DataSourceTransactionManager dataSourceTransactionManager;@Autowiredprivate ThreadPoolTaskExecutor executorService;private long timeout = 120;/*** 用于判断子线程业务是否处理完成* 处理完成时threadCountDownLatch的值为0*/private CountDownLatch threadCountDownLatch;/*** 用于等待子线程全部完成后,子线程统一进行提交和回滚* 进行提交和回滚时mainCountDownLatch的值为0*/private final CountDownLatch mainCountDownLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true,当子线程有异常发生时,设置为false,回滚事务*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);public boolean execute(List<Runnable> runnableList,String factorySchema) {isSubmit.set(true);setThreadCountDownLatch(runnableList.size());runnableList.forEach(runnable -> executorService.execute(() -> executeThread(factorySchema,runnable, threadCountDownLatch, mainCountDownLatch, isSubmit)));// 等待子线程全部执行完毕try {// 若计数器变为零了,则返回 trueboolean isFinish = threadCountDownLatch.await(timeout, TimeUnit.SECONDS);if (!isFinish) {// 如果还有为执行完成的就回滚isSubmit.set(false);log.info("存在子线程在预期时间内未执行完毕,任务将全部回滚");}} catch (Exception exception) {log.info("主线程发生异常,异常为: " + exception.getMessage());} finally {// 计数器减1,代表该主线程执行完毕mainCountDownLatch.countDown();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}private void executeThread(String factorySchema,Runnable runnable, CountDownLatch threadCountDownLatch, CountDownLatch mainCountDownLatch, AtomicBoolean isSubmit) {log.info("子线程: [" + Thread.currentThread().getName() + "]");// 判断别的子线程是否已经出现错误,错误别的线程已经出现错误,那么所有的都要回滚,这个子线程就没有必要执行了if (!isSubmit.get()) {log.info("整个事务中有子线程执行失败需要回滚, 子线程: [" + Thread.currentThread().getName() + "] 终止执行");// 计数器减1,代表该子线程执行完毕threadCountDownLatch.countDown();return;}//动态数据源切换SchemaContextHolder.setSchema(factorySchema);// 开启事务DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);try {// 执行业务逻辑runnable.run();} catch (Exception exception) {// 发生异常需要进行回滚,设置isSubmit为falseisSubmit.set(false);log.info("子线程: [" + Thread.currentThread().getName() + "]执行业务发生异常,异常为: " + exception.getMessage());} finally {// 计数器减1,代表该子线程执行完毕threadCountDownLatch.countDown();}try {// 等待主线程执行mainCountDownLatch.await();} catch (Exception exception) {log.info("子线程: [" + Thread.currentThread().getName() + "]等待提交或回滚异常,异常为: " + exception.getMessage());}try {// 提交if (isSubmit.get()) {dataSourceTransactionManager.commit(transactionStatus);log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交");} else {dataSourceTransactionManager.rollback(transactionStatus);log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务回滚");}} catch (Exception exception) {log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交或回滚出现异常,异常为:" + exception.getMessage());}}private void setThreadCountDownLatch(int num) {this.threadCountDownLatch = new CountDownLatch(num);}
}
(2)、测试类
@RestController
@RequestMapping("test")
public class TestController {@AutowiredTestService testService;@AutowiredMultiThreadingTransactionManager multiThreadingTransactionManager;@RequestMapping("test")public String test(){List<TestBean> list = new ArrayList<>();list.add(new TestBean("2",1));list.add(new TestBean("3",2));List<Runnable> runnableList = new ArrayList<>();list.forEach(testBean -> runnableList.add(() -> {testService.insert(testBean);}));boolean isSuccess = multiThreadingTransactionManager.execute(runnableList,"db9771");System.out.println(isSuccess);return "ok";};
}
3、总结
大体思路,就是所有子线程在各自线程内开启事务,执行业务逻辑后,判断是否抛错,一旦抛错,会把全局AtomicBoolean置为false,因为其具有原子性所以不会有线程不安全问题。所有子线程完业务代码会等待主线程,全部子线程执行业务结束后,主线程等待结束,判断AtomicBoolean是什么状态,一旦false,所有子线程回滚,否则提交。