场景
公司定时任务因数据量过大运行时间太久,大约3-4个小时,需要优化代码。数据量一旦变大,普通的修改操作也会变得复杂。
原代码
OpsPriceServiceImpl
@Override
public ExpireProductPriceRefeshResponse refeshExpireProductPrice(ExpireProductPriceRefeshRequest request) {if (log.isDebugEnabled()) {log.debug("begin refeshExpireProductPrice request: {}", JsonUtil.toJson(request));}ExpireProductPriceRefeshResponse response = new ExpireProductPriceRefeshResponse();try {request.validate(Constant.SUB_SYSTEM, ErpExceptionType.VCE13008);Long tenantNumId = request.getTenantNumId();Long dataSign = request.getDataSign();Date today = DateUtils.parse(DateUtils.format(new Date()));List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> shopLogs = mdmsPProductShopPriceLogDao.queryProductShopLogByEndDay(tenantNumId, dataSign, today);if (CollectionUtils.isNotEmpty(shopLogs)) {CompletableFuture.runAsync(() -> {for (MDMS_P_PRODUCT_SHOP_PRICE_LOG log : shopLogs) {mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());}}, opsThreadPoolExecutor).exceptionally(ex -> {log.error("refeshExpireProductPrice failed, cause: " + ex.getMessage());return null;});}} catch (Exception ex) {ExceptionUtil.processException(ex, response);}if (log.isDebugEnabled()) {log.debug("end refeshExpireProductPrice response:{}", JsonUtil.toJson(response));}return response;
}
mdmsPProductShopDao
public void updateShopPrice(Long tenantNumId, Long dataSign, String cortNumId, String subUnitNumId, String itemNumId,Double price, Long usrNumId, int priceType) {StringBuilder sb = new StringBuilder();sb.append("update mdms_p_product_shop set last_updtme = now() ");if (priceType == 501) {sb.append(",retail_price =?");} else if (priceType == 502) {sb.append(",member_price =?");} else if (priceType == 503) {sb.append(",member_day_price =?");} else if (priceType == 504) {sb.append(",tiny_price =?");} else if (priceType == 505) {sb.append(",health_care_price =?");} else if (priceType == 506) {sb.append(",floor_price =?");}sb.append(",last_update_user_id=? where tenant_num_id = ? and data_sign = ? and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ?");int row = jdbcTemplate.update(sb.toString(),new Object[]{price, usrNumId, tenantNumId, dataSign, cortNumId, subUnitNumId, itemNumId});if (row <= 0) {throw new DatabaseOperateException(Constant.SUB_SYSTEM, ErpExceptionType.DOE33008,"更新零售价价格失败!门店号:" + subUnitNumId + " 商品编号:" + itemNumId);}
}
mdmsPProductShopPriceLogDao
public void updateShopLogNotValid(Long tenantNumId, Long dataSign,Long usrNumId,String cortNumId,String subUnitNumId, String itemNumId , String series) {String sql = "update mdms_p_product_shop_price_log set is_valid =0,last_updtme = now(),last_update_user_id=? "+ " where tenant_num_id = ? and data_sign = ? and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ? "+ " and series=? and cancelsign='N'";int row = jdbcTemplate.update(sql,new Object[]{ usrNumId,tenantNumId, dataSign,cortNumId, subUnitNumId, itemNumId,series});
}
问题
由以上代码可见,代码虽使用CompletableFuture.runAsync()
进行异步执行,但下方通过增强for循环单个进行修改操作,且调用的方法通过了if else
进行了多次判断,导致速度缓慢。
思路
第一个想法
通过将if else进行舍弃将其中sql拼接不通过priceType
进行判断,认真读代码后,发现是进行不同字段的修改,放弃。
第二个想法
将if else
改为switch
,同时将OpsPriceServiceImpl
中的增强for循环
改为stream流形式
。当数据量不超过10万条数据时,增强for循环
速度快,但超过时stream流形式
更为快速。
原OpsPriceServiceImpl
for (MDMS_P_PRODUCT_SHOP_PRICE_LOG log : shopLogs) {mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());}
改为
shopLogs.stream().forEach(log->{mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());});
后发现stream流循环
通过parallelStream
可以进行并行操作,后将代码改为
shopLogs.parallelStream().forEach(log->{mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());});
原updateShopPrice方法
if (priceType == 501) {sb.append(",retail_price =?");
} else if (priceType == 502) {sb.append(",member_price =?");
} else if (priceType == 503) {sb.append(",member_day_price =?");
} else if (priceType == 504) {sb.append(",tiny_price =?");
} else if (priceType == 505) {sb.append(",health_care_price =?");
} else if (priceType == 506) {sb.append(",floor_price =?");
}
改为
switch (priceType) {case 501:sb.append(",retail_price =?");break;case 502:sb.append(",member_price =?");break;case 503:sb.append(",member_day_price =?");break;case 504:sb.append(",tiny_price =?");break;case 505:sb.append(",health_care_price =?");break;case 506:sb.append(",floor_price =?");break;
}
第三个想法
虽效率提高了部分,但感觉还是不够满意。原代码修改操作通过jdbcTemplate.update()
进行单条处理,决定通过jdbcTemplate.batchUpdate()
进行批量处理。因每循环一次需要对两个方法进行修改操作,可能会出现数据不统一问题,将这部分代码加入事务,出现异常后回滚(masterDataTransactionManager.getTransaction(TransactionUtil.newTransactionDefinition(300))
)。
修改后全部代码
OpsPriceServiceImpl
@Override
public ExpireProductPriceRefeshResponse refeshExpireProductPrice(ExpireProductPriceRefeshRequest request) {if (log.isDebugEnabled()) {log.debug("begin refeshExpireProductPrice request: {}", JsonUtil.toJson(request));}ExpireProductPriceRefeshResponse response = new ExpireProductPriceRefeshResponse();try {request.validate(Constant.SUB_SYSTEM, ErpExceptionType.VCE13008);Long tenantNumId = request.getTenantNumId();Long dataSign = request.getDataSign();Date today = DateUtils.parse(DateUtils.format(new Date()));List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> shopLogs = mdmsPProductShopPriceLogDao.queryProductShopLogByEndDay(tenantNumId, dataSign, today);if (CollectionUtils.isNotEmpty(shopLogs)) {TransactionStatus status = masterDataTransactionManager.getTransaction(TransactionUtil.newTransactionDefinition(300));try {CompletableFuture.runAsync(() -> {if (shopLogs.size() > 2000) {List<List<MDMS_P_PRODUCT_SHOP_PRICE_LOG>> partition = Lists.partition(shopLogs, 2000);for (List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> mdms_p_product_shop_price_logs : partition) {mdmsPProductShopDao.batchUpdateShopPrice(tenantNumId, dataSign, mdms_p_product_shop_price_logs);mdmsPProductShopPriceLogDao.batchUpdateShopLogNotValid(tenantNumId, dataSign, mdms_p_product_shop_price_logs);}} else {mdmsPProductShopDao.batchUpdateShopPrice(tenantNumId, dataSign, shopLogs);mdmsPProductShopPriceLogDao.batchUpdateShopLogNotValid(tenantNumId, dataSign, shopLogs);}}, opsThreadPoolExecutor).exceptionally(ex -> {log.error("refeshExpireProductPrice failed, cause: " + ex.getMessage());return null;});masterDataTransactionManager.commit(status);} catch (Exception ex) {masterDataTransactionManager.rollback(status);throw ex;}}} catch (Exception ex) {ExceptionUtil.processException(ex, response);}if (log.isDebugEnabled()) {log.debug("end refeshExpireProductPrice response:{}", JsonUtil.toJson(response));}return response;
}
mdmsPProductShopDao
public void batchUpdateShopPriceCase(Long tenantNumId, Long dataSign, StringBuilder sql, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> mdms_p_product_shop_price_logs) {sql.append(",last_update_user_id=? where tenant_num_id = ? and data_sign = ? and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ?");List<Object[]> batchArgs = new ArrayList<>();for (MDMS_P_PRODUCT_SHOP_PRICE_LOG priceLog : mdms_p_product_shop_price_logs) {Object[] object = new Object[]{ValidatorUtils.isNullOrZero(priceLog.getOLD_PRICE()) ? null : priceLog.getOLD_PRICE(), 1L, tenantNumId, dataSign, priceLog.getCORT_NUM_ID(), priceLog.getSUB_UNIT_NUM_ID(), priceLog.getITEM_NUM_ID()};batchArgs.add(object);}int[] ints = jdbcTemplate.batchUpdate(String.valueOf(sql), batchArgs);int sum = DaoUtil.sum(ints);if (sum != batchArgs.size()) {throw new DatabaseOperateException(Constant.SUB_SYSTEM, ErpExceptionType.DOE33008,"批量更新零售价价格失败!");}}public void batchUpdateShopPrice(Long tenantNumId, Long dataSign, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> mdms_p_product_shop_price_logs) {Map<Integer, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG>> collectMap = mdms_p_product_shop_price_logs.stream().collect(Collectors.groupingBy(MDMS_P_PRODUCT_SHOP_PRICE_LOG::getPRICE_TYPE));for (Integer integer : collectMap.keySet()) {StringBuilder sb = new StringBuilder();sb.append("update mdms_p_product_shop set last_updtme = now() ");switch (integer) {case 501:sb.append(",retail_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 502:sb.append(",member_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 503:sb.append(",member_day_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 504:sb.append(",tiny_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 505:sb.append(",health_care_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 506:sb.append(",floor_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;}}}
mdmsPProductShopPriceLogDao
public void batchUpdateShopLogNotValid(Long tenantNumId, Long dataSign, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> logList) {String sql = "update mdms_p_product_shop_price_log set is_valid =0,last_updtme = now(),last_update_user_id=? "+ " where tenant_num_id = ? and data_sign = ? and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ? "+ " and series=? and cancelsign='N'";List<Object[]> batchArgs = new ArrayList<>();for (MDMS_P_PRODUCT_SHOP_PRICE_LOG log : logList) {Object[] object = new Object[]{tenantNumId, dataSign, 1L, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), log.getSERIES()};batchArgs.add(object);}int[] ints = jdbcTemplate.batchUpdate(sql, batchArgs);int sum = DaoUtil.sum(ints);if (sum != batchArgs.size()) {throw new DatabaseOperateException(Constant.SUB_SYSTEM, ErpExceptionType.DOE33008,"批量更新零售价价格失败!");}}