【Elasticsearch源码】 分片恢复分析

带着疑问学源码,第七篇:Elasticsearch 分片恢复分析
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 8.0.0-SNAPSHOT

目的

在看源码之前先梳理一下,自己对于分片恢复的疑问点:

  • 网上对于ElasticSearch分片恢复的逻辑说法一抓一把,网上说的对不对?新版本中有没有更新?
  • 在分片恢复的时候,如果收到Api _forcemerge请求,这时候,会如何处理?(因为副本恢复的第一节点是复制segment文件)
    • 这部分等看/_forcemerge api的时候,再解答一下。
  • 分片恢复的第二阶段是同步translog,这一步会不会加锁?不加锁的话,如何确保是同步完成了?

如果说看源码有捷径的话,那么找到网上一篇写的比较权威的源码分析文章跟着看,那不失为一种好方法。
下面源码分析部分将参考腾讯云的:Elasticsearch 底层系列之分片恢复解析,一边参考,一边印证。

源码分析

目标节点请求恢复

先找到分片恢复的入口:IndicesClusterStateService.createOrUpdateShards

在这里会判断本地节点是否在routingNodes中,如果在,说明本地节点有分片创建或更新的需求,否则跳过。

private void createOrUpdateShards(final ClusterState state) {// 节点到索引分片的映射关系,主要用于分片分配、均衡决策// 具体的内容可以看下:https://jiankunking.com/elasticsearch-cluster-state.htmlRoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());if (localRoutingNode == null) {return;}DiscoveryNodes nodes = state.nodes();RoutingTable routingTable = state.routingTable();for (final ShardRouting shardRouting : localRoutingNode) {ShardId shardId = shardRouting.shardId();// failedShardsCache:https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L116// 恢复过程中失败的碎片列表;我们跟踪这些碎片,以防止在每次集群状态更新时重复恢复这些碎片if (failedShardsCache.containsKey(shardId) == false) {AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";Shard shard = indexService.getShardOrNull(shardId.id());if (shard == null) { // shard不存在则需创建assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";createShard(nodes, routingTable, shardRouting, state);} else { // 存在则更新updateShard(nodes, shardRouting, shard, routingTable, state);}}}}

副本分片恢复走的是createShard分支,在该方法中,首先获取shardRouting的类型,如果恢复类型为PEER,说明该分片需要从远端获取,则需要找到源节点,然后调用IndicesService.createShard:

RecoverySource的Type有以下几种:

EMPTY_STORE,
EXISTING_STORE,//主分片本地恢复
PEER,//副分片从远处主分片恢复
SNAPSHOT,//从快照恢复
LOCAL_SHARDS//从本节点其它分片恢复(shrink时)

createShard代码如下:

private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;DiscoveryNode sourceNode = null;// 如果恢复方式是peer,则会找到shard所在的源节点进行恢复if (shardRouting.recoverySource().getType() == Type.PEER)  {sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);if (sourceNode == null) {logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());return;}}try {final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);indicesService.createShard(shardRouting,recoveryTargetService,new RecoveryListener(shardRouting, primaryTerm),repositoriesService,failedShardHandler,this::updateGlobalCheckpointForShard,retentionLeaseSyncer,nodes.getLocalNode(),sourceNode);} catch (Exception e) {failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);}}/*** Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard* routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not.*/private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes,ShardRouting shardRouting) {DiscoveryNode sourceNode = null;if (!shardRouting.primary()) {ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();// only recover from started primary, if we can't find one, we will do it next roundif (primary.active()) {// 找到primary shard所在节点sourceNode = nodes.get(primary.currentNodeId());if (sourceNode == null) {logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", primary);}} else {logger.trace("can't find replica source node because primary shard {} is not active.", primary);}} else if (shardRouting.relocatingNodeId() != null) {// 找到搬迁的源节点sourceNode = nodes.get(shardRouting.relocatingNodeId());if (sourceNode == null) {logger.trace("can't find relocation source node for shard {} because it is assigned to an unknown node [{}].",shardRouting.shardId(), shardRouting.relocatingNodeId());}} else {throw new IllegalStateException("trying to find source node for peer recovery when routing state means no peer recovery: " +shardRouting);}return sourceNode;}

源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。

public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,Consumer<MappingMetadata> mappingUpdateConsumer,IndicesService indicesService) {// TODO: Create a proper object to encapsulate the recovery context// all of the current methods here follow a pattern of:// resolve context which isn't really dependent on the local shards and then async// call some external method with this pointer.// with a proper recovery context object we can simply change this to:// startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {//     markAsRecovery("from " + source.getShortDescription(), recoveryState);//     threadPool.generic().execute()  {//           onFailure () { listener.failure() };//           doRun() {//                if (source.recover(this)) {//                  recoveryListener.onRecoveryDone(recoveryState);//                }//           }//     }}// }assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());switch (recoveryState.getRecoverySource().getType()) {case EMPTY_STORE:case EXISTING_STORE:executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);break;case PEER:try {markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);} catch (Exception e) {failShard("corrupted preexisting index", e);recoveryListener.onRecoveryFailure(recoveryState,new RecoveryFailedException(recoveryState, null, e), true);}break;case SNAPSHOT:final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();executeRecovery("from snapshot",recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l));break;case LOCAL_SHARDS:final IndexMetadata indexMetadata = indexSettings().getIndexMetadata();final Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();final List<IndexShard> startedShards = new ArrayList<>();final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);final Set<ShardId> requiredShards;final int numShards;if (sourceIndexService != null) {requiredShards = IndexMetadata.selectRecoverFromShards(shardId().id(),sourceIndexService.getMetadata(), indexMetadata.getNumberOfShards());for (IndexShard shard : sourceIndexService) {if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {startedShards.add(shard);}}numShards = requiredShards.size();} else {numShards = -1;requiredShards = Collections.emptySet();}if (numShards == startedShards.size()) {assert requiredShards.isEmpty() == false;executeRecovery("from local shards", recoveryState, recoveryListener,l -> recoverFromLocalShards(mappingUpdateConsumer,startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));} else {final RuntimeException e;if (numShards == -1) {e = new IndexNotFoundException(resizeSourceIndex);} else {e = new IllegalStateException("not all required shards of index " + resizeSourceIndex+ " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "+ shardId());}throw e;}break;default:throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());}}

对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中,首先调用getStartRecoveryRequest获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为StartRecoveryRequest,通过RPC发送到源节点:

private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {final String actionName;final TransportRequest requestToSend;final StartRecoveryRequest startRequest;final RecoveryState.Timer timer;try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {if (recoveryRef == null) {logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);return;}final RecoveryTarget recoveryTarget = recoveryRef.target();timer = recoveryTarget.state().getTimer();if (preExistingRequest == null) {try {final IndexShard indexShard = recoveryTarget.indexShard();indexShard.preRecovery();assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());indexShard.prepareForIndexRecovery();final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";// 构造recovery request startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);requestToSend = startRequest;actionName = PeerRecoverySourceService.Actions.START_RECOVERY;} catch (final Exception e) {// this will be logged as warning later on...logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);onGoingRecoveries.failRecovery(recoveryId,new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);return;}logger.trace("{} starting recovery from {}", startRequest.shardId(), startRequest.sourceNode());} else {startRequest = preExistingRequest;requestToSend = new ReestablishRecoveryRequest(recoveryId, startRequest.shardId(), startRequest.targetAllocationId());actionName = PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY;logger.trace("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());}}// 向源节点发送请求,请求恢复transportService.sendRequest(startRequest.sourceNode(), actionName, requestToSend,new RecoveryResponseHandler(startRequest, timer));}/*** Prepare the start recovery request.** @param logger         the logger* @param localNode      the local node of the recovery target* @param recoveryTarget the target of the recovery* @param startingSeqNo  a sequence number that an operation-based peer recovery can start with.*                       This is the first operation after the local checkpoint of the safe commit if exists.* @return a start recovery request*/public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,RecoveryTarget recoveryTarget, long startingSeqNo) {final StartRecoveryRequest request;logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());Store.MetadataSnapshot metadataSnapshot;try {metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.try {final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;} catch (IOException | TranslogCorruptedException e) {logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +"resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);metadataSnapshot = Store.MetadataSnapshot.EMPTY;startingSeqNo = UNASSIGNED_SEQ_NO;}} catch (final org.apache.lucene.index.IndexNotFoundException e) {// happens on an empty folder. no need to logassert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;logger.trace("{} shard folder empty, recovering all files", recoveryTarget);metadataSnapshot = Store.MetadataSnapshot.EMPTY;} catch (final IOException e) {if (startingSeqNo != UNASSIGNED_SEQ_NO) {logger.warn(new ParameterizedMessage("error while listing local files, resetting the starting sequence number from {} " +"to unassigned and recovering as if there are none", startingSeqNo), e);startingSeqNo = UNASSIGNED_SEQ_NO;} else {logger.warn("error while listing local files, recovering as if there are none", e);}metadataSnapshot = Store.MetadataSnapshot.EMPTY;}logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());request = new StartRecoveryRequest(recoveryTarget.shardId(),recoveryTarget.indexShard().routingEntry().allocationId().getId(),recoveryTarget.sourceNode(),localNode,metadataSnapshot,recoveryTarget.state().getPrimary(),recoveryTarget.recoveryId(),startingSeqNo);return request;}

