elasticsearch源码分析-04集群状态发布

集群状态发布

cluster模块封装了在集群层面执行的任务,如集群健康、集群级元信息管理、分片分配给节点、节点管理等。集群任务执行之后可能会产生新的集群状态,如果产生新的集群状态主节点会将集群状态广播给其他节点。

集群状态封装在clusterState中,支持增量同步

提交集群任务的主要时机有以下几种:

  • 索引的创建、删除、打开、关闭
  • 索引模板、映射、别名的变化
  • gateway模块发布选举出来的集群状态
  • 快照
  • 分片分配
  • 集群节点变化等

提交集群任务入口在ClusterService的submitStateUpdateTask方法,第一个参数是事件来源,第二个参数是要执行的具体任务

public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>void submitStateUpdateTask(String source, T updateTask) {submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);}public <T> void submitStateUpdateTask(String source, T task,ClusterStateTaskConfig config,ClusterStateTaskExecutor<T> executor,ClusterStateTaskListener listener) {submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);}public <T> void submitStateUpdateTasks(final String source,final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,final ClusterStateTaskExecutor<T> executor) {masterService.submitStateUpdateTasks(source, tasks, config, executor);}

最有代表性的任务是ClusterStateUpdateTask,它实现了ClusterStateTaskConfig、ClusterStateTaskExecutor

public abstract class ClusterStateUpdateTaskimplements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {

在这里插入图片描述
ClusterStateTaskConfig包含了任务的配置信息和优先级

TimeValue timeout();
Priority priority();

ClusterStateTaskExecutor主要是定义要执行的任务,最主要的方法就是execute方法

 ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

任务执行时会传入当前集群状态,任务运行过程中如果产生新的集群状态就返回新的集群状态,如果没有就返回原来的集群状态

ClusterStateTaskListener主要是提交任务后的回调处理

/*** A callback called when execute fails.*/void onFailure(String source, Exception e);/*** called when the task was rejected because the local node is no longer master.* Used only for tasks submitted to {@link MasterService}.*/default void onNoLongerMaster(String source) {onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));}/*** Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed* properly by all listeners.*/default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {}

MasterService主要负责集群任务管理和运行,只有主节点会提交集群任务到内部队列,并运行队列中的任务

public <T> void submitStateUpdateTasks(final String source,final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,final ClusterStateTaskExecutor<T> executor) {if (!lifecycle.started()) {return;}final ThreadContext threadContext = threadPool.getThreadContext();final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {threadContext.markAsSystemContext();//封装任务List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream().map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)).collect(Collectors.toList());//提交任务taskBatcher.submitTasks(safeTasks, config.timeout());} catch (EsRejectedExecutionException e) {// ignore cases where we are shutting down..., there is really nothing interesting// to be done here...if (!lifecycle.stoppedOrClosed()) {throw e;}}}
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {if (tasks.isEmpty()) {return;}final BatchedTask firstTask = tasks.get(0);assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :"tasks submitted in a batch should share the same batching key: " + tasks;// convert to an identity map to check for dups based on task identity//根据任务标识检查重复数据final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(BatchedTask::getTask,Function.identity(),(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },IdentityHashMap::new));synchronized (tasksPerBatchingKey) {//添加相同batchingKey的任务,返回已存在batchingKey的任务LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,k -> new LinkedHashSet<>(tasks.size()));//检查是否存在相同batchingKey的任务for (BatchedTask existing : existingTasks) {// check that there won't be two tasks with the same identity for the same batching keyBatchedTask duplicateTask = tasksIdentity.get(existing.getTask());if (duplicateTask != null) {throw new IllegalStateException("task [" + duplicateTask.describeTasks(Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");}}existingTasks.addAll(tasks);}//执行任务if (timeout != null) {threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));} else {threadExecutor.execute(firstTask);}}

这里有去重逻辑,拥有相同ClusterStateTaskExecutor对象实例的任务只会执行一次,然后对于其他相同的实例直接赋值相同的执行结果。区分重复任务的方式时通过定义的任务本身,去重的方式不是将重复的数据删除,而是在执行完任务后赋予重复任务相同的结果。

ClusterStateTaskExecutor相同有两种情况可能是提交的任务本身重复,还有就是之前提交的任务已存在,但是尚未执行此时提交相同的任务就会保存到对应的列表中,只会执行一次

任务会被封装到UpdateTask中

class UpdateTask extends BatchedTask {final ClusterStateTaskListener listener;UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,ClusterStateTaskExecutor<?> executor) {super(priority, source, executor, task);this.listener = listener;}@Overridepublic String describeTasks(List<? extends BatchedTask> tasks) {return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));}}

