(四)elasticsearch 源码之索引流程分析

https://www.cnblogs.com/darcy-yuan/p/17024341.html

1.概览

前面我们讨论了es是如何启动,本文研究下es是如何索引文档的。

下面是启动流程图,我们按照流程图的顺序依次描述。

 

其中主要类的关系如下:

2. 索引流程 (primary)

我们用postman发送请求,创建一个文档

我们发送的是http请求,es也有一套http请求处理逻辑,和spring的mvc类似

// org.elasticsearch.rest.RestControllerprivate void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {final int contentLength = request.content().length();if (contentLength > 0) {final XContentType xContentType = request.getXContentType(); // 校验content-typeif (xContentType == null) {sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);return;}if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,"Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"));return;}}RestChannel responseChannel = channel;try {if (handler.canTripCircuitBreaker()) {inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");} else {inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);}// iff we could reserve bytes for the request we need to send the response also over this channelresponseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);handler.handleRequest(request, responseChannel, client);} catch (Exception e) {responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));}}
// org.elasticsearch.rest.BaseRestHandler @Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parametersfinal RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}usageCount.increment();// execute the actionaction.accept(channel); // 执行action}
// org.elasticsearch.rest.action.document.RestIndexActionpublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {IndexRequest indexRequest;final String type = request.param("type");if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {deprecationLogger.deprecatedAndMaybeLog("index_with_types", TYPES_DEPRECATION_MESSAGE); // type 已经废弃indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));} else {indexRequest = new IndexRequest(request.param("index"));indexRequest.id(request.param("id"));}indexRequest.routing(request.param("routing"));indexRequest.setPipeline(request.param("pipeline"));indexRequest.source(request.requiredContent(), request.getXContentType());indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));indexRequest.setRefreshPolicy(request.param("refresh"));indexRequest.version(RestActions.parseVersion(request));indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));String sOpType = request.param("op_type");String waitForActiveShards = request.param("wait_for_active_shards");if (waitForActiveShards != null) {indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));}if (sOpType != null) {indexRequest.opType(sOpType);}return channel ->client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))); // 执行index操作的consumer}

然后我们来看index操作具体是怎么处理的,主要由TransportAction管理

// org.elasticsearch.action.support.TransportActionpublic final Task execute(Request request, ActionListener<Response> listener) {/** While this version of execute could delegate to the TaskListener* version of execute that'd add yet another layer of wrapping on the* listener and prevent us from using the listener bare if there isn't a* task. That just seems like too many objects. Thus the two versions of* this method.*/Task task = taskManager.register("transport", actionName, request); // 注册任务管理器,call -> taskexecute(task, request, new ActionListener<Response>() { // ActionListener 封装@Overridepublic void onResponse(Response response) {try {taskManager.unregister(task);} finally {listener.onResponse(response);}}@Overridepublic void onFailure(Exception e) {try {taskManager.unregister(task);} finally {listener.onFailure(e);}}});return task;}
...public final void execute(Task task, Request request, ActionListener<Response> listener) {ActionRequestValidationException validationException = request.validate();if (validationException != null) {listener.onFailure(validationException);return;}if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger); // 链式处理requestFilterChain.proceed(task, actionName, request, listener);}
...public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this); // 先处理过滤器} else if (i == this.action.filters.length) {this.action.doExecute(task, request, listener); // 执行action操作} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch(Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}

实际上是TransportBulkAction执行具体操作

