Springboot 自定义线程池 ThreadPoolTaskExecutor

场景

假设 需要使用多线程清理es中的历史数据

知识

参数解释

 
  1. corePoolSize(核心线程数):线程池中的核心线程数量,即使线程池处于空闲状态,这些核心线程也不会被销毁。
  2. maximumPoolSize(最大线程数):线程池允许创建的最大线程数量。如果阻塞队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
  3. keepAliveTime(非核心线程的空闲时间):非核心线程等待keepAliveTime时间后还没有获取到任务就会自动销毁。
  4. unit(空闲时间单位)keepAliveTime的时间单位。
  5. workQueue(任务队列):用于保存等待执行的任务的阻塞队列。
  6. ThreadFactory(线程工厂):用于创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,后期方便定位问题。
  7. RejectedExecutionHandler(拒绝策略):当线程池中的线程达到maximumPoolSize,说明线程池处于饱和状态,此时仍然有任务提交过来,那么必须采取一种策略处理提交的新任务。

拒绝策略:当线程池中的线程达到最大数量,且任务队列已满时,需要采取一种策略来处理新提交的任务。ThreadPoolTaskExecutor提供了以下几种拒绝策略:

 
  1. AbortPolicy(默认拒绝策略):当实际线程数量大于维护队列中设定的数量时,将会触发拒绝任务的处理程序,它将抛出RejectedExecutionException
  2. DiscardPolicy:当实际线程数量大于维护队列中设定的数量时,新提交的任务将被静默丢弃。
  3. DiscardOldestPolicy:当实际线程数量大于维护队列中设定的数量时,队列中最老的任务将被静默丢弃,新任务将被放入队列。
  4. CallerRunsPolicy:当实际线程数量大于维护队列中设定的数量时,多出来的任务将由调用线程(主线程)处理。

实现方案

1.一个线程安全(保证时间段连贯)的参数生产方法(产生es 清理所需的时间段,索引,时间字段),定义一个外配置的json文件去做持久化记录时间段的位置(根据个人需要,完全可以摈弃)

2.一个es 清理的方法,我们用es的_delete_by_query 去处理

3.一个自定义线程池elasticsearchClearPool,定时任务去调用es 的清理方法 定时任务的线程为es-clear-main 清理的主线程,elasticsearchClearPool为es数据的清理的线程,

线程池的拒绝策略设置为:ThreadPoolExecutor.CallerRunsPolicy() (使用该策略保证清理时间的连贯性)

4.一个定时任务定时启动清理方法 使用fixedDelay  保证单一性

代码

只贴部分与文章相关的代码(存在冗余,使用按需改动即可)

自定义线程池配置


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @author sszdzq*/
@Configuration
public class ThreadPoolTaskConfig {@Bean(name = "elasticsearchClearPool")public Executor elasticsearchClearPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心线程池大小executor.setCorePoolSize(5);//最大线程数executor.setMaxPoolSize(9);//队列容量executor.setQueueCapacity(1);//活跃时间executor.setKeepAliveSeconds(60);//线程名字前缀executor.setThreadNamePrefix("es-clear-pool-");// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}}

