【并发编程】线程池多线程异步去分页调用其他服务接口获取海量数据

文章目录

  • 场景:
  • 解决方案

场景:

前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。

刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些服务都注册在xxl-job上,而且采用的是分片广播的路由策略,那么每个服务就可以只处理请求到的所有数据中id%服务总数==分片索引的部分数据,然后交给kafka,由kafka决定这条数据应该放到哪个分区上。

解决方案

最近学了线程池后,回过头来思考,认为之前的方案还有很大的优化空间。

  • 1.当数据量很大时,一次性查询所有数据会导致数据库的负载过大,而使用分页查询,每次只查询部分数据,可以减轻数据库的负担,从而提高数据库的性能和响应速度,所以请求数据方每次分页查询少量数据,这样可以整体降低请求数据的时间。
  • 第一次优化.之前是每个服务都要把全量数据请求过来,假设全量数据1000w条,一个服务请求数据需要100s,我开5个服务,那请求数据的总时长就是500s。现在把1000w条数据均分给5个服务,那1个服务就只需要请求200w条数据,耗时20s,那所有服务的请求总时长就是100s。总体耗时缩小了5倍。上面说的分页查询就可以实现:页面大小假设10w(也就是将1000w/10w,逻辑上分成了100页),每个服务自己的分片索引作为页号,每次请求完,都给索引加上分片总数(例如:当前注册了五个服务,那分片总数=5,对于分片索引为1的服务来说,请求的页号为1,6,11,16,21。。。,对于分片索引为2的服务来说,请求的页号为2,7,12,17。。。,对于分片索引为3的服务来说,请求的页号为3,8,13,18,。。。。,对于分片索引为4的服务来说,请求的页号为4,9,14,19。。。。,对于分片索引为5的服务来说,请求的页号为5,10,15,20.。。)这样1000w条数据就均分到每个服务上了。对于每个服务都是单线程去请求数据,就可以将请求操作以及(页号+总服务数)的操作写在一个while循环里,一直请求数据,直到请求的数据为空时(也就是页号超过100了),退出while。
        //单线程情况下while(true){String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
//        logger.info("body:{}",body);//2.获取返回结果的messageJSONObject jsonObject = new JSONObject();
//        if (StrUtil.isNotBlank(body)) {jsonObject = JSONUtil.parseObj(body);
//            logger.info("name:{}",Thread.currentThread().getName());
//        }
//        logger.info("jsonObject:{}",jsonObject);//3.从body中获取dataList<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);if(CollectionUtil.isEmpty(tests)){break;}shardIndex+=shardTotal;}
  • 第二次优化: 了解了线程池后,还可以再优化。之前是一个服务单线程循环请求需要20s(假设),每次请求10w条,需要请求200w/10w,也就是20次,那一次请求就需要1s。如果使用线程池的话,那么耗时还会更小,因为当你将任务都交给线程池去执行时,多个线程会同时(并行)去请求各自页的数据,假如你只设置了4个线程,那这4个线程会同时发起请求获取数据,1s会完成4次请求,那分给服务的200w,5s就请求完了。那5个服务从总耗时500s,降到了总耗时5s*5=25s。
    这次优化,第一版代码(只展示了请求数据的代码,其他业务代码没有展示)
    一直向线程池里扔请求数据的任务,当某个任务请求到的数据是空的时候,意味着要请求的数据已经没了,那就结束循环,不再扔请求数据的任务。
    //线程共享变量static volatile boolean flag = true;@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex();
//        int shardTotal = XxlJobHelper.getShardTotal();//分片总数int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下
//        List<CompletableFuture>completableFutureList=new ArrayList<>();while (flag){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());if(CollectionUtil.isEmpty(tests)){flag=false;}},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");executorService.shutdown();

上面代码会有一个问题,就是while循环往线程池里扔任务,所有线程在执行时,会在请求数据那里”停留“一段时间,“停留期间”还会一直循环向线程池扔任务,当线程执行完某次请求得到空数据结束循环时,等待队列中还排着大堆任务等着去请求数据。

为了解决这个问题,我改用了for循环提交任务,提前根据请求数据总量、每次读取的条数,以及服务总数得到每个服务需要执行的任务数。
第二版代码