// org.elasticsearch.action.bulk.TransportBulkActionprotected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {final long startTime = relativeTime();final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());boolean hasIndexRequestsWithPipelines = false;final MetaData metaData = clusterService.state().getMetaData();ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {IndexRequest indexRequest = getIndexWriteRequest(actionRequest);if (indexRequest != null) {// get pipeline from requestString pipeline = indexRequest.getPipeline();if (pipeline == null) { // 不是管道// start to look for default pipeline via settings found in the index meta dataIndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());// check the alias for the index request (this is how normal index requests are modeled)if (indexMetaData == null && indexRequest.index() != null) {AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); // 使用别名if (indexOrAlias != null && indexOrAlias.isAlias()) {AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;indexMetaData = alias.getWriteIndex();}}// check the alias for the action request (this is how upserts are modeled)if (indexMetaData == null && actionRequest.index() != null) {AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());if (indexOrAlias != null && indexOrAlias.isAlias()) {AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;indexMetaData = alias.getWriteIndex();}}if (indexMetaData != null) {// Find the default pipeline if one is defined from and existing index.String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());indexRequest.setPipeline(defaultPipeline);if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {hasIndexRequestsWithPipelines = true;}} else if (indexRequest.index() != null) {// No index exists yet (and is valid request), so matching index templates to look for a default pipelineList<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());assert (templates != null);String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;// order of templates are highest order first, break if we find a default_pipelinefor (IndexTemplateMetaData template : templates) {final Settings settings = template.settings();if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);break;}}indexRequest.setPipeline(defaultPipeline);if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {hasIndexRequestsWithPipelines = true;}}} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {hasIndexRequestsWithPipelines = true;}}}if (hasIndexRequestsWithPipelines) {// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,// this path is never taken.try {if (clusterService.localNode().isIngestNode()) {processBulkIndexIngestRequest(task, bulkRequest, listener);} else {ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);}} catch (Exception e) {listener.onFailure(e);}return;}if (needToCheck()) { // 根据批量请求自动创建索引,方便后续写入数据// Attempt to create all the indices that we're going to need during the bulk before we start.// Step 1: collect all the indices in the requestfinal Set<String> indices = bulkRequest.requests.stream()// delete requests should not attempt to create the index (if the index does not// exists), unless an external versioning is used.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE|| request.versionType() == VersionType.EXTERNAL|| request.versionType() == VersionType.EXTERNAL_GTE).map(DocWriteRequest::index).collect(Collectors.toSet());/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create* that we'll use when we try to run the requests. */final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();Set<String> autoCreateIndices = new HashSet<>();ClusterState state = clusterService.state();for (String index : indices) {boolean shouldAutoCreate;try {shouldAutoCreate = shouldAutoCreate(index, state);} catch (IndexNotFoundException e) {shouldAutoCreate = false;indicesThatCannotBeCreated.put(index, e);}if (shouldAutoCreate) {autoCreateIndices.add(index);}}// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.if (autoCreateIndices.isEmpty()) {executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); // 索引} else {final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());for (String index : autoCreateIndices) {createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {@Overridepublic void onResponse(CreateIndexResponse result) {if (counter.decrementAndGet() == 0) {threadPool.executor(ThreadPool.Names.WRITE).execute(() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));}}@Overridepublic void onFailure(Exception e) {if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {// fail all requests involving this index, if create didn't workfor (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest<?> request = bulkRequest.requests.get(i);if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {bulkRequest.requests.set(i, null);}}}if (counter.decrementAndGet() == 0) {executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {inner.addSuppressed(e);listener.onFailure(inner);}), responses, indicesThatCannotBeCreated);}}});}}} else {executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());}}

接下来, BulkOperation将 BulkRequest 转换成 BulkShardRequest,也就是具体在哪个分片上执行操作