 参数生成+清理方法


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import fri.bhlz.bean.BetweenTime;
import fri.bhlz.bean.EsParams;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;@Service
@Slf4j
public class ClearElasticSearchService {@Autowiredprivate RestTemplate restTemplate;@Value("${elasticsearch.index}")private String CLEAR_INDEX_FIELD;@Value("${elasticsearch.retention:90}")private String ES_DATA_RETENTION;@Value("${spring.elasticsearch.hostname}")private String HOST_NAME;@Value("${elasticsearch.clear:false}")private String clear;@Synchronizedpublic EsParams getClearParams(String indexFiled) {//读取jsonJSONObject js = cjrReadJson();String index = indexFiled.split(":")[0];String field = indexFiled.split(":")[1];String record = index + "-date";js = validJson(js, record);LocalDateTime st = js.getDate(record).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();LocalDateTime et = st.plusHours(1);EsParams es = new EsParams().setIndices(index).setClearTimeField(field);es.setClearStartTime(st);es.setClearEndTime(et);//持久化jsonjs.put(record, et.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));cjrWriteJson(js);log.debug("获取es时间:{} -> {}", es.getClearStartTime(), es.getClearEndTime());return es;}private JSONObject validJson(JSONObject js, String recordField) {js = ObjectUtils.isEmpty(js) ? new JSONObject() : js;if (!js.containsKey(recordField)) {js.put(recordField, LocalDateTime.now().minusYears(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}return js;}@Async("elasticsearchClearPool")public ResponseEntity<JSONObject> deleteByQuery(EsParams j) {log.debug("开始处理{} {}->{}", j.getIndices(), j.getClearStartTime(), j.getClearEndTime());//构建查询体SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//查询条件 时间范围RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(j.getClearTimeField());rangeQueryBuilder.lt(j.getClearEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rangeQueryBuilder.gte(j.getClearStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));searchSourceBuilder.postFilter(rangeQueryBuilder);searchSourceBuilder.toString();log.debug("请求体:{}", searchSourceBuilder);HttpHeaders httpHeaders = new HttpHeaders();HttpEntity httpEntity = new HttpEntity(searchSourceBuilder.toString(), httpHeaders);ResponseEntity<JSONObject> resp = restTemplate.postForEntity("http://" + HOST_NAME + "/" + j.getIndices() + "/_delete_by_query", httpEntity, JSONObject.class);if (resp.getStatusCode().equals(HttpStatus.OK)) {log.debug(resp.getBody().toString(SerializerFeature.DisableCircularReferenceDetect));log.info(" Elasticsearch清理 {} {}->{} {} {}", j.getIndices(), j.getClearStartTime(), j.getClearEndTime(), resp.getStatusCode().value(), resp.getBody().getString("deleted"));}return resp;}private void getBetweenTime(BetweenTime bt) {bt.setStartTime(bt.getStartTime().plusHours(1));bt.setEndTime(bt.getStartTime().plusHours(1));bt.setStartTimeStr(bt.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));bt.setEndTimeStr(bt.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}@Value("${clears.record-file}")private String recordPath;public JSONObject cjrReadJson() {createFile(recordPath);try (FileInputStream fis = new FileInputStream(recordPath)) {String jsonContent = new String(FileCopyUtils.copyToByteArray(fis), StandardCharsets.UTF_8);if (!StringUtils.hasText(jsonContent)) {return new JSONObject();}return JSON.parseObject(jsonContent);} catch (Exception e) {e.printStackTrace();return new JSONObject();}}public void cjrWriteJson(JSONObject j) {try (FileOutputStream fos = new FileOutputStream(recordPath)) {byte[] bytes = j.toString(SerializerFeature.PrettyFormat).getBytes(StandardCharsets.UTF_8);// 将字节数据写入文件FileCopyUtils.copy(bytes, fos);} catch (Exception e) {e.printStackTrace();}}private void createFile(String path) {try {File file = new File(path);if (!file.exists()) {file.createNewFile();}} catch (IOException e) {e.printStackTrace();}}
}