注意,请求的发送是异步的。

源节点处理恢复请求

源节点接收到请求后会调用恢复的入口函数PeerRecoverySourceService.messageReceived#recover:

class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {@Overridepublic void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request));}}

recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:

private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());final IndexShard shard = indexService.getShard(request.shardId().id());final ShardRouting routingEntry = shard.routingEntry();if (routingEntry.primary() == false || routingEntry.active() == false) {throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary");}if (request.isPrimaryRelocation() && (routingEntry.relocating() == false ||routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) {logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}",request.shardId(), request.targetNode());throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");}RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),request.targetNode());handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));}/*** performs the recovery from the local engine to the target*/public void recoverToTarget(ActionListener<RecoveryResponse> listener) {addListener(listener);final Closeable releaseResources = () -> IOUtils.close(resources);try {cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {final RuntimeException e;if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on use = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");} else {e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");}if (beforeCancelEx != null) {e.addSuppressed(beforeCancelEx);}IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));throw e;});final Consumer<Exception> onFailure = e -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]");IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));};final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();runUnderPrimaryPermit(() -> {final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());if (targetShardRouting == null) {logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),request.targetNode());throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");}assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",shard, cancellableThreads, logger);// 获取一个保留锁,使得translog不被清理final Closeable retentionLock = shard.acquireHistoryRetentionLock();resources.add(retentionLock);final long startingSeqNo;// 判断是否可以从SequenceNumber恢复// 除了异常检测和版本号检测,主要在shard.hasCompleteHistoryOperations()方法中判断请求的序列号是否小于主分片节点的localCheckpoint,// 以及translog中的数据是否足以恢复(有可能因为translog数据太大或者过期删除而无法恢复)final boolean isSequenceNumberBasedRecovery= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO&& isTargetSameHistory()&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())&& ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) ||(retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery// without having a complete history.if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {// all the history we need is retained by an existing retention lease, so we do not need a separate retention lockretentionLock.close();logger.trace("history is retained by {}", retentionLeaseRef.get());} else {// all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations()// and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's// local checkpoint will be retained for the duration of this recovery.logger.trace("history is retained by retention lock");}final StepListener<SendFileResult> sendFileStep = new StepListener<>();final StepListener<TimeValue> prepareEngineStep = new StepListener<>();final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();final StepListener<Void> finalizeStep = new StepListener<>();// 若可以基于序列号进行恢复,则获取开始的序列号if (isSequenceNumberBasedRecovery) {// 如果基于SequenceNumber恢复,则startingSeqNo取值为恢复请求中的序列号,// 从请求的序列号开始快照translog。否则取值为0,快照完整的translog。logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());// 获取开始序列号startingSeqNo = request.startingSeqNo();if (retentionLeaseRef.get() == null) {createRetentionLease(startingSeqNo, sendFileStep.map(ignored -> SendFileResult.EMPTY));} else {// 发送的文件设置为空sendFileStep.onResponse(SendFileResult.EMPTY);}} else {final Engine.IndexCommitRef safeCommitRef;try {// Releasing a safe commit can access some commit files.safeCommitRef = acquireSafeCommit(shard);resources.add(safeCommitRef);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);}// Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being// able to recover other replicas using operations-based recoveries. If we are not using retention leases then we// conservatively copy all available operations. If we are using retention leases then "enough operations" is just the// operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains// at least as much history as anything else. The safe commit will often contain all the history retained by the current set// of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a// retention lease for some history that this primary already discarded, since we discard history when the global checkpoint// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled// down.startingSeqNo = Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);try {final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);final Releasable releaseStore = acquireStore(shard.store());resources.add(releaseStore);sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {try {IOUtils.close(safeCommitRef, releaseStore);} catch (final IOException ex) {logger.warn("releasing snapshot caused exception", ex);}});final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();runUnderPrimaryPermit(() -> {try {// If the target previously had a copy of this shard then a file-based recovery might move its global// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a// new one later on in the recovery.shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,deleteRetentionLeaseStep, false));} catch (RetentionLeaseNotFoundException e) {logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());deleteRetentionLeaseStep.onResponse(null);}}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",shard, cancellableThreads, logger);// 第一阶段deleteRetentionLeaseStep.whenComplete(ignored -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep);}, onFailure);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);}}assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;sendFileStep.whenComplete(r -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");// 等待phase1执行完毕,主分片节点通知副分片节点启动此分片的Engine:prepareTargetForTranslog          // 该方法会阻塞处理,直到分片 Engine 启动完毕。// 待副分片启动Engine 完毕,就可以正常接收写请求了。// 注意,此时phase2尚未开始,此分片的恢复流程尚未结束。// 等待当前操作处理完成后,以startingSeqNo为起始点,对translog做快照,开始执行phase2:// For a sequence based recovery, the target can keep its local translogprepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);}, onFailure);prepareEngineStep.whenComplete(prepareEngineTime -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");/** add shard to replication group (shard will receive replication requests from this point on) now that engine is open.* This means that any document indexed into the primary after this will be replicated to this replica as well* make sure to do this before sampling the max sequence number in the next step, to ensure that we send* all documents up to maxSeqNo in phase2.*/runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();logger.trace("snapshot for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot("peer-recovery", startingSeqNo, Long.MAX_VALUE, false);resources.add(phase2Snapshot);retentionLock.close();// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values// are at least as high as the corresponding values on the primary when any of these operations were executed on it.final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();final RetentionLeases retentionLeases = shard.getRetentionLeases();final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion();// 第二阶段,发送translogphase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);}, onFailure);// Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2final long trimAboveSeqNo = startingSeqNo - 1;sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);finalizeStep.whenComplete(r -> {final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle timefinal SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();final SendFileResult sendFileResult = sendFileStep.result();final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,prepareEngineStep.result().millis(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis());try {future.onResponse(response);} finally {IOUtils.close(resources);}}, onFailure);} catch (Exception e) {IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));}}/*** Checks if we have a completed history of operations since the given starting seqno (inclusive).* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}*/public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo)  {return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);}

