百万数据量修改数据思路及方法

场景

公司定时任务因数据量过大运行时间太久,大约3-4个小时,需要优化代码。数据量一旦变大,普通的修改操作也会变得复杂。

原代码

OpsPriceServiceImpl

@Override
public ExpireProductPriceRefeshResponse refeshExpireProductPrice(ExpireProductPriceRefeshRequest request) {if (log.isDebugEnabled()) {log.debug("begin refeshExpireProductPrice request: {}", JsonUtil.toJson(request));}ExpireProductPriceRefeshResponse response = new ExpireProductPriceRefeshResponse();try {request.validate(Constant.SUB_SYSTEM, ErpExceptionType.VCE13008);Long tenantNumId = request.getTenantNumId();Long dataSign = request.getDataSign();Date today = DateUtils.parse(DateUtils.format(new Date()));List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> shopLogs = mdmsPProductShopPriceLogDao.queryProductShopLogByEndDay(tenantNumId, dataSign, today);if (CollectionUtils.isNotEmpty(shopLogs)) {CompletableFuture.runAsync(() -> {for (MDMS_P_PRODUCT_SHOP_PRICE_LOG log : shopLogs) {mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());}}, opsThreadPoolExecutor).exceptionally(ex -> {log.error("refeshExpireProductPrice failed, cause: " + ex.getMessage());return null;});}} catch (Exception ex) {ExceptionUtil.processException(ex, response);}if (log.isDebugEnabled()) {log.debug("end refeshExpireProductPrice response:{}", JsonUtil.toJson(response));}return response;
}

mdmsPProductShopDao

public void updateShopPrice(Long tenantNumId, Long dataSign, String cortNumId, String subUnitNumId, String itemNumId,Double price, Long usrNumId, int priceType) {StringBuilder sb = new StringBuilder();sb.append("update mdms_p_product_shop set last_updtme = now() ");if (priceType == 501) {sb.append(",retail_price =?");} else if (priceType == 502) {sb.append(",member_price =?");} else if (priceType == 503) {sb.append(",member_day_price =?");} else if (priceType == 504) {sb.append(",tiny_price =?");} else if (priceType == 505) {sb.append(",health_care_price =?");} else if (priceType == 506) {sb.append(",floor_price =?");}sb.append(",last_update_user_id=?  where tenant_num_id = ? and data_sign = ? and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ?");int row = jdbcTemplate.update(sb.toString(),new Object[]{price, usrNumId, tenantNumId, dataSign, cortNumId, subUnitNumId, itemNumId});if (row <= 0) {throw new DatabaseOperateException(Constant.SUB_SYSTEM, ErpExceptionType.DOE33008,"更新零售价价格失败!门店号:" + subUnitNumId + " 商品编号:" + itemNumId);}
}

mdmsPProductShopPriceLogDao

public void updateShopLogNotValid(Long tenantNumId, Long dataSign,Long usrNumId,String cortNumId,String subUnitNumId, String itemNumId , String  series) {String sql = "update mdms_p_product_shop_price_log set is_valid =0,last_updtme = now(),last_update_user_id=? "+ " where tenant_num_id = ? and data_sign = ?  and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ?  "+ " and  series=?  and cancelsign='N'";int row = jdbcTemplate.update(sql,new Object[]{ usrNumId,tenantNumId, dataSign,cortNumId, subUnitNumId, itemNumId,series});
}

问题

由以上代码可见,代码虽使用CompletableFuture.runAsync()进行异步执行,但下方通过增强for循环单个进行修改操作,且调用的方法通过了if else进行了多次判断,导致速度缓慢。

思路

第一个想法

通过将if else进行舍弃将其中sql拼接不通过priceType进行判断,认真读代码后,发现是进行不同字段的修改,放弃。

第二个想法

