ElasticSearch初体验之使用Java进行最基本的增删改查

好久没写博文了, 最近项目中使用到了ElaticSearch相关的一些内容, 刚好自己也来做个总结。
现在自己也只能算得上入门, 总结下自己在工作中使用Java操作ES的一些小经验吧。

本文总共分为三个部分:
一:ES相关基本概念及原理
二:ES使用场景介绍
三:使用Java进行ES的增删改查及代码讲解

一:ES相关基本概念:
ElasticSearch(简称ES)是一个基于Lucene构建的开源、分布式、RESTful的全文本搜索引擎。

不过,ElasticSearch却也不仅只是一个全文本搜索引擎,它还是一个分布式实时文档存储,其中每个field均是被索引的数据且可被搜索;也是一个带实时分析功能的分布式搜索引擎,并且能够扩展至数以百计的服务器存储及处理PB级的数据。

如前所述,ElasticSearch在底层利用Lucene完成其索引功能,因此其许多基本概念源于Lucene。
我们先说说ES的基本概念。

  • 索引 Index:对数据的逻辑存储(倒排索引),不存储原
    始值。
  • 类型 Type:对索引的逻辑分类,可以有⼀个或多个分类。
  • ⽂档 Document:基本数据单元,JSON。
  • 字段 Filed

关系型数据与ES对比:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields

这里再说下ES中很重要的概念--倒排索引。这同样也是solr,lucene中所使用的索引方式。

例如我们正常的索引:

当我们在关系型数据库中,都是有id索引的, 我们通过id去查value速度是很快的。
但是如果我们想查value中包含字母b的值呢?特别是数据量很大的时候, 这种以id为索引的方式是不是就不适合了?
那么这里就适合使用倒排索引了:

这里将value进行分词, 然后将分词结果拿出来当做索引
跟正向的索引比较,也就是做了一个倒置,这就是倒排索引的思想

二,ES使用场景介绍
1、全文搜索(搜索引擎)
在一组文档中查找某一单词所在文档及位置
2、模糊匹配
通过用户的输入去匹配词库中符合条件的词条
3、商品搜索
通过商品的关键字去数据源中查找符合条件的商品

在我自己的项目中使用的情况是我有上百万的文章需要被通过各种条件检索到, 所以这里就直接使用ES, 现在线上检索速度都是10ms之内返回。

下面看看ES数据在浏览器的展示形式以及可视化界面的搜索:

三:使用Java进行ES的增删改查及代码讲解
1, 使用ES进行增加和更新操作。

//首先在项目启动的时候生成esClient, 这个我们公司自己封装好了的。
@PostConstructpublic void init() {esClient = new ESClient<>(esConfig.getAddress(), esConfig.getCluster(), esConfig.getIndex(),esConfig.getUsername(), esConfig.getPassword(), ES_TYPE_MIXEDDATA, EsMixedDataDto.class);}

上面EsMixedDataDto是自己构建的一个类, ES中保存的字段就是这个类中的所有字段。
接着是增加和更新操作了:

//同步到ES中
articleEsService.upsertDocument(esMixedDataDto);/**
 * 创建或更新索引
 *
 * @param esMixedDataDto
 * @return
 */
public boolean upsertDocument(EsMixedDataDto esMixedDataDto) {return esClient.upsertDocument(esMixedDataDto.getMixId(), esMixedDataDto);}

这个还是调用了系统封装好的esClient中的insertOrUpdate方法,最后我会把ESClient中所有封装的方法都贴出来, 其内部就是调用了ES原生的insert或者update方法的。

2,使用ES进行删除操作

/**
 * 删除索引
 */
public void deleteIndex(String id) throws Exception{esClient.deleteDocument(id);
}

同上,也是使用了esClient中的delete方法,后面我会贴上esClient中所有方法。

3,使用ES进行查询
3.1 当然ES最重要的还是多维度的查询, 这里也是我要讲的重点。
首先来个最简单的搜索一篇文章的标题:

//通过关键词来查询文章集合
public PageResponse<EsMixedDataDto> queryForKeyword(String searchText, boolean highlight, PageRequest pageRequesto) {SearchRequestBuilder searchRequestBuilder = esClient.prepareSearch().setTypes(ES_TYPE_MIXEDDATA).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(QueryBuilders.multiMatchQuery(searchText, "title").type(MultiMatchQueryBuilder.Type.BEST_FIELDS)).setFrom(pageRequest.getOffset()).setSize(pageRequest.getLimit()).setExplain(false);if (highlight) {searchRequestBuilder.addHighlightedField("title", 100, 1).setHighlighterPreTags("<font color='red'>").setHighlighterPostTags("</font>");}try {//这里就是给es发送搜索指令了return getMixedData(searchRequestBuilder);} catch (Exception e) {log.error("ES搜索异常!", e.getMessage());throw new RuntimeException(e);}
}

这里先说说search_type, 也就是上面setSearchType(SearchType.QUERY_THEN_FETCH)的内容:

  • query_then_fetch:执⾏查询得到对⽂档进⾏排序的所需信息(在所
    有分⽚上执⾏),然后在相关分⽚上查询⽂档实际内容。返回结果的
    最⼤数量等于size参数的值。
  • query_and_fetch:查询在所有分⽚上并⾏执⾏,所有分⽚返回等于
    size值的结果数,最终返回结果的最⼤数量等于size的值乘以分⽚
    数。分⽚较多时会消耗过多资源。
  • count:只返回匹配查询的⽂档数量。
  • scan:⼀般在需要返回⼤量结果时使⽤。在发送第⼀次请求后,ES
    会返回⼀个滚动标识符,类似于数据库中的游标。

我这里使用的是query_then_fetch。

3.2 紧接着说个多条件复杂的查询:

/**
 * @param jiaxiaoId: 驾校id
 * @param title 文章的title关键词
 * @param publishStatus 发布状态
 * @param stickStatus 置顶状态
 * @param pageRequest 请求的页码和条数
 * @param highlight 搜索结果是否高亮显示
 */
public PageResponse<EsMixedDataDto> queryByConditions(Long jiaxiaoId, String title, PageRequest pageRequest, int publishStatus, int stickStatus, boolean highlight) {BoolQueryBuilder booleanQueryBuilder = QueryBuilders.boolQuery();booleanQueryBuilder.must(QueryBuilders.termQuery("jiaxiaoId", jiaxiaoId));if (StringUtils.isNotBlank(title)) {booleanQueryBuilder.must(QueryBuilders.multiMatchQuery(title, "title").type(MultiMatchQueryBuilder.Type.BEST_FIELDS));}//这里是添加是否发布的搜索条件, 默认是只展示已发布的文章if (publishStatus == CommonConstants.DataStatus.INIT_STATUS) {booleanQueryBuilder.must(QueryBuilders.termQuery("publishStatus", CommonConstants.DataStatus.INIT_STATUS));} else {booleanQueryBuilder.mustNot(QueryBuilders.termQuery("publishStatus", CommonConstants.DataStatus.INIT_STATUS));}//这里是添加是否置顶的搜索条件if (stickStatus == CommonConstants.DataStatus.PUBLISH_STATUS) {booleanQueryBuilder.must(QueryBuilders.termQuery("stickStatus", CommonConstants.DataStatus.PUBLISH_STATUS));} else if(stickStatus == CommonConstants.DataStatus.INIT_STATUS){booleanQueryBuilder.mustNot(QueryBuilders.termQuery("stickStatus", CommonConstants.DataStatus.PUBLISH_STATUS));}SearchRequestBuilder searchRequestBuilder = esClient.prepareSearch().setTypes(ES_TYPE_MIXEDDATA).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(booleanQueryBuilder).setFrom(pageRequest.getOffset()).setSize(pageRequest.getLimit()).addSort("stickStatus", SortOrder.DESC).setExplain(false);if (jiaxiaoId == null) {BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter().must(FilterBuilders.missingFilter("jiaxiaoId"));searchRequestBuilder.setPostFilter(filterBuilder);}if (highlight) {searchRequestBuilder.addHighlightedField("title", 100, 1).setHighlighterPreTags("<font color='red'>").setHighlighterPostTags("</font>");} else {searchRequestBuilder.addSort("publishTime", SortOrder.DESC);}try {return getMixedData(searchRequestBuilder);} catch (Exception e) {log.error("ES搜索异常!", e.getMessage());throw new RuntimeException(e);}
}

这里不用的就是使用query和filterBuilder,searchRequestBuilder中可以设置query和postFilter。
Debug到这里, 其实写的查询语句最终还是拼接成了一个ES可读的结构化查询语句:

3.3 最后贴上最重要的一个类ESClient.java, 这是我们针对于ElasticSearch封装的一个类。

public class ESClient<T> {private static final Logger LOG = LoggerFactory.getLogger(ESClient.class);private static final String DEFAULT_ANALYZER = "ik_smart";private static final DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();private final Client client;private String index;private Class<T> clazz;private String type;private BulkProcessor bulkProcessor;private List<String> serverHttpAddressList = Lists.newArrayList();private Map<String, JSONObject> sqlJsonMap = Maps.newHashMap();/**
     * 初始化一个连接ElasticSearch的客户端
     *
     * @param addresses   ES服务器的Transport地址和端口的列表,多个服务器用逗号分隔,例如 localhost:9300,localhost:9300,...
     * @param clusterName 集群名称
     * @param index       索引名称,这里应该使用项目名称
     * @param username    用户名称
     * @param password    用户密码
     * @param type        索引类型
     * @param clazz       存储类
     */public ESClient(String addresses, String clusterName, String index,String username, String password, String type, Class<T> clazz) {if (StringUtils.isBlank(addresses)) {throw new RuntimeException("没有给定的ES服务器地址。");}this.index = index;this.type = type;this.clazz = clazz;// 获得链接地址对象列表List<InetSocketTransportAddress> addressList = Lists.transform(Splitter.on(",").trimResults().omitEmptyStrings().splitToList(addresses),new Function<String, InetSocketTransportAddress>() {@Overridepublic InetSocketTransportAddress apply(String input) {String[] addressPort = input.split(":");String address = addressPort[0];Integer port = Integer.parseInt(addressPort[1]);serverHttpAddressList.add(address + ":" + 9200);return new InetSocketTransportAddress(address, port);}});// 建立关于ES的配置ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).put("client.transport.sniff", false);if (StringUtils.isNotBlank(username)) {builder.put("shield.user", username + ":" + password);}Settings settings = builder.build();// 生成原生客户端TransportClient transportClient = new TransportClient(settings);for (InetSocketTransportAddress address : addressList) {transportClient.addTransportAddress(address);}client = transportClient;bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {throw new RuntimeException(failure);}}).build();}/**
     * 初始化连接ElasticSearch的客户端
     *
     * @param client 原生客户端
     * @param index  索引名称
     * @param type   类型
     * @param clazz  存储类
     */public ESClient(Client client, String index, String type, Class<T> clazz) {this.client = client;this.index = index;this.type = type;this.clazz = clazz;}/**
     * 向ES发送存储请求,将一个对象存储到服务器。
     *
     * @param id 该对象的id
     * @param t  存储实例
     * @return 是否存储成功
     */public boolean indexDocument(String id, T t) {return indexDocument(id, type, t);}/**
     * 向ES发送存储请求,将一个对象存储到服务器。
     *
     * @param t 存储实例
     * @return 返回存储之后在ES服务器内生成的随机ID
     */public String indexDocument(T t) {IndexResponse indexResponse = client.prepareIndex(index, type).setSource(toJSONString(t)).execute().actionGet();return indexResponse.getId();}/**
     * 向ES发送存储请求,将一个对象存储到服务器,这个方法允许用户手动指定该对象的存储类型名称
     *
     * @param id   对象id
     * @param type 存储类型
     * @param t    存储实例
     * @return 是否存储成功
     */public boolean indexDocument(String id, String type, T t) {IndexResponse indexResponse = client.prepareIndex(index, type, id).setSource(toJSONString(t)).execute().actionGet();return true;}/**
     * 向ES发送批量储存请求, 请求不会马上提交,而是会等待到达bulk设置的阈值后进行提交.<br/>
     * 最后客户端需要调用{@link #flushBulk()}方法.
     *
     * @param id 对象id
     * @param t  存储实例
     * @return 成功表示放入到bulk成功, 可能会抛出runtimeException
     */
    public boolean indexDocumentBulk(String id, T t) {
        return indexDocumentBulk(id, type, t);
    }    /**
     * 向ES发送批量存储请求,将一个对象存储到服务器,这个方法允许用户手动指定该对象的存储类型名称
     *
     * @param id   对象id
     * @param type 存储类型
     * @param t    存储实例
     * @return 成功表示放入到bulk成功, 可能会抛出runtimeException
     * @see #indexDocument(String, Object)
     */
    public boolean indexDocumentBulk(String id, String type, T t) {
        IndexRequest indexRequest = new IndexRequest(index, type, id).source(toJSONString(t));
        bulkProcessor.add(indexRequest);
        return true;
    }    /**
     * 向ES发送批量存储请求, 请求不会马上提交,而是会等待到达bulk设置的阈值后进行提交.<br/>
     * 最后客户端需要调用{@link #flushBulk()}方法.
     *
     * @param t 存储实例
     * @return 成功表示放入到bulk成功, 可能会抛出runtimeException
     */
    public boolean indexDocumentBulk(T t) {
        IndexRequest indexRequest = new IndexRequest(index, type).source(toJSONString(t));
        bulkProcessor.add(indexRequest);
        return true;
    }    public boolean indexDocumentBulk(List<T> list) {
        for (T t : list) {
            indexDocumentBulk(t);
        }
        return true;
    }    /**
     * 向ES发送批量存储请求, 允许传入一个Function, 用来从对象中获取ID.
     *
     * @param list       对象列表
     * @param idFunction 获取ID
     * @return 成功表示放入到bulk成功, 可能会抛出runtimeException
     */
    public boolean indexDocumentBulk(List<T> list, Function<T, String> idFunction) {
        for (T t : list) {
            indexDocumentBulk(idFunction.apply(t), t);
        }
        return true;
    }    /**
     * 向ES发送更新文档请求,将一个对象更新到服务器,会替换原有对应ID的数据。
     *
     * @param id id
     * @param t  存储对象
     * @return 是否更新成功
     */
    public boolean updateDocument(String id, T t) {
        return updateDocument(id, type, t);
    }    /**
     * 向ES发送更新文档请求,将一个对象更新到服务器,会替换原有对应ID的数据。
     *
     * @param id   id
     * @param type 存储类型
     * @param t    存储对象
     * @return 是否更新成功
     */
    public boolean updateDocument(String id, String type, T t) {
        client.prepareUpdate(index, type, id).setDoc(toJSONString(t))
                .execute().actionGet();
        return true;
    }    /**
     * 向ES发送批量更新请求
     *
     * @param id 索引ID
     * @param t  存储对象
     * @return 成功表示放入到bulk成功, 可能会抛出runtimeException
     */
    public boolean updateDocumentBulk(String id, T t) {
        UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(toJSONString(t));
        bulkProcessor.add(updateRequest);
        return true;
    }    /**
     * 向ES发送upsert请求, 如果该document不存在将会新建这个document, 如果存在则更新.
     *
     * @param id id
     * @param t  存储对象
     * @return 是否执行成功
     */
    public boolean upsertDocument(String id, T t) {
        return upsertDocument(id, type, t);
    }    /**
     * 向ES发送upsert请求, 如果该document不存在将会新建这个document, 如果存在则更新.
     *
     * @param id   id
     * @param type 存储类型
     * @param t    存储对象
     * @return 是否执行成功
     */
    public boolean upsertDocument(String id, String type, T t) {
        client.prepareUpdate(index, type, id).setDocAsUpsert(true).setDoc(toJSONString(t))
                .execute().actionGet();
        return true;
    }    /**
     * 向ES发送批量upsert的请求.
     *
     * @param id id
     * @param t  储存对象
     * @return 是否执行成功
     */
    public boolean upsertDocumentBulk(String id, T t) {
        UpdateRequest updateRequest = new UpdateRequest(index, type, id)
                .doc(toJSONString(t));
        updateRequest.docAsUpsert(true);
        bulkProcessor.add(updateRequest);
        return true;
    }    /**
     * 向ES发送获取指定ID文档的请求
     *
     * @param id id
     * @return 搜索引擎实例
     * @throws Exception
     */
    public T getDocument(String id) throws Exception {
        try {
            GetResponse getResponse = client.prepareGet(index, type, id)
                    .execute().actionGet();
            if (getResponse.getSource() == null) {
                return null;
            }
            JSONObject jsonObject = new JSONObject(getResponse.getSource());            T t = clazz.newInstance();
            toObject(t, jsonObject);
            return t;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }    /**
     * 向ES发送删除指定ID文档的请求
     *
     * @param id id
     * @return 是否删除成功
     * @throws Exception
     */
    public boolean deleteDocument(String id) throws Exception {
        return deleteDocument(id, type);
    }    /**
     * 向ES发送删除指定ID文档的请求
     *
     * @param id   id
     * @param type 存储类型
     * @return 是否删除成功
     * @throws Exception
     */
    public boolean deleteDocument(String id, String type) throws Exception {
        DeleteResponse deleteResponse = client.prepareDelete(index, type, id)
                .execute().actionGet();
        return deleteResponse.isFound();
    }    /**
     * 向ES发送搜索文档的请求,返回分页结果
     *
     * @param searchText 搜索内容
     * @return 分页结果
     * @throws Exception
     */
    public PageResponse<T> searchDocument(String searchText) throws Exception {
        PageRequest pageRequest = WebContext.get().page();
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText))
                .setFrom(pageRequest.getOffset())
                .setSize(pageRequest.getLimit())
                .setFetchSource(true);
        return searchDocument(searchRequestBuilder);
    }    /**
     * 向ES发送搜索文档的请求,返回列表结果
     *
     * @param searchText 搜索内容
     * @param start      起始位置
     * @param size       获取数据大小
     * @return 返回数据列表
     * @throws Exception
     */
    public List<T> searchDocument(String searchText, int start, int size) throws Exception {
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText))
                .setFrom(start)
                .setSize(size)
                .setFetchSource(true);        PageResponse<T> pageResponse = searchDocument(searchRequestBuilder);
        return pageResponse.getItemList();
    }    /**
     * 向ES发送搜索文档的请求,返回列表结果
     *
     * @param searchText 搜索内容
     * @param type       类型
     * @param start      起始位置
     * @param size       数据大小
     * @return 返回数据列表
     * @throws Exception
     */
    public List<T> searchDocument(String searchText, String type, int start, int size) throws Exception {
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText))
                .setFrom(start)
                .setSize(size)
                .setFetchSource(true);        PageResponse<T> pageResponse = searchDocument(searchRequestBuilder);
        return pageResponse.getItemList();
    }    /**
     * 向ES发送搜索文档的请求,返回分页结果
     *
     * @param searchRequestBuilder 搜索构造器
     * @return 分页结果
     * @throws Exception
     */
    public PageResponse<T> searchDocument(SearchRequestBuilder searchRequestBuilder) throws Exception {
        SearchResponse searchResponse = search(searchRequestBuilder);
        return searchResponseToPageResponse(searchResponse);
    }    /**
     * 获得scrollId对应的数据. 请查看{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/>
     * 可以反复调用该方法, 直到返回数据为0.
     *
     * @param scrollId    给定的scrollId
     * @param keepSeconds scroll数据保留时间
     * @return 分页结果
     * @throws Exception
     */
    public PageResponse<T> scrollSearchDocument(String scrollId, int keepSeconds) throws Exception {
        return searchResponseToPageResponse(scrollSearch(scrollId, keepSeconds));
    }    /**
     * 向ES发送搜索请求,然后直接返回原始结果。
     *
     * @param searchRequestBuilder 搜索构造器
     * @return 返回结果
     */
    public SearchResponse search(SearchRequestBuilder searchRequestBuilder) {
        return searchRequestBuilder.setTypes(type).execute().actionGet();
    }    /**
     * 向ES发送搜索请求,然后直接返回原始结果。
     *
     * @param searchRequestBuilder 搜索构造器
     * @param type                 类型
     * @return 返回结果
     */
    @Deprecated
    public SearchResponse search(SearchRequestBuilder searchRequestBuilder, String type) {
        return searchRequestBuilder.setTypes(type).execute().actionGet();
    }    /**
     * 通过scrollId获得数据.请查看{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/>
     * 可以反复调用该方法, 直到返回数据为0.
     *
     * @param scrollId    给定的scrollId
     * @param keepSeconds scroll继续保留的时间, 建议60秒
     * @return 返回获取的数据
     */
    public SearchResponse scrollSearch(String scrollId, int keepSeconds) {
        return client.prepareSearchScroll(scrollId).setScroll(new TimeValue(keepSeconds * 1000))
                .execute().actionGet();
    }    /**
     * 提供搜索构造器来获得搜索scrollId, 这个scrollId用作{@link #scrollSearch(String, int)}
     * 和{@link #scrollSearchDocument(String, int)}的参数. <br/>
     * 当需要获取大量数据的时候, 请使用scrollSearch来进行.
     *
     * @param searchRequestBuilder 搜索构造器
     * @param keepSeconds          scroll搜索保留时间, 建议60秒
     * @param sizePerShard         每次每个分片获取的尺寸
     * @return 返回scrollId, 用于scrollSearch方法.
     */
    public String getScrollId(SearchRequestBuilder searchRequestBuilder, int keepSeconds, int sizePerShard) {
        SearchResponse searchResponse = searchRequestBuilder.setSearchType(SearchType.SCAN)
                .setScroll(new TimeValue(keepSeconds * 1000))
                .setSize(sizePerShard).execute().actionGet();
        return searchResponse.getScrollId();
    }    /**
     * 返回搜索指定内容后,总共ES找到匹配的数据量。
     *
     * @param searchText 搜索内容
     * @return 搜索结果数据量
     */
    @Deprecated
    public long countSearchResult(String searchText) {
        CountRequestBuilder countRequestBuilder = client.prepareCount(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText));
        return countSearchResult(countRequestBuilder);
    }    /**
     * 返回搜索指定内容后,总共ES找到匹配的数据量。
     *
     * @param searchText 搜索内容
     * @param type       类型
     * @return 搜索结果数据量
     */
    @Deprecated
    public long countSearchResult(String searchText, String type) {
        CountRequestBuilder countRequestBuilder = client.prepareCount(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText));
        return countSearchResult(countRequestBuilder);
    }    /**
     * 返回搜索指定内容后,总共ES找到匹配的数据量。
     *
     * @param countRequestBuilder 计数请求构造器实例
     * @return 搜索结果数据量
     * @see #prepareCount()
     */
    @Deprecated
    public long countSearchResult(CountRequestBuilder countRequestBuilder) {
        return countRequestBuilder.execute().actionGet().getCount();
    }    /**
     * 用默认的分词器进行文本分词。
     *
     * @param docText 给定的文本
     * @param order   是否使用排序,如果使用排序,则相同分词会被合并,并且出现次数最高的排在返回列表最头部。
     * @return 分词器将文本分词之后的词语列表
     */
    public List<String> analyzeDocument(String docText, boolean order) {
        List<AnalyzeToken> tokenList = analyzeDocument(docText, DEFAULT_ANALYZER);
        if (order) {
            // 如果是使用排序,按照分词出现次数进行排序,并且会合并相同的分词。
            // 构造分词Map,key为分词,value为出现次数。
            Map<String, Integer> tokenMap = Maps.newHashMap();
            for (AnalyzeToken token : tokenList) {
                if (tokenMap.get(token.getTerm()) == null) {
                    tokenMap.put(token.getTerm(), 1);
                } else {
                    tokenMap.put(token.getTerm(), tokenMap.get(token.getTerm()) + 1);
                }
            }            // 将分词Map进行排序
            List<Map.Entry<String, Integer>> tokenSortList = Ordering.from(new Comparator<Map.Entry<String, Integer>>() {
                @Override
                public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                    return o2.getValue().compareTo(o1.getValue());
                }
            }).sortedCopy(tokenMap.entrySet());            // 返回分词列表。
            return Lists.transform(tokenSortList, new Function<Map.Entry<String, Integer>, String>() {
                @Override
                public String apply(Map.Entry<String, Integer> input) {
                    return input.getKey();
                }
            });
        } else {
            // 返回所有分词结果
            return Lists.transform(tokenList, new Function<AnalyzeToken, String>() {
                @Override
                public String apply(AnalyzeToken input) {
                    return input.getTerm();
                }
            });
        }
    }    /**
     * 用指定分词器来分析给定的文本
     *
     * @param docText  给定的文本
     * @param analyzer 指定的分析器
     * @return 分词器将文本分词之后的词语列表
     */
    public List<AnalyzeToken> analyzeDocument(String docText, String analyzer) {
        AnalyzeResponse analyzeResponse = client.admin().indices().prepareAnalyze(index, docText)
                .setAnalyzer(analyzer)
                .execute().actionGet();
        return analyzeResponse.getTokens();
    }    /**
     * 获得一个搜索请求构造器的实例,通过这个实例,可以进行查询相关操作。<br/>
     * 使用这个方法{@link ESClient#searchDocument(SearchRequestBuilder)}进行查询。
     * <pre>
     *     prepareSearch("telepathy")
     *          .setTypes("article")
     *          .setSearchType(SearchType.QUERY_THEN_FETCH)
     *          .setQuery(QueryBuilders.matchQuery("_all", searchText))
     *          .setFrom(pageRequest.getLimit() * (pageRequest.getPage() - 1))
     *          .setSize(pageRequest.getLimit())
     *          .setExplain(true)
     *          .addHighlightedField("title", 100, 1)
     *          .setFetchSource(new String[]{}, new String[]{});
     * </pre>
     *
     * @return 搜索请求构造器实例
     */
    public SearchRequestBuilder prepareSearch() {
        return client.prepareSearch(index);
    }    /**
     * 获得一个计数请求构造器的实例,通过这个实例可以进行查询选项的构造。
     *
     * @return 计数请求构造器实例
     * @see #prepareSearch()
     */
    @Deprecated
    public CountRequestBuilder prepareCount() {
        return client.prepareCount(index);
    }
    
    /**
     * 获得一个Document的term vector (doc frequency, positions, offsets)
     *
     * @return TermVectorResponse
     * @see #termVector()
     */
    public ActionFuture<TermVectorResponse> termVector(TermVectorRequest request) {
        return client.termVector(request);
    }    /**
     * 将SQL转换成ES的JSON查询对象.
     *
     * @param sql 给定的SQL
     * @return JSON对象
     */
    public JSONObject convertSqlToJSON(String sql) {
        if (sqlJsonMap.get(sql) != null) {
            return sqlJsonMap.get(sql);
        }        List<String> addresses = Lists.newArrayList(serverHttpAddressList);
        while (addresses.size() > 0) {
            String sqlPluginUrl = "http://" + addresses.remove(RandomUtils.nextInt(0, addresses.size())) + "/_sql/_explain";
            try {
                JSONObject json = JSONObject.parseObject(
                        MucangHttpClient.getDefault().httpPostBody(sqlPluginUrl, sql, "text/plain")
                );                sqlJsonMap.put(sql, json);
                return json;
            } catch (Exception e) {
                LOG.error("调用elasticsearch-sql插件时遇到错误, 原因:{}", e);
            }
        }
        throw new RuntimeException("调用elasticsearch-sql插件多次失败, 请检查服务器或者插件功能是否正常.");
    }    /**
     * 用SQL语句进行搜索. 使用${keyName}的方式代表需要替换的字符串(需要替换的字符串请用双引号或者单引号引起来, 否则插件不能解析)<br/>
     * 例如: select * from table where mediaId="${mediaId}"<br/>
     *
     * @param sql     指定的SQL
     * @param kvPairs 替换键值对
     * @return 搜索结果
     * @throws Exception
     */
    public SearchResponse searchSql(String sql, final Map<String, String> kvPairs) throws Exception {
        JSONObject jsonQuery = convertSqlToJSON(sql);        PropertyPlaceholderHelper propertyPlaceholderHelper = new PropertyPlaceholderHelper("${", "}");
        String queryString = propertyPlaceholderHelper.replacePlaceholders(
                jsonQuery.toJSONString(),
                new PropertyPlaceholderHelper.PlaceholderResolver() {
                    @Override
                    public String resolvePlaceholder(String placeholderName) {
                        if (StringUtils.isBlank(kvPairs.get(placeholderName))) {
                            return "";
                        } else {
                            return kvPairs.get(placeholderName);
                        }
                    }
                });
        SearchRequestBuilder searchRequestBuilder = prepareSearch()
                .setSource(XContentFactory.jsonBuilder().value(JSONObject.parseObject(queryString)));
        return search(searchRequestBuilder);
    }    /**
     * 将给定的对象转换成JSON字符串,如果有特殊需求,可以覆盖该方法。
     *
     * @param t 给定的对象
     * @return JSON字符串
     */
    public String toJSONString(T t) {
        return JSON.toJSONString(t);
    }    /**
     * 将给定的Map里面的值注入到目标对象。如果有特殊需求,可以覆盖该方法。
     *
     * @param t   目标对象
     * @param map 给定的map
     * @throws Exception
     */
    public void toObject(T t, Map<String, ?> map) throws Exception {
        dozerBeanMapper.map(map, t);
    }    /**
     * 将BulkProcessor的缓冲内容进行立即提交.
     */
    public void flushBulk() {
        this.bulkProcessor.flush();
    }    public BulkProcessor getBulkProcessor() {
        return bulkProcessor;
    }    public void setBulkProcessor(BulkProcessor bulkProcessor) {
        this.bulkProcessor = bulkProcessor;
    }    public PageResponse<T> searchResponseToPageResponse(SearchResponse searchResponse) throws Exception {
        PageResponse<T> pageResponse = new PageResponse<>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
            // 将结果实例化成对应的类型实例
            T t = this.clazz.newInstance();
            Map<String, Object> hitMap;            if (searchHit.getSource() != null) {
                hitMap = searchHit.getSource();
            } else {
                hitMap = Maps.newHashMap(Maps.transformValues(searchHit.getFields(),
                        new Function<SearchHitField, Object>() {
                            @Override
                            public Object apply(SearchHitField input) {
                                return input.getValues();
                            }
                        }
                ));
            }            for (HighlightField highlightField : searchHit.getHighlightFields().values()) {
                hitMap.put(highlightField.getName(),
                        StringUtils.join(highlightField.getFragments(), "..."));
            }            // 将数据转换成对应的实例
            toObject(t, hitMap);
            pageResponse.getItemList().add(t);
        }
        pageResponse.setTotal(searchResponse.getHits().getTotalHits());
        return pageResponse;
    }    /**
     * 关闭native的链接.
     */
    public void close() {
        IOUtils.closeQuietly(bulkProcessor);
        this.client.close();
    }}

