1.业务场景
因为需要从一个返利明细表中获取大量的数据,生成返利报告,耗时相对较久,作为后台任务执行。但是后台任务如果不用多线程处理,也会要很长时间才能处理完。
另外考虑到数据量大,不能一次查询所有数据在内存中处理,为了防止内存溢出,分页查询数据,然后分批次多线程处理。
2.关键代码
//线程池配置
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,10,10L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());public String generateReport(String periodType, String monthWid, String quarterWid) {int totalNum = 0;//计时器StopWatch stopWatch = new StopWatch();stopWatch.start();try {//这里省略了一些其他的逻辑,只关注分页查询然后多线程任务处理的逻辑......//查询总数量totalNum = getReportTotalNum(periodType, monthWid, quarterWid, totalNum);int pageIndex = 0;int pageSize = 500;int pageNum = 1;StoreRebateDetailForReportQueryReq req = null;while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {、//分页查询,每页500条数据pageIndex = pageSize * (pageNum - 1);List<StoreRebateDetail> list = storeRebateDetailService.selectListForRebateReport(pageIndex, pageSize);int batchNum = list.size();//每个线程处理100条 int perThreadCount = 100;LOGGER.info("开始处理第{}页(共{}条)数据", pageNum, batchNum);final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器for (int j = 0; j < batchNum; j++) {//每100条一个线程处理if (j % perThreadCount == 0) {int start = j;int end = (batchNum - j) >= perThreadCount ? (j + perThreadCount) : batchNum;int pageNums = pageNum;poolExecutor.submit(()->{LOGGER.info("第{}页的第{}-{}条数据处理开始", pageNums, start+1, end);//处理比较复杂的业务逻辑(耗时较久)processInsert(list, start, end);LOGGER.info("第{}页的第{}-{}条数据处理结束", pageNums, start+1, end);cdl.countDown();});}}cdl.await();pageNum++;}stopWatch.stop();double totalTimeSeconds = stopWatch.getTotalTimeSeconds();result.put("syncStatus", "success");result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");return SToolUtils.convertResultJSONObj(CommonAbstractService.SUCCESS_STATUS, "处理成功", totalNum, new JSONArray().fluentAdd(result)).toString();} catch (Exception e) {stopWatch.stop();double totalTimeSeconds = stopWatch.getTotalTimeSeconds();LOGGER.error("调度处理异常:{}--{}", e.getMessage(), e);result.put("syncStatus", "fail");result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");return SToolUtils.convertResultJSONObj(CommonAbstractService.ERROR_STATUS, "处理异常", 0, new JSONArray().fluentAdd(result)).toString();} finally {//做业务需要处理的,可以没有}}
3.测试效果
原来跑一个月的数据需要40多分钟,后面通过这样处理后,采用5个线程跑,时间缩短至8分钟左右,相当于差不多时间缩短到原来的1/5。