Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

在这里插入图片描述

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**

  • 清理ES历史数据定时任务
    */
    @Component
    public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**

    • 根据索引名称删除当前日期两个月前的那一天的历史文档数据
    • @param jobContext
      */
      @Scheduled
      public void cleanESHistoryData(JobContext jobContext) {
      // jobContext为定时任务中回传数据
      String indexName = jobContext.getData();
      if (StringUtils.isBlank(indexName)) {
      LOGGER.warn(“ES索引名称不能为空!”);
      return;
      }
      long startTimeMillis = System.currentTimeMillis();
      String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
      try {
      String startTimeStr = twoMonthsAgoDate + " 00:00:00";
      // 初始化时间,形如2023-08-06 00:00:00
      Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
      // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
      for (int i = 0; i < 24; i++) {
      Date startTime = initialStartTime;
      startTime = DateUtils.addHours(startTime, i);
      Date endTime = DateUtils.addHours(startTime, 1);
      LOGGER.info(“正在清理索引:[{}],时间:{} 至 {}的历史文档数据…”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
      long currentStartTimeMillis = System.currentTimeMillis();
      // 指定操作的索引库
      SearchRequest searchRequest = new SearchRequest(indexName);
      // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery(“createTimeStr.keyword”)
      .from(DateTool.format(startTime, DF_FULL))
      .to(DateTool.format(endTime, DF_FULL)))
      .size(1000);
      // 设置滚动查询结果在内存中的过期时间为1min
      Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
      // 将滚动以及构造的查询条件放入查询请求
      searchRequest.scroll(scroll).source(searchSourceBuilder);
      SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      // 记录要滚动的ID
      String scrollId = searchResponse.getScrollId();
      SearchHit[] hits = searchResponse.getHits().getHits();
      while (hits != null && hits.length > 0) {
      // 创建批量处理请求对象
      BulkRequest bulkRequest = new BulkRequest();
      for (SearchHit hit : hits) {
      DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
      bulkRequest.add(deleteRequest);
      }
      // 执行批量删除请求操作
      restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      // 构造滚动查询条件,继续滚动查询
      SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
      scrollRequest.scroll(scroll);
      searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
      scrollId = searchResponse.getScrollId();
      hits = searchResponse.getHits().getHits();
      }
      // 当前滚动查询结束,清除滚动,释放服务器内存资源
      ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
      clearScrollRequest.addScrollId(scrollId);
      restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
      LOGGER.info(“清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
      }
      LOGGER.info(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
      } catch (Exception e) {
      LOGGER.error(String.format(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
      }
      }
      }

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

在这里插入图片描述

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/49975.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyBatid动态语句且模糊查询

目录 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f; MyBatis常用的动态标签和表达式 if标签 Choose标签 where标签 MyBatis模糊查询 #与$的区别 ​编辑 MyBatis映射 resultType resultMap 什么是MyBtais动态语句&#xff1f;&#xff1f;&#xff1f;…

视频转音频mp3怎么弄?

视频转音频mp3怎么弄&#xff1f;在很多人看来&#xff0c;音频就是视频中的一部分&#xff0c;其实这时是一定道理的&#xff0c;视频是一种包含图像和有声音的多媒体文件&#xff0c;没有声音的视频是不完美的。时代发展到现在&#xff0c;短视频已经融入了我们生活的方方面面…

【日常积累】Linux中vi/vim的使用

概述 vim是由vi发展演变过来的文本编辑器&#xff0c;因其具有语法高亮显示、多视窗编辑、代码折叠、支持插件等功能&#xff0c;由于其功能相比vi来说更加强大&#xff0c;所以在实际工作中的使用更加广泛。 vim工作模式 Vim具有多种工作模式&#xff0c;常用的工作模式有&…

微服务架构2.0--云原生时代

云原生 云原生&#xff08;Cloud Native&#xff09;是一种关注于在云环境中构建、部署和管理应用程序的方法和理念。云原生应用能够最大程度地利用云计算基础设施的优势&#xff0c;如弹性、自动化、可伸缩性和高可用性。这个概念涵盖了许多方面&#xff0c;包括架构、开发、…

Prometheus+Grafana+AlertManager监控Linux主机状态

文章目录 PrometheusGrafanaAlertManager监控平台搭建开始监控Grafana连接Prometheus数据源导入Grafana模板监控Linux主机状态 同系列文章 PrometheusGrafanaAlertManager监控平台搭建 Docker搭建并配置Prometheus Docker拉取并配置Grafana Docker安装并配置Node-Exporter …

Python系统学习1-9-类三之特征

一、封装 数据角度&#xff1a;将一些基本数据类型复合成一个自定义类型。 优势&#xff1a;将数据与对数据的操作相关联。 代码可读性更高&#xff08;类是对象的模板&#xff09;。 行为角度&#xff1a;向类外提供必要的功能&#xff0c;隐藏实现的细节。 优势&#xff…

【VRTK4.0运动专题】手柄控制物体移动和旋转

文章目录 原理预设体将两轴转化为位置向量或角度后&#xff0c;调用运动脚本的方法&#xff0c;对指定的物体进行移动或旋转 步骤1、将轴转化为位置向量或角度&#xff1a; 建轴转化预设体&#xff0c;关联两轴&#xff0c;2、准备带有要用方法的运动脚本&#xff1a; 建功能物…

在线图片怎么转换成PDF?在线图片转换成PDF步骤介绍

文件格式要转化不知道怎么办?想要网上下载文件格式转换软件&#xff0c;但是却不知道下载哪个好?今天小编小编就给大家分享一下靠谱的小圆象PDF转换器工具&#xff0c;想知道这款软件好不好用?在线图片怎么转换成PDF?那就进来看看吧。 在线图片怎么转换成PDF 小圆象PDF转换…

requests模板成功下载,但是不能在pycharm中运行

在做实验的过程中&#xff0c;需要用到requests&#xff0c;但是在pycharm中成功下载&#xff0c;仍然无法使用&#xff0c;找了很久&#xff0c;解决方法如下&#xff1a; 进入win中的命令提示符 下载requests模块 pip install requests输入python显示你的python的基本信息&…

什么是软件压力测试?软件压力测试工具和流程有哪些?

软件压力测试 一、含义&#xff1a;软件压力测试是一种测试应用程序性能的方法&#xff0c;通过模拟大量用户并发访问&#xff0c;测试应用程序在压力情况下的表现和响应能力。软件压力测试的目的是发现系统潜在的问题&#xff0c;如内存泄漏、线程锁、资源泄漏等&#xff0c;…

基于灵动微MM32F3270微控制器的监护仪

监护仪是各类医用电子仪器中应用极为普遍的一种。监护仪不仅可以提高护理工作的效率&#xff0c;更重要的是&#xff0c;它为更全面、更准确的掌握患者病情&#xff0c;提高医疗服务质量提供了更可靠的保障。 基于灵动微MM32F3270微控制器的监护仪&#xff1a; -信号采集&…

Fabric.js 元素选中状态的事件与样式

本文简介 带尬猴&#xff01; 你是否在使用 Fabric.js 时希望能在选中元素后自定义元素样式或选框&#xff08;控制角和辅助线&#xff09;的样式&#xff1f; 如果是的话&#xff0c;可以放心往下读。 本文将手把脚和你一起过一遍 Fabric.js 在对象元素选中后常用的样式设置…

Java IO流(五)Netty实战[TCP|Http|心跳检测|Websocket]

Netty入门代码示例(基于TCP服务) Server端 package com.bierce.io.netty.simple; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGro…

激活函数总结(十七):激活函数补充(PELU、Phish)

激活函数总结&#xff08;十七&#xff09;&#xff1a;激活函数补充 1 引言2 激活函数2.1 Parametric Exponential Linear Unit&#xff08;PELU&#xff09;激活函数2.2 Phish激活函数 3. 总结 1 引言 在前面的文章中已经介绍了介绍了一系列激活函数 (Sigmoid、Tanh、ReLU、…

自平衡性:保持数据结构稳定的关键

自平衡性是一种重要的数据结构属性&#xff0c;它确保在执行插入、删除等操作后&#xff0c;数据结构能够自动进行调整&#xff0c;以保持整体的平衡状态。平衡的数据结构可以提供更快的操作性能&#xff0c;避免极端情况下的低效操作&#xff0c;同时保持树或其他结构的整体稳…

Idea Maven 构建,运行Java程序,二次开发Jmeter

Idea Maven 构建 1. maven下载2. Idea 配置3. 配置Maven镜像4. 在Maven项目pom.xml中添加依赖5. 创建jar包&#xff0c;更新pom&#xff0c;执行代码 1. maven下载 【官网】https://maven.apache.org/download.cgi 【其他版本】https://dlcdn.apache.org/maven/maven-3/ 2. …

KubeSphere 社区双周报 | Java functions framework 支持 SkyWalking | 2023.8.4-8.17

KubeSphere 社区双周报主要整理展示新增的贡献者名单和证书、新增的讲师证书以及两周内提交过 commit 的贡献者&#xff0c;并对近期重要的 PR 进行解析&#xff0c;同时还包含了线上/线下活动和布道推广等一系列社区动态。 本次双周报涵盖时间为&#xff1a;2023.08.04-2023.…

Vant 4.6.4发布,增加了一些新功能,并修复了一些bug

导读Vant 4.6.4发布,增加了一些新功能&#xff0c;并修复了一些bug等。 新功能 feat(area-data): 更新芜湖的县区数据&#xff0c;由 nivin-studio 在 #12122 中贡献feat(Locale): 添加塞尔维亚语到国际化&#xff0c;由 RogerZXY 在 #12145 中贡献feat(ImagePreview): 添加 c…

matlab使用教程(22)—非线性优化函数的设置

1.设置优化选项 可以使用由 optimset 函数创建的 options 结构体来指定优化参数。然后&#xff0c;可以将 options 作为输入传递给优化函数&#xff0c;例如&#xff0c;通过使用以下语法调用 fminbnd x fminbnd(fun,x1,x2,options) 或使用以下语法调用 fminsearch x f…

C#与西门子PLC1500的ModbusTcp服务器通信4--搭建ModbusTcp客户端

1、客户端选择 客户端可以是一个程序或一个设备&#xff0c;这里我以C#WINFORM程序来实现客户机与PLC的Modbustcp服务器通信&#xff0c;开发环境是VS2019&#xff0c;.NET Framework版本是4.7.2 2、创建winform程序 3、引入Nmodbus4协议 找到项目&#xff0c;找到引用&…