如果有问题大家可以留言一起交流, 我也是一个es初学者。

分类: 工作经验,全文检索
标签: ElasticSearch

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mybatis学习(31):修改部分字段(有外键,先查询,再修改)

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

mybatis学习(32):删除操作

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

mybatis学习(33):动态sql if

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

小程序·云开发实战 - 体重记录小程序

前一段看到朋友圈里总是有人用txt记录体重&#xff0c;就特别想写一个记录体重的小程序&#xff0c; 现在小程序的云开发有云函数、数据库&#xff0c;真的挺好用&#xff0c;很适合个人开发者&#xff0c;服务器域名什么都不用管&#xff0c;云开发让你完全不用操心这些东西。…

mybatis学习(34):动态sql-choose

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

小程序·云开发实战 - 校园约拍小程序

创意来源于生活&#xff0c;之所以开发这个校园约拍小程序&#xff0c;是因为在摄影选修课上常听老师抱怨外出写生老找不到模特&#xff0c;许多大学生都想拥有一套专属自己记忆的摄影作品&#xff0c;记录下不会磨灭的美好回忆&#xff0c;可如何找到让自己满意的摄影师是他们…

mybatis学习(35):sql-where

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

mybatis学习(36):动态sql-set

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

从10亿光年到0.1飞米的世界 (经典!震撼!)

