1、pom引用
注:jackson包es只用到了databind,之所以全部引用是因为actuator用到了其他,只升级一个会 导致版本冲突
<!-- https://mvnrepository.com/artifact/co.elastic.clients/elasticsearch-java -->
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.8.2</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.15.2</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.15.2</version>
</dependency>
<dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.1.2</version>
</dependency>
2、配置文件
注:因为没有用springboot自身的es插件所以健康检查检测不到es状态,关闭es检测
elasticsearch:host: 10.247.149.67port: 9200userName: elasticpassword: Zlens@2023
management:health:elasticsearch:enabled: false
3、连接池
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.alibaba.cloud.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticSearchConfig {@Value("${elasticsearch.host}")private String host;@Value("${elasticsearch.port}")private Integer port;@Value("${elasticsearch.apiKey:}")private String apiKey;@Value("${elasticsearch.userName}")private String userName;@Value("${elasticsearch.password}")private String password;@Beanpublic ElasticsearchClient elasticsearchClient() {ElasticsearchTransport transport;RestClient restClient;if(StringUtils.isNotBlank(apiKey)){//apiKey验证模式restClient = RestClient.builder(new HttpHost(host, port)).setDefaultHeaders(new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKey)}).build();}else{final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();// 账号密码验证模式credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));restClient = RestClient.builder(new HttpHost(host, port)).setHttpClientConfigCallback(httpClientBuilder ->httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)).build();}transport = new RestClientTransport(restClient, new JacksonJsonpMapper());ElasticsearchClient client = new ElasticsearchClient(transport);System.out.println("es初始化完成");return client;}
}
4、操作类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.DateProperty;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Service
@Log4j2
public class ElasticsearchService<T> {@Resourceprivate ElasticsearchClient elasticsearchClient;/*** 判断索引是否存在.** @param indexName index名称*/public boolean existIndex(String indexName) {try {BooleanResponse booleanResponse = elasticsearchClient.indices().exists(e -> e.index(indexName));return booleanResponse.value();} catch (IOException e) {log.error("向es中检测索引【{}】出错,错误信息为:{}", indexName, e.getMessage());throw new RuntimeException("向es中检测索引【"+indexName+"】出错");}}/*** 创建索引.** @param indexName index名称*/public void createIndex(String indexName) {try {Map<String, Property> documentMap = new HashMap<>();documentMap.put("createTime", Property.of(property ->property.date(DateProperty.of(dateProperty ->dateProperty.index(true).format("epoch_millis"))//textProperty.index(true)))));elasticsearchClient.indices().create(c -> c.index(indexName).mappings(mappings ->mappings.properties(documentMap)));} catch (IOException e) {log.error("向es中创建索引【{}】出错,错误信息为:{}", indexName, e.getMessage());}}/*** 添加记录.**/public void addDocument(T data, String indexName) {try {if (!this.existIndex(indexName)) {this.createIndex(indexName);}elasticsearchClient.index(i -> i.index(indexName).id(null).document(data));} catch (IOException e) {log.error("向es中添加document出错:{}", e.getMessage());}}/*** 批量添加.** @param dataList 添加的数量集合* @param indexName indexName*/public void batchAddDocument(List<T> dataList, String indexName) {if (!this.existIndex(indexName)) {this.createIndex(indexName);}BulkRequest.Builder br = new BulkRequest.Builder();dataList.forEach(data -> br.operations(op -> op.index(idx -> idx.index(indexName).id(null).document(data))));try {BulkResponse result = elasticsearchClient.bulk(br.build());if (result.errors()) {log.error("Bulk had errors");for (BulkResponseItem item : result.items()) {if (item.error() != null) {log.error(item.error().reason());}}}} catch (IOException e) {log.error("向es中添加document出错:{}",e.getMessage());}}/*** 根据索引名称和字段查询数据.** @param indexName 索引名称* @param filedValue 查询字段值* @param filedName 查询字段名称*/public List<T> findDataList(String indexName, String filedName, String filedValue,Class<T> className) {try {SearchResponse<T> searchResponse = elasticsearchClient.search(s -> s.index(indexName).query(q -> q.match(t -> t.field(filedName).query(filedValue))),className);List<Hit<T>> hitList = searchResponse.hits().hits();List<T> dataList = new ArrayList<>();for (Hit<T> mapHit : hitList) {dataList.add(mapHit.source());}return dataList;} catch (IOException e) {log.error("【查询 -> 失败】从es中查询分析后的日志出错,错误信息为:{}", e.getMessage());}return null;}
}
上边创建索引是定制的加了特殊mapping,正常这样
/*** 创建索引.** @param indexName index名称*/public void createIndex(String indexName) {try {elasticsearchClient.indices().create(c -> c.index(indexName));} catch (IOException e) {log.error("向es中创建索引【{}】出错,错误信息为:{}", indexName, e.getMessage());}}