【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为那时候的 CPU 以及内存占用率较低。
  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;/*** 清理ES历史数据定时任务*/
@Component
public class CleanESHistoryDataTask {private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);@Resourceprivate RestHighLevelClient restHighLevelClient;/*** 根据索引名称删除当前日期两个月前的那一天的历史文档数据* @param jobContext*/@Scheduledpublic void cleanESHistoryData(JobContext jobContext) {// jobContext为定时任务中回传数据String indexName = jobContext.getData();if (StringUtils.isBlank(indexName)) {LOGGER.warn("ES索引名称不能为空!");return;}long startTimeMillis = System.currentTimeMillis();String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);try {String startTimeStr = twoMonthsAgoDate + " 00:00:00";// 初始化时间,形如2023-08-06 00:00:00Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);// 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据for (int i = 0; i < 24; i++) {Date startTime = initialStartTime;startTime = DateUtils.addHours(startTime, i);Date endTime = DateUtils.addHours(startTime, 1);LOGGER.info("正在清理索引:[{}],时间:{} 至 {}的历史文档数据...", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));long currentStartTimeMillis = System.currentTimeMillis();// 指定操作的索引库SearchRequest searchRequest = new SearchRequest(indexName);// 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("createTimeStr.keyword").from(DateTool.format(startTime, DF_FULL)).to(DateTool.format(endTime, DF_FULL))).size(1000);// 设置滚动查询结果在内存中的过期时间为1minScroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));// 将滚动以及构造的查询条件放入查询请求searchRequest.scroll(scroll).source(searchSourceBuilder);SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);// 记录要滚动的IDString scrollId = searchResponse.getScrollId();SearchHit[] hits = searchResponse.getHits().getHits();while (hits != null && hits.length > 0) {// 创建批量处理请求对象BulkRequest bulkRequest = new BulkRequest();for (SearchHit hit : hits) {DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());bulkRequest.add(deleteRequest);}// 执行批量删除请求操作restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);// 构造滚动查询条件,继续滚动查询SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);scrollRequest.scroll(scroll);searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);scrollId = searchResponse.getScrollId();hits = searchResponse.getHits().getHits();}// 当前滚动查询结束,清除滚动,释放服务器内存资源ClearScrollRequest clearScrollRequest = new ClearScrollRequest();clearScrollRequest.addScrollId(scrollId);restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);LOGGER.info("清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));}LOGGER.info("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));} catch (Exception e) {LOGGER.error(String.format("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);}}
}

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/25651.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编织人工智能:机器学习发展历史与关键技术全解析

文章目录 1. 引言1.1 机器学习的定义1.2 重要性和应用场景重要性应用场景 2. 机器学习的早期历史2.1 初期理论与算法感知机决策树 2.2 早期突破支持向量机神经网络初探 3. 21世纪初期的发展3.1 集成学习方法随机森林XGBoost 3.2 深度学习的崛起卷积神经网络&#xff08;CNN&…

css-4:元素水平垂直居中的方法有哪些?如果元素不定宽高呢?

1、背景 在开发中&#xff0c;经常遇到这个问题&#xff0c;即让某个元素的内容在水平和垂直方向上都居中&#xff0c;内容不仅限于文字&#xff0c;可能是图片或其他元素。 居中是一个非常基础但又是非常重要的应用场景&#xff0c;实现居中的方法存在很多&#xff0c;可以将这…

Spring IOC

◆ 传统Javaweb开发的困惑 ◆ IoC、DI和AOP思想提出 ◆ Spring框架的诞生 Spring | Home IOC控制反转&#xff1a;BeanFactory 快速入门 package com.xiaolin.service.Impl;import com.xiaolin.dao.UserDao; import com.xiaolin.service.UserService;public class UserServic…

Intel 4工艺太难了!酷睿Ultra终于突破5GHz

无论是14nm还是10nm&#xff0c;Intel这些年的新工艺都有一个通性&#xff1a;刚诞生的时候性能平平&#xff0c;高频率都上不去&#xff0c;只能用于笔记本移动端(分别对应5代酷睿、10代酷睿)&#xff0c;后期才不断成熟&#xff0c;比如到了13代酷睿就达到史无前例的6GHz。 接…

【Linux】守护进程

1 相关概念 1.1 守护进程的概念 守护进程也叫做精灵进&#xff0c;是运行在后台的一种特殊进程。它独立于控制终端并且可以周期性的执行某种任务或者处理某些发生的事件。 守护进程是非常有用的进程&#xff0c;在Linux当中大多数服务器用的就是守护进程。比如&#xff0c;web…

前端 select 标签如何创建下拉菜单?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 代码示例⭐ 代码讲解⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏…

【网络基础知识铺垫】

文章目录 1 :peach:计算机网络背景:peach:1.1 :apple:网络发展:apple: 2 :peach:协议:peach:2.1 :apple:协议分层:apple:2.2 :apple:OSI七层模型:apple:2.3 :apple:TCP/IP模型:apple:2.4 :apple:TCP/IP模型与操作系统的关系:apple: 3 :peach:网络传输基本流程:peach:4 :peach:网…

MybatisPlus存在 sql 注入漏洞(CVE-2023-25330)解决办法

