xxljob分片广播+多线程实现高效定时同步elasticsearch索引库

需求:为了利用elasticsearch实现高效搜索,需要将mysql中的数据查出来,再定时同步到es里,同时在同步过程中通过分片广播+多线程提高同步数据的效率。

1. 添加映射

  • 使用kibana添加映射
PUT  /app_info_article
{"mappings":{"properties":{"id":{"type":"long"},"publishTime":{"type":"date"},"layout":{"type":"integer"},"images":{"type":"keyword","index": false},"staticUrl":{"type":"keyword","index": false},"authorId": {"type": "long"},"authorName": {"type": "keyword"},"title":{"type":"text","analyzer":"ik_max_word","copy_to": "all"},"content":{"type":"text","analyzer":"ik_max_word","copy_to": "all"},"all":{"type": "text","analyzer": "ik_max_word"}}}
}
  • 使用http请求
    PUT请求添加映射:http://192.168.200.130:9200/app_info_article
    GET请求查询映射:http://192.168.200.130:9200/app_info_article
    DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
    GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
    在这里插入图片描述

2. springboot测试

引入依赖

<!--elasticsearch--><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.12.1</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.12.1</version><exclusions><exclusion><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-smile</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></exclusion></exclusions></dependency>

elasticsearch配置

#自定义elasticsearch连接配置
elasticsearch:host: 192.168.200.131port: 9200

elasticsearch配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {private String host;private int port;@Beanpublic RestHighLevelClient client(){return new RestHighLevelClient(RestClient.builder(new HttpHost(host,port,"http")));}
}

实体类

@Data
public class SearchArticleVo {// 文章idprivate Long id;// 文章标题private String title;// 文章发布时间private Date publishTime;// 文章布局private Integer layout;// 封面private String images;// 作者idprivate Long authorId;// 作者名词private String authorName;//静态urlprivate String staticUrl;//文章内容private String content;}

测试。测试成功后别忘了删除app_info_article的所有文档

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {@Autowiredprivate ApArticleMapper apArticleMapper;@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 注意:数据量的导入,如果数据量过大,需要分页导入*  1)查询数据库数据*  2)将数据写入到ES中即可*     创建BulkRequest*            ================================*            ||A:创建XxxRequest*            ||B:向XxxRequest封装DSL语句数据*            ||                             X C:使用RestHighLevelClient执行远程请求*            ================================*            将XxxRequest添加到BulkRequest*       使用RestHighLevelClient将BulkRequest添加到索引库* @throws Exception*/@Testpublic void init() throws Exception {//1)查询数据库数据List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();//2)创建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:创建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封装DSL语句数据.id(searchArticleVo.getId().toString()).source(JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)将XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient将BulkRequest添加到索引库restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);}}

3. 核心代码

3.1 xxljob配置

xxl:job:accessToken: default_tokenadmin:addresses: http://127.0.0.1:8080/xxl-job-adminexecutor:address: ''appname: hmttip: ''logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30port: 9998
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}

3.2 elasticsearch配置

和前面一样

3.3 任务编写