// org.elasticsearch.action.bulk.TransportBulkActionprotected void doRun() {final ClusterState clusterState = observer.setAndGetObservedState();if (handleBlockExceptions(clusterState)) {return;}final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);MetaData metaData = clusterState.metaData();for (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);//the request can only be null because we set it to null in the previous step, so it gets ignoredif (docWriteRequest == null) {continue;}if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {continue;}Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); // 解析索引try {switch (docWriteRequest.opType()) {case CREATE:case INDEX:IndexRequest indexRequest = (IndexRequest) docWriteRequest;final IndexMetaData indexMetaData = metaData.index(concreteIndex);MappingMetaData mappingMd = indexMetaData.mappingOrDefault();Version indexCreated = indexMetaData.getCreationVersion();indexRequest.resolveRouting(metaData);indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); // 校验indexRequest,自动生成idbreak;case UPDATE:TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),(UpdateRequest) docWriteRequest);break;case DELETE:docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));// check if routing is required, if so, throw error if routing wasn't specifiedif (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName())) {throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());}break;default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");}} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),docWriteRequest.id(), e);BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);responses.set(i, bulkItemResponse);// make sure the request gets never processed againbulkRequest.requests.set(i, null);}}// first, go over all the requests and create a ShardId -> Operations mappingMap<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();for (int i = 0; i < bulkRequest.requests.size(); i++) {DocWriteRequest<?> request = bulkRequest.requests.get(i);if (request == null) {continue;}String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),request.routing()).shardId(); // 根据文档id路由确定分片List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());shardRequests.add(new BulkItemRequest(i, request));}if (requestsByShard.isEmpty()) {listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),buildTookInMillis(startTimeNanos)));return;}final AtomicInteger counter = new AtomicInteger(requestsByShard.size());String nodeId = clusterService.localNode().getId();for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {final ShardId shardId = entry.getKey();final List<BulkItemRequest> requests = entry.getValue();BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), // 构建BulkShardRequestrequests.toArray(new BulkItemRequest[requests.size()]));bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());bulkShardRequest.timeout(bulkRequest.timeout());if (task != null) {bulkShardRequest.setParentTask(nodeId, task.getId());}shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {@Overridepublic void onResponse(BulkShardResponse bulkShardResponse) {for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {// we may have no response if item failedif (bulkItemResponse.getResponse() != null) {bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());}responses.set(bulkItemResponse.getItemId(), bulkItemResponse);}if (counter.decrementAndGet() == 0) {finishHim();}}@Overridepublic void onFailure(Exception e) {// create failures for all relevant requestsfor (BulkItemRequest request : requests) {final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();DocWriteRequest<?> docWriteRequest = request.request();responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));}if (counter.decrementAndGet() == 0) {finishHim();}}private void finishHim() {listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),buildTookInMillis(startTimeNanos)));}});}}

看下id路由逻辑

// org.elasticsearch.cluster.routing.OperationRoutingpublic static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {final String effectiveRouting;final int partitionOffset;if (routing == null) {assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";effectiveRouting = id;} else {effectiveRouting = routing;}if (indexMetaData.isRoutingPartitionedIndex()) {partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());} else {// we would have still got 0 above but this check just saves us an unnecessary hash calculationpartitionOffset = 0;}return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);}private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset; // hash// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size// of original index to hash documentsreturn Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();}

然后看看此分片是在当前节点,还是远程节点上,现在进入routing阶段。(笔者这里只启动了一个节点,我们就看下本地节点的逻辑)

// org.elasticsearch.action.support.replication.TransportReplicationActionprotected void doRun() {setPhase(task, "routing");final ClusterState state = observer.setAndGetObservedState();final String concreteIndex = concreteIndex(state, request);final ClusterBlockException blockException = blockExceptions(state, concreteIndex);if (blockException != null) {if (blockException.retryable()) {logger.trace("cluster is blocked, scheduling a retry", blockException);retry(blockException);} else {finishAsFailed(blockException);}} else {// request does not have a shardId yet, we need to pass the concrete index to resolve shardIdfinal IndexMetaData indexMetaData = state.metaData().index(concreteIndex);if (indexMetaData == null) {retry(new IndexNotFoundException(concreteIndex));return;}if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {throw new IndexClosedException(indexMetaData.getIndex());}// resolve all derived request fields, so we can route and apply itresolveRequest(indexMetaData, request);assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :"request waitForActiveShards must be set in resolveRequest";final ShardRouting primary = primary(state);if (retryIfUnavailable(state, primary)) {return;}final DiscoveryNode node = state.nodes().get(primary.currentNodeId());if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { // 根据路由确定primary在哪个node上,然后和当前node做比较performLocalAction(state, primary, node, indexMetaData);} else {performRemoteAction(state, primary, node);}}}

既然是当前节点,那就是发送内部请求

