一次业务的批量数据任务的处理优化

文章目录

  • 一次业务的批量数据任务的处理优化
    • 业务背景
    • 1.0版本 分批处理模式
    • 2.0版本 平衡任务队列模式
    • 3.0版本 优化调度平衡任务队列模式
    • 总结

一次业务的批量数据任务的处理优化

业务背景

一个重新生成所有客户的财务业务指标数据的批量数据处理任务。

1.0版本 分批处理模式

根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。

    private void updateForRegenerateByCustomer(List<Integer> customerIdList,SystemUserCommonDTO user, LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList,user,now);int maxSize = baseInfoList.size();//计算当前任务数量int currentMaxPoolSize = maxPoolSize<maxSize?maxPoolSize:maxSize;CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];//计算每个任务分段的数量int size = maxSize / currentMaxPoolSize;for(int i=0;i<currentMaxPoolSize;i++){final int begin = i * size;final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;//创建异步处理的分段任务tasks[i] = CompletableFuture.runAsync(()->updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now),executorService).whenCompleteAsync((k,v)-> log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(List<CustomerBaseInfo> baseInfoList,int begin,int end,SystemUserCommonDTO user, LocalDateTime now){//每个线程只处理自己的分段的数据for(int i=begin;i<end;i++){CustomerBaseInfo baseInfo = baseInfoList.get(i);//每个客户独立事务TransactionalUtils.runWithNewTransactional(()->updateForGenerateByCustomerId(baseInfo.getId(),user,now));}}/*** 生成指定客户的所有数据**/private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){//1、重新生成客户的所有业务类型的数据List<FinanceBiMaintainDto> maintainDtoList =financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);if(CollectionUtils.isEmpty(maintainDtoList)){return ;}//生成每个指标的数据Map<BusinessIndicatorEnum,List<FinanceBiMaintainDto>> indicatorMaintainDtoMap = maintainDtoList.stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));indicatorMaintainDtoMap.forEach((k,v)->{log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);financeBiManager.updateForBiMaintain(k, v,user,now);});}

运行耗时:1420.145秒

2.0版本 平衡任务队列模式

1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。

updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。

private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);//根据线程数,构建固定的任务数量CompletableFuture<?>[] tasks = new CompletableFuture<?>[currentMaxPoolSize];//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));//创建多个线程去消耗客户队列for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] =CompletableFuture.runAsync(() -> updateForGenerateByCustomerIdList(queue, user, now), executorService).whenCompleteAsync((k, v) -> {if (v != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",Thread.currentThread().getName()), v);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName());}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue<Integer> queue, SystemUserCommonDTO user,LocalDateTime now) {Integer customerId = queue.poll();//循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。while (customerId != null) {final Integer currentCustomerId = customerId;TransactionalUtils.runWithNewTransactional(() -> updateForGenerateByCustomerId(currentCustomerId, user, now));customerId = queue.poll();}}

优化后的耗时:1037.059秒

3.0版本 优化调度平衡任务队列模式

2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。

updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。


private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);//获取客户的统计数据Map<Integer, CustomerStatisticsInfo> customerStatisticsInfoMap =customerStatisticsInfoService.listAll().stream().collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);CompletableFuture<String>[] tasks = new CompletableFuture[currentMaxPoolSize];//根据客户的统计数据,构建待处理的客户队列ConcurrentLinkedQueue<Integer> queue =baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
//队列按照客户数据量倒序排列  .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId).collect(Collectors.toCollection(ConcurrentLinkedQueue::new));for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] = CompletableFuture.supplyAsync(() -> {updateForGenerateByCustomerIdList(queue, user, now);return Thread.currentThread().getName();}, executorService).whenCompleteAsync((k, ex) -> {if (ex != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}

耗时:726.725秒

总结

最终的耗时从1400多秒 降低到700多秒。降低了一半左右。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/46089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 NumPy 及其相关库(如 pandas、scikit-learn 等)时,由于 NumPy 的版本不兼容或者某些依赖库与 NumPy 的版本不匹配

题意&#xff1a; numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject 问题背景&#xff1a; I want to call my Python module from the Matlab. I received the error: Error using numpy_ops>init thi…

Java中的List集合

一、ArrayLIst集合 ArrayList的特点 ArrayList实现了Collection接口ArrayList内部封装了一个Object类型的对象&#xff0c;初始长度为10&#xff0c;且长度可变ArrayList集合使用数组实现所以查询快&#xff0c;但是增删慢(因为需要移动元素)ArrayList是不能保证线程安全的 …

戴尔inspiron如何独显直连?

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…

解决安卓tv 蓝牙遥控器配对后输入法弹不出来的问题

t972在蓝牙配对后&#xff0c;自带的LatinIME 输入法会出现弹不出来的现象。 经过分析&#xff0c;主要为蓝牙的kl 文件适配存在问题。解决如下&#xff1a; 1.新建 kl文件 这个需要结合选用的遥控器来设定名称&#xff0c;我这边的遥控器是按照如下配置的 Vendor_2b54_Pr…

java基础,接口和抽象类

一&#xff1a;接口和抽象类 ①接口的定义&#xff1a; 声明方式&#xff1a; 接口使用interface关键字来声明&#xff0c;后跟接口的名称和接口体&#xff08;包含常量和方法声明的代码块&#xff09; public interface ObjectService extends IService<ObjectDO> {/…

Vue 中 v-pre、v-once、v-cloak 标签的深度解析与案例展示

