(五)elasticsearch 源码之查询流程分析

https://www.cnblogs.com/darcy-yuan/p/17039526.html

1.概述

上文我们讨论了es(elasticsearch,下同)索引流程,本文讨论es查询流程,以下是基本流程图

2.查询流程

为了方便调试代码,笔者在电脑上启动了了两个节点,创建了一个索引如下,该索引有两个分片,没有复制分片

 使用postman发送如下请求:

 接下来,我们看代码(本系列文章源代码版本为7.4.0),search查询也是rest请求

// org.elasticsearch.action.support.TransportAction        public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this); // 先处理过滤器} else if (i == this.action.filters.length) {this.action.doExecute(task, request, listener); // 执行action操作} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch(Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}

具体执行操作的是 TransportSearchAction,TransportSearchAction 对查询索引的顺序做了一些优化,我们这里用的是 QUERY_THEN_FETCH

// org.elasticsearch.action.search.TransportSearchAction    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {final long relativeStartNanos = System.nanoTime();final SearchTimeProvider timeProvider =new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {if (source != searchRequest.source()) {// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch// situations when source is rewritten to null due to a bugsearchRequest.source(source);}final ClusterState clusterState = clusterService.state();final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); // 查询当前节点} else {if (shouldMinimizeRoundtrips(searchRequest)) { // 使用了折叠ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext,remoteClusterService, threadPool, listener,(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));} else {AtomicInteger skippedClusters = new AtomicInteger(0);collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,ActionListener.wrap(searchShardsResponses -> {List<SearchShardIterator> remoteShardIterators = new ArrayList<>();Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);int localClusters = localIndices == null ? 0 : 1;int totalClusters = remoteClusterIndices.size() + localClusters;int successfulClusters = searchShardsResponses.size() + localClusters;executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));},listener::onFailure));}}}, listener::onFailure);if (searchRequest.source() == null) {rewriteListener.onResponse(searchRequest.source());} else {Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),rewriteListener); // 重写后回调}}...private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,SearchResponse.Clusters clusters) {clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // 读锁// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead// of just for the _search apifinal Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),searchRequest.indices());routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);if (shouldSplitIndices(searchRequest)) { // 分开查询只读索引和在写索引,并且优先查在写索引//Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.//Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other//indices (possibly slower) being searched at the same time.List<String> writeIndicesList = new ArrayList<>();List<String> readOnlyIndicesList = new ArrayList<>();splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);String[] writeIndices = writeIndicesList.toArray(new String[0]);String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);if (readOnlyIndices.length == 0) {executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap,aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);} else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap,aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);} else {//Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so//that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.CountDown countDown = new CountDown(2);AtomicReference<Exception> exceptions = new AtomicReference<>();SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,searchService::createReduceContext);CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =new CountDownActionListener<SearchResponse, SearchResponse>(countDown, exceptions, listener) {@Overridevoid innerOnResponse(SearchResponse searchResponse) {searchResponseMerger.add(searchResponse);}@OverrideSearchResponse createFinalResponse() {return searchResponseMerger.getMergedResponse(clusters);}};//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and//will be provided separately to executeSearch.SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,SearchResponse.Clusters.EMPTY);//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and//will be provided separately to executeSearch.SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,SearchResponse.Clusters.EMPTY);}} else {String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new);executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap,aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);}}...private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,searchRequest.getLocalClusterAlias(), remoteShardIterators);failIfOverShardCountLimit(clusterService, shardIterators.size());// optimize search type for cases where there is only one shard group to search onif (shardIterators.size() == 1) {// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shardsearchRequest.searchType(QUERY_THEN_FETCH); // 单个分片,不需要dfs了}if (searchRequest.allowPartialSearchResults() == null) {// No user preference defined in search request - apply cluster service defaultsearchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}if (searchRequest.isSuggestOnly()) {// disable request cache if we have only suggestsearchRequest.requestCache(false);if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {// convert to Q_T_F if we have only suggestsearchRequest.searchType(QUERY_THEN_FETCH);}}final DiscoveryNodes nodes = clusterState.nodes();BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get, remoteConnections, searchTransportService::getConnection);boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); // 执行 SearchQueryThenFetchAsyncAction,异步处理}

 接下来执行 QUERY_THEN_FETCH的逻辑,从上面的时序图中我们看到 QUERY_THEN_FETCH主要分为四个阶段(phase),init, query, fetch, send response