@Component
public class SyncIndexTask {@Autowiredprivate IArticleClient articleClient;@Autowiredprivate RestHighLevelClient restHighLevelClient;//线程池public static ExecutorService pool = Executors.newFixedThreadPool(10);/**** 同步索引任务*  1)当数量大于100条的时候,才做分片导入,否则只让第1个导入即可*      A:查询所有数据量 ->searchTotal total>100 [判断当前分片不是第1个分片]*      第N个分片执行数据处理范围-要计算   确定当前分片处理的数据范围  limit #{index},#{size}*                                                                [index-范围]**      B:执行分页查询-需要根据index判断是否超过界限,如果没有超过界限,则并开启多线程,分页查询,将当前分页数据批量导入到ES**      C:在xxl-job中配置作业-策略:分片策略**/@XxlJob("syncIndex")public void syncIndex()  {//1、获取任务传入的参数   {"minSize":100,"size":10}String jobParam = XxlJobHelper.getJobParam();Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);int minSize = jobData.get("minSize"); //分片处理的最小总数据条数int size =  jobData.get("size"); //分页查询的每页条数   小分页//2、查询需要处理的总数据量  total=IArticleClient.searchTotal()Long total = articleClient.searchTotal();//3、判断当前分片是否属于第1片,不属于,则需要判断总数量是否大于指定的数据量[minSize],大于,则执行任务处理,小于或等于,则直接结束任务int cn = XxlJobHelper.getShardIndex(); //当前节点的下标if(total<=minSize && cn!=0){//结束return;}//4、执行任务   [index-范围]   大的分片分页处理//4.1:节点个数int n = XxlJobHelper.getShardTotal();//4.2:当前节点处理的数据量int count = (int) (total % n==0? total/n :  (total/n)+1);//4.3:确定当前节点处理的数据范围//从下标为index的数据开始处理  limit #{index},#{count}int indexStart = cn*count;int indexEnd = cn*count+count-1; //最大的范围的最后一个数据的下标//5.小的分页查询和批量处理int index =indexStart; //第1页的indexSystem.out.println("分片个数是【"+n+"】,当前分片下标【"+cn+"】,处理的数据下标范围【"+indexStart+"-"+indexEnd+"】");do {//=============================================小分页================================//5.1:分页查询//5.2:将数据导入ESpush(index,size,indexEnd);//5.3:是否要查询下一页 index+sizeindex = index+size;}while (index<=indexEnd);}/*** 数据批量导入* @param index* @param size* @param indexEnd* @throws IOException*/public void push(int index,int size,int indexEnd)  {pool.execute(()->{System.out.println("当前线程处理的分页数据是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");//1)查询数据库数据List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size);  //size可能越界//2)创建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:创建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封装DSL语句数据.id(searchArticleVo.getId().toString()).source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)将XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient将BulkRequest添加到索引库if(searchArticleVos!=null && searchArticleVos.size()>0){try {restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}});}
}

3.4 xxl-admin新增任务

  • 新增执行器 这里的appName要和XxlJobConfig 中的appname保持一致。也可以不用新增执行器,执行器就相当于一个分组的作用。
    在这里插入图片描述
  • 新增任务
    在这里插入图片描述

4. 模拟集群,运行项目

  • 运行3个SearchApplication项目,设置xxl.job.executor.port分别为9998,9997,9996
    在这里插入图片描述
  • 执行一次任务
    在这里插入图片描述
  • 查看结果,3个application都成功了
    在这里插入图片描述
  • kibana查看是否有数据,发现有18条,mysql的数据全部被导入了

在这里插入图片描述

5. 总结

  • xxljob分片广播:假如一共有1000条数据,有3个节点上运行着SearchApplication服务。那么每个节点需要同步的数据总条数为334,334,332条。
    在这里插入图片描述

  • 分页查询:节点0的任务总条数为334条,那么需要做分页(假设分页size为20)查询的次数为17次,每查1次后,将查到的数据通过restHighLevelClient发送到es中。

do {//5.1:分页查询//5.2:将数据导入ESpush(index,size,indexEnd);  //分页查询+导入es的操作//5.3:是否要查询下一页 index+sizeindex = index+size;
}while (index<=indexEnd);
  • 多线程:上述代码,push方法包括了分页查询+导入es的操作,是低效的。并且push方法不结束的话,下一页的操作不会开始。这里可以用多线程,每次push的时候都开启一个新线程,这样每一页的操作都是独立的,可以同时查询,同时导入到es,不会互相影响。这里使用了线程池。