 定时任务

package fri.bhlz.schedule;import fri.bhlz.bean.EsParams;
import fri.bhlz.service.ClearElasticSearchService;
import fri.bhlz.service.ClearService;
import fri.bhlz.service.VerificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component
@Slf4j
public class ScheduleTask {@Autowiredprivate ClearService clearService;@Autowiredprivate VerificationService verificationService;@Value("${elasticsearch.index}")private String CLEAR_INDEX_FIELD;@Value("${elasticsearch.retention:90}")private String ES_DATA_RETENTION;@Value("${elasticsearch.clear:false}")private String clear;@Autowiredprivate ClearElasticSearchService clearElasticSearchService;@Scheduled(initialDelay = 1500, fixedDelay = 1000 * 60 * 5)public void clearEsData() {Thread.currentThread().setName("es-clear-main");if (!Boolean.valueOf(clear)) {return;}log.info("clear elasticsearch start");LocalDateTime endTime = LocalDateTime.now().minusDays(Integer.parseInt(ES_DATA_RETENTION));for (String indexFiled : CLEAR_INDEX_FIELD.split(",")) {LocalDateTime now = endTime.minusDays(1);while (now.isBefore(endTime)) {EsParams params = clearElasticSearchService.getClearParams(indexFiled);clearElasticSearchService.deleteByQuery(params);now = params.getClearEndTime();}}log.info("clear elasticsearch end");}
}
spring:elasticsearch:hostname: 127.0.0.1:9200
elasticsearch:clear: trueindex: index_aa:create_time,index_bb:createtime,index_cc:save_timeretention: 90

注意事项

调用时一定要保证非本类的方法直接调用@Async标注的方法,不然会不生效

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14962.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 获取内外SD卡路径

方法一&#xff1a;使用Environment类和反射 获取内置SD卡路径&#xff1a; 通过Environment.getExternalStorageDirectory().getAbsolutePath()或者System.getenv("EXTERNAL_STORAGE") 获取外置SD卡路径&#xff1a; String extSdcardPath System.getenv("SEC…

打造AI虚拟伴侣 - 优化方案

第一部分:框架优化概述 1、精确定位: 构建一个高度灵活且用户友好的平台,旨在通过无缝集成多种大型语言模型(LLMs)后端,为用户创造沉浸式的角色交互体验。不仅适配电脑端,还特别优化移动端体验,满足二次元AI虚拟伴侣市场的特定需求。 2、核心功能强化: 增强后端兼容…

一起学习大模型 - 从底层了解Token Embeddings的原理(1)

文章目录 前言1. Token Embeddings简介2. 实现原理2.1 步骤2.2 伪代码2.2.1 代码2.2.2 输出示例2.2.3 代码详细解释2.2.4 实际应用 3. 选用高维向量的好处是什么3.1 捕捉语义关系3.2 处理多义词3.3 提升模型性能3.4 平滑数据稀疏性 前言 大家在使用离线或在线的Token Embeddin…

每日练习之深度优先搜索——最大的湖

最大的湖 题目描述 运行代码 #include<iostream> using namespace std; bool mp[102][102]; int sum,num; int N,M,K; int dfs(int x,int y ) {if( mp[x][y] ){mp[x][y]0;sum;dfs(x1,y);dfs(x-1,y);dfs(x,y1);dfs(x,y-1);}return 0; } int main() {cin>>N>>…

【每日一题】52.20个机器学习问题 2 (模型部署、实践流程和应用问题)

在上一篇《20个机器学习问答题》中&#xff0c;问题主要围绕机器学习的基础概念和理论知识。 这次&#xff0c;本篇内容针对机器学习的实践和应用继续提出了20个不同的问题。【点击跳转原文】 在实际应用中&#xff0c;机器学习模型的建立流程是怎样的&#xff1f; 机器学习模…

使用delphi11编写一个基于xls作为数据库的照片展示程序

1、创建xls文档可以参考前一篇博客&#xff0c;并使用wps将文档保存为2003格式xls后缀。 2、在form上面放置adoconnection、adotable、datasource、spinedit、timer、checkbox、image、4个button组件。 image的设置&#xff1a; Image1.Align : alClient; Image1.Center : Tr…

2024年,企业的人才管理怎么做?这5点是关键!

当今时代&#xff0c;各行各业都面临着激烈的竞争。这些竞争归根结底都是人才的竞争。企业若想在竞争中掌握主动权&#xff0c;实现基业长青&#xff0c;就必须努力留住人才&#xff0c;并充分发挥他们的积极性、主动性和创造性。因此&#xff0c;做好人才管理是企业实现长期可…

如何找到docker的run(启动命令)

使用python三方库进行 需要安装python解释器 安装runlike安装包 pip3 install runlike 运行命令 runlike -p <container_name> # 后面可以是容器名和容器id&#xff0c;-p参数是显示自动换行实验 使用docker启动一个jenkins 启动命令为 docker run -d \ -p 9002:80…

机器学习 - 特征监控

特征监控的定义 特征监控是机器学习模型在生产环境中持续监控输入特征的过程&#xff0c;确保输入数据特征的分布和性质与模型训练时一致&#xff0c;从而保证模型在生产环境中的表现稳定和可靠。特征监控通过检测数据的漂移、变化和异常&#xff0c;帮助识别潜在的问题并采取…

无线领夹麦克风哪个品牌音质最好,揭秘无线领夹麦哪个牌子好用

​随着社交媒体和内容创作的兴起&#xff0c;清晰可靠的音频捕捉已成为打造高品质作品的关键要素。无线领夹麦克风因其轻巧设计和用户友好的接口而受到青睐&#xff0c;它能够确保你的声音在任何环境下都能被完美捕捉。经过精心测试和对比&#xff0c;以下几款无线领夹麦克风是…

Socket CAN中ctrlmode有哪些?

在Linux中,socketcan 的 ctrlmode 是一个用于配置CAN设备控制模式的标志字段。该字段的值由一组标志位组成,这些标志位控制CAN设备的各种操作模式。以下是一些常见的 ctrlmode 标志及其含义: CAN_CTRLMODE_LOOPBACK: 描述:启用回环模式。作用:设备在发送帧的同时会接收它…

大数据学习之安装并配置maven环境

什么是Maven Maven字面意&#xff1a;专家、内行Maven是一款自动化构建工具&#xff0c;专注服务于Java平台的项目构建和依赖管理。依赖管理&#xff1a;jar之间的依赖关系&#xff0c;jar包管理问题统称为依赖管理项目构建&#xff1a;项目构建不等同于项目创建 项目构建是一…

Linux服务器自动监听Web应用接口,未响应自动重启JAVA应用脚本

近期部署了一个多台负载的应用在linux服务器&#xff0c;但总有其中的某台服务器应用会出现假死&#xff0c;导致dubbo请求出现RPC调用失败。当然主要问题肯定是程序上的某些问题导致的。但无法快速定位排查&#xff0c;所以弄个脚本自动监听接口&#xff0c;当出现未响应&…

《NoSQL数据库技术与应用》 MongoDB副本集

《NoSQL数据库技术与应用》 教学设计 课程名称&#xff1a;NoSQL数据库技术与应用 授课年级&#xff1a; 20xx年级 授课学期&#xff1a; 20xx学年第一学期 教师姓名&#xff1a; 某某老师 2020年5月6日 课题 名称 第4章 MongoDB副本集 计划学时 8课时 内容 分析 独立模式可…

第四十四天 完全背包理论 | 518.零钱兑换||

1.dp[j]含义&#xff1a;容量为j的背包&#xff0c;有一些可重复放入的物品&#xff0c;放满这个背包的最大价值 完全背包中每个物品可以使用无数次&#xff1a;遍历背包时采用正序遍历 &#xff08;对于纯完全背包问题&#xff09;先遍历物品还是先遍历背包无所谓&#xff1…

C语言——⾼位优先与低位优先的不同之处是什么?

一、问题 C语⾔的最⼤特⾊就是可移植性好。根据机器类型的不同&#xff0c;⾼位优先与低位优先也不同。那么&#xff0c;最好的可移植的 C 程序应该同时适⽤这两种类型的计算机。下⾯了解⼀下⾼位优先与低位优先的不同之处。 二、解答 所谓的⾼位优先&#xff0c;就是最低的地…

GitHub的原理及应用详解(五)

本系列文章简介&#xff1a; GitHub是一个基于Git版本控制系统的代码托管平台&#xff0c;为开发者提供了一个方便的协作和版本管理的工具。它广泛应用于软件开发项目中&#xff0c;包括但不限于代码托管、协作开发、版本控制、错误追踪、持续集成等方面。 GitHub的原理可以简单…

使用docker-compose部署时序数据库InfluxDB1.8.4

背景 如今 InfluxDB 已经更新到了 2.x &#xff0c; InfluxDB 1.x 和 2.x 版本之间有几个主要的区别&#xff1a; 数据模型&#xff1a; 1.x&#xff1a;使用数据库和保留策略来组织数据。 2.x&#xff1a;引入了组织&#xff08;organizations&#xff09;和存储桶&#xff…

Mac | 关于 Mac 桌面文件无法显示

现象问题 电脑配置&#xff1a;MacBook Pro M1&#xff0c;系统 Ventura 13.6.7 最近在不知道是不是安装了什么软件&#xff0c;导致桌面上的文件看不到了&#xff0c;但是在访达里的桌面还是可以看到文件&#xff0c;而且开启台前调度的时候&#xff0c;也不会返回桌面了。检查…