从上面的代码可以看出,恢复主要分两个阶段,第一阶段恢复segment文件,第二阶段发送translog。这里有个关键的地方,在恢复前,首先需要获取translogView及segment snapshot,translogView的作用是保证当前时间点到恢复结束时间段的translog不被删除,segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:

/*** Perform phase1 of the recovery operations. Once this {@link IndexCommit}* snapshot has been performed no commit operations (files being fsync'd)* are effectively allowed on this index until all recovery phases are done* <p>* Phase1 examines the segment files on the target node and copies over the* segments that are missing. Only segments that have the same size and* checksum can be reused*/void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {cancellableThreads.checkForCancel();//拿到shard的存储信息final Store store = shard.store();try {StopWatch stopWatch = new StopWatch().start();final Store.MetadataSnapshot recoverySourceMetadata;try {// 拿到snapshot的metadatarecoverySourceMetadata = store.getMetadata(snapshot);} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {shard.failShard("recovery", ex);throw ex;}for (String name : snapshot.getFileNames()) {final StoreFileMetadata md = recoverySourceMetadata.get(name);if (md == null) {logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " +recoverySourceMetadata.asMap().size() + " files", name);}}// 如果syncid相等,再继续比较下文档数,如果都相同则不用恢复if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {final List<String> phase1FileNames = new ArrayList<>();final List<Long> phase1FileSizes = new ArrayList<>();final List<String> phase1ExistingFileNames = new ArrayList<>();final List<Long> phase1ExistingFileSizes = new ArrayList<>();// Total size of segment files that are recoveredlong totalSizeInBytes = 0;// Total size of segment files that were able to be re-usedlong existingTotalSizeInBytes = 0;// Generate a "diff" of all the identical, different, and missing// segment files on the target node, using the existing files on// the source node// 找出target和source有差别的segment// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/store/Store.java#L971final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());for (StoreFileMetadata md : diff.identical) {phase1ExistingFileNames.add(md.name());phase1ExistingFileSizes.add(md.length());existingTotalSizeInBytes += md.length();if (logger.isTraceEnabled()) {logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +" size [{}]", md.name(), md.checksum(), md.length());}totalSizeInBytes += md.length();}List<StoreFileMetadata> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());phase1Files.addAll(diff.different);phase1Files.addAll(diff.missing);for (StoreFileMetadata md : phase1Files) {if (request.metadataSnapshot().asMap().containsKey(md.name())) {logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",md.name(), request.metadataSnapshot().asMap().get(md.name()), md);} else {logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());}phase1FileNames.add(md.name());phase1FileSizes.add(md.length());totalSizeInBytes += md.length();}logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));final StepListener<Void> sendFileInfoStep = new StepListener<>();final StepListener<Void> sendFilesStep = new StepListener<>();final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();final StepListener<Void> cleanFilesStep = new StepListener<>();cancellableThreads.checkForCancel();recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);// 将需要恢复的文件发送到target nodesendFileInfoStep.whenComplete(r ->sendFiles(store, phase1Files.toArray(new StoreFileMetadata[0]), translogOps, sendFilesStep), listener::onFailure);sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);createRetentionLeaseStep.whenComplete(retentionLease ->{final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint: retentionLease + " vs " + lastKnownGlobalCheckpoint;// Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want// the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica// to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on// the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);},listener::onFailure);final long totalSize = totalSizeInBytes;final long existingTotalSize = existingTotalSizeInBytes;cleanFilesStep.whenComplete(r -> {final TimeValue took = stopWatch.totalTime();logger.trace("recovery [phase1]: took [{}]", took);listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,phase1ExistingFileSizes, existingTotalSize, took));}, listener::onFailure);} else {logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());// but we must still create a retention leasefinal StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();createRetentionLease(startingSeqNo, createRetentionLeaseStep);createRetentionLeaseStep.whenComplete(retentionLease -> {final TimeValue took = stopWatch.totalTime();logger.trace("recovery [phase1]: took [{}]", took);listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(),Collections.emptyList(), 0L, took));}, listener::onFailure);}} catch (Exception e) {throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e);}}

从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件(canSkipPhase1方法中比对的)。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:

/*** Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:* <ul>* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>* <li>different: they exist in both snapshots but their they are not identical</li>* <li>missing: files that exist in the source but not in the target</li>* </ul>* This method groups file into per-segment files and per-commit files. A file is treated as* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated* as identical iff:* <ul>* <li>all files in this segment have the same checksum</li>* <li>all files in this segment have the same length</li>* <li>the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function,* The metadata transfers the {@code .si} file content as it's hash</li>* </ul>* <p>* The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be* unique segment identifiers in there hardening this method further.* <p>* The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files* like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated* as identical iff:* <ul>* <li>all files belonging to this commit have the same checksum</li>* <li>all files belonging to this commit have the same length</li>* <li>the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,* The metadata transfers the {@code segments_N} file content as it's hash</li>* </ul>* <p>* NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery.*/public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {final List<StoreFileMetadata> identical = new ArrayList<>();// 相同的file final List<StoreFileMetadata> different = new ArrayList<>();// 不同的filefinal List<StoreFileMetadata> missing = new ArrayList<>();// 缺失的filefinal Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();for (StoreFileMetadata meta : this) {if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacycontinue; // we don't need that file at all}final String segmentId = IndexFileNames.parseSegmentName(meta.name());final String extension = IndexFileNames.getExtension(meta.name());if (IndexFileNames.SEGMENTS.equals(segmentId) ||DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {// only treat del files as per-commit files fnm files are generational but only for upgradable DVperCommitStoreFiles.add(meta);} else {perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);}}final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {identicalFiles.clear();boolean consistent = true;for (StoreFileMetadata meta : segmentFiles) {StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());if (storeFileMetadata == null) {consistent = false;missing.add(meta);// 该segment在target node中不存在,则加入到missing} else if (storeFileMetadata.isSame(meta) == false) {consistent = false;different.add(meta);// 存在但不相同,则加入到different} else {identicalFiles.add(meta);// 存在且相同}}if (consistent) {identical.addAll(identicalFiles);} else {// make sure all files are added - this can happen if only the deletes are differentdifferent.addAll(identicalFiles);}}RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical),Collections.unmodifiableList(different), Collections.unmodifiableList(missing));assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0): "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" +this.metadata.size() + "] contains  segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]";return recoveryDiff;}

这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog。

第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。

第二阶段使用当前translog的快照,而不获取写锁(但是,translog快照是translog的时间点视图)。然后,它将每个translog操作发送到目标节点,以便将其重播到新的shard中。

    /*** Perform phase two of the recovery process.* <p>* Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new* shard.** @param startingSeqNo              the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all*                                   ops should be sent* @param endingSeqNo                the highest sequence number that should be sent* @param snapshot                   a snapshot of the translog* @param maxSeenAutoIdTimestamp     the max auto_id_timestamp of append-only requests on the primary* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.* @param listener                   a listener which will be notified with the local checkpoint on the target.*/void phase2(final long startingSeqNo,final long endingSeqNo,final Translog.Snapshot snapshot,final long maxSeenAutoIdTimestamp,final long maxSeqNoOfUpdatesOrDeletes,final RetentionLeases retentionLeases,final long mappingVersion,final ActionListener<SendSnapshotResult> listener) throws IOException {if (shard.state() == IndexShardState.CLOSED) {throw new IndexShardClosedException(request.shardId());}logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");final StopWatch stopWatch = new StopWatch().start();final StepListener<Void> sendListener = new StepListener<>();final OperationBatchSender sender = new OperationBatchSender(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp,maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener);sendListener.whenComplete(ignored -> {final long skippedOps = sender.skippedOps.get();final int totalSentOps = sender.sentOps.get();final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get();assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps);stopWatch.stop();final TimeValue tookTime = stopWatch.totalTime();logger.trace("recovery [phase2]: took [{}]", tookTime);listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));}, listener::onFailure);sender.start();}

目标节点开始恢复

接收segment

对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:

真正执行的位置:MultiFileWriter.innerWriteFileChunk

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {final Store store = store();final String name = fileMetaData.name();... ...if (position == 0) {indexOutput = openAndPutIndexOutput(name, fileMetaData, store);} else {indexOutput = getOpenIndexOutput(name); // 加一层前缀,组成临时文件}... ...while((scratch = iterator.next()) != null) { indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件}... ...store.directory().sync(Collections.singleton(temporaryFileName));  // 这里会调用fsync落盘
}

打开引擎

经过上面的过程,目标节点完成了追数据的第一步。接收完segment后,目标节点打开shard对应的引擎准备接收translog,注意,这里打开引擎后,正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下:

 /*** Opens the engine on top of the existing lucene engine and translog.* The translog is kept but its operations won't be replayed.*/public void openEngineAndSkipTranslogRecovery() throws IOException {assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);loadGlobalCheckpointToReplicationTracker();innerOpenEngineAndTranslog(replicationTracker);getEngine().skipTranslogRecovery();}private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {assert Thread.holdsLock(mutex) == false : "opening engine under mutex";if (state != IndexShardState.RECOVERING) {throw new IndexShardNotRecoveringException(shardId, state);}final EngineConfig config = newEngineConfig(globalCheckpointSupplier);// we disable deletes since we allow for operations to be executed against the shard while recovering// but we need to make sure we don't loose deletes until we are done recoveringconfig.setEnableGcDeletes(false);// 恢复过程中不删除translogupdateRetentionLeasesOnReplica(loadRetentionLeases());assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty(): "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()+ "] but got " + getRetentionLeases();synchronized (engineMutex) {assert currentEngineReference.get() == null : "engine is running";verifyNotClosed();// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).final Engine newEngine = engineFactory.newReadWriteEngine(config);// 创建engineonNewEngine(newEngine);currentEngineReference.set(newEngine);// We set active because we are now writing operations to the engine; this way,// we can flush if we go idle after some time and become inactive.active.set(true);}// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.onSettingsChanged();assert assertSequenceNumbersInCommit();recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);}

接收并重放translog

打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:

IndexShard#runTranslogRecovery
=>
IndexShard#applyTranslogOperation

// 重放translog快照中的translog操作到当前引擎。
// 在成功回放每个translog操作后,会通知回调onOperationRecovered。/*** Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.* The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.*/int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,Runnable onOperationRecovered) throws IOException {int opsRecovered = 0;Translog.Operation operation;while ((operation = snapshot.next()) != null) {try {logger.trace("[translog] recover op {}", operation);Engine.Result result = applyTranslogOperation(engine, operation, origin);switch (result.getResultType()) {case FAILURE:throw result.getFailure();case MAPPING_UPDATE_REQUIRED:throw new IllegalArgumentException("unexpected mapping update: " + result.getRequiredMappingUpdate());case SUCCESS:break;default:throw new AssertionError("Unknown result type [" + result.getResultType() + "]");}opsRecovered++;onOperationRecovered.run();} catch (Exception e) {// TODO: Don't enable this leniency unless users explicitly opt-inif (origin == Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY && ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {// mainly for MapperParsingException and Failure to detect xcontentlogger.info("ignoring recovery of a corrupt translog entry", e);} else {throw ExceptionsHelper.convertToRuntime(e);}}}return opsRecovered;}private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation,Engine.Operation.Origin origin) throws IOException {// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;final Engine.Result result;switch (operation.opType()) {// 还原出操作类型及操作数据并调用engine执行相应的动作case INDEX:final Translog.Index index = (Translog.Index) operation;// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all// autoGeneratedID docs that are coming from the primary are updated correctly.result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(),versionType, UNASSIGNED_SEQ_NO, 0, index.getAutoGeneratedIdTimestamp(), true, origin,new SourceToParse(shardId.getIndexName(), index.id(), index.source(),XContentHelper.xContentType(index.source()), index.routing()));break;case DELETE:final Translog.Delete delete = (Translog.Delete) operation;result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.id(),versionType, UNASSIGNED_SEQ_NO, 0, origin);break;case NO_OP:final Translog.NoOp noOp = (Translog.NoOp) operation;result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);break;default:throw new IllegalStateException("No operation defined for [" + operation + "]");}return result;}

通过上面的步骤,translog的重放完毕,此后需要做一些收尾的工作,包括,refresh让回放后的最新数据可见,打开translog gc:

 /*** perform the last stages of recovery once all translog operations are done.* note that you should still call {@link #postRecovery(String)}.*/public void finalizeRecovery() {recoveryState().setStage(RecoveryState.Stage.FINALIZE);Engine engine = getEngine();engine.refresh("recovery_finalization");engine.config().setEnableGcDeletes(true);}

到这里,replica shard恢复的两个阶段便完成了,由于此时shard还处于INITIALIZING状态,还需通知master节点启动已恢复的shard:

IndicesClusterStateService#RecoveryListener

        @Overridepublic void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) {shardStateAction.shardStarted(shardRouting,primaryTerm,"after " + state.getRecoverySource(),timestampMillisFieldRange,SHARD_STATE_ACTION_LISTENER);}

至此,shard recovery的所有流程都已完成。

小结

疑问点1:网上说的对不对?新版本中有没有更新?

到这里完整跟着腾讯云的文档走了一遍,主体的流程在Elasticsearch 8.0.0-SNAPSHOT中基本一样,只是部分方法有些调整。

所以下面这个流程还是正确的:

ES副本分片恢复主要涉及恢复的目标节点和源节点,目标节点即故障恢复的节点,源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求,源节点接收到请求后主要分两阶段来处理。第一阶段,对需要恢复的shard创建snapshot,然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过,否则对比shard的segment文件差异,将有差异的segment文件发送给target node。第二阶段,为了保证target node数据的完整性,需要将本地的translog发送给target node,且对接收到的translog进行回放。整体流程如下图所示:
在这里插入图片描述

疑问点2:在分片恢复的时候,如果收到Api _forcemerge请求,这时候,会如何处理?

这部分等看/_forcemerge api的时候,再解答一下。

疑问点3:片恢复的第二阶段是同步translog,这一步会不会加锁?不加锁的话,如何确保是同步完成了?

完整性

首先,phase1阶段,保证了存量的历史数据可以恢复到从分片。phase1阶段完成后,从分片引擎打开,可以正常处理index、delete请求,而translog覆盖完了整个phase1阶段,因此在phase1阶段中的index/delete操作都将被记录下来,在phase2阶段进行translog回放时,副本分片正常的index和delete操作和translog是并行执行的,这就保证了恢复开始之前的数据、恢复中的数据都会完整的写入到副本分片,保证了数据的完整性。如下图所示:

在这里插入图片描述

一致性

由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:

// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L993
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {plan = IndexingStrategy.processAsStaleOp(index.version(), 0);
}

拓展:如何保证副分片和主分片一致

整理自:《Elasticsearch源码解析与优化实战》

索引恢复过程的一个难点在于如何维护主副分片的一致性。假设副分片恢复期间一直有写操作,如何实现一致呢?
我们先看看早期的做法:在2.0版本之前,副分片恢复要经历三个阶段。

  • phase1:将主分片的 Lucene 做快照,发送到 target。期间不阻塞索引操作,新增数据写到主分片的translog。
  • phase2:将主分片translog做快照,发送到target重放,期间不阻塞索引操作。
  • phase3:为主分片加写锁,将剩余的translog发送到target。此时数据量很小,写入过程的阻塞很短。

从理论上来说,只要流程上允许将写操作阻塞一段时间,实现主副一致是比较容易的。

但是后来(从2.0版本开始),也就是引入translog.view概念的同时,phase3被删除。

phase3被删除,这个阶段是重放操作(operations),同时防止新的写入 Engine。这是不必要的,因为自恢复开始,标准的 index 操作会发送所有的操作到正在恢复中的分片。重放恢复开始时获取的view中的所有操作足够保证不丢失任何操作。

阻塞写操作的phase3被删除,恢复期间没有任何写阻塞过程。接下来需要处理的就是解决phase1和phase2之间的写操作与phase2重放操作之间的时序和冲突问题。在副分片节点,phase1结束后,假如新增索引操作和 translog 重放操作并发执行,因为时序的关系会出现新老数据交替。如何实现主副分片一致呢?

假设在第一阶段执行期间,有客户端索引操作要求将docA的内容写为1,主分片执行了这个操作,而副分片由于尚未就绪所以没有执行。第二阶段期间客户端索引操作要求写 docA 的内容为2,此时副分片已经就绪,先执行将docA写为2的新增请求,然后又收到了从主分片所在节点发送过来的translog重复写docA为1的请求该如何处理?具体流程如下图所示。

在这里插入图片描述

答案是在写流程中做异常处理,通过版本号来过滤掉过期操作。写操作有三种类型:索引新文档、更新、删除。索引新文档不存在冲突问题,更新和删除操作采用相同的处理机制。每个操作都有一个版本号,这个版本号就是预期doc版本,它必须大于当前Lucene中的doc版本号,否则就放弃本次操作。对于更新操作来说,预期版本号是Lucene doc版本号+1。主分片节点写成功后新数据的版本号会放到写副本的请求中,这个请求中的版本号就是预期版本号。

这样,时序上存在错误的操作被忽略,对于特定doc,只有最新一次操作生效,保证了主副分片一致。

我们分别看一下写操作三种类型的处理机制。

1.索引新文档

不存在冲突问题,不需要处理。

2.更新

判断本次操作的版本号是否小于Lucene中doc的版本号,如果小于,则放弃本次操作。

Index、Delete都继承自Operation,每个Operation都有一个版本号,这个版本号就是doc版本号。对于副分片的写流程来说,正常情况下是主分片写成功后,相应doc写入的版本号被放到转发写副分片的请求中。对于更新来说,就是通过主分片将原doc版本号+1后转发到副分片实现的。在对比版本号的时候:

expectedVersion = 写副分片请求中的 version = 写主分片成功后的version

通过下面的方法判断当前操作的版本号是否低于Lucene中的版本号:

// VersionType#isVersionConflictForWritesEXTERNAL((byte) 1) {@Overridepublic boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {if (currentVersion == Versions.NOT_FOUND) {return false;}if (expectedVersion == Versions.MATCH_ANY) {return true;}if (currentVersion >= expectedVersion) {return true;}return false;}}

如果translog重放的操作在写一条"老"数据,则compareOpToLuceneDocBasedOnSeqNo会返回OpVsLuceneDocStatus.OP_STALE_OR_EQUAL。

private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";final OpVsLuceneDocStatus status;VersionValue versionValue = getVersionFromMap(op.uid().bytes());assert incrementVersionLookup();if (versionValue != null) {status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);} else {// load from indexassert incrementIndexVersionLookup();try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), op.uid());if (docAndSeqNo == null) {status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;} else if (op.seqNo() > docAndSeqNo.seqNo) {status = OpVsLuceneDocStatus.OP_NEWER;} else if (op.seqNo() == docAndSeqNo.seqNo) {assert localCheckpointTracker.hasProcessed(op.seqNo()) :"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;} else {status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;}}}return status;}

