一、说明
在项目中使用Elasticsearch的聚合与分组查询后,对于返回结果一脸懵逼,查阅各资料后,自己总结了一下参数取值的含义,不一定全面,只含常见参数
二、分组查询
2.1 参数解释
SearchResponse<Map> searchResponse = null;try {searchResponse = client.search(s -> s.index("tbanalyzelist").query(q -> q.bool(t -> {t.must(m -> m.match(b -> b.field("machineType.keyword").query(FieldValue.of(machineType))));if (ToolUtil.isNotEmpty(bizCodes))t.must(m -> m.terms(b -> b.field("bizCode.keyword").terms(f -> f.value(values))));t.must(a -> a.range(r -> r.field("duration").gt(JsonData.of(0))));t.must(a -> a.range(r -> r.field("open_time").gt(JsonData.of(startTime)).lte(JsonData.of(endTime1))));return t;}))//.size(2000000) 数据太多暂且注释.from(1) //分页查询 起始位置.size(2) // 每页两条数据 .aggregations("byOpenTime", aggregationBuilder ->aggregationBuilder.terms(termsAggregationBuilder ->termsAggregationBuilder.field("openTime"))),Map.class);} catch (IOException e) {e.printStackTrace();}//查询结果System.out.println(searchResponse);System.out.println("耗时:" + searchResponse.took());HitsMetadata<Map> hits = searchResponse.hits();System.out.println(hits.total());System.out.println("符合条件的总文档数量:" + hits.total().value());//注意:第一个hits() 与 第二个hits()含义不一样List<Hit<Map>> hitList = searchResponse.hits().hits(); //获取分组结果Map<String, Aggregate> aggregations = searchResponse.aggregations();System.out.println("aggregations:" + aggregations);Aggregate aggregate = aggregations.get("byOpenTime");System.out.println("byOpenTime分组结果 = " + aggregate);LongTermsAggregate lterms = aggregate.lterms();Buckets<LongTermsBucket> buckets = lterms.buckets();for (LongTermsBucket b : buckets.array()) {System.out.println(b.key() + " : " + b.docCount());}
- searchResponse输出结果转JSON
{"took":190, //执行整个搜索请求耗费了多少毫秒"timed_out":false,//查询是否超时。默认情况下,搜索请求不会超时。"_shards":{ // 在查询中参与分片情况"failed":0, //失败分片数量 "successful":1,//成功"total":1,//总计"skipped":0//跳过},"hits":{ //结果命中数据 "total":{ //匹配到的文档总数"relation":"gte",//是否是我们的实际的满足条件的所有文档数 "value":10000 //文档总数},"hits":[//每一个命中数据{"_index":"tbanalyzelist", //索引名相当于数据库的表名"_id":"QF2THIQBzxpesqmRtMpw","_score":3.0470734,//分数"_type":"_doc",//类型//资源,这里才是存储的我们想要的数据"_source":"{duration=317.0, //每个字段的值相当于mysql中的字段machineId=ZFB007422, bizName=wangyf, bizCode=221026172721ZBTQ, open_time=1664296386000, openTime=2022-09-27, machineType=DEV-HL}"},{"_index":"tbanalyzelist","_id":"QV2THIQBzxpesqmRtMpw","_score":3.0470734,"_type":"_doc","_source":"{duration=313.0, machineId=ZFB007422, bizName=wangyf, bizCode=221026172721ZBTQ, open_time=1664383009000, openTime=2022-09-28, machineType=DEV-HL}"}],"max_score":3.0470734 //查询所匹配文档的 _score 的最大值},"aggregations":{//聚合结果"lterms#byOpenTime":{//分组的桶名称"buckets":[ //分组桶结果{"doc_count":20144,//"key":"1664150400000","key_as_string":"2022-09-26T00:00:00.000Z"},{"doc_count":19724,"key":"1664409600000","key_as_string":"2022-09-29T00:00:00.000Z"},{"doc_count":19715,"key":"1664236800000","key_as_string":"2022-09-27T00:00:00.000Z"},{"doc_count":19653,"key":"1664323200000","key_as_string":"2022-09-28T00:00:00.000Z"},{"doc_count":19376,"key":"1664496000000","key_as_string":"2022-09-30T00:00:00.000Z"},{"doc_count":331,"key":"1664064000000","key_as_string":"2022-09-25T00:00:00.000Z"}],"doc_count_error_upper_bound":0,"sum_other_doc_count":0}}
}
- doc_count_error_upper_bound:表示没有在这次聚合中返回、但是可能存在的潜在聚合结果,
- sum_other_doc_count:表示这次聚合中没有统计到的文档数。因为ES为分布式部署,不同文档分散于多个分片,这样当聚合时,会在每个分片上分别聚合,然后由协调节点汇总结果后返回。
- doc_count:每个桶的文档数量。
- key: 分组后的key值
2.2 获取桶数据方式
Buckets<LongTermsBucket> longBuckets = aggregate.lterms().buckets();
Buckets<StringTermsBucket> stringBuckets = aggregate.sterms().buckets();
Buckets<DoubleTermsBucket> doubleBuckets = aggregate.dterms().buckets();
三、聚合查询
查询条件先忽略,这里聚合后的条件可以直接取到max,count,min,avg,sum等值
String cinemaId = "15989";SearchResponse<Map> searchResponse = null;try {searchResponse = client.search(s -> s.index("tbmaoyan").query(q -> q.bool(t -> {t.must(m -> m.match(f -> f.field("cinemaId.keyword").query(FieldValue.of(cinemaId))));//t.must(m -> m.term(f -> f.field("cinemaId.keyword").value(cinemaId)));//t.must(m -> m.match(f -> f.field("cinemaId").query("36924")));// t.must(m -> m.match(f -> f.field("bizCode").query(FieldValue.of("220104182434IIZF"))));//220104182434IIZF 220120143442CB4Creturn t;}))
// .sort(o -> o.field(f -> f.field("openTime").order(SortOrder.Asc)))//对viewInfo进行统计.aggregations("sumViewInfo", aggregationBuilder -> aggregationBuilder.stats(statsAggregationBuilder -> statsAggregationBuilder.field("viewInfo")))//对showInfo进行统计.aggregations("aggregateShowInfo", aggregationBuilder -> aggregationBuilder.stats(statsAggregationBuilder -> statsAggregationBuilder.field("showInfo"))).from(0).size(10000), Map.class);} catch (IOException e) {e.printStackTrace();}//查询结果System.out.println(searchResponse);System.out.println("耗时:" + searchResponse.took());HitsMetadata<Map> hits = searchResponse.hits();System.out.println(hits.total());System.out.println("符合条件的总文档数量:" + hits.total().value());//注意:第一个hits() 与 第二个hits()的区别List<Hit<Map>> hitList = searchResponse.hits().hits(); List<Map> hitListCopy = new ArrayList<>();for (Hit<Map> mapHit : hitList) {String source = mapHit.source().toString();System.out.println("文档原生信息:" + source);Map map = mapHit.source();hitListCopy.add(map);}//获取聚合结果Map<String, Aggregate> aggregations = searchResponse.aggregations();System.out.println("aggregations:" + aggregations);Aggregate aggregateViewInfo = aggregations.get("sumViewInfo");Aggregate aggregateShowInfo = aggregations.get("aggregateShowInfo");System.out.println("viewInfo:" + aggregateViewInfo);System.out.println("showInfo:" + aggregateShowInfo);System.out.println("统计个数:" + aggregateViewInfo.stats().count());System.out.println("最高分账票房:" + aggregateViewInfo.stats().max());System.out.println("最低分账票房:" + aggregateViewInfo.stats().min());System.out.println("平均分账票房:" + aggregateViewInfo.stats().avg());System.out.println("聚合查询的分账票房:" + aggregateViewInfo.stats().sum());Double sumViewInfoCopy = hitListCopy.stream().mapToDouble(h -> Double.parseDouble(h.get("viewInfo").toString())).sum();System.out.println("********************");System.out.println("聚合查询的分账票房:" + aggregateViewInfo.stats().sum());System.out.println("stream流查询的分账票房: " + sumViewInfoCopy);
- searchResponse.aggregations()的结果跟上面分组查询类似,不过赘述了
- aggregations.get("sumViewInfo")的取值
- aggregations.get("aggregateShowInfo")的取值
- 比对一下聚合查询跟我们自己算的数据是否一致