一、背景
数据回滚
二、难点
2.1 需要处理的数据涉及多达数万个用户,每个用户涉及的表达到10个
2.2 时间紧急,需要快速回滚,数据需要完整
2.3 数据存在重复或空缺问题
三、解决方案
3.1 数据多,使用分批处理,把大任务分割成若干个小任务
3.2 时间紧,使用多线程CompletableFuture处理,提高处理效率
3.3 mysql数据有些是重复,需要去重,使用not exist处理,保障数据完整
四、案例代码
@Slf4j
public class DataRollBackProcessTest {// 自定义线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 600,TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));@Testpublic void startTest() throws ExecutionException, InterruptedException {List<Integer> list = new ArrayList<>();for (int i = 1; i <= 100; i++) {list.add(i);}concurrentProcess(list);}/*** * 并行处理,全部异步任务执行完才一起返回** @param list* @throws ExecutionException* @throws InterruptedException*/public void concurrentProcess(List<Integer> list) throws ExecutionException, InterruptedException {// 定义一个集合切割为小任务时每个任务的大小,int taskSize = 5;List<List<Integer>> divideList = divide(list, taskSize);// 创建一个CompletableFuture数组,用于存储异步操作的结果CompletableFuture<Void>[] futures = new CompletableFuture[divideList.size()];// 循环10次,每次执行一次异步操作for (int i = 0; i < divideList.size(); i++) {int index = i;CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 异步操作,可以在这里执行你的任务try {simulateLongDurationTasks(divideList.get(index));} catch (InterruptedException e) {e.printStackTrace();}System.out.println("异步操作 " + index + " 执行完成");}, threadPoolExecutor);// 将CompletableFuture对象存储到数组中futures[i] = future;}// 使用CompletableFuture.allOf等待所有异步操作完成CompletableFuture<Void> allOf = CompletableFuture.allOf(futures);// 阻塞,等待所有异步操作完成allOf.get();System.out.println("所有异步操作执行完成");}/*** 集合切分** @param origin* @param size* @param <T>* @return*/public <T> List<List<T>> divide(List<T> origin, int size) {if (origin == null || origin.size() == 0) {return Collections.emptyList();}int block = (origin.size() + size - 1) / size;return IntStream.range(0, block).boxed().map(i -> {int start = i * size;int end = Math.min(start + size, origin.size());return origin.subList(start, end);}).collect(Collectors.toList());}/*** 模拟耗时的任务* <p>* 需求背景:* 需要把一组用户的数据复制到另一组用户,生成sql脚本如下,为了简略,* 使用Thread.sleep替换耗时任务* <p>* -- 把B用户的数据插入到A用户,且A用户不存在相同的数据* sql使用点1: INSERT INTO student from* sql使用点2: NOT EXISTS** INSERT INTO student (uid, STATUS, age, sex) SELECT* 61442, -- A用户* STATUS,* age,* sex* FROM* student t1* WHERE* t1.uid = 682801 -- B用户* AND t1. STATUS = 1* AND NOT EXISTS (* SELECT* t2.id* FROM* student t2* WHERE* t2.uid = 61442* AND t2.age = t1.age* AND t2.sex = t1.sex* );*/public void simulateLongDurationTasks(List<Integer> subList) throws InterruptedException {if (subList == null || subList.size() == 0) {return;}int sleepSeconds = subList.stream().mapToInt(e -> e).reduce(0, Integer::sum);log.info("thread id:{}, thread name:{}, thread states:{}, Thread.activeCount:{}, thread sleep:{}",Thread.currentThread().getId(),Thread.currentThread().getName(),Thread.currentThread().getState(),Thread.activeCount(),sleepSeconds);Thread.sleep(sleepSeconds);}
}
五、总结
使用分批处理,结合多线程,提高处理效率
多线程处理需要考虑系统资源竞争问题、顺序问题