场景
假设 需要使用多线程清理es中的历史数据
知识
参数解释:
- corePoolSize(核心线程数):线程池中的核心线程数量,即使线程池处于空闲状态,这些核心线程也不会被销毁。
- maximumPoolSize(最大线程数):线程池允许创建的最大线程数量。如果阻塞队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
- keepAliveTime(非核心线程的空闲时间):非核心线程等待
keepAliveTime
时间后还没有获取到任务就会自动销毁。- unit(空闲时间单位):
keepAliveTime
的时间单位。- workQueue(任务队列):用于保存等待执行的任务的阻塞队列。
- ThreadFactory(线程工厂):用于创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,后期方便定位问题。
- RejectedExecutionHandler(拒绝策略):当线程池中的线程达到
maximumPoolSize
,说明线程池处于饱和状态,此时仍然有任务提交过来,那么必须采取一种策略处理提交的新任务。拒绝策略:当线程池中的线程达到最大数量,且任务队列已满时,需要采取一种策略来处理新提交的任务。
ThreadPoolTaskExecutor
提供了以下几种拒绝策略:
- AbortPolicy(默认拒绝策略):当实际线程数量大于维护队列中设定的数量时,将会触发拒绝任务的处理程序,它将抛出
RejectedExecutionException
。- DiscardPolicy:当实际线程数量大于维护队列中设定的数量时,新提交的任务将被静默丢弃。
- DiscardOldestPolicy:当实际线程数量大于维护队列中设定的数量时,队列中最老的任务将被静默丢弃,新任务将被放入队列。
- CallerRunsPolicy:当实际线程数量大于维护队列中设定的数量时,多出来的任务将由调用线程(主线程)处理。
实现方案
1.一个线程安全(保证时间段连贯)的参数生产方法(产生es 清理所需的时间段,索引,时间字段),定义一个外配置的json文件去做持久化记录时间段的位置(根据个人需要,完全可以摈弃)
2.一个es 清理的方法,我们用es的_delete_by_query 去处理
3.一个自定义线程池elasticsearchClearPool,定时任务去调用es 的清理方法 定时任务的线程为es-clear-main 清理的主线程,elasticsearchClearPool为es数据的清理的线程,
线程池的拒绝策略设置为:ThreadPoolExecutor.CallerRunsPolicy() (使用该策略保证清理时间的连贯性)
4.一个定时任务定时启动清理方法 使用fixedDelay 保证单一性
代码
只贴部分与文章相关的代码(存在冗余,使用按需改动即可)
自定义线程池配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @author sszdzq*/
@Configuration
public class ThreadPoolTaskConfig {@Bean(name = "elasticsearchClearPool")public Executor elasticsearchClearPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//核心线程池大小executor.setCorePoolSize(5);//最大线程数executor.setMaxPoolSize(9);//队列容量executor.setQueueCapacity(1);//活跃时间executor.setKeepAliveSeconds(60);//线程名字前缀executor.setThreadNamePrefix("es-clear-pool-");// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}}
参数生成+清理方法
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import fri.bhlz.bean.BetweenTime;
import fri.bhlz.bean.EsParams;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;@Service
@Slf4j
public class ClearElasticSearchService {@Autowiredprivate RestTemplate restTemplate;@Value("${elasticsearch.index}")private String CLEAR_INDEX_FIELD;@Value("${elasticsearch.retention:90}")private String ES_DATA_RETENTION;@Value("${spring.elasticsearch.hostname}")private String HOST_NAME;@Value("${elasticsearch.clear:false}")private String clear;@Synchronizedpublic EsParams getClearParams(String indexFiled) {//读取jsonJSONObject js = cjrReadJson();String index = indexFiled.split(":")[0];String field = indexFiled.split(":")[1];String record = index + "-date";js = validJson(js, record);LocalDateTime st = js.getDate(record).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();LocalDateTime et = st.plusHours(1);EsParams es = new EsParams().setIndices(index).setClearTimeField(field);es.setClearStartTime(st);es.setClearEndTime(et);//持久化jsonjs.put(record, et.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));cjrWriteJson(js);log.debug("获取es时间:{} -> {}", es.getClearStartTime(), es.getClearEndTime());return es;}private JSONObject validJson(JSONObject js, String recordField) {js = ObjectUtils.isEmpty(js) ? new JSONObject() : js;if (!js.containsKey(recordField)) {js.put(recordField, LocalDateTime.now().minusYears(3).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}return js;}@Async("elasticsearchClearPool")public ResponseEntity<JSONObject> deleteByQuery(EsParams j) {log.debug("开始处理{} {}->{}", j.getIndices(), j.getClearStartTime(), j.getClearEndTime());//构建查询体SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//查询条件 时间范围RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(j.getClearTimeField());rangeQueryBuilder.lt(j.getClearEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));rangeQueryBuilder.gte(j.getClearStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));searchSourceBuilder.postFilter(rangeQueryBuilder);searchSourceBuilder.toString();log.debug("请求体:{}", searchSourceBuilder);HttpHeaders httpHeaders = new HttpHeaders();HttpEntity httpEntity = new HttpEntity(searchSourceBuilder.toString(), httpHeaders);ResponseEntity<JSONObject> resp = restTemplate.postForEntity("http://" + HOST_NAME + "/" + j.getIndices() + "/_delete_by_query", httpEntity, JSONObject.class);if (resp.getStatusCode().equals(HttpStatus.OK)) {log.debug(resp.getBody().toString(SerializerFeature.DisableCircularReferenceDetect));log.info(" Elasticsearch清理 {} {}->{} {} {}", j.getIndices(), j.getClearStartTime(), j.getClearEndTime(), resp.getStatusCode().value(), resp.getBody().getString("deleted"));}return resp;}private void getBetweenTime(BetweenTime bt) {bt.setStartTime(bt.getStartTime().plusHours(1));bt.setEndTime(bt.getStartTime().plusHours(1));bt.setStartTimeStr(bt.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));bt.setEndTimeStr(bt.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}@Value("${clears.record-file}")private String recordPath;public JSONObject cjrReadJson() {createFile(recordPath);try (FileInputStream fis = new FileInputStream(recordPath)) {String jsonContent = new String(FileCopyUtils.copyToByteArray(fis), StandardCharsets.UTF_8);if (!StringUtils.hasText(jsonContent)) {return new JSONObject();}return JSON.parseObject(jsonContent);} catch (Exception e) {e.printStackTrace();return new JSONObject();}}public void cjrWriteJson(JSONObject j) {try (FileOutputStream fos = new FileOutputStream(recordPath)) {byte[] bytes = j.toString(SerializerFeature.PrettyFormat).getBytes(StandardCharsets.UTF_8);// 将字节数据写入文件FileCopyUtils.copy(bytes, fos);} catch (Exception e) {e.printStackTrace();}}private void createFile(String path) {try {File file = new File(path);if (!file.exists()) {file.createNewFile();}} catch (IOException e) {e.printStackTrace();}}
}
定时任务
package fri.bhlz.schedule;import fri.bhlz.bean.EsParams;
import fri.bhlz.service.ClearElasticSearchService;
import fri.bhlz.service.ClearService;
import fri.bhlz.service.VerificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Component
@Slf4j
public class ScheduleTask {@Autowiredprivate ClearService clearService;@Autowiredprivate VerificationService verificationService;@Value("${elasticsearch.index}")private String CLEAR_INDEX_FIELD;@Value("${elasticsearch.retention:90}")private String ES_DATA_RETENTION;@Value("${elasticsearch.clear:false}")private String clear;@Autowiredprivate ClearElasticSearchService clearElasticSearchService;@Scheduled(initialDelay = 1500, fixedDelay = 1000 * 60 * 5)public void clearEsData() {Thread.currentThread().setName("es-clear-main");if (!Boolean.valueOf(clear)) {return;}log.info("clear elasticsearch start");LocalDateTime endTime = LocalDateTime.now().minusDays(Integer.parseInt(ES_DATA_RETENTION));for (String indexFiled : CLEAR_INDEX_FIELD.split(",")) {LocalDateTime now = endTime.minusDays(1);while (now.isBefore(endTime)) {EsParams params = clearElasticSearchService.getClearParams(indexFiled);clearElasticSearchService.deleteByQuery(params);now = params.getClearEndTime();}}log.info("clear elasticsearch end");}
}
spring:elasticsearch:hostname: 127.0.0.1:9200
elasticsearch:clear: trueindex: index_aa:create_time,index_bb:createtime,index_cc:save_timeretention: 90
注意事项
调用时一定要保证非本类的方法直接调用@Async标注的方法,不然会不生效