副分片在InternalEngine#index函数中通过plan判断是否写到Lucene:

// non-primary mode (i.e., replica or recovery)
final IndexingStrategy plan = indexingStrategyForOperation(index);

在 indexingStrategyForOperation函数中,plan的最终结果就是plan =IndexingStrategy.processButSkipLucene,后面会跳过写Lucene和translog的逻辑。

3. 删除

判断本次操作中的版本号是否小于Lucene中doc的版本号,如果小于,则放弃本次操作。

通过compareOpToLuceneDocBasedOnSeqNo方法判断本次操作是否小于Lucenne中doc的版本号,与Index操作时使用相同的比较函数。

类似的,在InternalEngine#delete函数中判断是否写到Lucene:

final DeletionStrategy plan = deletionStrategyForOperation(delete);

如果translog重放的是一个"老"的删除操作,则compareOpToLuceneDocBasedOnSeqNo会返回OpVsLuceneDocStatus.OP_STALE_OR_EQUAL。

plan的最终结果就是plan=DeletionStrategy.processButSkipLucene,后面会跳过Lucene删除的逻辑。

  /*** the status of the current doc version in lucene, compared to the version in an incoming* operation*/enum OpVsLuceneDocStatus {/** the op is more recent than the one that last modified the doc found in lucene*/OP_NEWER,/** the op is older or the same as the one that last modified the doc found in lucene*/OP_STALE_OR_EQUAL,/** no doc was found in lucene */LUCENE_DOC_NOT_FOUND}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/590637.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【基础】【Python网络爬虫】【12.App抓包】reqable 安装与配置(附大量案例代码)(建议收藏)

