SpringBoot多线程,保证各个子线程和主线程事物一致性
- 1、第一种写法
- 1.1、TransactionalUntil工具类
- 1.2、service业务类
- 2、第二种写法
- 2.1、service业务类
1、第一种写法
1.1、TransactionalUntil工具类
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;import javax.annotation.Resource;@Component
public class TransactionalUntil {@Resourceprivate DataSourceTransactionManager dataSourceTransactionManager;/*** 开启事务*/public TransactionStatus begin() {TransactionStatus transaction = dataSourceTransactionManager.getTransaction(new DefaultTransactionAttribute());return transaction;}/*** 提交事务*/public void commit(TransactionStatus transactionStatus) {dataSourceTransactionManager.commit(transactionStatus);}/*** 回滚事务*/public void rollback(TransactionStatus transactionStatus) {dataSourceTransactionManager.rollback(transactionStatus);}}
1.2、service业务类
其实主要就是用CountDownLatch类的特性,和手动提交事物,来保证主线程和子线程事物的一致性。
@Resourceprivate TransactionalUntil transactionalUntil;private volatile Boolean is_ok = new Boolean(true);@Transactional@Overridepublic void trs1() throws Exception{// 监控子线程数据CountDownLatch childMonitor = new CountDownLatch(2);// 主线程收集子线程运行最终结果List<Boolean> childResponse = new CopyOnWriteArrayList<>();//子线程在该对象上等待主线程的通知CountDownLatch mainMonitor = new CountDownLatch(1);//业务代码查询,可以换成自己的业务场景ResVmCustom resVmCustom1 = resVmCustomMapper.selectById(1);resVmCustom1.setCpu(3);resVmCustomMapper.updateById(resVmCustom1);CompletableFuture.runAsync(() -> {TransactionStatus transactionStatus = transactionalUntil.begin();try {ResVmCustom resVmCustom2 = resVmCustomMapper.selectById(2);resVmCustom2.setCpu(3);//Db.saveOrUpdate(resVmCustom2);resVmCustomMapper.updateById(resVmCustom2);childResponse.add(Boolean.TRUE);childMonitor.countDown();mainMonitor.await();if (is_ok) {// 事务提交log.info("线程 {} 正常执行,线程事务提交", Thread.currentThread().getName());transactionalUntil.commit(transactionStatus);} else {// 事务回滚log.info("线程 {} 执行出现异常,线程事务回滚", Thread.currentThread().getName());transactionalUntil.rollback(transactionStatus);}} catch (Exception e) {// 提交失败log.info("线程 {} 执行出现异常", Thread.currentThread().getName());childMonitor.countDown();childResponse.add(Boolean.FALSE);transactionalUntil.rollback(transactionStatus);}});CompletableFuture.runAsync(() -> {TransactionStatus transactionStatus = transactionalUntil.begin();try {ResVmCustom resVmCustom3 = resVmCustomMapper.selectById(3);resVmCustom3.setCpu(3);resVmCustomMapper.updateById(resVmCustom3);childResponse.add(Boolean.TRUE);childMonitor.countDown();mainMonitor.await();if (is_ok) {// 事务提交log.info("线程 {} 正常执行,线程事务提交", Thread.currentThread().getName());transactionalUntil.commit(transactionStatus);} else {// 事务回滚log.info("线程 {} 执行出现异常,线程事务回滚", Thread.currentThread().getName());transactionalUntil.rollback(transactionStatus);}} catch (Exception e) {// 提交失败log.info("线程 {} 执行出现异常", Thread.currentThread().getName());childMonitor.countDown();childResponse.add(Boolean.FALSE);transactionalUntil.rollback(transactionStatus);}});childMonitor.await();for (Boolean res : childResponse) {while (!res) {// 如果有一个子线程只想失败,改变mainResult状态 事务回滚log.info("有线程执行失败,修改标识位,事务回滚");is_ok = false;break;}}//主线程获取结果,子线程根据主线程的结果 提交或回滚mainMonitor.countDown();/*** 下面这句话,保证了主线程和子线程一致* 如果不想主线程和子线程事物保持一致,注释下面这句话就行*/if (!is_ok) {throw new RuntimeException();}}
2、第二种写法
2.1、service业务类
TransactionalUntil 工具类还用和上面的一样就行…
/*** SpringBoot默认的自己的线程池*/@Resourceprivate ThreadPoolTaskExecutor executor;@Transactional@Overridepublic void trs2() throws Exception {// 假设三个线程要执行// 监控子线程数据CountDownLatch childMonitor = new CountDownLatch(3);// 主线程收集子线程运行最终结果List<Boolean> childResponse = new CopyOnWriteArrayList<>();//子线程在该对象上等待主线程的通知CountDownLatch mainMonitor = new CountDownLatch(1);for (int i = 1; i <= 3; i++) {int finalI = i;executor.execute(() -> {TransactionStatus transactionStatus = transactionalUntil.begin();try {//数据库增删改操作//此时只是用线程的个数i,当成业务的主键id,你自己重新声明其他变量来替代主键业务ID也行ResVmCustom resVmCustom = resVmCustomMapper.selectById(finalI);resVmCustom.setCpu(3);resVmCustomMapper.updateById(resVmCustom);childResponse.add(Boolean.TRUE);childMonitor.countDown();mainMonitor.await();if (is_ok) {// 事务提交log.info("线程 {} 正常执行,线程事务提交", Thread.currentThread().getName());transactionalUntil.commit(transactionStatus);} else {// 事务回滚log.info("线程 {} 执行出现异常,线程事务回滚", Thread.currentThread().getName());transactionalUntil.rollback(transactionStatus);}} catch (Exception e) {// 提交失败log.info("线程 {} 执行出现异常", Thread.currentThread().getName());childMonitor.countDown();childResponse.add(Boolean.FALSE);transactionalUntil.rollback(transactionStatus);}});}childMonitor.await();for (Boolean res : childResponse) {while (!res) {// 如果有一个子线程只想失败,改变mainResult状态 事务回滚log.info("有线程执行失败,修改标识位,事务回滚");is_ok = false;break;}}is_ok = false;//主线程获取结果,子线程根据主线程的结果 提交或回滚mainMonitor.countDown();/*** 下面这句话,保证了主线程和子线程一致* 如果不想主线程和子线程事物保持一致,注释下面这句话就行*/if (!is_ok) {throw new RuntimeException();}}