多线程批量同步数据到ES

需求背景:新增了ES,现在要讲数据库某张表的数据同步到ES中,百万级的数据量一次性读取同步肯定不行,所以可以用多线程同步执行同步数据。

1.线程池配置类

@Configuration
public class ThreadPoolConfig {/*** 核心线程池大小*/private static final int CORE_POOL_SIZE = 17;/*** 最大可创建的线程数*/private static final int MAX_POOL_SIZE = 50;/*** 队列最大长度*/private static final int QUEUE_CAPACITY = 1000;/*** 线程池维护线程所允许的空闲时间*/private static final int KEEP_ALIVE_SECONDS = 500;@Bean("taskExecutor")public ExecutorService executorService(){//使用原子类,保证线程命名的唯一性和连续性AtomicInteger c = new AtomicInteger(1);//创建链表结构的阻塞队列LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);return new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_SECONDS,TimeUnit.MILLISECONDS,queue,r -> new Thread(r, "es-pool-" + c.getAndIncrement()),new ThreadPoolExecutor.DiscardPolicy());}
}

2.ES配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {private String host;private int port;@Beanpublic RestHighLevelClient client(){return new RestHighLevelClient(RestClient.builder(new HttpHost(host,port,"http")));}
}

3.主要代码逻辑

@Service
@Transactional
@Slf4j
public class TestService{@Autowiredprivate TestMapper testMapper;@Autowiredprivate RestHighLevelClient client; //ES客户端@Autowiredprivate ExecutorService executorService; //线程池private static final String ARTICLE_ES_INDEX = "test_info";//ES索引库名称private static final int PAGE_SIZE = 5000; //每页记录数/*** 批量导入逻辑*/public void importAll() {//查询数据总数int count = testMapper.selectCount();//总页数用 数据库数据总数%每页记录数int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;//记录开始执行时间long startTime = System.currentTimeMillis();//一共有多少页,就创建多少个CountDownLatch的计数CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);int fromIndex;List<TestVo> testVoList= null;for (int i = 0; i < totalPageSize; i++) {//起始分页条数fromIndex = i * PAGE_SIZE;//查询数据库当前页数的数据  SELECT*FROM 表名 LIMIT fromIndex,PAGE_SIZEtestVoList= testMapper.selectCurrentData(fromIndex, PAGE_SIZE);//创建线程,做批量插入es数据操作TaskThread taskThread = new TaskThread(testVoList, countDownLatch);//把当前线程任务交由线程池执行executorService.execute(taskThread);}//调用await()方法,用来等待计数归零countDownLatch.await();long endTime = System.currentTimeMillis();log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒", count, (endTime - startTime) / 1000);}//这里为了方便,写了线程内部类。class TaskThread implements Runnable {List<TestVo> testVoList;CountDownLatch cdl;//数据和倒计时锁public TaskThread(List<TestVo> testVoList, CountDownLatch cdl) {this.articleList = articleList;this.cdl = cdl;}@Overridepublic void run() {//创建ES对象,并指定名称BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);for (SearchArticleVo searchArticleVo : articleList) {//存储到ESbulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString()).source(JSON.toJSONString(testVoList), XContentType.JSON));}//发送请求,批量添加数据到es索引库中client.bulk(bulkRequest, RequestOptions.DEFAULT);//添加成功后计数减一cdl.countDown();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/644950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言学习(5)—— 数组

一、一维数组 1. 基本数据类型的数组 数组的定义&#xff1a;数据类型 数组名 [数组大小]; 数组名就代表该数组的首地址&#xff0c;即a[0]的地址 使用下标来访问数组元素 数组是多个相同类型数据的组合&#xff0c;一个数组一旦定义了&#xff0c;其长度是固定的&…

开源模型应用落地-业务整合篇(四)

一、前言 通过学习第三篇文章,我们已经成功地建立了IM与AI服务之间的数据链路。然而,我们目前面临一个紧迫需要解决的安全性问题,即非法用户可能会通过获取WebSocket的连接信息,顺利地连接到我们的服务。这不仅占用了大量的无效连接和资源,还对业务数据带来了潜在的风险。…

build.gradle标签详解

一、简介 Gradle是一个开源的构建自动化工具&#xff0c;主要用于Java、Groovy和其他JVM语言的项目。它使用一个基于Groovy或Kotlin的特定领域语言(DSL)来声明项目设置&#xff0c;从而摒弃了基于XML的繁琐配置。build.gradle是Gradle项目的核心配置文件&#xff0c;它定义了项…

系统架构设计师教程(十五)面向服务架构设计理论与实践

面向服务架构设计理论与实 15.1 SOA的相关概念15.1.1 SOA的定义15.1.2 业务流程与BPEL15.2 SOA的发展历史15.2.1 SOA的发展历史15.2.2 国内SOA的发展现状与国外对比15.2.3 SOA的微服务化发展15.3 SOA的参考架构15.4 SOA主要协议和规范15.4.1 UDDI协议15.4.2 WSDL规范15.4.3 SOA…

清理Docker环境

清理Docker环境&#xff1a;有时&#xff0c;Docker环境可能会出现一些问题&#xff0c;导致网络连接故障。您可以尝试清理Docker环境并重新启动。可以尝试运行以下命令&#xff1a; 复制 docker-compose down docker system prune -a docker-compose up docker-compose up 和…

Windows 下 TFTP 服务搭建及 U-Boot 中使用 tftp 命令实现文件下载

目录 Tftpd32/64文件下载更多内容 TFTP&#xff08;Trivial File Transfer Protocol&#xff0c;简单文件传输协议&#xff09;是 TCP/IP 协议族中的一个用来在客户机与服务器之间进行简单文件传输的协议&#xff0c;提供不复杂、开销不大的文件传输服务&#xff0c;端口号为 6…

Vue.js动画库

1、vue2-animate https://animate.style/ 地址&#xff1a;https://www.npmjs.com/package/vue2-animate一个可以在你的网站中即用型跨浏览器动画库&#xff0c;非常适合主页、滑块和动画引导提示。这是Animate.css 的一个端口&#xff0c;用于 Vue.js 2.0/3.0 和Alpines.js …

免费SSL申请和自动更新

当前是在mac下操作 安装certbot # mac下brew安装即可 brew install certbotcentos 安装 centos安装文档 申请泛解析证书 sudo certbot certonly --manual --preferred-challengesdns -d *.yourdomain.com## 输出 Saving debug log to /var/log/letsencrypt/letsencrypt.lo…

[Android] Android文件系统中存储的内容有哪些?

文章目录 前言root 文件系统/system 分区稳定性:安全性: /system/bin用来提供服务的二进制可执行文件:调试工具:UNIX 命令&#xff1a;调用 Dalvik 的脚本(upall script):/system/bin中封装的app_process脚本 厂商定制的二进制可执行文件: /system/xbin/system/lib[64]/system/…

Web前端主题色更换实现方式全解析(二)

Web前端主题色更换实现方式全解析&#xff08;一&#xff09; Web前端主题色更换实现方式全解析&#xff08;二&#xff09; 文章目录 一、基于前端框架的主题色切换1. Vue.js实现方式1.1 使用Vue的动态样式绑定1.2 结合Vuex管理主题色状态1.3 示例代码与效果展示 2. 前端框架通…

plink2R

您尝试安装的 plink2R 包与您当前的R版本不兼容。错误消息表明&#xff0c;该包可能没有为您当前的R版本提供。 为了解决这个问题&#xff0c;您可以尝试以下方法&#xff1a; 更新R版本&#xff1a;考虑升级到最新版本的R&#xff0c;因为新版本的R可能支持 plink2R 包。您可…

代码随想录算法训练营Day37|738.单调递增的数字、贪心算法总结

目录 738.单调递增的数字 方法一&#xff1a;暴力解法 方法二&#xff1a;贪心解法 贪心算法总结 738.单调递增的数字 题目链接 文章链接 方法一&#xff1a;暴力解法 class Solution { private:// 各位递增判断函数bool checkNum(int num) {int max 10;while (num) {int …

6.php开发-个人博客项目Tp框架路由访问安全写法历史漏洞

目录 知识点 php框架——TP URL访问 Index.php-放在控制器目录下 ​编辑 Test.php--要继承一下 带参数的—————— 加入数据库代码 --不过滤 --自己写过滤 --手册&#xff08;官方&#xff09;的过滤 用TP框架找漏洞&#xff1a; 如何判断网站是thinkphp&#x…

nvm安装与使用教程

目录 nvm是什么 nvm安装 配置环境变量 更换淘宝镜像 安装node.js版本 nvm list available 显示可下载版本的部分列表 nvm install 版本号 ​编辑 nvm ls 查看已经安装的版本 ​编辑 nvm use 版本号(切换想使用的版本号) nvm是什么 nvm是node.js version management的…

2023全球固态硬盘SSD总结与展望

根据有关市场研究机构的报告显示&#xff0c;全球固态硬盘&#xff08;SSD&#xff09;市场预计将以15.4%的复合年增长率增长&#xff0c;并将在2030年底从2023年的4560万美元增至12430万美元。近年来&#xff0c;由于技术进步和对高性能存储解决方案需求的增长&#xff0c;该市…

Qt安装MYSQL驱动

Qt安装MYSQL驱动 1 Qt配置MySQL驱动 在使用Qt连接数据库前需要确定当前Qt支持的数据库驱动模块有哪些。 1.1 Qt数据库驱动 Qt SQL模块是Qt提供的一个访问数据库的接口&#xff0c;支持多种平台下使用不同类型的数据库&#xff0c;在这个过程中&#xff0c;数据库驱动负责与…

mfc110.dll丢失是什么意思?全面解析mfc110.dll丢失的解决方法

在使用计算机的过程中&#xff0c;用户可能会遭遇一个常见的困扰&#xff0c;即系统提示无法找到mfc110.dll文件。这个动态链接库文件&#xff08;DLL&#xff09;是Microsoft Foundation Classes&#xff08;MFC&#xff09;库的重要组成部分&#xff0c;对于许多基于Windows的…

代码随想录刷题笔记 DAY12 | 二叉树的理论基础 | 二叉树的三种递归遍历 | 二叉树的非递归遍历 | 二叉树的广度优先搜索

Day 12 01. 二叉树的理论基础 1.1 二叉树的种类 满二叉树&#xff1a;除了叶子节点以外&#xff0c;每个节点都有两个子节点&#xff0c;整个树是被完全填满的完全二叉树&#xff1a;除了底层以外&#xff0c;其他部分是满的&#xff0c;底部可以不是满的但是必须是从左到右连…

数据结构之受限线性表

受限线性表 对于一般线性表&#xff0c;虽然必须通过遍历逐一查找再对目标位置进行增、删和查操作&#xff0c;但至少一般线性表对于可操作元素并没有限制。说到这里&#xff0c;大家应该明白了&#xff0c;所谓的受限线性表&#xff0c;就是可操作元素受到了限制。 受限线性表…