// org.elasticsearch.transport.TransportServiceprivate <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,final TransportRequest request,final TransportRequestOptions options,TransportResponseHandler<T> handler) {if (connection == null) {throw new IllegalStateException("can't send request to a null connection");}DiscoveryNode node = connection.getNode();Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);// TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoringfinal long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));final TimeoutHandler timeoutHandler;if (options.timeout() != null) {timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);responseHandler.setTimeoutHandler(timeoutHandler);} else {timeoutHandler = null;}try {if (lifecycle.stoppedOrClosed()) {/** If we are not started the exception handling will remove the request holder again and calls the handler to notify the* caller. It will only notify if toStop hasn't done the work yet.** Do not edit this exception message, it is currently relied upon in production code!*/// TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForActionthrow new TransportException("TransportService is closed stopped can't send request");}if (timeoutHandler != null) {assert options.timeout() != null;timeoutHandler.scheduleTimeout(options.timeout());}connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream...private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);try {onRequestSent(localNode, requestId, action, request, options);onRequestReceived(requestId, action);final RequestHandlerRegistry reg = getRequestHandler(action); // 注册器模式 action -> handlerif (reg == null) {throw new ActionNotFoundTransportException("Action [" + action + "] not found");}final String executor = reg.getExecutor();if (ThreadPool.Names.SAME.equals(executor)) {//noinspection uncheckedreg.processMessageReceived(request, channel);} else {threadPool.executor(executor).execute(new AbstractRunnable() {@Overrideprotected void doRun() throws Exception {//noinspection uncheckedreg.processMessageReceived(request, channel); // 处理请求}@Overridepublic boolean isForceExecution() {return reg.isForceExecution();}@Overridepublic void onFailure(Exception e) {try {channel.sendResponse(e);} catch (Exception inner) {inner.addSuppressed(e);logger.warn(() -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action), inner);}}@Overridepublic String toString() {return "processing of [" + requestId + "][" + action + "]: " + request;}});}

然后获取在分片上的执行请求许可

// org.elasticsearch.action.support.replication.TransportReplicationActionprotected void doRun() throws Exception {final ShardId shardId = primaryRequest.getRequest().shardId();final IndexShard indexShard = getIndexShard(shardId);final ShardRouting shardRouting = indexShard.routingEntry();// we may end up here if the cluster state used to route the primary is so stale that the underlying// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails// the replica will take over and a replica will be assigned to the first node.if (shardRouting.primary() == false) {throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);}final String actualAllocationId = shardRouting.allocationId().getId();if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",primaryRequest.getTargetAllocationID(), actualAllocationId);}final long actualTerm = indexShard.getPendingPrimaryTerm();if (actualTerm != primaryRequest.getPrimaryTerm()) {throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);}acquirePrimaryOperationPermit( // 获取在primary分片上执行操作的许可indexShard,primaryRequest.getRequest(),ActionListener.wrap(releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),e -> {if (e instanceof ShardNotInPrimaryModeException) {onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));} else {onFailure(e);}}));}

现在进入primary阶段

// org.elasticsearch.action.support.replication.TransportReplicationAction                    setPhase(replicationTask, "primary");final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {primaryShardReference.close(); // release shard operation lock before responding to callersetPhase(replicationTask, "finished");onCompletionListener.onResponse(response);}, e -> handleException(primaryShardReference, e));final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {if (syncGlobalCheckpointAfterOperation) {final IndexShard shard = primaryShardReference.indexShard;try {shard.maybeSyncGlobalCheckpoint("post-operation");} catch (final Exception e) {// only log non-closed exceptionsif (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {// intentionally swallow, a missed global checkpoint sync should not fail this operationlogger.info(new ParameterizedMessage("{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);}}}referenceClosingListener.onResponse(response);}, referenceClosingListener::onFailure);new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();

中间的调用跳转不赘述,最后TransportShardBulkAction 调用索引引引擎