首先我们了解下这个漏洞是什么&#xff1f; MyBatis-Plus TenantPlugin 是 MyBatis-Plus 的一个为多租户场景而设计的插件&#xff0c;可以在 SQL 中自动添加租户 ID 来实现数据隔离功能。 MyBatis-Plus TenantPlugin 3.5.3.1及之前版本由于 TenantHandler#getTenantId 方法在…

DeviceNet主站网关转ETHERCAT连接ethercat总线伺服如何控制

大家好&#xff0c;今天要和大家分享一款自主研发的通讯网关——捷米JM-ECTM-DNT。这款产品可是解决了不同协议设备数据交换的麻烦问题&#xff0c;让我们一起来看看它的神奇之处吧&#xff01; 这款通讯网关有什么特别的呢&#xff1f;首先&#xff0c;它可以连接DEVICENET总…

火车头标题伪原创【php源码】

大家好&#xff0c;给大家分享一下python怎么读取文件中的数据&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 火车头采集ai伪原创插件截图&#xff1a; python是一门非常火爆且相对易学的编程语言&#xff0c;应用在各种场景。许多人想学…

electron+vue3全家桶+vite项目搭建【13.1】ipc通信的使用,主进程与渲染进程之间的交互

文章目录 引入IPC通信[主/渲染]进程对应渲染进程>主进程代码测试测试效果 主进程>渲染进程代码测试测试效果 双向通信代码测试测试效果 引入 electron项目常常由一个主进程和多个渲染进程构成&#xff0c;渲染进程之间是隔离的&#xff0c;而所有渲染进程都和主进程共享…

vscode 格式问题

1、EditorConfig for VS Code 插件 shift alt f 格式化文件&#xff08;VS Code格式化按键&#xff09;&#xff0c;如下图&#xff0c;每个缩进4空格 代码如下 创建文件名 .editorconfig root true [*] charset utf-8 indent_style space indent_size 2 end_of_…

Docker 启动 Nacos 报错:No DataSource set

​ &#x1f468;&#x1f3fb;‍&#x1f4bb; 热爱摄影的程序员 &#x1f468;&#x1f3fb;‍&#x1f3a8; 喜欢编码的设计师 &#x1f9d5;&#x1f3fb; 擅长设计的剪辑师 &#x1f9d1;&#x1f3fb;‍&#x1f3eb; 一位高冷无情的编码爱好者 大家好&#xff0c;我是 …

Microsoft Message Queuing Denial-of-Service Vulnerability

近期官方公布了一个MSMQ的拒绝服务漏洞&#xff0c;可能因为网络安全设备的更新&#xff0c;影响业务&#xff0c;值得大家关注。 漏洞具体描述参见如下&#xff1a; Name: Microsoft Message Queuing Denial-of-Service Vulnerability Description: Microsoft Message Queuing…

java 版本企业招标投标管理系统源码+多个行业+tbms+及时准确+全程电子化tbms

​ 功能描述 1、门户管理&#xff1a;所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含&#xff1a;招标公告、非招标公告、系统通知、政策法规。 2、立项管理&#xff1a;企业用户可对需要采购的项目进行立项申请&#xff0c;并提交审批&#xff0c;查…

静态页面与动态页面的区别及部署jpress应用

简述静态网页和动态网页的区别 静态网页&#xff1a; 1、首先是静态网页&#xff0c;静态网页每个网页中都有一个固定的URL&#xff0c;网页URL以htm、HTML、jpg、.gif、.mp4等常见形式为后缀&#xff0c;而且不含有问号&#xff1b; 2、静态网页内容一经发布到网页服务器上…

Java 8:让你的代码更简洁、高效和灵活的新特性

Java 8 ——企业中使用最普遍的版本&#xff0c;那么了解它的新特性是非常有必要的 目录 一、函数式接口 二、Lamdba表达式 三、方法引用 四、Stream API 3.1 创建 方法一&#xff1a;通过集合 方法二&#xff1a;通过数组 方法三&#xff1a;通过Stream的of() 方法四…

《JeecgBoot系列》JeecgBoot(ant-design-vue) 识别字段中指定内容并修改该行文字颜色

JeecgBoot(ant-design-vue) 识别字段中指定内容并修改该行文字颜色 需求&#xff1a;将生产工厂是配件工厂的行改变颜色标注 一、修改table组件内容 在<a-table></a-table>内添加:rowClassName"tableRowClass" <a-table>...:rowClassName"t…

【Linux】从0到1实现一个进度条小程序

个人主页&#xff1a;&#x1f35d;在肯德基吃麻辣烫 我的gitee&#xff1a;gitee仓库 分享一句喜欢的话&#xff1a;热烈的火焰&#xff0c;冰封在最沉默的火山深处 文章目录 前言一、理解回车 \r 和换行 \n二、初步认识缓冲区1. 认识第一个函数&#xff1a;sleep2.观察缓冲区…

Hive终端命令行打印很多日志时,如何设置日志级别

示例&#xff1a;use test; 切换到test数据库时&#xff0c;输出很多日志信息不方便看结果&#xff0c;如下图。 解决方法&#xff1a; 退出hive命令行界面&#xff08;ctrlC&#xff09;执行“vi /usr/local/apache-hive-3.1.2-bin/conf/log4j.properties”命令&#xff0c;创…