从10亿光年到0.1飞米的世界! 10亿光年,是一个什么概念?光年:光走一年的路程.光速:每秒299792458米,一秒可绕地球7圈半.在10亿光年下观测的宇宙.上面的每一个象素点所表现的事物都是无比古远的.1亿光年.把视野缩小了10倍,宇宙看起来还是星光点点,1000万光年,把眼光再降低一个数…

高效、稳定开发功能的一些心得

在开始编码前一定要足够了解案子&#xff0c;了解各种特殊情况&#xff0c;和美术、策划、服务器沟通好&#xff0c;最后写好伪代码。 一些建议 1.尽量复用&#xff0c;例如重复的对象单独抽出来做成item&#xff0c;别的模块也用到的做成通用item&#xff0c;不要写重复代码。…

Flume与Kafka整合案例详解

环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Zookeeper 3.4.5 Flume 1.6.0 Kafka 2.1.0 flume笔记 直接贴配置文件 [rootzero239 kafka_2.10-0.10.1.1]# cat /opt/hadoop/apache-flume-1.6.0-bin/conf/kafka-conf.properties # The configuration file needs to …

mybatis学习(37):动态sql-trim

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

SQL-简单查询

/*人员&#xff1a;LDH功能&#xff1a;SQL-简单查询日期&#xff1a;2018-7-18*/USE TSQLFundamentals2008; GO-- Select some columns information. SELECT empid,lastname,firstname,address,city,country FROM HR.Employees;-- GROUP BY SELECT 国家 country,COUNT(1) AS …