// org.elasticsearch.action.bulk.TransportShardBulkAction
static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,ActionListener<Void> itemDoneListener) throws Exception {final DocWriteRequest.OpType opType = context.getCurrent().opType();final UpdateHelper.Result updateResult;if (opType == DocWriteRequest.OpType.UPDATE) {final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();try {updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);} catch (Exception failure) {// we may fail translating a update to index or delete operation// we use index result to communicate failure while translating update requestfinal Engine.Result result =new Engine.IndexResult(failure, updateRequest.version());context.setRequestToExecute(updateRequest);context.markOperationAsExecuted(result);context.markAsCompleted(context.getExecutionResult());return true;}// execute translated update requestswitch (updateResult.getResponseResult()) {case CREATED:case UPDATED:IndexRequest indexRequest = updateResult.action();IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData();MappingMetaData mappingMd = metaData.mappingOrDefault();indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex());context.setRequestToExecute(indexRequest);break;case DELETED:context.setRequestToExecute(updateResult.action());break;case NOOP:context.markOperationAsNoOp(updateResult.action());context.markAsCompleted(context.getExecutionResult());return true;default:throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());}} else {context.setRequestToExecute(context.getCurrent());updateResult = null;}assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED statefinal IndexShard primary = context.getPrimary();final long version = context.getRequestToExecute().version();final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;final Engine.Result result;if (isDelete) {final DeleteRequest request = context.getRequestToExecute();result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),request.ifSeqNo(), request.ifPrimaryTerm());} else {final IndexRequest request = context.getRequestToExecute();result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( // lucene 执行引擎request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());}

3.索引流程(replica)

在ReplicationOperation的execute方法中,primary分片执行完操作后,监听器会向复制分片发送请求

// org.elasticsearch.action.support.replication.ReplicationOperationpublic void execute() throws Exception {final String activeShardCountFailure = checkActiveShardCount();final ShardRouting primaryRouting = primary.routingEntry();final ShardId primaryId = primaryRouting.shardId();if (activeShardCountFailure != null) {finishAsFailed(new UnavailableShardsException(primaryId,"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));return;}totalShards.incrementAndGet();pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordinationprimary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure)); // 监听器调用 handlePrimaryResult}private void handlePrimaryResult(final PrimaryResultT primaryResult) {this.primaryResult = primaryResult;primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());final ReplicaRequest replicaRequest = primaryResult.replicaRequest();if (replicaRequest != null) {if (logger.isTraceEnabled()) {logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);}// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.// we have to make sure that every operation indexed into the primary after recovery start will also be replicated// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.// we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset// of the sampled replication group, and advanced further than what the given replication group would allow it to.// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.final long globalCheckpoint = primary.computedGlobalCheckpoint();// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed// on.final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";final ReplicationGroup replicationGroup = primary.getReplicationGroup();markUnavailableShardsAsStale(replicaRequest, replicationGroup);performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); // 在复制分片上执行操作}successfulShards.incrementAndGet();  // mark primary as successfuldecPendingAndFinishIfNeeded();}...private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {// for total stats, add number of unassigned shards and// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)totalShards.addAndGet(replicationGroup.getSkippedShards().size());final ShardRouting primaryRouting = primary.routingEntry();for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { // 轮询各个复制分片if (shard.isSameAllocation(primaryRouting) == false) {performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);}}}private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {if (logger.isTraceEnabled()) {logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);}totalShards.incrementAndGet();pendingActions.incrementAndGet();replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, // 调用代理ReplicasProxynew ActionListener<ReplicaResponse>() {@Overridepublic void onResponse(ReplicaResponse response) {successfulShards.incrementAndGet();try {primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());} catch (final AlreadyClosedException e) {// the index was deleted or this shard was never activated after a relocation; fall through and finish normally} catch (final Exception e) {// fail the primary but fall through and let the rest of operation processing completefinal String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);primary.failShard(message, e);}decPendingAndFinishIfNeeded();}@Overridepublic void onFailure(Exception replicaException) {logger.trace(() -> new ParameterizedMessage("[{}] failure while performing [{}] on replica {}, request [{}]",shard.shardId(), opType, shard, replicaRequest), replicaException);// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.if (TransportActions.isShardNotAvailableException(replicaException) == false) {RestStatus restStatus = ExceptionsHelper.status(replicaException);shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));}String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));}@Overridepublic String toString() {return "[" + replicaRequest + "][" + shard + "]";}});}

发送transport请求

// org.elasticsearch.action.support.replication.TransportReplicationActionpublic void performOn(final ShardRouting replica,final ReplicaRequest request,final long primaryTerm,final long globalCheckpoint,final long maxSeqNoOfUpdatesOrDeletes,final ActionListener<ReplicationOperation.ReplicaResponse> listener) {String nodeId = replica.currentNodeId();final DiscoveryNode node = clusterService.state().nodes().get(nodeId);if (node == null) {listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));return;}final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener,ReplicaResponse::new);transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); // 发送transport请求
}

副分片收到请求处理结果与主分片类似,最后调用lucene引擎

// org.elasticsearch.action.bulk.TransportShardBulkActionprivate static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest<?> docWriteRequest,IndexShard replica) throws Exception {final Engine.Result result;switch (docWriteRequest.opType()) {case CREATE:case INDEX:final IndexRequest indexRequest = (IndexRequest) docWriteRequest;final ShardId shardId = replica.shardId();final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.type(), indexRequest.id(),indexRequest.source(), indexRequest.getContentType(), indexRequest.routing());result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), // 调用lucene引擎indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);break;case DELETE:DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),deleteRequest.type(), deleteRequest.id());break;default:assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());}

4.总结

本文简单描述了es索引流程,包括了http请求是如何解析的,如何确定分片的。但是仍有许多不足,比如没有讨论远程节点是如何处理的,lucene执行引擎的细节,后面博客会继续探讨这些课题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/676282.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows Server 2025 Hyper-V 新变化

今天简单跟大家聊聊Windows Server 2025 Hyper-V一些新功能新变化&#xff0c;具体如下&#xff1a; 在 VM 之间共享 GPU 随着图形处理器的重要性日益增加&#xff0c;特别是由于它们在 AI 应用程序中的核心作用&#xff0c;Hyper-V 中对 GPU 的现有支持已不再足够。到目前为…

[Java][算法 哈希]Day 01---LeetCode 热题 100---01~03

LeetCode 热题 100---01~03 ------->哈希 第一题 两数之和 思路 最直接的理解就是 找出两个数的和等于目标数 这两个数可以相同 但是不能是同一个数字&#xff08;从数组上理解就是内存上不是同一位置&#xff09; 解法一&#xff1a;暴力法 暴力解万物 按照需求 …

报错ValueError: Unknown CUDA arch (8.6) or GPU not supported

文章目录 问题描述解决方案参考文献 问题描述 报错 ValueError: Unknown CUDA arch (8.6) or GPU not supported 本人显卡为 RTX 3060&#xff0c;CUDA 为 10.2&#xff0c;PyTorch 为 1.5 解决方案 修改 C:\Users\Administrator\Envs\test\Lib\site-packages\torch\utils\c…

【PyQt】08 - 编辑Tab顺序

文章目录 前言一、Tab顺序二、编辑Tab顺序总结 前言 介绍了什么是Tab顺序&#xff0c;以及如何修改Tab顺序。 一、Tab顺序 当你的界面设计好之后&#xff0c;在输入栏按住Tab按键&#xff0c;他会按照你摆放的顺序一次转跳 二、编辑Tab顺序 方法一 然后鼠标左击就可以改变…

前端JavaScript篇之对this对象的理解

目录 对this对象的理解1. 函数调用模式&#xff1a;2. 方法调用模式&#xff1a;3. 构造器调用模式&#xff1a;4. apply、call和bind调用模式&#xff1a; 对this对象的理解 在JavaScript中&#xff0c;this关键字是一个非常重要的概念&#xff0c;它用于指向当前执行上下文中…

【CV论文精读】EarlyBird: Early-Fusion for Multi-View Tracking in the Bird’s Eye View

【CV论文精读】EarlyBird: Early-Fusion for Multi-View Tracking in the Bird’s Eye View 0.论文摘要 多视图聚合有望克服多目标检测和跟踪中的遮挡和漏检挑战。多视图检测和3D对象检测中的最新方法通过将所有视图投影到地平面并在鸟瞰视图&#xff08;BEV&#xff09;中执…

面试经典150题 -- 栈(总结)

总的链接 面试经典 150 题 - 学习计划 - 力扣&#xff08;LeetCode&#xff09;全球极客挚爱的技术成长平台 关于栈 -- stack 的学习链接 c的STL中的栈 -- stack-CSDN博客 20 . 有效的括号 这题直接用栈模拟就好了; 这里用一种取巧的方法 , 当遇见左括号&#xff0c;加入右…

JAVA设计模式之建造者模式详解

建造者模式 1 建造者模式介绍 建造者模式 (builder pattern), 也被称为生成器模式 , 是一种创建型设计模式. 定义: 将一个复杂对象的构建与表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 **建造者模式要解决的问题 ** 建造者模式可以将部件和其组装过程分开…

Dynamo批量处理多个Revit文件?

Hello大家好&#xff01;我是九哥~ 最近很多小伙伴都在咨询Dynamo如何批量处理多个Revit文件&#xff0c;之前写过一篇《Dynamo批量修改多文件项目基点参数》&#xff0c;利用的是后台打开Revit的方式&#xff0c;可以实现一些批量操作的功能。 但是这个方法&#xff0c;对于一…

物理信息神经网络(PINN): 将物理知识融合到深度学习中

物理信息神经网络&#xff08;PINN&#xff09;: 将物理知识融合到深度学习中 物理信息神经网络&#xff08;PINN&#xff09;简介PINN的工作原理PINN模型如何利用物理法则指导模型训练1. 定义物理问题和相应的物理定律2. 构建神经网络3. 定义损失函数数据误差项 (Data-fidelit…

IoC原理

Spring框架的IOC是基于Java反射机制实现的&#xff0c;那具体怎么实现的&#xff0c;下面研究一下 反射 Java反射机制是在运行状态中&#xff0c;对于任意一个类&#xff0c;都能够知道这个类的所有属性和方法&#xff1b;对于任意一个对象&#xff0c;都能够调用它的任意方法…

多 split 窗口 in Gtkmm4

文章目录 效果预览实现概要源代码 效果预览 实现概要 使用Gtk::Paned虽然 Paned 只能装两个子控件, 但是我可以嵌套 paned1 装 box1 和 box2 paned2 装 paned1 和 box3 源代码 #include <gtkmm.h> class ExampleWindow : public Gtk::Window { public:ExampleWindow()…

大模型基础架构的变革:剖析Transformer的挑战者(下)

上一篇文章中&#xff0c;我们介绍了UniRepLKNet、StripedHyena、PanGu-π等有可能会替代Transformer的模型架构&#xff0c;这一篇文章我们将要介绍另外三个有可能会替代Transformer的模型架构&#xff0c;它们分别是StreamingLLM、SeTformer、Lightning Attention-2&#xff…

07 A B 从计数器到可控线性序列机

07. A.从计数器到可控线性序列机 让LED灯按照亮0.25秒。灭0.75秒的状态循环亮灭让LED灯按照亮0.25秒&#xff0c;灭0.5秒&#xff0c;亮0.75秒&#xff0c;灭1秒的状态循环亮灭让LED灯按照指定的亮灭模式亮灭&#xff0c;亮灭模式未知&#xff0c;由用户随即指定。以0.25秒为一…

高职单招怎么搜答案? #经验分享#微信#笔记

当今社会&#xff0c;随着信息技术的迅猛发展&#xff0c;大学生们在学习过程中面临着各种各样的困难和挑战。而在这些挑战中&#xff0c;面对繁重的作业和复杂的题目&#xff0c;大学生搜题软件应运而生 1.题老大 这是一个公众号 亿级数量题库&#xff0c;可截图搜题&#…

动态SQl简单创建

创建pojo实体类&#xff0c;使用lombok注解 package com.example.pojo;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.time.LocalDate; import java.time.LocalDateTime;Data NoArgsConstructor AllArgsConstructor pu…

Maven私服部署与JAR文件本地安装

Nexus3 是一个仓库管理器&#xff0c;它极大地简化了本地内部仓库的维护和外部仓库的访问。 平常我们在获取 maven 仓库资源的时候&#xff0c;都是从 maven 的官方&#xff08;或者国内的镜像&#xff09;获取。团队的多人员同样的依赖都要从远程获取一遍&#xff0c;从网络方…

【每日一题】LeetCode——反转链表

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有…

MySQL ——group by子句使用with rollup

group by 子句使用with rollup关键字之后&#xff0c;具有分组加和的功能。即&#xff1a;在所有的分组记录之后&#xff0c;自动新增一条记录&#xff0c;从全局计算所有记录的数据。 0 问题描述 求出每年的学生平均成绩&#xff0c;及历史至今的平均成绩&#xff0c;结果保留…

c++二叉树寒假特训题目(2)

hello&#xff0c;我是Joseph&#xff0c;今天推出第二期c二叉树寒假特训题目。 第一期传送门 第一期答案传送门 这期有7题&#xff0c;目录如下。 目录 题目 二叉树结点查找 二叉树是否对称 ​编辑 二叉排序树 层次遍历 根据前序中序求后序 二叉树高度 ​编辑 二…