提交到线程池运行调用run方法

@Override
public void run() {//运行还没处理的任务runIfNotProcessed(this);
}
void runIfNotProcessed(BatchedTask updateTask) {//具有相同batching key的任务只会执行一次if (updateTask.processed.get() == false) {final List<BatchedTask> toExecute = new ArrayList<>();final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();synchronized (tasksPerBatchingKey) {//获取任务列表LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);if (pending != null) {for (BatchedTask task : pending) {if (task.processed.getAndSet(true) == false) {logger.trace("will process {}", task);//构建要执行的任务列表toExecute.add(task);processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);} else {logger.trace("skipping {}, already processed", task);}}}}if (toExecute.isEmpty() == false) {final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {String tasks = updateTask.describeTasks(entry.getValue());return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");//执行任务run(updateTask.batchingKey, toExecute, tasksSummary);}}}

执行任务并发布集群状态的逻辑在MasterService中

@Overrideprotected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;//运行任务,并发布集群状态runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));}
private void runTasks(TaskInputs taskInputs) {final String summary = taskInputs.summary;if (!lifecycle.started()) {logger.debug("processing [{}]: ignoring, master service not started", summary);return;}logger.debug("executing cluster state update for [{}]", summary);//之前集群状态final ClusterState previousClusterState = state();//只在主节点执行if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {logger.debug("failing [{}]: local node is no longer master", summary);taskInputs.onNoLongerMaster();return;}final long computationStartTime = threadPool.relativeTimeInMillis();//执行task任务生成新的集群状态final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);taskOutputs.notifyFailedTasks();final TimeValue computationTime = getTimeSince(computationStartTime);logExecutionTime(computationTime, "compute cluster state update", summary);if (taskOutputs.clusterStateUnchanged()) {final long notificationStartTime = threadPool.relativeTimeInMillis();taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();final TimeValue executionTime = getTimeSince(notificationStartTime);logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);} else {//集群状态发生改变final ClusterState newClusterState = taskOutputs.newClusterState;if (logger.isTraceEnabled()) {logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);} else {logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);}final long publicationStartTime = threadPool.relativeTimeInMillis();try {ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);// new cluster state, notify all listenersfinal DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {String nodesDeltaSummary = nodesDelta.shortSummary();if (nodesDeltaSummary.length() > 0) {logger.info("{}, term: {}, version: {}, delta: {}",summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);}}logger.debug("publishing cluster state version [{}]", newClusterState.version());//发布集群状态publish(clusterChangedEvent, taskOutputs, publicationStartTime);} catch (Exception e) {handleException(summary, publicationStartTime, newClusterState, e);}}}

执行方法前判断是不是主节点因为只有主节点可以运行集群任务,根据执行任务前的集群状态执行任务生成新的集群状态

执行任务获取任务执行结果,并生成新的集群状态

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {//执行提交的任务,并且返回新的集群状态ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);//根据分配分片结果生成新的集群状态ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),clusterTasksResult.executionResults);}

获取任务列表,调用executor的execute方法

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {ClusterTasksResult<Object> clusterTasksResult;try {List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());//执行任务,并返回新的集群状态clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);if (previousClusterState != clusterTasksResult.resultingState &&previousClusterState.nodes().isLocalNodeElectedMaster() &&(clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) {throw new AssertionError("update task submitted to MasterService cannot remove master");}} catch (Exception e) {......clusterTasksResult = ClusterTasksResult.builder().failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e).build(previousClusterState);}......return clusterTasksResult;}

这里我们以gateway恢复集群状态为例

ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
@Overridepublic final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks)throws Exception {//执行集群状态变更task,并且返回执行之后的集群状态结果ClusterState result = execute(currentState);return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);}@Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace("successful state recovery, importing cluster state...");clusterService.submitStateUpdateTask("local-gateway-elected-state",new RecoverStateUpdateTask() {@Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}@Overridepublic ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {logger.debug("cluster is already recovered");return currentState;}//状态信息恢复完成final ClusterState newState = Function.<ClusterState>identity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//开始分配分片return allocationService.reroute(newState, "state recovered");}

生成新的集群状态,开始分配分片,根据之前的集群状态和新生成的结果构造新的集群状态

private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult<?> executionResult) {//新的集群状态ClusterState newClusterState = executionResult.resultingState;if (previousClusterState != newClusterState) {// only the master controls the version numbers//生成新的集群状态版本号,递增的Builder builder = incrementVersion(newClusterState);//路由表发生了改变,也就是分片信息发送了改变,分片-nodeif (previousClusterState.routingTable() != newClusterState.routingTable()) {builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());}//集群元数据发生了改变if (previousClusterState.metadata() != newClusterState.metadata()) {builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1));}newClusterState = builder.build();}return newClusterState;}

回到MasterService的runTasks方法中新的集群状态已经生成并返回,然后判断集群状态和之前的集群状态是否相同,如果发生变化则将进入集群状态发布阶段,将最新的集群状态广播到所有节点

//发布集群状态
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {@Overrideprotected boolean blockingAllowed() {return isMasterUpdateThread() || super.blockingAllowed();}};//发布集群状态clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));// indefinitely wait for publication to complete//无限期等待发布完成try {FutureUtils.get(fut);onPublicationSuccess(clusterChangedEvent, taskOutputs);} catch (Exception e) {onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);}}
@Overridepublic void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {//新的集群状态ClusterState newState = clusterChangedEvent.state();assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();try {// state got changed locally (maybe because another master published to us)if (clusterChangedEvent.previousState() != this.committedState.get()) {throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");}pendingStatesQueue.addPending(newState);//发布集群状态publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);} catch (FailedToCommitClusterStateException t) {// cluster service logs a WARN messagelogger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",newState.version(), electMaster.minimumMasterNodes());synchronized (stateMutex) {pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("failed to publish cluster state"));rejoin("zen-disco-failed-to-publish");}publishListener.onFailure(t);return;}final DiscoveryNode localNode = newState.getNodes().getLocalNode();final AtomicBoolean processedOrFailed = new AtomicBoolean();pendingStatesQueue.markAsCommitted(newState.stateUUID(),new PendingClusterStatesQueue.StateProcessedListener() {@Overridepublic void onNewClusterStateProcessed() {processedOrFailed.set(true);publishListener.onResponse(null);ackListener.onNodeAck(localNode, null);}@Overridepublic void onNewClusterStateFailed(Exception e) {processedOrFailed.set(true);publishListener.onFailure(e);ackListener.onNodeAck(localNode, e);logger.warn(() -> new ParameterizedMessage("failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e);}});synchronized (stateMutex) {if (clusterChangedEvent.previousState() != this.committedState.get()) {publishListener.onFailure(new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes"));return;}//经过二阶段提交状态已经发布到了集群,但不能保证所有节点都成功了,下面处理提交后的集群状态boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +" committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");if (sentToApplier == false && processedOrFailed.get() == false) {assert false : "cluster state published locally neither processed nor failed: " + newState;logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",newState.version());publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither " +"been processed nor failed"));}}}

首先准备发送集群状态的目标节点列表,剔除本节点。构建增量发布或全量发布集群状态,然后执行序列化并压缩,以便将状态发布出去

public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes,final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException {final DiscoveryNodes nodes;final SendingController sendingController;final Set<DiscoveryNode> nodesToPublishTo;final Map<Version, BytesReference> serializedStates;final Map<Version, BytesReference> serializedDiffs;final boolean sendFullVersion;try {//需要发送目的节点nodes = clusterChangedEvent.state().nodes();nodesToPublishTo = new HashSet<>(nodes.getSize());DiscoveryNode localNode = nodes.getLocalNode();final int totalMasterNodes = nodes.getMasterNodes().size();for (final DiscoveryNode node : nodes) {if (node.equals(localNode) == false) {nodesToPublishTo.add(node);}}sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null;//全量状态serializedStates = new HashMap<>();//增量状态serializedDiffs = new HashMap<>();// we build these early as a best effort not to commit in the case of error.// sadly this is not water tight as it may that a failed diff based publishing to a node// will cause a full serialization based on an older version, which may fail after the// change has been committed.//构建序列化后的结果buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);//发布状态返回结果处理final BlockingClusterStatePublishResponseHandler publishResponseHandler =new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes,totalMasterNodes, publishResponseHandler);} catch (Exception e) {throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e);}try {//发布innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,serializedDiffs);} catch (FailedToCommitClusterStateException t) {throw t;} catch (Exception e) {// try to fail committing, in cause it's still on goingif (sendingController.markAsFailed("unexpected error", e)) {// signal the change should be rejectedthrow new FailedToCommitClusterStateException("unexpected error", e);} else {throw e;}}}

全量状态保存在serializedStates,增量状态保存在serializedDiffs。每个集群状态都有自己为一个版本好,在发布集群状态时允许相邻版本好之间只发送增量内容

构造需要发送的状态,如果上次发布集群状态的节点不存在或设置了全量发送配置,则构建全量状态否则构建增量状态然后进行序列化并压缩

private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,boolean sendFullVersion, Map<Version, BytesReference> serializedStates,Map<Version, BytesReference> serializedDiffs) {Diff<ClusterState> diff = null;for (final DiscoveryNode node : nodesToPublishTo) {try {//发送全量if (sendFullVersion || !previousState.nodes().nodeExists(node)) {// will send a full referenceif (serializedStates.containsKey(node.getVersion()) == false) {serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));}} else {//发送增量// will send a diffif (diff == null) {diff = clusterState.diff(previousState);}if (serializedDiffs.containsKey(node.getVersion()) == false) {serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));}}} catch (IOException e) {throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node);}}}

es使用二阶段提交来实现状态发布,第一步是push及先将状态数据发送到node节点,但不应用,如果得到超过半数的节点的返回确认,则执行第二步commit及发送提交请求,二阶段提交不能保证节点收到commit请求后可以正确应用,也就是它只能保证发了commit请求,但是无法保证单个节点上的状态应用是成功还是失败的

  • push阶段发送集群状态数据
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,final SendingController sendingController, final Discovery.AckListener ackListener,final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,final Map<Version, BytesReference> serializedDiffs) {final ClusterState clusterState = clusterChangedEvent.state();final ClusterState previousState = clusterChangedEvent.previousState();//发布超时时间final TimeValue publishTimeout = discoverySettings.getPublishTimeout();//发布起始时间final long publishingStartInNanos = System.nanoTime();//遍历节点异步发送全量或增量状态数据for (final DiscoveryNode node : nodesToPublishTo) {// try and serialize the cluster state once (or per version), so we don't serialize it// per node when we send it over the wire, compress it while we are at it...// we don't send full version if node didn't exist in the previous version of cluster state//发生全量状态if (sendFullVersion || !previousState.nodes().nodeExists(node)) {sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);} else {//发布增量状态sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);}}//等待提交,等待第一阶段完成收到足够的响应或达到了超时时间sendingController.waitForCommit(discoverySettings.getCommitTimeout());final long commitTime = System.nanoTime() - publishingStartInNanos;ackListener.onCommit(TimeValue.timeValueNanos(commitTime));try {long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));if (sendingController.getPublishingTimedOut()) {DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();// everyone may have just respondedif (pendingNodes.length > 0) {logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",clusterState.version(), publishTimeout, pendingNodes);}}// The failure is logged under debug when a sending failed. we now log a summary.Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();if (failedNodes.isEmpty() == false) {logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",clusterChangedEvent.state().version(), failedNodes);}} catch (InterruptedException e) {// ignore & restore interruptThread.currentThread().interrupt();}}

无论是发送全量数据还是发送增量数据最终都会调用到sendClusterStateToNode方法

private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,final DiscoveryNode node,final TimeValue publishTimeout,final SendingController sendingController,final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {try {//调用底层的传输层发送transportService.sendRequest(node, SEND_ACTION_NAME,new BytesTransportRequest(bytes, node.getVersion()),stateRequestOptions,new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {@Overridepublic void handleResponse(TransportResponse.Empty response) {//发布超时if (sendingController.getPublishingTimedOut()) {logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,clusterState.version(), publishTimeout);}//检查收到的响应是否过半,然后执行commitsendingController.onNodeSendAck(node);}@Overridepublic void handleException(TransportException exp) {if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);} else {logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);sendingController.onNodeSendFailed(node, exp);}}});} catch (Exception e) {logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);sendingController.onNodeSendFailed(node, e);}}

调用transportService的sendRequest方法异步发送数据,rpc请求为internal:discovery/zen/publish/send对应节点注册的处理器为SendClusterStateRequestHandler

//发送处理transportService.registerRequestHandler(SEND_ACTION_NAME, ThreadPool.Names.SAME, false, false, BytesTransportRequest::new,new SendClusterStateRequestHandler());private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {@Overridepublic void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception {//处理状态变更请求handleIncomingClusterStateRequest(request, channel);}}protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {Compressor compressor = CompressorFactory.compressor(request.bytes());StreamInput in = request.bytes().streamInput();final ClusterState incomingState;synchronized (lastSeenClusterStateMutex) {try {if (compressor != null) {in = compressor.streamInput(in);}in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);in.setVersion(request.version());// If true we received full cluster state - otherwise diffs//true:全量状态,false:增量if (in.readBoolean()) {incomingState = ClusterState.readFrom(in, transportService.getLocalNode());fullClusterStateReceivedCount.incrementAndGet();logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),request.bytes().length());} else if (lastSeenClusterState != null) {Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());incomingState = diff.apply(lastSeenClusterState);compatibleClusterStateDiffReceivedCount.incrementAndGet();logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",incomingState.version(), incomingState.stateUUID(), request.bytes().length());} else {logger.debug("received diff for but don't have any local cluster state - requesting full state");throw new IncompatibleClusterStateVersionException("have no local cluster state");}} catch (IncompatibleClusterStateVersionException e) {incompatibleClusterStateDiffReceivedCount.incrementAndGet();throw e;} catch (Exception e) {logger.warn("unexpected error while deserializing an incoming cluster state", e);throw e;} finally {IOUtils.close(in);}//触发监听器incomingClusterStateListener.onIncomingClusterState(incomingState);lastSeenClusterState = incomingState;}//发送发回空结果channel.sendResponse(TransportResponse.Empty.INSTANCE);}

保存集群状态,然后返回空结果

继续回到主节点发送数据的回调函数中,检查响应是否足够

public synchronized void onNodeSendAck(DiscoveryNode node) {if (committed) {//提交状态assert sendAckedBeforeCommit.isEmpty();sendCommitToNode(node, clusterState, this);} else if (committedOrFailed()) {logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());} else {// we're still waitingsendAckedBeforeCommit.add(node);if (node.isMasterNode()) {checkForCommitOrFailIfNoPending(node);}}}//检查返回ack的节点数,如果超过了半数就执行commitprivate synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) {logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit);neededMastersToCommit--;if (neededMastersToCommit == 0) {if (markAsCommitted()) {for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {sendCommitToNode(nodeToCommit, clusterState, this);}sendAckedBeforeCommit.clear();}}decrementPendingMasterAcksAndChangeForFailure();}
  • commit阶段

接收到了足够的响应后开始执行commit逻辑

private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {try {logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",clusterState.stateUUID(), clusterState.version(), node);transportService.sendRequest(node, COMMIT_ACTION_NAME,new CommitClusterStateRequest(clusterState.stateUUID()),stateRequestOptions,new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {@Overridepublic void handleResponse(TransportResponse.Empty response) {if (sendingController.getPublishingTimedOut()) {logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version());}sendingController.getPublishResponseHandler().onResponse(node);}@Overridepublic void handleException(TransportException exp) {logger.debug(() -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}",clusterState.stateUUID(), clusterState.version(), node), exp);sendingController.getPublishResponseHandler().onFailure(node, exp);}});} catch (Exception t) {logger.warn(() -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}",clusterState.stateUUID(), clusterState.version(), node), t);sendingController.getPublishResponseHandler().onFailure(node, t);}}

同样通过transportService发生RPC请求,内部请求的url为internal:discovery/zen/publish/commit

接收数据的节点注册的处理器为CommitClusterStateRequestHandler

//提交处理transportService.registerRequestHandler(COMMIT_ACTION_NAME, ThreadPool.Names.SAME, false, false, CommitClusterStateRequest::new,new CommitClusterStateRequestHandler());//提交集群状态处理private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {@Overridepublic void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {handleCommitRequest(request, channel);}}

节点应用集群状态

@Overridepublic void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) {//更新提交新状态final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID,new PendingClusterStatesQueue.StateProcessedListener() {@Overridepublic void onNewClusterStateProcessed() {processedListener.onResponse(null);}@Overridepublic void onNewClusterStateFailed(Exception e) {processedListener.onFailure(e);}});if (state != null) {synchronized (stateMutex) {//应用新的集群状态processNextCommittedClusterState("master " + state.nodes().getMasterNode() +" committed version [" + state.version() + "]");}}}//集群应用新的集群状态clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",this::clusterState,new ClusterApplyListener() {@Overridepublic void onSuccess(String source) {try {pendingStatesQueue.markAsProcessed(newClusterState);} catch (Exception e) {onFailure(source, e);}}@Overridepublic void onFailure(String source, Exception e) {logger.error(() -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);try {// TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around// for too long.pendingStatesQueue.markAsFailed(newClusterState, e);} catch (Exception inner) {inner.addSuppressed(e);logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);}}});

最终调用到ClusterApplierService的runTask方法

private void runTask(UpdateTask task) {if (!lifecycle.started()) {logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);return;}logger.debug("processing [{}]: execute", task.source);//获取之前的集群状态final ClusterState previousClusterState = state.get();//任务执行起始时间long startTimeMS = currentTimeInMillis();//简单的秒表,允许对许多任务进行计时final StopWatch stopWatch = new StopWatch();final ClusterState newClusterState;try {try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {newClusterState = task.apply(previousClusterState);}} catch (Exception e) {TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));logger.trace(() -> new ParameterizedMessage("failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",executionTime, previousClusterState.version(), task.source, previousClusterState), e);warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);task.listener.onFailure(task.source, e);return;}if (previousClusterState == newClusterState) {TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);task.listener.onSuccess(task.source);} else {if (logger.isTraceEnabled()) {logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,newClusterState);} else {logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);}try {//执行状态更新applyChanges(task, previousClusterState, newClusterState, stopWatch);TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,executionTime, newClusterState.version(),newClusterState.stateUUID());warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);task.listener.onSuccess(task.source);} catch (Exception e) {TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));assert applicationMayFail();task.listener.onFailure(task.source, e);}}}

遍历所有状态应用者,调用集群状态的应用者的应用集群状态方法

//发送集群状态应用者
callClusterStateAppliers(clusterChangedEvent, stopWatch);
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {clusterStateAppliers.forEach(applier -> {logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {applier.applyClusterState(clusterChangedEvent);}});}

遍历所有集群状态监听器,调用集群状态变更回调函数

//发送集群状态监听器
callClusterStateListeners(clusterChangedEvent, stopWatch);
//执行集群状态变更后的回调private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {try {logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {listener.clusterChanged(clusterChangedEvent);}} catch (Exception ex) {logger.warn("failed to notify ClusterStateListener", ex);}});}

回到主节点执行回调函数handleResponse和handleException两个回调函数执行相同的处理逻辑,将latch减一,如果有的节点执行失败也不会执行修复逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/41793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ionic7 使用Capacitor打包 apk 之后,设置网络权限

报错处理 在打包的时候遇到过几个问题&#xff0c;这里记录下来两个 Visual Studio Code运行ionic build出错显示ionic : 无法加载文件 ionic 项目通过 android studio 打开报错 capacitor.settings.gradle 文件不存在 ionic7 项目初始化以及打包 apk 这篇文章讲到了如果安装…

2-25 基于matlab的语音信号降噪处理算法

基于matlab的语音信号降噪处理算法&#xff0c;采用谱减法&#xff0c;可以对强噪声背景下的语音信号进行去噪。输入原始信号及加噪信号&#xff0c;对加噪信号进行降噪&#xff0c;并提高信噪比。程序已调通&#xff0c;可直接运行。 2-25 语音信号降噪处理算法 谱减法 - 小红…

餐饮管理系统-计算机毕业设计源码43667

餐饮管理系统 摘 要 在信息化、数字化的时代背景下&#xff0c;餐饮行业面临着前所未有的挑战与机遇。为了提高运营效率、优化顾客体验&#xff0c;餐饮企业亟需一套高效、稳定且灵活的管理系统来支撑其日常运营。基于Spring Boot的餐饮管理系统应运而生&#xff0c;成为餐饮行…

mac 安装nvm的教程

在macOS上切换Node.js版本&#xff0c;可以使用nvm&#xff08;Node Version Manager&#xff09;。以下是安装nvm和切换Node.js版本的步骤&#xff1a; 安装nvm 下载方式 终端复制输入&#xff1a; curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.1/install.…

GIT - 一条命令把项目更新到远程仓库

前言 阅读本文大概需要1分钟 说明 更新项目到远程仓库只需要执行一条命令&#xff0c;相当的简便 步骤 第一步 编辑配置文件 vim ~/.bash_profile第二步 写入配置文件 gsh() {local msg"${1:-ADD COMMIT PUSH}"git add . && git commit -m "$m…

mipi协议中的calibration和scramble模式

在MIPI(Mobile Industry Processor Interface)协议中,calibration(校准)和scramble(加扰)模式是两个重要的特性,它们分别用于优化数据传输的准确性和减少信号干扰。以下是对这两个模式的详细解析: Calibration(校准)模式 目的与功能: 校准模式主要用于优化和补偿由…

C生万物之文件操作

文章目录 一、为什么使用文件&#xff1f;二、什么是文件&#xff1f;1、程序文件2、数据文件3、文件名 三、文件的打开和关闭1、文件指针2、文件的打开和关闭 四、文件的顺序读写1. 8个重要的库函数1.1 单字符输入输出【fputc和fgetc】1.2 文本行输入输出【fputs和fgets】1.3 …

机器学习 C++ 的opencv实现SVM图像二分类的测试 (三)【附源码】

机器学习 C 的opencv实现SVM图像二分类的测试 (三) 数据集合下载地址&#xff1a;https://download.csdn.net/download/hgaohr1021/89506900 根据上节得到的svm.xml&#xff0c;测试结果为&#xff1a; #include <stdio.h> #include <time.h> #include <o…

yolov5 json 和 txt数据格式关系

训练阶段 和 推理阶段数据格式转换说明 关于yolov5 数据格式一直以来都傻傻分不清楚&#xff0c;这下进行了一个梳理&#xff0c;做了笔记&#xff0c;也希望可帮助到有需要的有缘人~ 转换部分代码

大厂面试官赞不绝口的后端技术亮点【后端项目亮点合集(2):消息队列、ElasticSearch、Mysql等亮点合集】

本文将持续更新~~ 历史文章&#xff1a; 后端项目亮点合集&#xff08;1&#xff09;&#xff1a;Redis篇_后端项目有什么亮点-CSDN博客 本文的作用&#xff1a; &#xff08;1&#xff09;简历优化&#xff1a;针对自己的简历&#xff0c;对Redis亮点进行优化升级&#xff0c;…

虚拟机交叉编译基于ARM平台的opencv(ffmpeg/x264)

背景&#xff1a; 由于手上有一块rk3568的开发板&#xff0c;需要运行yolov5跑深度学习模型&#xff0c;但是原有的opencv不能对x264格式的视频进行解码&#xff0c;这里就需要将ffmpegx264编译进opencv。 但是开发板算力有限&#xff0c;所以这里采用在windows下&#xff0c;安…

绝缘子陶瓷绝缘子玻色绝缘子聚合物绝缘子检测数据集VOC+YOLO格式2050张3类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;2050 标注数量(xml文件个数)&#xff1a;2050 标注数量(txt文件个数)&#xff1a;2050 标注…

Ubuntu 22.04远程自动登录桌面环境

如果需要远程自动登录桌面环境&#xff0c;首先需要将Ubuntu的自动登录打开&#xff0c;在【settings】-【user】下面 然后要设置【Sharing】进行桌面共享&#xff0c;Ubuntu有自带的桌面共享功能&#xff0c;不需要另外去安装xrdp或者vnc之类的工具了 点开【Remote Desktop】…

Orangepi配合IIC驱动OLED屏幕

目录 一、OLED屏幕 二、Orangepi的IIC接口及OLED屏幕硬件接线 2.1 Orangepi的IIC接口&#xff1a; 2.2 Orangepi与OLED屏幕硬件接线&#xff1a; 三、wiringPi库示例代码 3.1 wiringPi库OLED屏幕示例代码&#xff1a; 3.2 OLED显示自己想要的字符&#xff1a; 一、OLED屏…

unix高级编程系列之文件I/O

背景 作为linux 开发者&#xff0c;我们不可避免会接触到文件编程。比如通过文件记录程序配置参数&#xff0c;通过字符设备与外设进行通信。因此作为合格的linux开发者&#xff0c;一定要熟练掌握文件编程。在文件编程中&#xff0c;我们一般会有两类接口函数&#xff1a;标准…

Mysql慢日志、慢SQL

慢查询日志 查看执行慢的SQL语句&#xff0c;需要先开启慢查询日志。 MySQL 的慢查询日志&#xff0c;记录在 MySQL 中响应时间超过阀值的语句&#xff08;具体指运行时间超过 long_query_time 值的SQL。long_query_time 的默认值为10&#xff0c;意思是运行10秒以上(不含10秒…

阿里云RDS云数据库库表恢复操作

最近数据库中数据被人误删了,记录一下恢复操作方便以后发生时进行恢复. 1.打开控制台&#xff0c;进入云数据库实例. 2.进入实例后 &#xff0c;点击右侧的备份恢复&#xff0c;然后看一下备份时间点&#xff0c;中间这边都是阿里云自动备份的备份集&#xff0c;基本都是7天一备…

详解「一本通 5.1 练习 1」括号配对(区间DP经典题)

一.题目 二.思路 题目的大意是说:给你一个只由[ ] ( )构成的字符串&#xff0c;请问需要增加多少个字符才能使其变为一个合法的括号序列。 因为添加若干字符使其达到匹配的目的等价于将不匹配的字符去除使得字符串达到匹配的目的 所以这题只需计算出已匹配完成的括号数,再…

中英双语介绍伦敦金融城(City of London)

中文版 伦敦金融城&#xff0c;通常称为“金融城”或“城”&#xff08;The City&#xff09;&#xff0c;是英国伦敦市中心的一个著名金融区&#xff0c;具有悠久的历史和全球性的影响力。以下是关于伦敦金融城的详细介绍&#xff0c;包括其地理位置、人口、主要公司、历史背…

【优化论】约束优化算法

约束优化算法是一类专门处理目标函数在存在约束条件下求解最优解的方法。为了更好地理解约束优化算法&#xff0c;我们需要了解一些核心概念和基本方法。 约束优化的核心概念 可行域&#xff08;Feasible Region&#xff09;&#xff1a; 比喻&#xff1a;想象你在一个园艺场…