// org.elasticsearch.action.search.AbstractSearchAsyncAction    private void executePhase(SearchPhase phase) {try {phase.run(); // 执行阶段} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);}onPhaseFailure(phase, "", e);}}

首先是init阶段

// org.elasticsearch.action.search.AbstractSearchAsyncActionpublic final void run() {for (final SearchShardIterator iterator : toSkipShardsIts) {assert iterator.skip();skipShard(iterator);}if (shardsIts.size() > 0) {assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";if (request.allowPartialSearchResults() == false) {final StringBuilder missingShards = new StringBuilder();// Fail-fast verification of all shards being availablefor (int index = 0; index < shardsIts.size(); index++) {final SearchShardIterator shardRoutings = shardsIts.get(index);if (shardRoutings.size() == 0) {if(missingShards.length() > 0){missingShards.append(", ");}missingShards.append(shardRoutings.shardId());}}if (missingShards.length() > 0) {//Status red - shard is missing all copies and would produce partial results for an index searchfinal String msg = "Search rejected due to missing shards ["+ missingShards +"]. Consider using `allow_partial_search_results` setting to bypass this error.";throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);}}for (int index = 0; index < shardsIts.size(); index++) { // 轮询分片搜索final SearchShardIterator shardRoutings = shardsIts.get(index);assert shardRoutings.skip() == false;performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());}}}

然后是query阶段,query阶段调用transportService去查当前节点,或者其他节点查询符合条件的文档

// org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction    protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,final SearchActionListener<SearchPhaseResult> listener) {getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),buildShardSearchRequest(shardIt), getTask(), listener);}

节点收到请求后找到对应的处理器处理

// org.elasticsearch.action.search.SearchTransportService transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, // 注册query的请求处理器(request, channel, task) -> {searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));});

 构建queryContext进行查询