if else改为switch,同时将OpsPriceServiceImpl中的增强for循环改为stream流形式。当数据量不超过10万条数据时,增强for循环速度快,但超过时stream流形式更为快速。
原OpsPriceServiceImpl

  for (MDMS_P_PRODUCT_SHOP_PRICE_LOG log : shopLogs) {mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());}

改为

   shopLogs.stream().forEach(log->{mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());});

后发现stream流循环通过parallelStream可以进行并行操作,后将代码改为

   shopLogs.parallelStream().forEach(log->{mdmsPProductShopDao.updateShopPrice(tenantNumId, dataSign, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), ValidatorUtils.isNullOrZero(log.getOLD_PRICE()) ? null:log.getOLD_PRICE(), 1L, log.getPRICE_TYPE());mdmsPProductShopPriceLogDao.updateShopLogNotValid(tenantNumId,dataSign,1L,log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(),log.getITEM_NUM_ID(),log.getSERIES());});

原updateShopPrice方法

if (priceType == 501) {sb.append(",retail_price =?");
} else if (priceType == 502) {sb.append(",member_price =?");
} else if (priceType == 503) {sb.append(",member_day_price =?");
} else if (priceType == 504) {sb.append(",tiny_price =?");
} else if (priceType == 505) {sb.append(",health_care_price =?");
} else if (priceType == 506) {sb.append(",floor_price =?");
}

改为

switch (priceType) {case 501:sb.append(",retail_price =?");break;case 502:sb.append(",member_price =?");break;case 503:sb.append(",member_day_price =?");break;case 504:sb.append(",tiny_price =?");break;case 505:sb.append(",health_care_price =?");break;case 506:sb.append(",floor_price =?");break;
}

第三个想法

虽效率提高了部分,但感觉还是不够满意。原代码修改操作通过jdbcTemplate.update()进行单条处理,决定通过jdbcTemplate.batchUpdate()进行批量处理。因每循环一次需要对两个方法进行修改操作,可能会出现数据不统一问题,将这部分代码加入事务,出现异常后回滚(masterDataTransactionManager.getTransaction(TransactionUtil.newTransactionDefinition(300)))。
修改后全部代码
OpsPriceServiceImpl

@Override
public ExpireProductPriceRefeshResponse refeshExpireProductPrice(ExpireProductPriceRefeshRequest request) {if (log.isDebugEnabled()) {log.debug("begin refeshExpireProductPrice request: {}", JsonUtil.toJson(request));}ExpireProductPriceRefeshResponse response = new ExpireProductPriceRefeshResponse();try {request.validate(Constant.SUB_SYSTEM, ErpExceptionType.VCE13008);Long tenantNumId = request.getTenantNumId();Long dataSign = request.getDataSign();Date today = DateUtils.parse(DateUtils.format(new Date()));List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> shopLogs = mdmsPProductShopPriceLogDao.queryProductShopLogByEndDay(tenantNumId, dataSign, today);if (CollectionUtils.isNotEmpty(shopLogs)) {TransactionStatus status = masterDataTransactionManager.getTransaction(TransactionUtil.newTransactionDefinition(300));try {CompletableFuture.runAsync(() -> {if (shopLogs.size() > 2000) {List<List<MDMS_P_PRODUCT_SHOP_PRICE_LOG>> partition = Lists.partition(shopLogs, 2000);for (List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> mdms_p_product_shop_price_logs : partition) {mdmsPProductShopDao.batchUpdateShopPrice(tenantNumId, dataSign, mdms_p_product_shop_price_logs);mdmsPProductShopPriceLogDao.batchUpdateShopLogNotValid(tenantNumId, dataSign, mdms_p_product_shop_price_logs);}} else {mdmsPProductShopDao.batchUpdateShopPrice(tenantNumId, dataSign, shopLogs);mdmsPProductShopPriceLogDao.batchUpdateShopLogNotValid(tenantNumId, dataSign, shopLogs);}}, opsThreadPoolExecutor).exceptionally(ex -> {log.error("refeshExpireProductPrice failed, cause: " + ex.getMessage());return null;});masterDataTransactionManager.commit(status);} catch (Exception ex) {masterDataTransactionManager.rollback(status);throw ex;}}} catch (Exception ex) {ExceptionUtil.processException(ex, response);}if (log.isDebugEnabled()) {log.debug("end refeshExpireProductPrice response:{}", JsonUtil.toJson(response));}return response;
}