mybatis学习(38):动态sql-foreach

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

mybatis学习(39):动态sql片段

目录结构 com.geyao.mybatis.mapper BlogMapper类 package com.geyao.mybatis.mapper;import java.util.List; import java.util.Map;import org.apache.ibatis.annotations.Param;import com.geyao.mybatis.pojo.Blog;public interface BlogMapper {Blog selectBlog(Integer…

中国古代十三美男

一、潘安    潘岳&#xff0c;就是人所周知的潘安&#xff0c;西晋时河南人氏&#xff0c;表字安仁&#xff0c;小字檀奴。其人“姿容既好&#xff0c;神情亦佳”。潘岳年轻时&#xff0c;坐车到洛阳城外游玩&#xff0c;当时不少妙龄姑娘见了他&#xff0c;都会怦然心动给他…

mybatis学习(40):逆向工程的创建

目录 首先导入我们的jar包 链接&#xff1a;https://pan.baidu.com/s/1Ent3kAwOagOZLT0XxDLEeA 提取码&#xff1a;zqpu 建立一个com.geyao.generator的包 generator的java类 package com.geyao.generator; import java.io.File; import java.util.*;import org.mybatis.ge…

SpringBoot中Tomcat配置(学习SpringBoot实战)

1、Tomcat配置 Spring Boot默认内嵌的Tomcat为Servlet容器&#xff0c;所以本节只讲对Tomcat配置&#xff0c;其实本节的配置对Tomcat、Jetty和Undertow都是通用的。 1.1 配置Tomcat 关于Tomcat的所有属性都在org.springframework.boot.autoconfigure.web.ServerProperties配…

axios的数据请求方式及跨域

express 的三大功能&#xff1a;静态资源、路由、模板引擎 app.use(express.static(www));  只要是创建这个静态的目录&#xff0c;这个 www 的静态目录里面的文件就可以被访问 数据的请求方式 axios get 的 请求方式    axios.get(url地址).then(function(success){  //…

mybatis学习(41):使用逆向工程

新建一个项目&#xff0c;将逆向工程的生成的拷贝进来 配置文件 log4j.properties ### \u914D\u7F6E\u6839 ### log4j.rootLogger debug,console ,fileAppender,dailyRollingFile,ROLLING_FILE,MAIL,DATABASE### \u8BBE\u7F6E\u8F93\u51FAsql\u7684\u7EA7\u522B\uFF0C\u5176…