Python网络爬虫基础 App抓包1. App爬虫原理2. reqable 的安装与配置reqable 安装教程reqable 的配置 3. 模拟器的安装与配置夜神模拟器的安装夜神模拟器的配置配置代理配置证书 4. 内联调试及注意事项软件启动顺开启抓包功reqable面板功列表部件功能列表数据快捷操作栏 夜神模拟…

WPF+Halcon 培训项目实战 完结(13):HS 鼠标绘制图形

文章目录 前言相关链接项目专栏运行环境匹配图片矩形鼠标绘制Halcon添加右键事件Task封装运行结果个人引用问题原因推测 圆形鼠标绘制代码运行结果 课程完结&#xff1a; 前言 为了更好地去学习WPFHalcon&#xff0c;我决定去报个班学一下。原因无非是想换个工作。相关的教学视…

【java爬虫】股票数据获取工具前后端代码

前面我们有好多文章都是在介绍股票数据获取工具&#xff0c;这是一个前后端分离项目 后端技术栈&#xff1a;springboot&#xff0c;sqlite&#xff0c;jdbcTemplate&#xff0c;okhttp 前端技术栈&#xff1a;vue&#xff0c;element-plus&#xff0c;echarts&#xff0c;ax…

K8s实战入门

1.NameSpace Namespace是kubernetes系统中的一种非常重要资源&#xff0c;它的主要作用是用来实现多套环境的资源隔离或者多租户的资源隔离。 默认情况下&#xff0c;kubernetes集群中的所有的Pod都是可以相互访问的。但是在实际中&#xff0c;可能不想让两个Pod之间进行互相…