mdmsPProductShopDao

    public void batchUpdateShopPriceCase(Long tenantNumId, Long dataSign, StringBuilder sql, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> mdms_p_product_shop_price_logs) {sql.append(",last_update_user_id=?  where tenant_num_id = ? and data_sign = ? and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ?");List<Object[]> batchArgs = new ArrayList<>();for (MDMS_P_PRODUCT_SHOP_PRICE_LOG priceLog : mdms_p_product_shop_price_logs) {Object[] object = new Object[]{ValidatorUtils.isNullOrZero(priceLog.getOLD_PRICE()) ? null : priceLog.getOLD_PRICE(), 1L, tenantNumId, dataSign, priceLog.getCORT_NUM_ID(), priceLog.getSUB_UNIT_NUM_ID(), priceLog.getITEM_NUM_ID()};batchArgs.add(object);}int[] ints = jdbcTemplate.batchUpdate(String.valueOf(sql), batchArgs);int sum = DaoUtil.sum(ints);if (sum != batchArgs.size()) {throw new DatabaseOperateException(Constant.SUB_SYSTEM, ErpExceptionType.DOE33008,"批量更新零售价价格失败!");}}public void batchUpdateShopPrice(Long tenantNumId, Long dataSign, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> mdms_p_product_shop_price_logs) {Map<Integer, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG>> collectMap = mdms_p_product_shop_price_logs.stream().collect(Collectors.groupingBy(MDMS_P_PRODUCT_SHOP_PRICE_LOG::getPRICE_TYPE));for (Integer integer : collectMap.keySet()) {StringBuilder sb = new StringBuilder();sb.append("update mdms_p_product_shop set last_updtme = now() ");switch (integer) {case 501:sb.append(",retail_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 502:sb.append(",member_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 503:sb.append(",member_day_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 504:sb.append(",tiny_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 505:sb.append(",health_care_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;case 506:sb.append(",floor_price =?");batchUpdateShopPriceCase(tenantNumId, dataSign, sb, collectMap.get(integer));break;}}}

mdmsPProductShopPriceLogDao

    public void batchUpdateShopLogNotValid(Long tenantNumId, Long dataSign, List<MDMS_P_PRODUCT_SHOP_PRICE_LOG> logList) {String sql = "update mdms_p_product_shop_price_log set is_valid =0,last_updtme = now(),last_update_user_id=? "+ " where tenant_num_id = ? and data_sign = ?  and cort_num_id=? and sub_unit_num_id = ? and item_num_id = ?  "+ " and  series=?  and cancelsign='N'";List<Object[]> batchArgs = new ArrayList<>();for (MDMS_P_PRODUCT_SHOP_PRICE_LOG log : logList) {Object[] object = new Object[]{tenantNumId, dataSign, 1L, log.getCORT_NUM_ID(), log.getSUB_UNIT_NUM_ID(), log.getITEM_NUM_ID(), log.getSERIES()};batchArgs.add(object);}int[] ints = jdbcTemplate.batchUpdate(sql, batchArgs);int sum = DaoUtil.sum(ints);if (sum != batchArgs.size()) {throw new DatabaseOperateException(Constant.SUB_SYSTEM, ErpExceptionType.DOE33008,"批量更新零售价价格失败!");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/865201.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示

章节内容 上一节完成&#xff1a; HDFS的集群启动HDFS的命令行操作HDFS 上传下载移动重命名等操作 背景介绍 这里是三台公网云服务器&#xff0c;每台 2C4G&#xff0c;搭建一个Hadoop的学习环境&#xff0c;供我学习。 之前已经在 VM 虚拟机上搭建过一次&#xff0c;但是没…

QT 绘制多阶贝塞尔曲线bezier

#include "bezierline.h"BezierLine::BezierLine(QWidget *parent) {this->setParent(parent);/*阶数 &#xff1a; 公式order 2: P (1-t)^2*P0 2(1-t)*t*P1 t^2*P2order 3: P (1-t)^3*P0 3(1-t)^2*t*P1 3(1-t)*t^2*P2 t^3*P3order 4: P (1-t)^4*P0 4…

Arthas常见使用姿势

文章目录 Arthas常见使用姿势官网基本命令通用参数解释表达式核心变量说明常用命令一些常用特殊案例举例其他技巧关于OGNLOGNL的常见使用OGNL的一些特殊用法与说明OGNL内置的虚拟属性OGNL的个人思考OGNL的杂碎&#xff0c;收集未做验证 Arthas常见使用姿势 官网 https://arth…

基于FPGA的DDS信号发生器

前言 此处仅为基于Vivado实现DDS信号发生器的仿真实现&#xff0c;Vivado的安装请看下面的文章&#xff0c;这里我只是安装了一个标准版本&#xff0c;只要能够仿真波形即可。 FPGA开发Vivado安装教程_vivado安装 csdn-CSDN博客 DDS原理 DDS技术是一种通过数字计算生成波形…

Pandas_DataFrame读写详解:案例解析(第24天)

系列文章目录 一、 读写文件数据 二、df查询数据操作 三、df增加列操作 四、df删除行列操作 五、df数据去重操作 六、df数据修改操作 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 系列文章目录前言一、 读写文…

Web 基础与 HTTP 协议

Web 基础与 HTTP 协议 一、Web 基础1.1域名和 DNS域名的概念Hosts 文件DNS&#xff08;Domain Name System 域名系统&#xff09;域名注册 1.2网页与 HTML网页概述HTML 概述网站和主页Web1.0 与 Web2.0 1.3静态网页与动态网页静态网页动态网页 二、HTTP 协议1.1HTTP 协议概述1.…

秋招——MySQL补充——MySQL是如何加行级锁

文章目录 引言正文什么SQL语句会加行级锁查询操作增加对应的行级锁事务的写法 update和delete修改操作也会增加行级锁 行级锁有哪些种类记录锁间隙锁Next-Key锁 MySQL是如何加行级锁&#xff1f;唯一索引等值查询查询记录是存在的查询记录是不存在的 唯一索引范围查找针对大于或…

《梦醒蝶飞:释放Excel函数与公式的力量》8.4 COUNTIF函数

8.4 COUNTIF函数 COUNTIF函数是Excel中常用的统计函数之一&#xff0c;用于统计指定条件下的单元格数量。通过COUNTIF函数&#xff0c;我们可以轻松地对数据进行条件筛选和统计分析。下面将从函数简介、语法、基本用法、注意事项、高级应用、实战练习和小节几个方面展开介绍。…

爬虫笔记19——代理IP的使用

访问网站时IP被阻止 有些网站会设置特定规则来限制用户的访问&#xff0c;例如频率限制、单一账户多次登录等。 网站为了保护自身安全和用户体验&#xff0c;会设置防御机制&#xff0c;将涉嫌恶意行为的IP地址加入黑名单并屏蔽访问。如果用户在使用网站时违反了这些规则&…

格式化选NTFS还是exFAT 格式化NTFS后Mac不能用怎么办 移动硬盘格式化ntfs和exfat的区别

面对硬盘、U盘或移动硬盘的格式化决策&#xff0c;NTFS与exFAT作为主流的文件系统&#xff0c;用户在选择时可以根据它们的不同特点来选择适用场景。下面我们来看看格式化选NTFS还是exFAT&#xff0c;格式化NTFS后Mac不能用怎么办的相关内容。 一、格式化选NTFS还是exFAT 在数…

十四、【源码】@Autowired、@Value、@Component

源码地址&#xff1a;https://github.com/spring-projects/spring-framework 仓库地址&#xff1a;https://gitcode.net/qq_42665745/spring/-/tree/14-auto-property Autowired、Value、Component 注解注入属性的实现分散在refresh容器的各个方法中&#xff0c;梳理&#x…

玩转springboot之springboot使用外置tomcat进行运行

使用外置tomcat进行运行 springboot中是集成了tomcat容器的&#xff0c;如果我们不想使用springboot所集成的tomcat&#xff0c;而想要使用自己的Tomcat外部容器&#xff0c;该怎么做呢&#xff1f; 首先&#xff0c;需要更改打包方式&#xff0c;之前是打成jar包&#xff0c;现…

docker 搭建 AI大数据模型 --- 使用GPU

docker 搭建 AI大数据模型 — 使用GPU方式 搭建本地大模型&#xff0c;最简单的方法&#xff01;效果直逼GPT 服务器GPU系统HP580 G8P40Rocky9.2 安装程序AnythingLLM前端界面Open WebUIChatOllamaollama 一、AnythingLLM 介绍 AnythingLLM 是 Mintplex Labs Inc. 开发的一…

面试官:Rocketmq是推消息还是拉消息

RocketMQ消息模型 核心模型&#xff1a;RocketMQ本质上是基于拉模式的。长轮询技术&#xff1a;使用长轮询技术&#xff0c;减少了拉取消息的延迟&#xff0c;同时保持了拉模式的控制优势。 长轮询技术详解 工作原理&#xff1a; 请求保持开放&#xff1a;消费者向服务器发出…

MySQL 聚集索引与非聚集索引的概念以及优缺点

概念介绍&#xff1a; 聚集索引&#xff08;Clustered Index&#xff09;&#xff1a; 定义&#xff1a;聚集索引是一种数据存储方式&#xff0c;数据表中主键记录按照索引的顺序进行物理排序。每个表只能有一个聚集索引&#xff0c;因为数据物理上只能排序一次。实现&#x…

FreeDOS 已经30岁了

1994 年 6 月&#xff0c;微软发布了其 DOS 操作系统的最后一个版本 MS-DOS 6.22。 程序员 Jim Hall 对微软的 Windows 3.x 以及后来的 Windows 95 都不满意不感兴趣&#xff0c;他希望创建一个公共领域的 DOS 兼容系统&#xff0c;在越来越多的人拥抱图形用户界面的时代维持传…

9.(vue3.x+vite)修改el-input,el-data-picker样式

效果预览 二:相关代码 <template><div style="padding: 50px"><el-input placeholder="请输入模型名称" style="width: 260px" /><br /

Java灵活用工2.0报价单微信小程序+APP+微信公众号 源码

&#x1f680;【开篇&#xff1a;解锁灵活用工的高效时代】 在人力资源市场日益灵活的今天&#xff0c;如何快速、准确地生成报价单&#xff0c;成为企业吸引并管理自由职业者的关键。而“灵活用工报价单微信小程序APP微信公众号源码”正是这样一款集高效、便捷于一体的解决方…

YOLO在目标检测与视频轨迹追踪中的应用

YOLO在目标检测与视频轨迹追踪中的应用 引言 在计算机视觉领域&#xff0c;目标检测与视频轨迹追踪是两个至关重要的研究方向。随着深度学习技术的飞速发展&#xff0c;尤其是卷积神经网络&#xff08;CNN&#xff09;的广泛应用&#xff0c;目标检测与视频轨迹追踪的性能得到…

YOLO-V2

一、V2版本细节升级 1、YOLO-V2&#xff1a; 更快&#xff01;更强 1.1 做的改进内容 1. YOLO-V2-Batch Normalization V2版本舍弃Dropout&#xff0c;卷积后每一层全部加入Batch Normalization网络的每一层的输入都做了归一化&#xff0c;收敛相对更容易经过Batch Norma…