文章目录
- 版本及环境
- 1 Maven依赖
- 2 创建索引并插入单条数据
- 3 打印所有创建的索引的名称
- 4 查询索引中数据
- 5 删除索引
- 6 创建索引,并批量插入本地csv数据
- 7 查看索引中的前10条数据
版本及环境
windows 11
ElasticSearch 5.6.2
Idea 2020
请注意,5.6.2是已经停止维护的版本,官方文档好多已经404,transport client
客户端在6.x的版本中被官方弃用,但是现在仍然有老服务在用。最新的elasticsearch 7.x和8.x不再支持jdk1.8,但是我的服务器在2023年仍不能升级高版本java,所以只能学习老版本。
1 Maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>ES_Client</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>5.6.2</version></dependency><!-- <dependency>-->
<!-- <groupId>org.elasticsearch.client</groupId>-->
<!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!-- <version>5.6.2</version>-->
<!-- </dependency>--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-csv</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>compile</scope></dependency></dependencies>
</project>
2 创建索引并插入单条数据
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;public class IndexCreated {public static void main(String[] args) {try {// 设置集群名称Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();// 创建 TransportClienttry (TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 准备索引数据String jsonDocument = "{" +"\"user\":\"Marry Doe\"," +"\"postDate\":\"2023-12-01\"," +"\"message\":\"Hello, Index!\"" +"}";// 创建索引请求IndexResponse response = client.prepareIndex("twitter", "tweet", "2").setSource(jsonDocument).get();// 输出索引结果System.out.println("Index created: " + response.getId());}} catch (UnknownHostException e) {e.printStackTrace();}}
}
2 删除索引
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;public class DeleteIndexExample {public static void main(String[] args) {try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 删除索引请求DeleteIndexRequest request = new DeleteIndexRequest("geo_index");// 执行删除索引请求DeleteIndexResponse response = client.admin().indices().delete(request).actionGet();// 打印删除索引响应System.out.println("Index deleted: " + response.isAcknowledged());} catch (UnknownHostException e) {e.printStackTrace();}}
}
3 打印所有创建的索引的名称
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;public class ListAllIndicesExample {public static void main(String[] args) {try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 获取所有索引请求GetIndexRequest request = new GetIndexRequest();// 执行获取所有索引请求GetIndexResponse response = client.admin().indices().getIndex(request).actionGet();// 获取所有索引的名称String[] indices = response.getIndices();// 打印所有索引的名称System.out.println("All Indices:");for (String index : indices) {System.out.println(index);// 获取索引的设置try {Settings indexSettings = client.admin().indices().prepareGetSettings(index).get().getIndexToSettings().get(index);System.out.println("Settings: " + indexSettings);} catch (Exception e) {if (e.getMessage().contains("Index not exist")) {System.out.println("Index not found: " + index);} else {e.printStackTrace();}}// 获取索引的映射try {GetMappingsResponse mappingsResponse = client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).get();ImmutableOpenMap<String, MappingMetaData> indexMappings = mappingsResponse.mappings().get(index);System.out.println("Mappings: " + indexMappings);} catch (Exception e) {if (e.getMessage().contains("Index not exist")) {System.out.println("Index not found: " + index);} else {e.printStackTrace();}}System.out.println();}} catch (UnknownHostException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}
}
4 查询索引中数据
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;public class QueryIndexExample {public static void main(String[] args) {try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 准备查询请求SearchRequestBuilder searchRequest = client.prepareSearch("twitter").setQuery(QueryBuilders.matchAllQuery());// 执行查询请求SearchResponse searchResponse = searchRequest.get();// 处理查询结果System.out.println("Search results:");System.out.println(searchResponse.toString());} catch (UnknownHostException e) {e.printStackTrace();}}
}
5 删除索引
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;public class DeleteIndexExample {public static void main(String[] args) {try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 删除索引请求DeleteIndexRequest request = new DeleteIndexRequest("geo_index");// 执行删除索引请求DeleteIndexResponse response = client.admin().indices().delete(request).actionGet();// 打印删除索引响应System.out.println("Index deleted: " + response.isAcknowledged());} catch (UnknownHostException e) {e.printStackTrace();}}
}
6 创建索引,并批量插入本地csv数据
将以下数据存入前文创建的索引geo_index
,csv文件一共20万行。
以下为数据示例,第二列为地理数据。每行第二列的 loc 字段包含包围符 "
,并且在字段中包含逗号 ,
,我们需要更复杂的逻辑来解析 CSV 文件。以下使用 Apache Commons CSV 库来处理包含引号和逗号的 CSV 数据:
12345678901234567890,"23.123456,113.123456"
23456789012345678901,"24.234567,114.234567"
34567890123456789012,"25.345678,115.345678"
45678901234567890123,"26.456789,116.456789"
代码假设csv文件位置在"D:\geo_clean.csv"
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.io.FileReader;
import java.net.InetAddress;public class BulkInsertCsvToElasticsearch {public static void main(String[] args) {try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 读取 CSV 文件String csvFile = "D:\\geo_clean.csv";CSVParser csvParser = CSVParser.parse(new FileReader(csvFile), CSVFormat.DEFAULT);// 创建批量请求构建器BulkRequestBuilder bulkRequest = client.prepareBulk();// 读取 CSV 文件的每一行for (CSVRecord csvRecord : csvParser) {// 获取 CSV 数据中的两列String did = csvRecord.get(0);String loc = csvRecord.get(1);// 创建索引请求IndexRequest indexRequest = client.prepareIndex("geo_index", "doc").setSource(XContentFactory.jsonBuilder().startObject().field("did", did).field("loc", loc).endObject()).request();// 添加索引请求到批量请求bulkRequest.add(indexRequest);}// 执行批量请求BulkResponse bulkResponse = bulkRequest.get();// 处理批量响应if (bulkResponse.hasFailures()) {System.out.println("Bulk request failed with failures: " + bulkResponse.buildFailureMessage());} else {System.out.println("Bulk request successfully processed.");}} catch (Exception e) {e.printStackTrace();}}
}
7 查看索引中的前10条数据
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;public class QueryGeoIndexExample {public static void main(String[] args) {try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {// 构建搜索请求SearchRequest searchRequest = new SearchRequest("geo_index");// 构建搜索源SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.query(QueryBuilders.matchAllQuery());searchSourceBuilder.size(10); // 限制返回结果的数量// 设置搜索源searchRequest.source(searchSourceBuilder);// 执行搜索请求SearchResponse searchResponse = client.search(searchRequest).actionGet();// 打印搜索结果System.out.println("Search Results:");System.out.println(searchResponse.toString());} catch (UnknownHostException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}
}