在线智能防雷监控检测系统应用方案

在线智能防雷监控检测系统是一种利用现代信息技术&#xff0c;对防雷设施的运行状态进行实时监测、管理和控制的系统&#xff0c;它可以有效提高防雷保护的安全性、可靠性和智能化程度&#xff0c;降低运维成本和风险&#xff0c;为用户提供全方位的防雷解决方案。 地凯科技在…

排序算法之计数排序

计数排序是一种非基于比较的排序算法&#xff0c;它通过统计数组中每个元素出现的次数&#xff0c;将其按次数从小到大排序。 以下是计数排序的基本步骤&#xff1a; 统计&#xff1a;统计数组中每个元素出现的次数。计数&#xff1a;将每个元素的出现次数存储在另一个数组中…

redisson作为分布式锁的底层实现

1. redisson如何实现尝试获取锁的逻辑 如何实现在一段的时间内不断的尝试获取锁 其实就是搞了个while循环&#xff0c;不断的去尝试获取锁资源。但是因为latch的存在会在给定的时间内处于休眠状态。这个事件&#xff0c;监听的是解锁动作&#xff0c;如果解锁动作发生。会调用…

Android textview展示富文本内容

今天实现的内容&#xff0c;就是上图的效果&#xff0c;通过Span方式展示图片&#xff0c;需要支持文字颜色改变、加粗。支持style\"color:green; font-weight:bold;\"展示。尤其style标签中的font-size、font-weight是在原生中不被支持的。 所以我们今天需要使用自…

