本文章内容根据 Elastic Search Java API Client 7.17 版本官方文档 内容翻译而来,用于方便后续学习翻阅
序言
这是适用于 Elasticsearch 的官方 Java API Client 文档。该客户端针对所有 Elasticsearch API 提供强类型的请求和响应。
功能
- 所有 Elasticsearch API 的强类型请求和响应:为每个 API 的请求与响应赋予明确类型,确保数据的准确性与一致性。
- 所有 API 的阻塞式和异步式版本:开发者既可选择阻塞式操作,以顺序执行任务;也可选择异步式操作,实现非阻塞的高效处理。
- 使用流畅的构建器与函数式模式:在创建复杂嵌套结构时,能让代码简洁易读,提高开发效率。
- 通过对象映射器无缝集成应用类:如使用 Jackson 或任何 JSON-B 实现,方便将应用类与客户端进行整合。
- 将协议处理委托给 HTTP 客户端:例如 Java Low Level REST Client,它负责处理所有传输层相关事务,如 HTTP 连接池管理、重试机制、节点发现等。
Elasticsearch 服务器兼容性策略
Elasticsearch Java 客户端具有向前兼容性,这意味着客户端支持与更大或同等小版本的Elasticsearch 通信。Elasticsearch 语言客户端仅对默认发行版提供向后兼容性,且不做任何保证。
起步
安装
要求:
- Java 8 或更高版本:这意味着你需要在系统上安装 Java 运行环境,且版本必须是 8 及其后续版本,因为该 Java API Client 是基于此 Java 版本及以上开发和运行的,它可能使用了 Java 8 及其后续版本的新特性和改进,以提供更好的性能和功能。
- 一个 JSON 对象映射库:其目的是使你的应用程序类与 Elasticsearch API 实现无缝集成。Java 客户端支持使用 Jackson 或者像 Eclipse Yasson 这样的 JSON-B 库。JSON 对象映射库在将 Java 对象与 JSON 数据进行相互转换时发挥着关键作用,这对于与 Elasticsearch 这样以 JSON 作为数据传输和存储格式的系统进行交互至关重要。使用 Jackson 或 JSON-B 库,可以方便地将 Java 对象序列化为 JSON 数据发送给 Elasticsearch,也可以将从 Elasticsearch 接收到的 JSON 数据反序列化为 Java 对象,以便在 Java 程序中处理。
发布版本:
该软件的发布版本托管在 Maven Central 上。如果你需要一个快照(SNAPSHOT)版本,可以在以下地址找到 Elastic Maven 快照存储库:https://snapshots.elastic.co/maven/。Maven Central 是一个广泛使用的 Maven 仓库,包含了大量开源项目的发布版本,开发人员可以通过 Maven 构建工具轻松从这里获取所需的依赖。而快照版本通常是正在开发中的不稳定版本,存储在 Elastic Maven 快照存储库中,供开发者提前测试新功能或获取最新开发进展,但要注意其可能存在的不稳定性。
在 Gradle 项目中使用 Jackson 进行安装。
dependencies {implementation 'co.elastic.clients:elasticsearch-java:7.17.27'implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
}
在 Maven 项目中使用 Jackson 进行安装。
<project><dependencies><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>7.17.27</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.17.0</version></dependency></dependencies>
</project>
如果遇到 ClassNotFoundException:jakarta. json.spi.JsonProvider
设置依赖项后,您的应用程序可能会失败,
ClassNotFoundException:jakarta. json.spi.JsonProvider
。
如果发生这种情况,您必须显式添加 jakarta. json:jakarta.json-api:2.0.1
依赖项。
Gradle:
dependencies {...implementation 'jakarta.json:jakarta.json-api:2.0.1'
}
Maven:
<project><dependencies>...<dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.0.1</version></dependency></dependencies>
</project>
为什么需要这个?
一些框架,如Spring Boot或Helidon,带有Gradle和Maven插件或Maven BOM文件,以简化开发和依赖管理。这些插件和BOM定义了许多知名库使用的版本。
其中一个库可以是
jakarta. json:json-api
,它定义了标准JavaJSON API。在1.x
版中,这个库使用javax.json
包,而在2.x
版中,它在从JavaEE过渡到JakartaEE
后使用jakarta.json
包。JavaAPI Client依赖于该库的版本
2.0.1
,以便使用更新且面向未来的jakarta. json包。但是一些构建插件和BOM覆盖了 JavaAPI Client 的依赖项,以在旧的javax.json
命名空间中使用版本1.x
,导致ClassNotFoundException:jakarta.json.spi.JsonProvider
。
添加正确的版本作为 顶级项目(top-level project)依赖项可以解决问题。
如果您的应用程序还需要
javax. json
,您可以添加javax.json:javax.json-api:1.1.4
依赖项,相当于jakarta.json:jakarta.json-api:1.1.6
。
连接
Java API 客户端围绕三个主要组件构建:
- API 客户端类。这些类为 Elasticsearch API 提供强类型数据结构和方法。由于 Elasticsearch API 规模较大,它按功能组(也称为“命名空间”)进行结构化,每个功能组都有自己的客户端类。Elasticsearch 的核心功能在
ElasticsearchClient
类中实现。 - JSON对象映射器。这将你的应用程序类映射为 JSON,并与 API 客户端无缝集成。
- 传输层实现。这是所有 HTTP 请求处理发生的地方。
以下代码片段创建并连接这三个组件:
// Create the low-level client
// 创建低级别客户端
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();// Create the transport with a Jackson mapper
// 使用 Jackson 映射器创建传输
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());// And create the API client
// 创建低级别客户端
ElasticsearchClient esClient = new ElasticsearchClient(transport);// Use the client...
// 使用客户端....// Close the client, also closing the underlying transport object and network connections.
// 关闭客户端,同时关闭底层传输对象和网络连接。
esClient.close();
身份验证由 Java 低级别 REST 客户端管理。有关配置身份验证的更多详细信息,请参阅其留档。
创建首个请求
下面的代码片段表示从“product”索引中搜索所有name 字段与 “bicycle” 匹配的值,并将它们作为 Product
类的实例返回。
该示例使用流畅的函数构建器将搜索查询编写为简洁的 DSL-like 代码。API约定中会更加详细地解释这种模式。
SearchResponse<Product> search = esClient.search(s -> s.index("products").query(q -> q.term(t -> t.field("name").value(v -> v.stringValue("bicycle")))),Product.class);for (Hit<Product> hit: search.hits().hits()) {processProduct(hit.source());
}
上面示例的源代码可以在 JavaAPI 客户端测试中找到。
从高级 REST 客户端迁移。
ElasticsearchJavaAPI客户端是一个全新的客户端库,与旧的高级休息客户端(HLRC)没有关系。这是一个深思熟虑的选择,旨在提供一个独立于Elasticsearch服务器代码的库,并为所有Elasticsearch功能提供一个非常一致且更易于使用的API。
从 HLRC 迁移需要在您的应用程序中重写一些代码。然而,这种转换可以逐步发生,因为两个客户端库可以在一个应用程序中共存,而没有操作开销。
兼容模式:使用带有 Elasticsearch 8. x 的 7.17 客户端
通过启用 HLRC 的兼容模式(参见下面的代码示例),HLRC 版本 7.17
可以与 Elasticsearch 版本 8. x
一起使用。在这种模式下,HLRC 发送额外的标头来指示 Elasticsearch 8.x
像 7.x
服务器一样运行。
Java API 客户端不需要此设置,因为兼容模式始终处于启用状态。
使用具有 HLRC 和 JavaAPI 客户端的 http 客户端
为了避免在过渡阶段(应用程序同时使用HLRC和新的JavaAPI客户端)的任何操作开销,两个客户端可以共享相同的LOW Level Rest 客户端,这是管理所有连接、循环策略、节点嗅探等的网络层。
以下代码展示如何使用同一个 HTTP 客户端初始化两个客户端。
// Create the low-level client
// 创建 low-level 客户端
RestClient httpClient = RestClient.builder(new HttpHost("localhost", 9200)
).build();// Create the HLRC
// 创建HLRC
RestHighLevelClient hlrc = new RestHighLevelClientBuilder(httpClient).setApiCompatibilityMode(true) .build();// Create the Java API Client with the same low level client
// 使用相同的low-level客户端创建 Java API 客户端。
ElasticsearchTransport transport = new RestClientTransport(httpClient,new JacksonJsonpMapper()
);ElasticsearchClient esClient = new ElasticsearchClient(transport);// hlrc and esClient share the same httpClient
// hlrc 和 esClient 共享同一个 httpClient
启用允许 HLRC 7.17 与 Elasticsearch 8. x 一起使用的兼容模式。
转型策略
您可以通过不同的方式开始在应用程序代码中从HLRC转换。
例如:
- 保持现有代码不变,并为应用程序中的新功能使用新的Java API Client,然后再迁移现有代码。
- 重写应用程序中新的JavaAPI客户端比HLRC更容易使用的部分,比如与搜索相关的所有内容,
- 通过利用新的JavaAPI Client与JSON对象映射器的紧密集成,重写需要将应用程序对象映射到JSON或从JSON映射的部分。
API 约定
Java API Client使用非常一致的代码结构,使用现代代码模式,使复杂的请求更容易编写,复杂的响应更容易处理。下面的部分将详细解释这些。
包结构和命名空间客户端
ElasticSearch API很大,并且被组织成功能组,如ElasticSearch API留档中所示。
JavaAPI 客户端遵循这种结构:功能组称为 “命名空间”,每个命名空间都位于co. elastic.client.elasticsearch
的子包中。
每个命名空间客户端都可以从顶级 Elasticsearch 客户端访问。唯一的例外是 “搜索” 和 “文档” API,它们位于核心子包中,可以在主要的 Elasticsearch 客户端对象上访问。
下面的片段展示了如何使用indices命名空间客户端创建索引(lambda语法在构建API对象中解释):
// Create the "products" index
// 创建 “products” 索引
ElasticsearchClient client = ...
client.indices().create(c -> c.index("products"));
命名空间客户端是非常轻量级的对象,可以动态创建。
方法命名约定
Java API 客户端中的类包含两种方法和属性。
-
作为 API 一部分的方法和属性,例如
ElasticsearchClient.search()
或SearchResponse. maxScore()
。它们是从Elasticsearch JSON API中使用标准的JavacamelCaseNaming
约定的各自名称派生而来的。 -
构建Java API Client的框架的部分方法和属性,例如
Query._kind()
。这些方法和属性以下划线作为前缀,以避免与 API 名称的任何命名冲突,并作为区分 API 和框架的简单方法。
阻塞式和异步式客户端
API 客户端有两种类型:阻塞式和异步式。异步客户端上的所有方法都返回标准的 CompletableFuture
。
两种风格可以根据你的需求同时使用,共享同一个传输对象。
ElasticsearchTransport transport = ...// Synchronous blocking client
// 同步阻塞客户端
ElasticsearchClient client = new ElasticsearchClient(transport);if (client.exists(b -> b.index("products").id("foo")).value()) {logger.info("product exists");
}// Asynchronous non-blocking client
// 异步非阻塞客户端
ElasticsearchAsyncClient asyncClient =new ElasticsearchAsyncClient(transport);asyncClient.exists(b -> b.index("products").id("foo")).whenComplete((response, exception) -> {if (exception != null) {logger.error("Failed to index", exception);} else {logger.info("Product exists");}});
虽然我们不会在 Java 中深入介绍异步编程,但请记住处理异步任务的失败。很容易忽略它们并忽略错误。
上面示例的源代码可以在 JavaAPI 客户端测试 中找到。
构建 API 对象
构建对象
Java API Client中的所有数据类型都是不可变的。对象创建使用2008年在Effective Java中普及的构建器模式。
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices().create(new CreateIndexRequest.Builder().index("my-index").aliases("foo",new Alias.Builder().isWriteIndex(true).build()).build()
);
请注意,构建器在调用其 build()
方法后不应被重用。
生成器 lambda 表达式
尽管这代码能正常运行,但必须实例化构建器类并调用build()
方法有点冗长。因此,Java API 客户端中的每个属性设置器也接受一个lambda表达式,该表达式将新创建的构建器作为参数并返回一个已填充的构建器。上面的片段也可以写成:
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices().create(createIndexBuilder -> createIndexBuilder.index("my-index").aliases("foo", aliasBuilder -> aliasBuilder.isWriteIndex(true)));
这种方法允许更简洁的代码,并且还避免导入类(甚至记住它们的名称),因为类型是从方法参数签名中推断出来的。
请注意,在上面的示例中,构建器变量仅用于启动属性设置器链。因此,这些变量的名称并不重要,可以缩短以提高易读性:
ElasticsearchClient client = ...
CreateIndexResponse createResponse = client.indices().create(c -> c.index("my-index").aliases("foo", a -> a.isWriteIndex(true)));
构建 lambda 表达式对于复杂的嵌套查询特别有用,例如下面的查询,取自 interval 查询 API 文档。
此示例还强调了在深度嵌套结构中构建器参数的有用的命名约定。对于具有单个参数的lambda表达式,Kotlin提供了隐式it
参数,Scala允许使用_。这可以通过使用下划线或单个字母前缀后跟表示深度级别的数字(即_0、_1或b0、b1等)来近似Java。这不仅消除了创建一次性变量名的需要,而且还提高了代码的可读性。正确的缩进还允许查询的结构脱颖而出。
ElasticsearchClient client = ...
SearchResponse<SomeApplicationData> results = client.search(b0 -> b0.query(b1 -> b1.intervals(b2 -> b2.field("my_text").allOf(b3 -> b3.ordered(true).intervals(b4 -> b4.match(b5 -> b5.query("my favorite food").maxGaps(0).ordered(true))).intervals(b4 -> b4.anyOf(b5 -> b5.intervals(b6 -> b6.match(b7 -> b7.query("hot water"))).intervals(b6 -> b6.match(b7 -> b7.query("cold porridge")))))))),// 搜索结果将映射到某些 Application ationData 实例,以便应用程序随时可用。SomeApplicationData.class
);
上面示例的源代码可以在 Java API 客户端测试中找到。
Lists and maps
添加构造器设置
对象构建器将List
和Map
类型的属性公开为一组重载的仅可添加的方法,这些方法通过向列表添加新条目和向映射添加新条目(或替换现有条目)来更新属性值。
对象构建器创建不可变对象,这也适用于在对象构建时设置为不可变的列表和映射属性。
// Prepare a list of index names
// 准备一份索引名称列表
List<String> names = Arrays.asList("idx-a", "idx-b", "idx-c");// Prepare cardinality aggregations for fields "foo" and "bar"
// 为字段“foo”和“bar”准备基数聚合。
Map<String, Aggregation> cardinalities = new HashMap<>();
cardinalities.put("foo-count", Aggregation.of(a -> a.cardinality(c -> c.field("foo"))));
cardinalities.put("bar-count", Aggregation.of(a -> a.cardinality(c -> c.field("bar"))));// Prepare an aggregation that computes the average of the "size" field
// 准备一个计算 “size” 字段平均值的聚合。
final Aggregation avgSize = Aggregation.of(a -> a.avg(v -> v.field("size")));SearchRequest search = SearchRequest.of(r -> r// Index list:// - add all elements of a list.index(names)// - add a single element.index("idx-d")// - add a vararg list of elements.index("idx-e", "idx-f", "idx-g")// Sort order list: add elements defined by builder lambdas// 排序顺序列表:添加构建器lambdas定义的元素.sort(s -> s.field(f -> f.field("foo").order(SortOrder.Asc))).sort(s -> s.field(f -> f.field("bar").order(SortOrder.Desc)))// Aggregation map:// - add all entries of an existing map.aggregations(cardinalities)// - add a key/value entry.aggregations("avg-size", avgSize)// - add a key/value defined by a builder lambda.aggregations("price-histogram",a -> a.histogram(h -> h.field("price")))
);
List
和 map
的值不能为null
Elasticsearch API有很多可选属性。对于单值属性,Java API Client将缺失的可选值表示为null
。因此,应用程序必须在使用可选值之前对其进行空值检查。
然而,对于列表和集合,应用程序通常只关心它们是否为空,甚至只是迭代它们的内容。使用null
很麻烦。为了避免这种情况,Java API Client 集合属性永远不会为null
,并且缺失的可选集合将作为空集合返回。
如果您需要区分缺失(未定义)的可选集合和 Elasticsearch 返回的有效为空的集合,ApiTypeHelper
类提供了一个实用方法来区分它们:
NodeStatistics stats = NodeStatistics.of(b -> b.total(1).failed(0).successful(1)
);// The `failures` list was not provided.
// - it's not null
assertNotNull(stats.failures());
// - it's empty
assertEquals(0, stats.failures().size());
// - and if needed we can know it was actually not defined
assertFalse(ApiTypeHelper.isDefined(stats.failures()));
变体类型
Elasticsearch API 有很多变体类型:查询、聚合、字段映射、分析器等。在如此大的集合中找到正确的类名可能具有挑战性。
Java API Client 构建器使简化了这一点:变体类型的构建器 ( 如Query
) 为每个可用的实现都提供了方法。我们已经在上面的interval
(一种查询)和 allOf、match 和 anyOf(各种不同种类的interval
)中看到了这一点。
这是因为 Java API Client 中的变体对象是“标记联合”的实现:它们包含它们所持有的变体的标识符(或标签)以及该变体的值。例如,Query
对象可以包含一个带有 intervals
标签的 IntervalsQuery
、带有 term
标签的TermQuery
等。这种方法允许编写流畅的代码,您可以让IDE完成功能指导您构建和导航复杂的嵌套结构:
变体构建器对每个可用的实现都有 setter
方法。它们使用与常规属性相同的约定,并接受构建器 lambda 表达式和实际变体类型的现成对象。以下是构建 term 查询的示例:
Query query = new Query.Builder().term(t -> t // 1 .field("name") // 2.value(v -> v.stringValue("foo")) ).build(); // 3
- 选择
term
变体以构建 term 查询。 - 使用构建器 lambda 表达式构建 term 查询。
- 构建现在包含
term
类型的 TermQuery 对象的 Query。
变体对象对每个可用的实现都有getter
方法。这些方法检查对象是否实际持有该类型的变体,并返回向下转换为正确类型的值。否则它们会抛出 IllegalStateException
。这种方法允许编写流畅的代码来遍历变体。
assertEquals("foo", query.term().value().stringValue());
变体对象还提供了它们当前持有的变体类型的信息:
- 每个变体类型的方法:
isTerm()
、isIntervers()
、isFuzzy()
等。 - 使用定义所有变体类型的嵌套 Kind 枚举。
在检查特定变体的实际类型后,可以使用此信息导航到特定变体:
if (query.isTerm()) { // 1doSomething(query.term());
}switch(query._kind()) { // 2case Term:doSomething(query.term());break;case Intervals:doSomething(query.intervals());break;default:doSomething(query._kind(), query._get()); // 3
}
- 测试变体是否属于特定类型。
- 测试一组更大的变体类型。
- 获取变体对象持有的类型和值。
对象的生命周期和线程安全
Java API Client 中有五种不同生命周期的对象:
Object mapper(对象映射器)
无状态且线程安全,但创建成本很高。它通常是一个单例,在应用程序启动时创建,用于创建传输。
Transport(传输)
线程安全,通过底层 HTTP 客户端保存网络资源。传输对象与Elasticsearch集群相关联,必须显式关闭以释放底层资源(如网络连接)。
Clients(客户端)
不可变、无状态和线程安全。这些都是非常轻量级的对象,它们只是封装传输并提供API端点作为方法。关闭客户端将关闭底层传输。
Builders(构建器)
可变的,非线程安全的。构造器是临时对象,在调用build()
之后不应该被重用。
Requests & other API objects( Requests 对象和其他API对象)
不可变的,线程安全的。如果您的应用程序反复使用相同的请求或请求的相同部分,则可以提前准备好这些对象,并在具有不同传输的多个客户端的多个调用中重用它们。
从 JSON 数据创建 API 对象
此功能是在版本 7.17.2 中添加的。
在使用 Elasticsearch 开发应用程序的过程中,经常使用 Kibana 开发者平台 以交互式地准备和测试查询、聚合、索引映射以及其他复杂的 API 调用。这将产生您可能希望在应用程序中使用的有效的 JSON 片段。
由于将这些JSON片段转换为Java代码非常耗时且容易出错,因此Java API Client中的大多数数据类都可以从JSON文本加载:对象构建器具有 withJson()
方法,该方法从原始 JSON 填充构建器。这也允许你将动态加载的JSON与编程构造的对象结合起来。
在底层, withJson()
方法调用对象的反序列化器。因此,JSON 文本的结构和值类型必须与目标数据结构一致。使用 withJson()
维持了 Java API Client 的强类型保证。
示例
从资源文件中加载索引定义
准备一个包含索引定义的资源文件 sam-index. json:
{"mappings": {"properties": {"field1": { "type": "text" }}}
}
您可以根据该定义创建索引,如下所示:
InputStream input = this.getClass().getResourceAsStream("some-index.json"); // 1CreateIndexRequest req = CreateIndexRequest.of(b -> b.index("some-index").withJson(input) // 2
);boolean created = client.indices().create(req).acknowledged();
- 打开 JSON 资源文件的输入流。
- 用资源文件内容填充索引创建请求。
从JSON文件中摄取文档
同样,你可以从数据文件中读取要存储在 Elasticsearch 中的文档。
FileReader file = new FileReader(new File(dataDir, "document-1.json"));IndexRequest<JsonData> req; // 1req = IndexRequest.of(b -> b.index("some-index").withJson(file)
);client.index(req);
- 当对具有泛型类型参数的数据结构调用
withJson()
时,这些泛型类型将被认为是JsonData
。
创建一个结合JSON和编程构造的搜索请求
您可以将 withJson()
与对 setter
方法的常规调用结合起来。下面的示例从 String
加载搜索请求的查询部分,并以编程方式添加聚合。
Reader queryJson = new StringReader("{" +" \"query\": {" +" \"range\": {" +" \"@timestamp\": {" +" \"gt\": \"now-1w\"" +" }" +" }" +" }" +"}");SearchRequest aggRequest = SearchRequest.of(b -> b.withJson(queryJson) // 1.aggregations("max-cpu", a1 -> a1 // 2.dateHistogram(h -> h.field("@timestamp").calendarInterval(CalendarInterval.Hour)).aggregations("max", a2 -> a2.max(m -> m.field("host.cpu.usage")))).size(0)
);Map<String, Aggregate> aggs = client.search(aggRequest, Void.class) // 3.aggregations();
- 从 JSON 字符串加载查询。
- 添加聚合。
- 由于这是一个聚合,我们不关心结果文档并将其目标类设置为
Void
,这意味着它们将被忽略。注意,将size
设置为 0 实际上会阻止返回任何文档。
从多个 JSON 片段创建搜索请求
withJson()
方法是部分反序列化器:从 JSON 加载的属性将设置属性值或替换以前的属性,但不会重置 JSON 输入中未找到的其他属性。您可以使用它来组合多个 JSON 片段来构建复杂的搜索请求。在下面的示例中,我们将选择一些文档的查询的单独定义和对该查询的结果运行的聚合结合起来。
Reader queryJson = new StringReader("{" +" \"query\": {" +" \"range\": {" +" \"@timestamp\": {" +" \"gt\": \"now-1w\"" +" }" +" }" +" }," +" \"size\": 100" + // 1"}");Reader aggregationJson = new StringReader("{" +" \"size\": 0, " + // 2" \"aggregations\": {" +" \"hours\": {" +" \"date_histogram\": {" +" \"field\": \"@timestamp\"," +" \"interval\": \"hour\"" +" }," +" \"aggregations\": {" +" \"max-cpu\": {" +" \"max\": {" +" \"field\": \"host.cpu.usage\"" +" }" +" }" +" }" +" }" +" }" +"}");SearchRequest aggRequest = SearchRequest.of(b -> b.withJson(queryJson) // 3.withJson(aggregationJson) // 4.ignoreUnavailable(true) // 5
);Map<String, Aggregate> aggs = client.search(aggRequest, Void.class).aggregations();
- 将查询返回的最大文档数设置为100。
- 我们不希望聚合中有任何匹配的文档。
- 加载请求的查询部分。
- 加载请求的聚合部分(覆盖查询中的
size
)。 - 以编程方式设置的附加请求属性。
需要注意,当 JSON 片段具有一些共同属性时,顺序会变得尤为重要:就像以编程方式设置属性值时一样,为属性设置的最后一个值会覆盖前一个值。
异常
客户端方法可以抛出两种异常:
- Elasticsearch服务器收到请求但被拒绝(验证错误、服务器内部超时,等等),将产生
ElasticsearchException
异常。此异常包含有关错误的详细信息,由 Elasticsearch 提供。 - 未能到达服务器的请求(网络错误、服务器不可用等)将产生
TransportException
。这个异常的原因是底层实现抛出的异常。对于RestClientTransfer
,它将是包含低级HTTP响应的ResponseException
。
使用 Java API Client
下面内容将提供有关 Elasticsearch 最常用和一些不太明显的功能的教程。
有关完整的参考,请参阅 Elasticsearch 留档,特别是 REST API 部分。Java API Client 使用 Java API 约定密切遵循那里描述的 JSON 结构。
如果您是 Elasticsearch 的新手,请务必阅读 Elasticsearch 的快速入门,它提供了一个很好的介绍。
索引单个文档
Java API Client 提供了几种索引数据的方法:你可以提供将自动映射到 JSON 的应用程序对象,也可以提供原始 JSON 数据。使用应用程序对象更适合具有明确定义的领域模型(domain model)的应用程序,而原始 JSON 更适合具有半结构化数据的日志记录用例。
在下面的示例中,我们会使用到具有 sku
、name
和 price
的 Product
域对象。
有关索引请求的完整说明,请参阅 ElasticSearch API 留档。
使用流畅的DSL
构建请求最直接的方法是使用流畅的 DSL。在下面的示例中,我们在products
索引中索引产品描述,使用产品的 SKU 作为索引中的文档标识符。产品对象将使用 Elasticsearch 客户端上配置的对象映射器映射到 JSON。
Product product = new Product("bk-1", "City bike", 123.0);IndexResponse response = esClient.index(i -> i.index("products").id(product.getSku()).document(product)
);logger.info("Indexed with version " + response.version());
您还可以将使用 DSL 创建的对象分配给变量。Java API Client 类对此有一个静态的 of()
方法,该方法使用 DSL 语法创建对象。
Product product = new Product("bk-1", "City bike", 123.0);IndexRequest<Product> request = IndexRequest.of(i -> i.index("products").id(product.getSku()).document(product)
);IndexResponse response = esClient.index(request);logger.info("Indexed with version " + response.version());
使用经典构建器(classic builders)
如果您更习惯于经典的构建器模式,也是可以使用它的。构建器对象在底层由流畅的DSL语法调用。
Product product = new Product("bk-1", "City bike", 123.0);IndexRequest.Builder<Product> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index("product");
indexReqBuilder.id(product.getSku());
indexReqBuilder.document(product);IndexResponse response = esClient.index(indexReqBuilder.build());logger.info("Indexed with version " + response.version());
使用异步客户端
上面的示例使用的是 Elasticsearch 同步客户端(the synchronous Elasticsearch client.)。所有 Elasticsearch API 在异步客户端中也可用,使用相同的请求和响应类型。(有关更多详细信息,请参阅阻塞和异步客户端)。
ElasticsearchAsyncClient esAsyncClient = new ElasticsearchAsyncClient(transport);Product product = new Product("bk-1", "City bike", 123.0);esAsyncClient.index(i -> i.index("products").id(product.getSku()).document(product)
).whenComplete((response, exception) -> {if (exception != null) {logger.error("Failed to index", exception);} else {logger.info("Indexed with version " + response.version());}
});
使用原始JSON数据
当你想要索引的数据来自外部源时,对于半结构化数据,必须创建域对象( domain objects),这可能很麻烦,或者完全不可能。
您可以使用 withJson()
索引来自任意源的数据。使用此方法将读取源并将其用于索引请求的 document
属性。有关更多详细信息,请参阅从JSON数据创建API对象。
Reader input = new StringReader("{'@timestamp': '2022-04-08T13:55:32Z', 'level': 'warn', 'message': 'Some log message'}".replace('\'', '"'));IndexRequest<JsonData> request = IndexRequest.of(i -> i.index("logs").withJson(input)
);IndexResponse response = esClient.index(request);logger.info("Indexed with version " + response.version());
批量:索引多个文档
批量请求允许你在一个请求中向 Elasticsearch 发送多个与文档相关的操作。当需要操作多个文档时,这比用单独的请求发送给每个文档更有效。
批量请求可以包含多种操作:
- 创建一个文档,在确保它不存在后对其进行索引,
- 索引一个文档,如果需要就创建它,如果它已存在就替换它(index a document, creating it if needed and replacing it if it exists,)
- 更新一个已经存在的文档,可以使用脚本或部分文档进行原地更新。
- 删除一个文档。
有关批量请求的完整说明,请参阅 ElasticSearch API 留档。
索引应用程序对象
BulkRequest
包含一个操作集合,每个操作都是一个具有多个变体的类型。要创建此请求,可以为主请求使用builder对象,并为每个操作使用流畅的DSL。
下面示例展示如何索引列表或应用程序对象。
List<Product> products = fetchProducts();BulkRequest.Builder br = new BulkRequest.Builder();for (Product product : products) {br.operations(op -> op // 1 .index(idx -> idx // 2 .index("products") // 3.id(product.getSku()).document(product)));
}BulkResponse result = esClient.bulk(br.build());// Log errors, if any
if (result.errors()) {logger.error("Bulk had errors");for (BulkResponseItem item: result.items()) {if (item.error() != null) {logger.error(item.error().reason());}}
}
- 添加操作(记住列表属性是可添加的)。
op
是BulkOperation
的构建器,它是一种变体类型。这种类型具有index
、create
、update
和delete
变体。 - 选择
index
操作变体,idx
是IndexOperation
的构建器。 - 设置索引操作的属性,类似于单个文档索引:索引名称、标识符和文档。
索引原始 JSON 数据
批量索引请求的 document
属性可以是任何能够 使用 Elasticsearch 客户端的 JSON 映射器序列化为 JSON 的对象。但是,批量采集的数据通常以JSON文本形式(例如磁盘上的文件)提供,解析此 JSON 只是为了重新序列化它以发送批量请求,这将导致资源浪费。因此,批量操作中的文档也可以是逐字(无需解析)发送到 Elasticsearch 服务器的 BinaryData
类型。
在下面的示例中,我们将使用 Java API Client 的 BinaryData
从日志文件目录中读取 json 文件,并以批量请求的形式发送它们。
// List json log files in the log directory
// 在日志文件目录中列出 json 日志文件
File[] logFiles = logDir.listFiles(file -> file.getName().matches("log-.*\\.json")
);BulkRequest.Builder br = new BulkRequest.Builder();for (File file: logFiles) {FileInputStream input = new FileInputStream(file);BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);br.operations(op -> op.index(idx -> idx.index("logs").document(data)));
}
使用 Bulk Ingester 流式摄入
BulkIngester
通过提供一个工具类来简化 Bulk API 的使用,该工具类允许在批量请求中透明地分组 index/update/delete
操作。您只需向 摄取器(ingester )添加 add()
批量操作,它将根据其配置负责分组和批量发送操作。
当满足以下条件之一时,摄取器(ingester )将发送批量请求:
- 操作次数超过最大值(默认为 1000)
- 批量请求大小(以字节为单位)超过最大值(默认为5mib)。
- 上一次请求过期后的延迟(定期刷新,无默认值)
此外,您可以定义等待 Elasticsearch 执行的最大并发请求数(默认为 1)。当达到该最大值并且收集了最大操作数时,继续向索引器添加新的操作将被阻塞。这是通过给客户端应用程序增加负载压力来避免Elasticsearch服务器超载。
BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(esClient) // 1.maxOperations(100) // 2.flushInterval(1, TimeUnit.SECONDS) // 3
);for (File file: logFiles) {FileInputStream input = new FileInputStream(file);BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);ingester.add(op -> op // 4.index(idx -> idx.index("logs").document(data)));
}ingester.close(); // 5
- 设置用于发送批量请求的 Elasticsearch 客户端。
- 设置在发送批量请求之前收集的最大操作数。
- 设置刷新间隔。
- 向摄取器(ingester)添加批量操作。
- 关闭摄取器(ingester)以刷新挂起的操作并释放资源。
此外,批量摄取器( the bulk ingester )接收一个侦听器,以便您的应用程序可以收到发送的批量请求及其结果的通知。为了允许将批量操作与应用程序上下文相关联,add()
方法可以选择接收 context
参数(上下文参数)。此上下文参数的类型用作 BulkIngester
对象的通用参数。您可能已经注意到上面 BulkIngester<Void>
中的 Void
类型:这是因为我们没有注册侦听器,因此不关心上下文的值。
以下示例展示了如何使用上下文的值来实现批量摄取侦听器:与之前一样,它批量发送 JSON 日志文件,但跟踪批量请求错误和失败的操作。当操作失败时,根据错误类型,您可能希望将其重新添加到摄取器中。
BulkListener<String> listener = new BulkListener<String>() { // 1@Overridepublic void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {}@Overridepublic void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {// The request was accepted, but may contain failed items.// The "context" list gives the file name for each bulk item.logger.debug("Bulk request " + executionId + " completed");for (int i = 0; i < contexts.size(); i++) {BulkResponseItem item = response.items().get(i);if (item.error() != null) {// Inspect the failure causelogger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());}}}@Overridepublic void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {// The request could not be sentlogger.debug("Bulk request " + executionId + " failed", failure);}
};BulkIngester<String> ingester = BulkIngester.of(b -> b.client(esClient).maxOperations(100).flushInterval(1, TimeUnit.SECONDS).listener(listener) // 2
);for (File file: logFiles) {FileInputStream input = new FileInputStream(file);BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);ingester.add(op -> op.index(idx -> idx.index("logs").document(data)),file.getName() // 3);
}ingester.close();
- 创建一个监听器,其中上下文是被摄取文件名的字符串。
- 在批量摄取器( the bulk ingester)上注册监听器。
- 将文件名设置为批量操作的上下文(the context value)。
批量摄取还暴露了统计信息,允许监控摄取过程并调整其配置:
- 添加的操作数量
- 由于达到最大并发请求数而被阻止的
add()
调用数(争用), - 发送的批量请求数
- 由于达到最大并发请求数而被阻止的批量请求数。
通过 ID 读取文档
Elasticsearch 是关于搜索的,但您可能还想直接访问文档,知道它们的标识符。“get” 请求就是为此而设计的。
注意:有关 get 请求的完整说明,请参阅 ElasticSearch API 留档。
读取域对象(domain object)
下面的示例从 products
索引中读取标识符为 bk-1
的文档。
get 请求有两个参数:
- 第一个参数是实际的请求,下面示例将使用流畅的 DSL 构建它
- 第二个参数是我们希望将文档的JSON映射到的类。
GetResponse<Product> response = esClient.get(g -> g.index("products") // 1.id("bk-1"),Product.class // 2
);if (response.found()) {Product product = response.source();logger.info("Product name " + product.getName());
} else {logger.info ("Product not found");
}
- 带有索引名称和标识符的 GET 请求。
- 目标类,这里是
Product
。
读取原始 JSON
当你的索引包含半结构化数据时,或者如果你没有领域对象定义( a domain object definition),你也可以将文档作为原始 JSON 数据读取。
原始 JSON 数据只是可以用作 get 请求的结果类型的另一个类。在下面的示例中,我们使用 Jackson 的 ObjectNode
。我们还可以使用任何可以由与 Elasticsearch chClient
关联的 JSON 映射器反序列化的 JSON 表示。
GetResponse<ObjectNode> response = esClient.get(g -> g.index("products").id("bk-1"),ObjectNode.class // 1
);if (response.found()) {ObjectNode json = response.source();String name = json.get("name").asText();logger.info("Product name " + name);
} else {logger.info("Product not found");
}
- 目标类是一个原始 JSON 对象。
搜索文档
已建立索引的文档几乎可以实时进行搜索。
有关搜索请求的完整说明,请参阅 ElasticSearch 留档:搜索数据、查询 DSL 和搜索 API。
简单搜索查询
有多种类型的搜索查询,它们可以组合使用。这里我们将从简单的文本匹配查询开始,在 products
索引中搜索“bikes”。
搜索结果有一个 hits
属性,其包含查询匹配的文档和索引中存在的匹配总数信息。
匹配总数信息带有一个关系字段,表示该总数是精确的(eq
- 相等)还是近似的(gte
- 大于或等于)。
每个返回的文档都带有其相关性分数和关于其在索引中的位置的附加信息。
String searchText = "bike";SearchResponse<Product> response = esClient.search(s -> s.index("products") // 1.query(q -> q // 2.match(t -> t // 3.field("name") // 4.query(searchText))),Product.class // 5
);TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;if (isExactResult) {logger.info("There are " + total.value() + " results");
} else {logger.info("There are more than " + total.value() + " results");
}List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {Product product = hit.source();logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
- 我们要搜索的索引的名称。
- 搜索请求的查询部分(搜索请求也可以有其他组件,如聚合)。
- 在众多可用的查询变体中选择一个,此处选择
match
(匹配)查询(全文搜索)。 - 配置匹配查询:我们在
name
字段中搜索词语。 - 匹配文档的目标类。我们在这里使用
Product
,就像在get 请求
示例中一样。
与 get 操作类似,您可以使用相应的目标类而不是 Product
来获取与查询匹配的原始JSON文档。比如如 JSON-P 的 JsonValue
或 Jackson 的 ObjectNode
嵌套搜索查询
Elasticsearch 允许将单个查询组合起来,以构建更复杂的搜索请求。在下面的示例中,我们将搜索最高价格为 200 英镑的bike
。
String searchText = "bike";
double maxPrice = 200.0;// 按名称搜索
Query byName = MatchQuery.of(m -> m // 1.field("name").query(searchText)
)._toQuery(); // 2// 按最高价格搜索
Query byMaxPrice = RangeQuery.of(r -> r.field("price").gte(JsonData.of(maxPrice)) // 3
)._toQuery();// 结合名称和价格查询来搜索 products 索引
SearchResponse<Product> response = esClient.search(s -> s.index("products").query(q -> q.bool(b -> b // 4.must(byName) // 5.must(byMaxPrice))),Product.class
);List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {Product product = hit.source();logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
- 我们分别为各个条件创建查询。
MatchQuery
是一种查询变体,我们必须将其转换为query
联合类型。有关其他详细信息,请参阅变体类型。- Elasticsearch 范围查询接受大范围的值类型。我们在这里创建最高价格的 JSON 表示。
- 搜索查询是一个布尔查询,它结合了文本搜索和最大价格查询。
- 这两个查询都添加为
must
,因为我们希望结果符合所有条件。
模板搜索
搜索模板是一个存储好的搜索,您可以使用不同的变量来运行它。搜索模板允许您在不修改应用程序代码的情况下更改搜索。
在运行模板搜索之前,必须要先创建模板。这是一个存储的脚本,它返回搜索请求主体,通常被定义为 Mustache 模板。这个存储脚本可以在应用程序之外创建,也可以使用 Java API Client:
// 脚本创建
esClient.putScript(r -> r.id("query-script") // 1.script(s -> s.lang("mustache").source("{\"query\":{\"match\":{\"{{field}}\":\"{{value}}\"}}}")));
- 要创建的模板脚本的标识符。
要使用搜索模板,可使用 searchTemplate
方法来引用脚本并为其提供参数的值:
SearchTemplateResponse<Product> response = esClient.searchTemplate(r -> r.index("some-index").id("query-script") // 1.params("field", JsonData.of("some-field")) // 2.params("value", JsonData.of("some-data")),Product.class
);List<Hit<Product>> hits = response.hits().hits();
for (Hit<Product> hit: hits) {Product product = hit.source();logger.info("Found product " + product.getSku() + ", score " + hit.score());
}
- 要使用的模板脚本的标识符。
- 模板参数值。
有关更深入的信息,请参阅 Elasticsearch 搜索模板留档。
聚合
聚合将数据汇总为指标、统计信息或其他分析结果。
有关聚合的完整说明,请参阅 ElasticSearch 留档。
一个简单的聚合
在下面的示例中,我们运行一个聚合,根据产品索引为名称与用户提供的文本匹配的产品创建价格直方图。为了实现这一点,我们使用具有查询(在搜索文档中解释)和聚合定义的搜索请求。
此示例是一个分析类型的聚合,我们不想在其中使用匹配的文档。用于分析的搜索请求的一般模式是将结果大小搜索结果的目标类设置为 Void
如果使用相同的聚合来显示产品,并将价格直方图作为向下钻取的切面,,我们将size
设置为非零值,并使用 Product
作为目标类来处理结果。
String searchText = "bike";Query query = MatchQuery.of(m -> m.field("name").query(searchText)
)._toQuery();SearchResponse<Void> response = esClient.search(b -> b.index("products").size(0) // 1.query(query) // 2.aggregations("price-histogram", a -> a // 3.histogram(h -> h // 4.field("price").interval(50.0))),Void.class // 5
);
- 将匹配文档的数量设置为零,因为我们只需要价格直方图。
- 设置 对运行聚合的产品进行的过滤的查询
- 创建一个名为 “price-histogram(价格直方图)” 的聚合。可以根据需要添加任意数量的命名聚合。
- 选择
histogram(直方图)
聚合变体。 - 我们不关心匹配到的内容(
size
已设置为零),使用Void
将忽略响应中的任何文档。
响应包含请求中每个聚合的聚合结果。
List<HistogramBucket> buckets = response.aggregations().get("price-histogram") // 1.histogram() // 2.buckets().array(); // 3for (HistogramBucket bucket: buckets) {logger.info("There are " + bucket.docCount() +" bikes under " + bucket.key());
}
- 获取 “price-histogram(价格直方图)” 聚合的结果。
- 将其转换为
histogram(直方图)
变体结果。这须与聚合中的定义一致。 - 桶可以表示为数组或映射。这将强制转换为数组变体(默认情况)。