Elasticsearch 按时间进行聚合统计
需求:
1、统计某一个小时,每分钟的数据条数
2、统计某一天,每个小数的数据条数
3、统计一周,每天的数据条数
pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency>
代码实现
package com.woodare.tsp.portal.facade;import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.woodare.tsp.common.core.context.Context;
import com.woodare.tsp.portal.enums.DateType;
import com.woodare.tsp.portal.helper.DateHelper;
import com.woodare.tsp.portal.pojo.vo.DashboardVO;
import com.woodare.tsp.portal.pojo.vo.StatisticsChartVO;
import com.woodare.tsp.service.consts.ElasticsearchIndexNameConst;
import com.woodare.tsp.service.domain.Product;
import com.woodare.tsp.service.entity.IotDataLog;
import com.woodare.tsp.service.service.DeviceOnlineStatusService;
import com.woodare.tsp.service.service.DeviceService;
import com.woodare.tsp.service.service.ProductService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.elasticsearch.core.ElasticsearchAggregations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;/*** @author WANG*/
@RequiredArgsConstructor
@Slf4j
@Service
public class DashboardFacade {final DeviceService deviceService;final ProductService productService;final DeviceOnlineStatusService deviceOnlineStatusService;final ElasticsearchRestTemplate elasticsearchRestTemplate;@Value("${device.online.time:10}")private Integer deviceOnlineTime;/*** 统计** @return 统计详情*/public DashboardVO statistics() {DashboardVO dashboardVO = new DashboardVO();List<Product> productList = productService.getListByCondition(new Product());Long totalCount = deviceService.getTotalDeviceCount();// 设备最后在线时间在最近{10}分钟内,算在线long time = DateUtil.offsetMinute(new Date(), -deviceOnlineTime).getTime();long onlineCount = 0;for (Product product : productList) {onlineCount += deviceOnlineStatusService.count(product.getUid(), time);}BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();String tenantId = Context.getTenantId();if (StrUtil.isNotBlank(tenantId)) {queryBuilder.filter(QueryBuilders.termQuery("tenantId", tenantId));}StatisticsChartVO.DateRange dateRange = this.getDateRange(DateType.DAY);queryBuilder.filter(QueryBuilders.rangeQuery("createdTime").gte(dateRange.getStart()).lte(dateRange.getEnd()));IndexCoordinates indexCoordinates = IndexCoordinates.of(ElasticsearchIndexNameConst.TEMP_IOT_DATA_LOG);NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(queryBuilder).build();SearchHits<IotDataLog> searchHits = elasticsearchRestTemplate.search(query, IotDataLog.class, indexCoordinates);long totalHits = searchHits.getTotalHits();return dashboardVO.setTotalAlertCount(0L).setTotalMessageCount(totalHits).setTotalDeviceCount(totalCount).setTotalOnlineDeviceCount(onlineCount);}public StatisticsChartVO statisticsChart(DateType dateType) {StatisticsChartVO statisticsChartVO = new StatisticsChartVO();StatisticsChartVO.DateRange dateRange = this.getDateRange(dateType);BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();String tenantId = Context.getTenantId();if (StrUtil.isNotBlank(tenantId)) {queryBuilder.filter(QueryBuilders.termQuery("tenantId", tenantId));}queryBuilder.filter(QueryBuilders.rangeQuery("createdTime").gte(dateRange.getStart()).lte(dateRange.getEnd()));NativeSearchQuery query = this.buildQueryByDateType(queryBuilder, dateType);SearchHits<Object> searchHits = elasticsearchRestTemplate.search(query, Object.class, IndexCoordinates.of(ElasticsearchIndexNameConst.TEMP_IOT_DATA_LOG));ElasticsearchAggregations elasticsearchAggregations = (ElasticsearchAggregations) searchHits.getAggregations();assert elasticsearchAggregations != null;Aggregations aggregations = elasticsearchAggregations.aggregations();Map<String, Long> map = new LinkedHashMap<>();for (Aggregation aggregation : aggregations) {ParsedDateHistogram dateHistogram = (ParsedDateHistogram) aggregation;for (Histogram.Bucket bucket : dateHistogram.getBuckets()) {long docCount = bucket.getDocCount();String date = bucket.getKeyAsString();map.put(date, docCount);log.info("data: {} --- {}", date, docCount);}}List<StatisticsChartVO.DataItem> dataList = this.buildDataByDateType(map, dateType, dateRange);statisticsChartVO.setDataList(dataList);return statisticsChartVO;}private List<StatisticsChartVO.DataItem> buildDataByDateType(Map<String, Long> map, DateType dateType, StatisticsChartVO.DateRange dateRange) {List<String> dateList;String timeZone = Context.getTimezone();List<StatisticsChartVO.DataItem> list = new ArrayList<>();if (DateType.HOUR.equals(dateType)) {dateList = DateHelper.getMinuteList(dateRange.getStart(), dateRange.getEnd());} else if (DateType.DAY.equals(dateType)) {dateList = DateHelper.getHourList();} else {dateList = DateHelper.getDayList(dateRange.getStart(), dateRange.getEnd());}if (DateType.HOUR.equals(dateType)) {buildHourDataList(map, dateList, list);} else if (DateType.DAY.equals(dateType)) {for (String date : dateList) {// es查询出来的是日期是0时区的日期,需转成当前时区的时间,例如:时间为14:00,这是0时区的14:00,转成+8区则为22:00String hour = date.split(StrUtil.COLON)[0];String newHour = getHour(Integer.parseInt(hour), Integer.parseInt(timeZone));StatisticsChartVO.DataItem item = new StatisticsChartVO.DataItem();String newDate = newHour + StrUtil.COLON + date.split(StrUtil.COLON)[1];item.setDate(newDate);item.setValue(map.containsKey(date) ? map.remove(date) : 0L);list.add(item);}} else {for (String date : dateList) {StatisticsChartVO.DataItem item = new StatisticsChartVO.DataItem();item.setDate(date);item.setValue(map.getOrDefault(date, 0L));list.add(item);}}list.sort(Comparator.comparing(StatisticsChartVO.DataItem::getDate));return list;}private void buildHourDataList(Map<String, Long> map, List<String> dateList, List<StatisticsChartVO.DataItem> list) {if (MapUtil.isEmpty(map)) {setDefaultData(dateList, list);} else {String offsetHour = dateList.get(0).split(StrUtil.COLON)[0];String currUtcHour = getHourInZeroTimeZone(System.currentTimeMillis());// 先去除其他数据Map<String, Long> stashMap = new LinkedHashMap<>();for (Map.Entry<String, Long> entry : map.entrySet()) {String date = entry.getKey();String currHour = date.split(StrUtil.COLON)[0];if (currHour.equals(currUtcHour)) {String minute = date.split(StrUtil.COLON)[1];stashMap.put(offsetHour + StrUtil.COLON + minute, entry.getValue());}}if (MapUtil.isEmpty(stashMap)) {setDefaultData(dateList, list);} else {for (String date : dateList) {StatisticsChartVO.DataItem item = new StatisticsChartVO.DataItem();item.setDate(date);item.setValue(stashMap.getOrDefault(date, 0L));list.add(item);}}}}public String getHourInZeroTimeZone(long timestamp) {LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);int hour = dateTime.getHour();return (hour < 10 ? "0" : "") + hour;}private void setDefaultData(List<String> dateList, List<StatisticsChartVO.DataItem> list) {for (String date : dateList) {StatisticsChartVO.DataItem item = new StatisticsChartVO.DataItem();item.setDate(date);item.setValue(0L);list.add(item);}}private String getHour(int num1, int num2) {int num = num1 + num2;if (num >= 24) {num -= 24;}if (num < 0) {num += 24;}if (num < 10) {return "0" + num;} else {return StrUtil.EMPTY + num;}}private NativeSearchQuery buildQueryByDateType(BoolQueryBuilder queryBuilder, DateType dateType) {if (DateType.HOUR.equals(dateType)) {return new NativeSearchQueryBuilder().withQuery(queryBuilder).withAggregations(AggregationBuilders.dateHistogram("minute_stats").field("createdTime").calendarInterval(DateHistogramInterval.MINUTE).format("HH:mm")).build();} else if (DateType.DAY.equals(dateType)) {return new NativeSearchQueryBuilder().withQuery(queryBuilder).withAggregations(AggregationBuilders.dateHistogram("minute_stats").field("createdTime").calendarInterval(DateHistogramInterval.HOUR).format("HH:mm")).build();} else {return new NativeSearchQueryBuilder().withQuery(queryBuilder).withAggregations(AggregationBuilders.dateHistogram("logs_per_day").field("createdTime").calendarInterval(DateHistogramInterval.DAY).format("yyyy-MM-dd")).build();}}private StatisticsChartVO.DateRange getDateRange(DateType dateType) {StatisticsChartVO.DateRange dateRange = new StatisticsChartVO.DateRange();if (DateType.HOUR.equals(dateType)) {dateRange.setStart(DateUtil.beginOfHour(new Date()).getTime());dateRange.setEnd(DateUtil.endOfHour(new Date()).getTime());} else if (DateType.DAY.equals(dateType)) {dateRange.setStart(DateUtil.beginOfDay(new Date()).getTime());dateRange.setEnd(DateUtil.endOfDay(new Date()).getTime());} else if (DateType.WEEK.equals(dateType)) {DateTime start = DateUtil.offsetDay(new Date(), -7);DateTime end = DateUtil.offsetDay(new Date(), -1);dateRange.setStart(start.getTime());dateRange.setEnd(end.getTime());}return dateRange;}
}
package com.woodare.tsp.portal.helper;import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.woodare.tsp.common.core.context.Context;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @author WANG*/
public class DateHelper {private static final int HOURS_OF_DAY = 24;private static List<String> getMinuteIntervals(long startTimestamp, long endTimestamp) {List<String> intervals = new ArrayList<>();DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm");int timezone = Integer.parseInt(Context.getTimezone());LocalDateTime current = LocalDateTime.ofEpochSecond(startTimestamp / 1000, 0, ZoneOffset.ofHours(timezone));LocalDateTime endTime = LocalDateTime.ofEpochSecond(endTimestamp / 1000, 0, ZoneOffset.ofHours(timezone));while (!current.isAfter(endTime)) {intervals.add(current.format(formatter));current = current.plusMinutes(1);}return intervals;}public static List<String> getHourList() {List<String> hourList = new ArrayList<>();for (int i = 0; i < HOURS_OF_DAY; i++) {String hour;if (i < 10) {hour = "0" + i;} else {hour = "" + i;}hourList.add(hour + ":00");}return hourList;}public static List<String> getMinuteList(Long start, Long end) {return getMinuteIntervals(start, end);}public static List<String> getDayList(Long start, Long end) {List<String> dates = new ArrayList<>();LocalDateTime startDate = LocalDateTime.ofInstant(Instant.ofEpochMilli(start), ZoneId.systemDefault());LocalDateTime endDate = LocalDateTime.ofInstant(Instant.ofEpochMilli(end), ZoneId.systemDefault());while (!startDate.isAfter(endDate)) {dates.add(startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));startDate = startDate.plusDays(1);}return dates;}public static void main(String[] args) {Context.setTimezone("8");long start = DateUtil.beginOfHour(new Date()).getTime();long end = DateUtil.endOfHour(new Date()).getTime();List<String> intervals = getMinuteList(start, end);for (String interval : intervals) {System.out.println(interval);}System.out.println("==========");List<String> hourList = getHourList();for (String interval : hourList) {System.out.println(interval);}System.out.println("==========");DateTime startDate = DateUtil.offsetDay(new Date(), -7);DateTime endDate = DateUtil.offsetDay(new Date(), -1);List<String> dayList = getDayList(startDate.getTime(), endDate.getTime());for (String date : dayList) {System.out.println(date);}}}