病情聊天机器人,利用Neo4j图数据库和Elasticsearch全文搜索引擎相结合

项目设计目的&#xff1a; 本项目旨在开发一个病情聊天机器人&#xff0c;利用Neo4j图数据库和Elasticsearch全文搜索引擎相结合&#xff0c;实现对病情相关数据的存储、查询和自动回答。通过与用户的交互&#xff0c;机器人可以根据用户提供的症状描述&#xff0c;给出初步的可…

字母简化(UPC练习)

题目描述 给出一串全部为小写英文字母的字符串&#xff0c;要求把这串字母简化。简化规则是&#xff1a;统计连续出现的字母数&#xff0c;输出时先输出个数&#xff0c;再输出字母。比如&#xff1a;aaabbbaa&#xff0c;则简化为3a3b2a&#xff1b;而zzzzeeeeea&#xff0c;…

帆软报表中定时调度中的最后一步如何增加新的处理方式

在定时调度中,到调度执行完之后,我们可能想做一些别的事情,当自带的处理方式不满足时,可以自定义自己的处理方式。 产品的处理方式一共有如下这些类型: 我们想在除了上面的处理方式之外增加自己的处理方式应该怎么做呢? 先看下效果: 涉及到两方面的改造,前端与后端。…

前端算法之二叉树