public static ExecutorService pool = Executors.newFixedThreadPool(10);
......
public void push(int index,int size,int indexEnd)  {pool.execute(()->{//分页查询+导入到es});}
  • elasticsearch的相关api:参考我另一篇博客 项目中使用Elasticsearch的API相关介绍
//2)创建BulkRequest - 刷新策略BulkRequest bulkRequest = new BulkRequest()//刷新策略-立即刷新.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);for (SearchArticleVo searchArticleVo : searchArticleVos) {//A:创建XxxRequestIndexRequest indexRequest = new IndexRequest("app_info_article")//B:向XxxRequest封装DSL语句数据.id(searchArticleVo.getId().toString()).source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);//3)将XxxRequest添加到BulkRequestbulkRequest.add(indexRequest);}//4)使用RestHighLevelClient将BulkRequest添加到索引库if(searchArticleVos!=null && searchArticleVos.size()>0){try {restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/12720.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HL7协议

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.介绍2.传输协议规范2.1. MLLP2.1.1. 数据头定义2.1.2. 转义字符集 2.2. 规范说明2.3. 消息格式说明 3.HL7结构介绍3.1. 患者建档&#xff08;ADT^A28&#xff09;…

人工智能领域向量化技术加速多模态大模型训练与应用

目录 前言1、TextIn文档解析技术1.1、文档解析技术1.2、目前存在的问题1.2.1、不规则的文档信息示例 1.3、合合信息的文档解析1.3.1、合合信息的TextIn文档解析技术架构1.3.2、版面分析关键技术 Layout-engine1.3.3、文档树提取关键技术 Catalog-engine1.3.4、双栏1.3.5、非对称…

计算机服务器中了locked勒索病毒怎么解决,locked勒索病毒解密恢复工具

在网络技术飞速发展的时代&#xff0c;通过网络开展各项工作业务成为众多企业的首选&#xff0c;网络也为企业的生产运营提供了极大便利&#xff0c;大大提升了企业办公效率&#xff0c;但是利用网络避免不了网络威胁的存在&#xff0c;数据安全问题一直是企业关心的主要话题。…

TikTok机房ip好还是住宅ip好?

住宅ip比较好&#xff0c;机房数据中心IP高效、低价&#xff0c;所以使用的人多且用处复杂&#xff0c;这类ip极大可能存在滥用的黑历史&#xff0c;通过此类ip访问tiktok&#xff0c;被禁止的可能性更高&#xff0c;更容易被拉入黑名单。所以我们推荐tiktok独享原生ip搭建节点…

CC工具箱使用指南:【界线导出Excel(一横)】

一、简介 群友定制工具。 这个工具的目的是将面要素的边界线的属性导出Excel。 给定的Excel模板如下&#xff1a; 结果需要输出每一段界一的起点、终点的坐标&#xff0c;这里以度分秒的方法表达。 每段界线的方位角以及方向&#xff0c;方向按16位方位角描述&#xff1a; …

高通QCS6490开发(六):连接使用摄像头

本文将会介绍如何在FV01开发板上连接摄像头和显示预览。 所用硬件有&#xff1a; 1. FV01开发板 2.Raspberry 摄像头 操作步骤如下&#xff1a; 通过FPC线和杜邦线将FV01板和摄像头连接起来&#xff0c;接线如下&#xff1a; 1、Camera设备连接&#xff0c;通过22pin转15pi…

togaf培训简介2

1.定义 2.ADM 业务下降期不要瞎折腾&#xff0c;上升期配合业务做一些改革&#xff1f; 项目交付物不能是聊天记录、PPT什么的&#xff0c;最起码是邮件。 3.架构内容框架 或者叫&#xff1a;企业统一体。 包括&#xff1a;企业连续性和解决方案连续性 方案和工具的解耦很大程…

【回溯】1255. 得分最高的单词集合

本文涉及知识点 回溯 力扣难道&#xff1a;1881 LeetCode1255. 得分最高的单词集合 你将会得到一份单词表 words&#xff0c;一个字母表 letters &#xff08;可能会有重复字母&#xff09;&#xff0c;以及每个字母对应的得分情况表 score。 请你帮忙计算玩家在单词拼写游戏…

K8s 二进制部署 上篇

一 K8S按装部署方式&#xff1a; ① Minikube Minikube是一个工具&#xff0c;可以在本地快速运行一个单节点微型K8S&#xff0c;仅用于学习、预览K8S的一些特 性使用。 部署地址&#xff1a;https://kubernetes.io/docs/setup/minikube ② Kubeadmin Kubeadmin也是一个工…

vue网页端控制台展示独有标记

效果展示 实现步骤 1. 新建js文件 定义一个类 用于提供控制台打印日志显示样式的方法 src\libs\util.log.js class Logger {// 定义静态方法static typeColor(type "default") {let color "";switch (type) {case "default":color "#3…

后台菜单数据递归展示

后台菜单数据递归展示 效果示例图aslide.vueaslideItem.vuemenu 效果示例图 aslide.vue <script setup>import {ref} from vue;const props defineProps({isCollapse: {type: Boolean,default: false}});import AslideItem from "./aslideItem.vue"const def…

MIRO时,修改页签“采购订单参考”的数量时,金额不自动计算

MIRO 发票校验时&#xff0c;进入到如下界面&#xff0c;系统参考采购订单自动带出已经收货的金额和数量。 此时如果想要修改数量时&#xff0c;有些用户账号下&#xff0c;金额不自动计算&#xff0c;但是有些用户账号下&#xff0c;数量更改时&#xff0c;系统自动计算和建议…

穷人翻身的秘诀!2024年普通人如何创业赚钱?穷人如何逆袭翻身?普通人创业新风口?

穷人的思维有一个致命的缺陷&#xff0c;就是追求确定性&#xff0c;进而失去了可能性。而赚钱的真相实际上非常残酷。世界上能够赚钱的事情必定是不确定的&#xff0c;能够赚取巨额财富的事情更是极度不确定的。只有面对不确定性&#xff0c;才能让你把竞争对手拦在门外&#…

如何在 Linux 上检查 CPU 和硬盘温度

为了更好地监测您的Linux系统的硬件健康状况&#xff0c;如CPU与硬盘温度、风扇转速等关键指标&#xff0c;采用lm_sensors与hddtemp这两款强大工具是明智之选。以下是关于这些工具的详尽指南&#xff0c;包括它们的功能介绍、安装步骤以及如何配置lm_sensors&#xff0c;旨在为…

ASCLL码表以及字符的相加减

ASCLL码表完整版及解释_acssll码-CSDN博客 #include <getopt.h> #include <stdio.h> #include <stdlib.h>#define MAX_PATH 256 char filename[MAX_PATH 5];int isdigit(int c) {if (c > 0 && c < 9)return 1;return 0; }int main(int argc…

【TypeScript】对象类型的定义

简言 在 JavaScript 中&#xff0c;我们分组和传递数据的基本方式是通过对象。在 TypeScript 中&#xff0c;我们通过对象类型来表示这些对象。 对象类型 在 JavaScript 中&#xff0c;我们分组和传递数据的基本方式是通过对象。在 TypeScript 中&#xff0c;我们通过对象类…

Blender雕刻建模_笔刷纹理和顶点绘制

笔刷纹理 主要用于皮肤&#xff0c;纹理的雕刻。 可以修改映射方式来实现不同绘制效果。 用一张纹理来定义笔刷各个点的强度。其中白色为1&#xff0c;黑色为0。 设置笔刷纹理步骤&#xff1a; -新建一套笔刷 -强度&#xff0c;设为0.15&#xff08;可以根据需求修改&#x…

ACWing471. 棋盘-DFS剪枝

题目 思路 本思路参考博客AcWing 471. 棋盘 - AcWing 约束方程&#xff1a; 代码 #include <iostream> #include <cstring> #include <algorithm>using namespace std;const int N 110, INF 0x3f3f3f3f; int g[N][N], n, m, dist[N][N]; int dx[4] {-1…

接口自动化-requests库

requests库是用来发送请求的库&#xff0c;本篇用来讲解requests库的基本使用。 1.安装requests库 pip install requests 2.requests库底层方法的调用逻辑 &#xff08;1&#xff09;get / post / put / delete 四种方法底层调用 request方法 注意&#xff1a;data和json都…

基于Java+SpringBoot+Mybaties-plus+Vue+elememt 驾校管理系统 设计与实现

一.项目介绍 系统角色&#xff1a;管理员、驾校教练、学员 管理员&#xff1a; 个人中心&#xff1a;修改密码以及个人信息修改 学员管理&#xff1a;维护学员信息&#xff0c;维护学员成绩信息 驾校教练管理&#xff1a;驾校教练信息的维护 驾校车辆管理&…