目录 v-pre v-once ​​​​​​​v-cloak ​​​​​​​v-cloak介绍 ​​​​​​​插值表达式闪烁问题 v-pre 当使用 v-pre 指令时,不会进行编译操作。所有的 Vue 模板语法都将得以完整保留,并会按照其初始的形态进行渲染。其中,最为常见的应用场景便是用于展示…

用户登陆实现前后端JWT鉴权

目录 一、JWT介绍 二、前端配置 三、后端配置 四、实战 一、JWT介绍 1.1 什么是jwt JWT&#xff08;JSON Web Token&#xff09;是一种开放标准&#xff08;RFC 7519&#xff09;&#xff0c;用于在各方之间以安全的方式传输信息。JWT 是一种紧凑、自包含的信息载体&…

【Android面试八股文】组件化在项目中有什么意义?

一、没有组件化会出现什么问题? 早期的单一分层模式 问题一:无论分包怎么做,随着项目增大,项目失去层次感,后面接手的人扑街问题二:包名约束太弱,稍有不注意,就会不同业务包直接互相调用,代码高耦合问题三:多人开发在版本管理中,容易出现代码覆盖冲突等问题二、组件…

【Linux】Linux的账号和用户组

管理员的工作中&#xff0c;相当重要的一环就是【管理账号】。 因为整个系统都是你在管理&#xff0c;并且所有一般用户的账号申请&#xff0c;都必须要通过你的协助才行&#xff0c;所以你就必须要了解一下如何管理好一个服务器主机的账号。 在管理Linux主机的账号时&#xff…

Linux下mysql数据库的导入与导出以及查看端口

一&#xff1a;Linux下导出数据库 1、基础导出 要在Linux系统中将MySQL数据库导出&#xff0c;通常使用mysqldump命令行工具。以下是一个基本的命令示例&#xff0c;用于导出整个数据库&#xff1a; mysqldump -u username -p database_name > export_filename.sql 其中&a…

Django 删除单行数据

1&#xff0c;添加模型 from django.db import modelsclass Post(models.Model):title models.CharField(max_length200)content models.TextField()pub_date models.DateTimeField(date published)class Book(models.Model):title models.CharField(max_length100)author…

121. 小红的区间翻转(卡码网周赛第二十五期(23年B站笔试真题))

题目链接 121. 小红的区间翻转&#xff08;卡码网周赛第二十五期&#xff08;23年B站笔试真题&#xff09;&#xff09; 题目描述 小红拿到了两个长度为 n 的数组 a 和 b&#xff0c;她仅可以执行一次以下翻转操作&#xff1a;选择a数组中的一个区间[i, j]&#xff0c;&#x…

【spring boot starter的自定义和学习笔记】

spring boot starter的自定义和理解 author:shengfq date:2024-07-14 version:1.0 title:spring boot starter的自定义和理解 1.基本概念 Starter是Spring Boot的四大核心功能特性之一&#xff0c;除此之外&#xff0c;Spring Boot还有自动装配、Actuator监控等特性。 Sprin…

顺序表算法 - 移除元素

. - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/remove-element/description/思路: 代码: // numsSize表示数组的长度 …

【2024年全国青少信息素养大赛c++初中复赛集训第一天编程题分享】

目录 题目 1:星际旅行者的紧急求助 题目 2:失落的文明遗迹 题目 3:时间之门的密码 题目5,输出多进制数 题目6、乒乓球 题目7、明明的随机数 题目8、烤鸡 题目9、排队接水 题目 10:魔法森林的迷宫 题目 11:校园植树节活动 题目 12:小学生数学竞赛排名 题目 1:…

力扣224.基本计算器

力扣224.基本计算器 一个栈存符号 并记录当前数的符号遍历到一个数就存入答案 class Solution {public:int calculate(string s) {stack<int> st({1});int sign 1;int res0;int number;int n s.size();int i0;while(i<n) {if(isdigit(s[i])){number 0;while(i &…

python+pygame实现五子棋人机对战之五

pythonpygame实现五子棋人机对战之一 pythonpygame实现五子棋人机对战之二 pythonpygame实现五子棋人机对战之三 pythonpygame实现五子棋人机对战之四 在之前的文章中已经完成了所有的基础工作&#xff0c;剩下的就是把空填上就可以了。 六、 完成程序 # encoding:utf-8…

网络安全——防御课实验二

在实验一的基础上&#xff0c;完成7-11题 拓扑图 7、办公区设备可以通过电信链路和移动链路上网(多对多的NAT&#xff0c;并且需要保留一个公网IP不能用来转换) 首先&#xff0c;按照之前的操作&#xff0c;创建新的安全区&#xff08;电信和移动&#xff09;分别表示两个外网…

Readiris PDF Corporate / Business v23 解锁版安装教程 (PDF管理软件)

前言 Readiris PDF Corporate / Business 是一款高性能的 OCR&#xff08;光学字符识别&#xff09;软件&#xff0c;能够帮助用户将纸质文档、PDF 文件或图像文件转换为可编辑和可搜索的电子文本。该软件提供专业级的功能和特性&#xff0c;非常适合企业和商业使用。使用 Rea…

基于lstm的股票Volume预测

LSTM&#xff08;Long Short-Term Memory&#xff09;神经网络模型是一种特殊的循环神经网络&#xff08;RNN&#xff09;&#xff0c;它在处理长期依赖关系方面表现出色&#xff0c;尤其适用于时间序列预测、自然语言处理&#xff08;NLP&#xff09;和语音识别等领域。以下是…