二叉树 二叉树用于解决什么问题 数据的组织与搜索&#xff1a;排序&#xff1a;表达式和计算&#xff1a;图形处理&#xff1a; 举例&#xff1a;二叉树的最近公共祖先 思路&#xff1a; 排序/排布方式 和 &#xff08;排序中&#xff09;当前树和节点的关系 举例2&#xff1a;…

光照贴图的参数化

正如Jon在上一篇文章中所解释的那样&#xff0c;我们在《见证者》中使用了预先计算的全局照明&#xff0c;而我的首要任务之一就是开发该系统。 我开发了一些有趣的技术来计算自动参数化&#xff0c;以一种可以轻松映射到 GPU 的方式制定照明计算&#xff0c;并通过使用辐照度…

【如何选择Mysql服务器的CPU核数及内存大小】

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容&#x1f4e2;文章总结&#x1f4e5;博主目标 &#x1f50a;博主介绍 &#x1f31f;我是廖志伟&#xff0c;一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO专家博主、阿里云专家博主、清华大学出版社签约作…

Codeium在IDEA里的3个坑

转载自Codeium在IDEA里的3个坑&#xff1a;无法log in&#xff0c;downloading language server和中文乱码_downloading codeium language server...-CSDN博客文章浏览阅读1.7w次&#xff0c;点赞26次&#xff0c;收藏47次。Codeium安装IDEA插件的3个常见坑_downloading codeiu…

4-2. AOP细节

1 AOP细节 1.1 切入点表达式 1.1.1 作用 通过表达式的方式定位一个或多个具体的连接点。 1.1.2 语法细节 1&#xff09;切入点表达式的语法格式 execution([权限修饰符] [返回值类型] [简单类名/全类名] 方法名) 2) 举例说明 3&#xff09;在AspectJ中&#xff0c;切入…

通过国家网络风险管理方法提供安全的网络环境

印度尼西亚通过讨论网络安全法草案启动了其战略举措。不过&#xff0c;政府和议会尚未就该法案的多项内容达成一致。另一方面&#xff0c;制定战略性、全面的网络安全方法的紧迫性从未像今天这样重要。 其政府官方网站遭受了多起网络攻击&#xff0c;引发了人们对国家网络安全…

小肥柴慢慢手写数据结构(C篇)(5-2 AVL树)

小肥柴慢慢学习数据结构笔记&#xff08;C篇&#xff09;&#xff08;5-2 AVL树 目录5-5 AVL出现的原因5-5-1 平衡树5-5-2 平衡二叉树的具体案例 5-6 AVL平衡策略的讨论5-7 不使用平衡因子的实现&#xff08;黑皮书&#xff0c;训练思维&#xff09;5-8 使用平衡因子的实现&…

普中STM32-PZ6806L开发板(USART2 串口 + HI-LINK-V20离线语音模块控制LED灯)

简介 买了HI-LINK-V20型号的离线语音识别模块, 为了后面可以做有意思的东西, 现在先来用用, 使用USART2 串口 接收来自我在HI-LINK-V20中预设的动作, 当识别到词条时发送对应的指令到串口, HI-LINK串口接的就是STM32F03ZET6的USART2, 且往下看。 电路原理图 连线图 连线引脚表…

Linux:/proc/sys/vm/目录各文件详解

目录 前言一、/proc/sys/vm/目录各文件二、相关功能的API函数 前言 /proc/sys/vm/ 目录是 Linux 系统中的一个特殊目录&#xff0c;它包含了与虚拟内存子系统相关的系统内核参数。这些参数可以用来配置系统的虚拟内存管理策略&#xff0c;包括内存分配、页面置换、内存压缩、NU…