@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex()+1;int shardTotal = XxlJobHelper.getShardTotal();//分片总数
//        int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下List<CompletableFuture>completableFutureList=new ArrayList<>();//总条数double total = 10000000;//读取的条数double pageSize=1000;double tasks = Math.ceil( total / (double) shardTotal / pageSize);logger.info("任务数{}",tasks);for(double i=0;i<tasks;i++){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());String body = HttpUtil.get(url);JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");

如有问题,请求指正(^^ゞ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/31319.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Docker】配置指定大小的磁盘空间

背景 测试磁盘满时程序的运行情况 问题 如何使用 docker 来模拟磁盘满的情况 解决方法 创建指定大小的数据卷 volumedocker volume create --driver local --opt typetmpfs --opt devicetmpfs --opt osize50M my_volumn创建 docker 时&#xff0c;使用该数据卷docker run …

JS逆向系列之猿人学爬虫第14题-备而后动-勿使有变

文章目录 题目地址参数分析参考jspython 调用往期逆向文章推荐题目地址 https://match.yuanrenxue.cn/match/14题目难度标的是困难,主要难在js混淆部分。 参数分析 初始抓包有无限debugger反调试,可以直接hook 函数构造器过掉无限debugger Function.prototype.__construc…

Mirror网络库 | 说明

此篇为上文&#xff0c;下篇&#xff1a;Mirror网络库 | 实战 一、介绍 基于UNET&#xff0c;从2014年经过9年实战测试&#xff1b;服务器和客户端是一个项目&#xff1b;使用NetworkBehaviour而不是MonoBehaviour&#xff0c;还有NetworkServer和NetworkClient&#xff1b;Mi…

pdf怎么压缩到1m?这样做压缩率高!

PDF是目前使用率比较高的一种文档格式&#xff0c;因为它具有很高的安全性&#xff0c;还易于传输等&#xff0c;但有时候当文件体积过大时&#xff0c;会给我们带来不便&#xff0c;这时候简单的解决方法就是将其压缩变小。 想要将PDF文件压缩到1M&#xff0c;也要根据具体的情…

雅克比矩阵在机器人运动学中的应用

以六轴机械臂为例&#xff0c;设机械臂关节空间为q&#xff0c;位置矩阵为p&#xff0c;速度矩阵为v q [ q 0 , q 1 , q 2 , q 3 , q 4 , q 5 ] q[q_0,q_1,q_2,q_3,q_4,q_5] q[q0​,q1​,q2​,q3​,q4​,q5​] p [ x , y , z ] T [ f x ( q ) f y ( q ) f z ( q ) ] p[x,y,z…

ASP.NET Core中间件记录管道图和内置中间件

管道记录 下图显示了 ASP.NET Core MVC 和 Razor Pages 应用程序的完整请求处理管道 中间件组件在文件中添加的顺序Program.cs定义了请求时调用中间件组件的顺序以及响应的相反顺序。该顺序对于安全性、性能和功能至关重要。 内置中间件记录 内置中间件原文翻译MiddlewareDe…

微服务 云原生:基于 Gogs + Drone 实现 CI/CD 自动化

一般构建部署 以一个简单的前后端项目来说&#xff0c;分别编写前后端的 Dockerfile 文件并构建镜像&#xff0c;然后编写 docker-compose.yml 构建部署&#xff0c;启动运行。每次代码变更后都需重新手动打包、构建、推送。 一个简单的例子&#xff1a; 前端&#xff1a; 项…

【力扣每日一题】2023.8.7 反转字符串

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一个字符数组形式的字符串&#xff0c;让我们直接原地修改反转字符串&#xff0c;不必返回。 给出的条件是使用O(1)的额外空间…

bash: sudo: command not found的解决方法 | 安装sudo

-bash: sudo: command not found的解决方法 https://www.cnblogs.com/pengpengboshi/p/16159443.html 报错 安装apt-get update报错由于没有公钥&#xff0c;无法验证下列签名&#xff1a; NO_PUBKEY A4B469963BF863CC解决办法是手动加入 &#xff08;sudo可去掉&#xff09;…

c# 全网最稳定 企业级 以太网客户端类库 具备即时更新状态,断线三次重拨,稳定收发。

源码下载,带示例 代码实现了一个基本的TCP客户端,能够连接到服务器并发送接收数据。当连接失败时,会进行重连,并在达到最大重连次数后终止连接。使用异步编程模型、实现事件模型以及重连机制。 ConnectAsync(): 这是一个异步方法,用于与服务器建立连接。在方法中,首先初始…

ARM架构银河麒麟docker,源码编译安装GDAL

docker中安装依赖 sudo apt-get update sudo apt-get install build-essential autoconf automake libtool sudo apt-get install libproj-dev libgeos-dev libjson-c-dev libpng-dev libjpeg-dev sudo apt-get install python3-dev sudo apt-get install python3.11-dev去官网…

IO密集时epoll还高效吗?

io特别密集时为什么 epoll 效率不高。原因是&#xff1a; 连接密集&#xff08;短连接特别多&#xff09;&#xff0c;使用epoll的话&#xff0c;每一次连接需要发生epoll_wait->accpet->epoll_ctl调用&#xff0c;而使用select只需要select->accpet&#xff0c;减少了…

数组和字符串-字符串

最长公共前缀 题意&#xff1a; 给多个字符串&#xff0c;找最长前缀 解&#xff1a; 暴力匹配&#xff0c;先按字典序排序字符串&#xff0c;这样长度短的优先进行匹配&#xff0c;所得字符串就可能偏小 适合a aa aaa aaaa这样的数据&#xff0c;不过对于aa aab aabc aab…

python爬虫相关

目录 初识爬虫 爬虫分类 网络爬虫原理 爬虫基本工作流程 搜索引擎获取新网站的url robots.txt HTHP协议 Resquests模块 前言&#xff1a; 安装 普通请求 会话请求 response的常用方法 简单案例 aiohttp模块 使用前安装模块 具体案例 数据解析 re解析 bs4…

数据治理内容

https://space.bilibili.com/405479587 文章内容来源b站up主&#xff0c;语兴呀 数据治理内容 一.模型&#xff1a; 由于早期业务快速扩张&#xff0c;对元数据把控不到位&#xff0c;导致成熟期出现大量不合规模型 解决&#xff1a;数据标准&#xff1a;元数据补充 建设管控&…

基于长短期神经网络LSTM时间序列回归分析

​目录 背影 摘要 LSTM的基本定义 LSTM实现的步骤 基于长短期神经网络LSTM的回归分析 MATALB代码:基于长短期神经网络的回归分析,基于LSTM的回归预测资源-CSDN文库 https://download.csdn.net/download/abc991835105/88184633 效果图 结果分析 展望 参考论文 背影 LSTM神经…

AutoDL服务器的镜像版本太高,配置python3.7 tensorflow1.15版本的框架的步骤

1.选择一个实例&#xff0c;进入后端界面 2. 更新bashrc中的环境变量 conda init bash && source /root/.bashrc查看虚拟环境 conda info --envs可以看到此时有一个base的虚拟环境 但是它的python版本为3.8.10&#xff0c;无法安装tensorflow1.15,所以我们要创建一个…

PHP面向对象面试题

1、简述面对对象六大设计原则 &#xff1f; 面向对象六大设计原则是一组指导软件设计的原则&#xff0c;它们有助于提高代码的可维护性、可扩展性和可重用性。这些原则是&#xff1a; 单一职责原则&#xff08;Single Responsibility Principle&#xff0c;SRP&#xff09;&a…

Ctfshow web入门 SSTI 模板注入篇 web361-web372 详细题解 全

CTFshow SSTI web361 笔记分享 一、代码块 变量块 {{}} 用于将表达式打印到模板输出 注释块 {##} 注释 控制块 {%%} 可以声明变量&#xff0c;也可以执行语句 {% for i in .__class__.__mro__[1].__subclasses__() %}{% if i.__name___wrap_close %}{% print i.__init__.…

Nacos服务治理—负载均衡

引入负载均衡 在消费方引入负载均衡机制&#xff0c;同时简化获取服务提供者信息的流程 Spring Cloud引入组件LoadBalance实现负载均衡 添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web<…