// org.elasticsearch.search.SearchServiceprivate SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {final SearchContext context = createAndPutContext(request);context.incRef();try {context.setTask(task);final long afterQueryTime;try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {contextProcessing(context);loadOrExecuteQueryPhase(request, context); // query 主逻辑if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {freeContext(context.id());} else {contextProcessedSuccessfully(context);}afterQueryTime = executor.success();}if (request.numberOfShards() == 1) {return executeFetchPhase(context, afterQueryTime); // fetch 逻辑}return context.queryResult();} catch (Exception e) {// execution exception can happen while loading the cache, strip itif (e instanceof ExecutionException) {e = (e.getCause() == null || e.getCause() instanceof Exception) ?(Exception) e.getCause() : new ElasticsearchException(e.getCause());}logger.trace("Query phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}
...private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {final boolean canCache = indicesService.canCache(request, context);context.getQueryShardContext().freezeContext();if (canCache) { // 看下是否有缓存indicesService.loadIntoContext(request, context, queryPhase);} else {queryPhase.execute(context);}}
...public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {if (searchContext.hasOnlySuggest()) {suggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);return;}if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));}// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F// request, preProcess is called on the DFS phase phase, this is why we pre-process them// here to make sure it happens during the QUERY phaseaggregationPhase.preProcess(searchContext);final ContextIndexSearcher searcher = searchContext.searcher();boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled); // 查询主逻辑if (rescore) { // only if we do a regular searchrescorePhase.execute(searchContext); // 重新打分}suggestPhase.execute(searchContext); // 处理建议,聚合aggregationPhase.execute(searchContext);if (searchContext.getProfilers() != null) {ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());searchContext.queryResult().profileResults(shardResults);}}
...static boolean execute(SearchContext searchContext,final IndexSearcher searcher,Consumer<Runnable> checkCancellationSetter) throws QueryPhaseExecutionException {final IndexReader reader = searcher.getIndexReader();QuerySearchResult queryResult = searchContext.queryResult();queryResult.searchTimedOut(false);try {queryResult.from(searchContext.from());queryResult.size(searchContext.size());Query query = searchContext.query();assert query == searcher.rewrite(query); // already rewrittenfinal ScrollContext scrollContext = searchContext.scrollContext();if (scrollContext != null) {if (scrollContext.totalHits == null) {// first roundassert scrollContext.lastEmittedDoc == null;// there is not much that we can optimize here since we want to collect all// documents in order to get the total number of hits} else {final ScoreDoc after = scrollContext.lastEmittedDoc;if (returnsDocsInOrder(query, searchContext.sort())) {// now this gets interesting: since we sort in index-order, we can directly// skip to the desired docif (after != null) {query = new BooleanQuery.Builder().add(query, BooleanClause.Occur.MUST).add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER).build();}// ... and stop collecting after ${size} matchessearchContext.terminateAfter(searchContext.size());} else if (canEarlyTerminate(reader, searchContext.sort())) {// now this gets interesting: since the search sort is a prefix of the index sort, we can directly// skip to the desired docif (after != null) {query = new BooleanQuery.Builder().add(query, BooleanClause.Occur.MUST).add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER).build();}}}}final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();// whether the chain contains a collector that filters documentsboolean hasFilterCollector = false;if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {// add terminate_after before the filter collectors// it will only be applied on documents accepted by these filter collectorscollectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));// this collector can filter documents during the collectionhasFilterCollector = true;}if (searchContext.parsedPostFilter() != null) {// add post filters before aggregations// it will only be applied to top hitscollectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));// this collector can filter documents during the collectionhasFilterCollector = true;}if (searchContext.queryCollectors().isEmpty() == false) {// plug in additional collectors, like aggregationscollectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));}if (searchContext.minimumScore() != null) {// apply the minimum score after multi collector so we filter aggs as wellcollectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));// this collector can filter documents during the collectionhasFilterCollector = true;}boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;final Runnable timeoutRunnable;if (timeoutSet) {final long startTime = searchContext.getRelativeTimeInMillis();final long timeout = searchContext.timeout().millis();final long maxTime = startTime + timeout;timeoutRunnable = () -> {final long time = searchContext.getRelativeTimeInMillis();if (time > maxTime) {throw new TimeExceededException();}};} else {timeoutRunnable = null;}final Runnable cancellationRunnable;if (searchContext.lowLevelCancellation()) {SearchTask task = searchContext.getTask();cancellationRunnable = () -> { if (task.isCancelled()) throw new TaskCancelledException("cancelled"); };} else {cancellationRunnable = null;}final Runnable checkCancelled;if (timeoutRunnable != null && cancellationRunnable != null) {checkCancelled = () -> {timeoutRunnable.run();cancellationRunnable.run();};} else if (timeoutRunnable != null) {checkCancelled = timeoutRunnable;} else if (cancellationRunnable != null) {checkCancelled = cancellationRunnable;} else {checkCancelled = null;}checkCancellationSetter.accept(checkCancelled);// add cancellable// this only performs segment-level cancellation, which is cheap and checked regardless of// searchContext.lowLevelCancellation()collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled));final boolean doProfile = searchContext.getProfilers() != null;// create the top docs collector last when the other collectors are knownfinal TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector);// add the top docs collector, the first collector context in the chaincollectors.addFirst(topDocsFactory);final Collector queryCollector;if (doProfile) {InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);queryCollector = profileCollector;} else {queryCollector = QueryCollectorContext.createQueryCollector(collectors);}try {searcher.search(query, queryCollector); // 调用lucene api} catch (EarlyTerminatingCollector.EarlyTerminationException e) {queryResult.terminatedEarly(true);} catch (TimeExceededException e) {assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";if (searchContext.request().allowPartialSearchResults() == false) {// Can't rethrow TimeExceededException because not serializablethrow new QueryPhaseExecutionException(searchContext, "Time exceeded");}queryResult.searchTimedOut(true);} finally {searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);}if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER&& queryResult.terminatedEarly() == null) {queryResult.terminatedEarly(false);}final QuerySearchResult result = searchContext.queryResult();for (QueryCollectorContext ctx : collectors) {ctx.postProcess(result);}ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);if (executor instanceof QueueResizingEsThreadPoolExecutor) {QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());}if (searchContext.getProfilers() != null) {ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());result.profileResults(shardResults);}return topDocsFactory.shouldRescore();} catch (Exception e) {throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);}}

至此,节点查询逻辑完成。请求查询的节点对查询结果进行保存

// org.elasticsearch.action.search.AbstractSearchAsyncActionpublic final void onShardSuccess(Result result) {successfulOps.incrementAndGet();results.consumeResult(result); // 处理查询结果if (logger.isTraceEnabled()) {logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);}// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level// so its ok concurrency wise to miss potentially the shard failures being created because of another failure// in the #addShardFailure, because by definition, it will happen on *another* shardIndexAtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();if (shardFailures != null) {shardFailures.set(result.getShardIndex(), null);}}
// org.elasticsearch.action.search.InitialSearchPhasevoid consumeResult(Result result) {assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set";results.set(result.getShardIndex(), result); // 处理查询结果}

下一步是fetch阶段

// org.elasticsearch.action.search.FetchSearchPhaseprivate void innerRun() throws IOException {final int numShards = context.getNumShards();final boolean isScrollSearch = context.getRequest().scroll() != null;List<SearchPhaseResult> phaseResults = queryResults.asList();String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); // 解析上一步的查询结果,主要是文档idfinal boolean queryAndFetchOptimization = queryResults.length() == 1;final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);if (queryAndFetchOptimization) {assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()+ "], single result: " +  phaseResults.get(0).fetchResult();// query AND fetch optimizationfinishPhase.run();} else {ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs); // fetch哪些文档if (scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and returnphaseResults.stream().map(SearchPhaseResult::queryResult).forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resourcesfinishPhase.run();} else {final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards): null;final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or notfinishPhase, context);for (int i = 0; i < docIdsToLoad.length; i++) {IntArrayList entry = docIdsToLoad[i];SearchPhaseResult queryResult = queryResults.get(i);if (entry == null) { // no results for this shard IDif (queryResult != null) {// if we got some hits from this shard we have to release the context there// we do this as we go since it will free up resources and passing on the request on the// transport layer is cheap.releaseIrrelevantSearchContext(queryResult.queryResult());}// in any case we count down this result since we don't talk to this shard anymorecounter.countDown();} else {SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),searchShardTarget.getNodeId());ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),connection); // 去fetch文档内容}}}}

 最后收集结果返回:

// org.elasticsearch.action.search.AbstractSearchAsyncAction    protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {ShardSearchFailure[] failures = buildShardFailures();Boolean allowPartialResults = request.allowPartialSearchResults();assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";if (allowPartialResults == false && failures.length > 0){raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));}return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),skippedOps.get(), buildTookInMillis(), failures, clusters);}

3.elasticsearch中的回调

es中大量使用listener回调,对于习惯了顺序逻辑的同学可能会不太适应,这里举例说明

可以看到doExecute方法定义了一个很长的rewriteListener,然后在Rewriteable中进行回调。

注意到doExecute 方法参数里面也有一个listener,调用 executeLocalSearch 后也会进行回调。有些回调可能有多层,需要层层往上递归。

// org.elasticsearch.action.search.TransportSearchActionprotected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {final long relativeStartNanos = System.nanoTime();final SearchTimeProvider timeProvider =new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> { // 1.先定义listenerif (source != searchRequest.source()) {// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch// situations when source is rewritten to null due to a bugsearchRequest.source(source);}final ClusterState clusterState = clusterService.state();final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); // 查询当前节点} else {if (shouldMinimizeRoundtrips(searchRequest)) { // 使用了折叠ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext,remoteClusterService, threadPool, listener,(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));} else {AtomicInteger skippedClusters = new AtomicInteger(0);collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,ActionListener.wrap(searchShardsResponses -> {List<SearchShardIterator> remoteShardIterators = new ArrayList<>();Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);int localClusters = localIndices == null ? 0 : 1;int totalClusters = remoteClusterIndices.size() + localClusters;int successfulClusters = searchShardsResponses.size() + localClusters;executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));},listener::onFailure));}}}, listener::onFailure);if (searchRequest.source() == null) {rewriteListener.onResponse(searchRequest.source());} else {Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),rewriteListener); // 2. rewriteAndFetch}}
// org.elasticsearch.index.query.Rewriteablestatic <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T>rewriteResponse, int iteration) {T builder = original;try {for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;rewrittenBuilder = builder.rewrite(context)) {builder = rewrittenBuilder;if (iteration++ >= MAX_REWRITE_ROUNDS) {// this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds// and then we fail to prevent infinite loopsthrow new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not " +"rewritten");}if (context.hasAsyncActions()) {T finalBuilder = builder;final int currentIterationNumber = iteration;context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,currentIterationNumber), rewriteResponse::onFailure));return;}}rewriteResponse.onResponse(builder); // 3. 回调 rewriteListener} catch (IOException|IllegalArgumentException|ParsingException ex) {rewriteResponse.onFailure(ex);}}

4.总结

 本文简单描述了es是如何进行文档查询的,es会先去各个分片上获取符合查询条件的文档id等信息,然后再fetch文档内容。本文没有涉及dfs,后面博客会继续探讨这些课题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/679973.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

S32 Design Studio PE工具配置GPIO

首先我们来讲最简单的GPIO配置 代码生成 按照下图步骤就能配置一个基本的GPIO口&#xff0c;在组件里面选择pin_mux&#xff0c;选中就能配置使能和方向&#xff0c;no pin routed就是没有配置的。GPIO口分ABCDE组&#xff0c;每组从0到最大的序号。 然后在functional prope…

多机多卡运行nccl-tests和channel获取

nccl-tests 环境1. 安装nccl2. 安装openmpi3. 单机测试4. 多机测试mpirun多机多进程多节点运行nccl-testschannel获取 环境 Ubuntu 22.04.3 LTS (GNU/Linux 5.15.0-91-generic x86_64)cuda 11.8 cudnn 8nccl 2.15.1NVIDIA GeForce RTX 4090 *2 1. 安装nccl #查看cuda版本 nv…

npm config set registry https://registry.npm.taobao.org 这个设置了默认的镜像源之后如何恢复默认的镜像源

要恢复npm默认的镜像源&#xff0c;你可以使用以下命令将registry设置回npm的官方源&#xff1a; npm config set registry https://registry.npmjs.org/这个命令会修改你的全局npm配置&#xff0c;将包的下载源改回npm官方的源。这样做之后&#xff0c;任何后续的npm install…

逐鹿比特币生态,Elastos 携新作 BeL2「重出江湖」

撰文&#xff1a;Babywhale&#xff0c;Techub News 文章来源Techub News&#xff0c;搜Tehub News下载查看更多Web3资讯。 刚刚过去的 2023 年&#xff0c;「比特币生态」成为了市场的绝对焦点之一。从铭文开始&#xff0c;到重新走进大众视野的 Stacks 与比特币闪电网络&am…

算法竞赛进阶指南——基本算法(倍增)

ST表 可以求区间最大、最小、gcd、lcm&#xff0c;符合 f(a, a) a都可以 求区间最值&#xff0c;一个区间划分成两段 f[i][j]: 从i开始&#xff0c;长度为2^j的区间最值 #include<iostream> #include<cmath> using namespace std; const int N 1e6 10; int n,…

mxxWechatBot流程与原理

大家伙&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。 免责声明&#xff1a;该工具仅供学习使用&#xff0c;禁止使用该工具从事违法活动&#xff0c;否则永久拉黑封禁账号&#xff01;&#xff01;&#xff01;本人不对任何工具的使用负责&am…

【FPGA Verilog】各种加法器Verilog

1bit半加器adder设计实例 module adder(cout,sum,a,b); output cout; output sum; input a,b; wire cout,sum; assign {cout,sum}ab; endmodule 解释说明 &#xff08;1&#xff09;assign {cout,sum}ab 是连续性赋值 对于线网wire进行赋值&#xff0c;必须以assign或者dea…

【Linux】学习-基础IO拓展篇

Linux基础IO拓展篇—详解文件系统 理解文件系统 在Linux基础IO篇中&#xff0c;我们站在用户的视角对文件进行了理解&#xff0c;主要是针对被打开的文件&#xff0c;那么有没有没有被打开的文件呢&#xff1f;当然有&#xff01;今天我们换个视角&#xff0c;来站在系统的角…

95.网游逆向分析与插件开发-游戏窗口化助手-窗口化助手显示与大小调整

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;地图数据获取的逆向分析与C代码还原 码云地址&#xff08;游戏窗口化助手 分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;e85c0fc8b85895c8c…

备战蓝桥杯---数学基础3

本专题主要围绕同余来讲&#xff1a; 下面介绍一下基本概念与定理&#xff1a; 下面给出解这方程的一个例子&#xff1a; 下面是用代码实现扩展欧几里得算法&#xff1a; #include<bits/stdc.h> using namespace std; int gcd(int a,int b,int &x,int &y){if(b…

【MySQL】MySQL函数学习和总结

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” #mermaid-svg-Ny0xnYjfHqF7s3aS {font-family:"trebuchet ms",verdana,arial,sans-serif;font-siz…

Springboot+vue的社区智慧养老监护管理平台设计与实现(有报告),Javaee项目,springboot vue前后端分离项目

演示视频&#xff1a; Springbootvue的社区智慧养老监护管理平台设计与实现&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的社区智慧养老监护管理平台设…

OpenAI推出ChatGPT已经过去一年多了,AI 取代了内容创作者吗

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

OnlyOffice-8.0版本深度测评

OnlyOffice 是一套全面的开源办公协作软件&#xff0c;不断演进的 OnlyOffice 8.0 版本为用户带来了一系列引人瞩目的新特性和功能改进。OnlyOffice 8.0 版本在功能丰富性、安全性和用户友好性上都有显著提升&#xff0c;为用户提供了更为强大、便捷和安全的文档处理和协作环境…

CentOS在VMWare中扩容

1.相关概念 物理卷&#xff1a;简称PV&#xff0c;逻辑卷管理中处于最底层&#xff0c;它可以是实际物理硬盘上的分区&#xff0c;也可以是整个物理硬盘&#xff0c;一块硬盘&#xff0c;或多块硬盘&#xff0c;如/dev/sdb。 卷组&#xff1a;简称VG&#xff0c;建立在物理卷之…

配置VMware实现从服务器到虚拟机的一键启动脚本

正文共&#xff1a;1666 字 15 图&#xff0c;预估阅读时间&#xff1a;2 分钟 首先祝大家新年快乐&#xff01;略备薄礼&#xff0c;18000个红包封面来讨个开年好彩头&#xff01; 虽然之前将服务器放到了公网&#xff08;成本增加了100块&#xff0c;内网服务器上公网解决方案…

HarmonyOS 状态管理装饰器 Observed与ObjectLink 处理嵌套对象/对象数组 结构双向绑定

本文 我们还是来说 两个 harmonyos 状态管理的装饰器 Observed与ObjectLink 他们是用于 嵌套对象 或者 以对象类型为数组元素 的数据结构 做双向同步的 之前 我们说过的 state和link 都无法捕捉到 这两种数据内部结构的变化 这里 我们模拟一个类数据结构 class Person{name:…

Java 学习和实践笔记(3)

安装和配置成功&#xff1a; 运行第一个程序时出现这个错误&#xff1a;javac不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件。 找到这篇文章看了下&#xff1a;javac 不是内部或外部命令&#xff0c;也不是可运行的程序 或批处理文件。_javac 不是内部或外部…

假期day5

TCP UDP区别 共同点&#xff1a;都是属于传输层的协议 TCP&#xff1a;稳定。面向连接的&#xff0c;有可靠的数据传输服务。传输过程中数据无误&#xff0c;无丢失&#xff0c;无失序&#xff0c;无重复。传输效率低&#xff0c;耗费资源多。数据收发不同步&#xff0c;有沾…

从源码学习final的使用

从源码学习final的使用 final的作用 ​ final字面意思&#xff0c;意为最终的、不可变的。在Java中&#xff0c;final可以用来修饰类、方法和变量&#xff0c;可以分别起到不同的作用。 final修饰类&#xff1a;表示该类不可以被继承&#xff1b;final修饰方法&#xff1a;表…