elasticsearch源码分析-03选举集群状态

选举集群状态

es中存储的数据有一下几种,state元数据、lucene索引文件、translog事务日志
元数据信息可以分为:

  • 集群层面的元信息-对应着metaData数据结构,主要是clusterUUid、settings、templates等
  • 索引层面的元信息-对应着indexMetaData数据结构,主要存储分片数量、mappings索引字段映射等
  • 分片层面的元信息-对应着shardStateMetaData,主要是version、indexUUid、主分片等
    每个节点可能会有不同的集群状态,需要选择正确的元数据作为权威源数据。状态信息的管理在gatewayService中,它实现了ClusterStateListener接口,当选择完主节点后会发布一个集群状态task,触发回调方法clusterChanged
//恢复分片分配状态
performStateRecovery(enforceRecoverAfterTime, reason);

集群层和索引层元数据恢复在gateway模块完成

public void clusterChanged(final ClusterChangedEvent event) {if (lifecycle.stoppedOrClosed()) {return;}final ClusterState state = event.state();//只有主节点才能执行if (state.nodes().isLocalNodeElectedMaster() == false) {// not our job to recoverreturn;}//已经执行过了集群状态和索引状态恢复了if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {// already recoveredreturn;}//这段省略主要是检查是否达到恢复状态条件......//恢复状态performStateRecovery(enforceRecoverAfterTime, reason);
}

首先判断只有主节点可以执行状态选举,然后判断是否已经在执行了状态恢复任务了,如果是则直接返回;如果没有则执行恢复状态任务
最终会调用recoveryRunnable.run()

final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);recoveryRunnable = () ->gateway.performStateRecovery(new GatewayRecoveryListener());

执行gateway的performStateRecovery方法
首先回去所有master资格的节点信息

//具有master资格的node节点final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);

获取其他master节点的元数据

//获取集群及信息final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();

这里我们看下TransportNodesListGatewayMetaState的构造函数

public TransportNodesListGatewayMetaState(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,ActionFilters actionFilters, GatewayMetaState metaState) {super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters,Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);this.metaState = metaState;
}//注册action处理类
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,new TransportHandler());

回到list方法,会调用doExecute方法

public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();execute(new Request(nodesIds).timeout(timeout), future);return future;
}protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {//执行new AsyncAction(task, request, listener).start();
}

发送所有节点获取元数据

void start() {final DiscoveryNode[] nodes = request.concreteNodes();if (nodes.length == 0) {//没有需要获取数据的node// nothing to notifythreadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));return;}TransportRequestOptions.Builder builder = TransportRequestOptions.builder();if (request.timeout() != null) {builder.withTimeout(request.timeout());}//循环发送请求给所有节点for (int i = 0; i < nodes.length; i++) {final int idx = i;final DiscoveryNode node = nodes[i];final String nodeId = node.getId();try {TransportRequest nodeRequest = newNodeRequest(request);if (task != null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}//发送请求transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),new TransportResponseHandler<NodeResponse>() {@Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}//处理返回@Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}@Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}@Overridepublic String executor() {return ThreadPool.Names.SAME;}});} catch (Exception e) {onFailure(idx, nodeId, e);}}
}

对端接收请求后处理在上面注册的NodeTransportHandler,构造每个节点元数据返回

//node请求处理class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {@Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {channel.sendResponse(nodeOperation(request, task));}}protected NodeGatewayMetaState nodeOperation(NodeRequest request) {return new NodeGatewayMetaState(clusterService.localNode(), metaState.getMetadata());}

我们继续回到每个节点发送请求的返回处理

//处理返回
@Override
public void handleResponse(NodeResponse response) {onOperation(idx, response);
}private void onOperation(int idx, NodeResponse nodeResponse) {//记录node的返回结果responses.set(idx, nodeResponse);//当所有节点都返回结果了无论是失败还是成功了if (counter.incrementAndGet() == responses.length()) {finishHim();}
}private void finishHim() {NodesResponse finalResponse;try {finalResponse = newResponse(request, responses);} catch (Exception e) {logger.debug("failed to combine responses from nodes", e);listener.onFailure(e);return;}//触发监听回调listener.onResponse(finalResponse);
}

及获取到了其他节点的元数据,继续回到performStateRecovery
需要获取的master角色节点数

//需要分配数量
final int requiredAllocation = Math.max(1, minimumMasterNodes);

开始通过版本号选择集群层元数据,比较版本号,选择版本号最大的集群状态

//集群元数据
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}found++;if (electedGlobalState == null) {electedGlobalState = nodeState.metadata();//比较版本号大的胜出} else if (nodeState.metadata().version() > electedGlobalState.version()) {electedGlobalState = nodeState.metadata();}for (final ObjectCursor<IndexMetadata> cursor : nodeState.metadata().indices().values()) {indices.addTo(cursor.value.getIndex(), 1);}
}

检查是否有足够数量节点返回了集群状态

 //没有获取足够的节点返回消息
if (found < requiredAllocation) {listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");return;
}

构造集群状态,删除索引信息,下面会选择索引级元数据

//更新全局状态,清理索引,我们在下一阶段选择它们final Metadata.Builder metadataBuilder = Metadata.builder(electedGlobalState).removeAllIndices();

遍历所有节点选择返回的索引元数据版本最高的节点作为索引级元数据,然后将索引级元数据添加到metadataBuilder中

for (int i = 0; i < keys.length; i++) {if (keys[i] != null) {final Index index = (Index) keys[i];IndexMetadata electedIndexMetadata = null;int indexMetadataCount = 0;for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() == null) {continue;}final IndexMetadata indexMetadata = nodeState.metadata().index(index);if (indexMetadata == null) {continue;}if (electedIndexMetadata == null) {electedIndexMetadata = indexMetadata;//比较版本号,选择最大版本号} else if (indexMetadata.getVersion() > electedIndexMetadata.getVersion()) {electedIndexMetadata = indexMetadata;}indexMetadataCount++;}if (electedIndexMetadata != null) {if (indexMetadataCount < requiredAllocation) {logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetadataCount, requiredAllocation);} // TODO if this logging statement is correct then we are missing an else here//设置索引级元数据metadataBuilder.put(electedIndexMetadata, false);}}
}

构造恢复后的集群级元数据和索引级元数据

//恢复后的集群状态
ClusterState recoveredState = Function.<ClusterState>identity().andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())).apply(ClusterState.builder(clusterService.getClusterName()).metadata(metadataBuilder).build());listener.onSuccess(recoveredState);

调用GatewayRecoveryListener的onSuccess向集群提交任务

class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {@Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace("successful state recovery, importing cluster state...");clusterService.submitStateUpdateTask("local-gateway-elected-state",new RecoverStateUpdateTask() {@Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}@Overridepublic void onFailure(final String msg) {logger.info("state recovery failed: {}", msg);resetRecoveredFlags();}}

调用RecoverStateUpdateTask的execute方法

@Override
public ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {logger.debug("cluster is already recovered");return currentState;}//状态信息恢复完成final ClusterState newState = Function.<ClusterState>identity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//开始分配分片return allocationService.reroute(newState, "state recovered");
}

集群元数据和索引级元数据恢复完成开始分配分片

  • 元数据的持久化
    具有master资格的节点和数据节点可以持久化集群状态,当接收到集群状态变更时会将其持久化到磁盘GatewayClusterApplier实现了ClusterStateApplier,当集群状态变更时会调用applyClusterState方法
@Override
public void applyClusterState(ClusterChangedEvent event) {if (event.state().blocks().disableStatePersistence()) {incrementalClusterStateWriter.setIncrementalWrite(false);return;}try {// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term// that's higher than the last accepted term.// TODO: can we get rid of this hack?if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {incrementalClusterStateWriter.setCurrentTerm(event.state().term());}//更新磁盘上的元数据incrementalClusterStateWriter.updateClusterState(event.state());incrementalClusterStateWriter.setIncrementalWrite(true);} catch (WriteStateException e) {logger.warn("Exception occurred when storing new meta data", e);}
}

将集群级元数据和索引级元数据落盘

void updateClusterState(ClusterState newState) throws WriteStateException {//元数据Metadata newMetadata = newState.metadata();final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);//全局元数据long globalStateGeneration = writeGlobalState(writer, newMetadata);//索引级元数据Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);writeManifest(writer, manifest);previousManifest = manifest;previousClusterState = newState;final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +"wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());} else {logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());}
}
  • 加载磁盘元数据
    在node实例的start方法中会调用gatewayMetaState.start方法
//集群元数据
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));

然后会调用loadFullState方法

//加载元数据
manifestClusterStateTuple = metaStateService.loadFullState();public Tuple<Manifest, Metadata> loadFullState() throws IOException {//加载最新的状态文件final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());if (manifest == null) {return loadFullStateBWC();}//构建元数据final Metadata.Builder metadataBuilder;if (manifest.isGlobalGenerationMissing()) {metadataBuilder = Metadata.builder();} else {final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),nodeEnv.nodeDataPaths());if (globalMetadata != null) {metadataBuilder = Metadata.builder(globalMetadata);} else {throw new IOException("failed to find global metadata [generation: " + manifest.getGlobalGeneration() + "]");}}//索引级元数据for (Map.Entry<Index, Long> entry : manifest.getIndexGenerations().entrySet()) {final Index index = entry.getKey();final long generation = entry.getValue();final String indexFolderName = index.getUUID();final IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,nodeEnv.resolveIndexFolder(indexFolderName));if (indexMetadata != null) {metadataBuilder.put(indexMetadata, false);} else {throw new IOException("failed to find metadata for existing index " + index.getName() + " [location: " + indexFolderName +", generation: " + generation + "]");}}return new Tuple<>(manifest, metadataBuilder.build());
}

从磁盘读取构建索引级元数据和集群级元数据,用于构建集群状态对象ClusterState

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/37543.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RK35x8通过TFTP下载内核到开发板

对于有网线接口的RK35X8开发板&#xff0c;调试时候&#xff0c;可以通过网线下载内核镜像和设备树到开发板&#xff0c;不用每次修改驱动都要重新打开下载工具&#xff0c;进入下载模式。通过TFTP可以大大提高调试效率。 在ubuntu安装TFTP服务 安装tftp服务器 sudo apt-get…

【面试系列】前端开发工程师高频面试题及详细解答

欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;欢迎订阅相关专栏&#xff1a; ⭐️ 全网最全IT互联网公司面试宝典&#xff1a;收集整理全网各大IT互联网公司技术、项目、HR面试真题. ⭐️ AIGC时代的创新与未来&#xff1a;详细讲解AIGC的概念、核心技术、…

Python商务数据分析知识专栏(二)——Python数据分析基础

Python商务数据分析知识专栏&#xff08;二&#xff09;——Python数据分析基础 一、Python数据分析概述二、Numpy数值计算基础专栏二&#xff08;Python数据分析基础&#xff09;的总结 与 专栏三&#xff08;Python数据分析的应用&#xff09;开端 一、Python数据分析概述 二…

【笔记】Spring Cloud Gateway 实现 gRPC 代理

Spring Cloud Gateway 在 3.1.x 版本中增加了针对 gRPC 的网关代理功能支持,本片文章描述一下如何实现相关支持.本文主要基于 Spring Cloud Gateway 的 官方文档 进行一个实践练习。有兴趣的可以翻看官方文档。 由于 Grpc 是基于 HTTP2 协议进行传输的&#xff0c;因此 Srping …

深度学习之Transformer模型的Vision Transformer(ViT)和Swin Transformer

Transformer 模型最初由 Vaswani 等人在 2017 年提出,是一种基于自注意力机制的深度学习模型。它在自然语言处理(NLP)领域取得了巨大成功,并且也逐渐被应用到计算机视觉任务中。以下是两种在计算机视觉领域中非常重要的 Transformer 模型:Vision Transformer(ViT)和 Swi…

git 个人常见错误备注

问题1&#xff1a;all conflict fixed but you are still merging。。。。。 如果你已经解决了所有冲突&#xff0c;但 Git 仍然提示你正在进行合并&#xff0c;可能是因为你还没有完成合并过程。以下是详细步骤&#xff0c;确保你正确完成合并并提交更改&#xff1a; 确认所…

Tongsuo(铜锁)项目介绍 - 实现国密SSL协议

文章介绍 铜锁(Tongsuo)是一个提供现代密码学算法和安全通信协议的开源基础密码库,为存储、网络、密钥管理、隐私计算、区块链等诸多业务场景提供底层的密码学基础能力,实现数据在传输、使用、存储等过程中的私密性、完整性和可认证性,为数据生命周期中的隐私和安全提供保…

鸿蒙 如何 url decode

在 TypeScript 和 JavaScript 中进行 URL 编码的最简单方式是使用内置的 global 函数 encodeURIComponent()。以下是一个示例&#xff1a; let url "https://example.com/?name测试&job开发者"; let encodedURL encodeURIComponent(url); console.log(encode…

【RAG】FoRAG:面向网络增强型长形式问答的事实性优化RAG

一、解决问题 在基于网络的长形式问答&#xff08;Web-enhanced Long-form Question Answering, LFQA&#xff09;任务中&#xff0c;现有RAG在生成答案时存在的问题&#xff1a; 事实性不足&#xff1a;研究表明&#xff0c;现有系统生成的答案中只有大约一半的陈述能够完全得…

Qt开发笔记:Qt3D三维开发笔记(一):Qt3D三维开发基础概念介绍

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://blog.csdn.net/qq21497936/article/details/140059315 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、O…

汇编语言基础教程

汇编语言基础教程 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将深入探讨汇编语言的基础知识和应用&#xff0c;帮助大家理解汇编语言在计算机编程中…

来自Claude官方的提示词库,支持中文!建议收藏!

大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,所以创建了“AI信息Gap”这个公众号,专注于分享AI全维度知识,包括但不限于AI科普,AI工具测评,AI效率提升,AI行业洞察。关注我,AI之…

多元时间序列分析——VAR(向量自回归模型)

VAR模型主要是考察多个变量之间的动态互动关系&#xff0c;从而解释各种经济冲击对经济变量形成的动态影响。这种动态关系可通过格兰杰因果关系、脉冲响应以及方差分解来进一步明确和可视化。VAR模型主要研究内生变量之间的关系&#xff0c;内生变量就是参与模型并由模型体系内…

通天星CMSV6车载监控平台CompanyList信息泄露漏洞

1 漏洞描述 通天星CMSV6车载视频监控平台是东莞市通天星软件科技有限公司研发的监控平台,通天星CMSV6产品覆盖车载录像机、单兵录像机、网络监控摄像机、行驶记录仪等产品的视频综合平台。通天星科技应用于公交车车载、校车车载、大巴车车载、物流车载、油品运输车载、警车车…

推荐一款程序员的搞钱神器

你是不是经常为开发环境的搭建而头疼&#xff1f;有没有遇到过因为接口开发而焦头烂额的情况&#xff1f;作为一名程序员&#xff0c;特别是独立开发者&#xff0c;这些问题是不是常常让你觉得心力交瘁&#xff1f;别担心&#xff0c;现在有一个神器&#xff0c;能让你摆脱这些…

五、golang基础之slice和map

文章目录 一、slice&#xff08;一&#xff09;含义&#xff08;二&#xff09;定义切片&#xff08;三&#xff09;切片初始化&#xff08;四&#xff09;len() 和 cap() 函数&#xff08;五&#xff09;空(nil)切片&#xff08;六&#xff09;切片截取&#xff08;七&#xf…

2024HVV最新POC/EXP,目前有8000+个POC/EXP

点击"仙网攻城狮”关注我们哦~ 不当想研发的渗透人不是好运维 让我们每天进步一点点 简介 都是网上收集的POC和EXP&#xff0c;最新收集时间是2024年五月&#xff0c;需要的自取。 表里没有的可以翻翻之前的文章&#xff0c;资源比较零散没有整合起来。 文件链接&#xff…

hexo博客搭建

系列文章目录 文章目录 系列文章目录前言1. 环境配置2. 打包并发布到github仓库3. 生成ssh秘钥4.vscode配置本地与远端相对路径不一致问题总结 前言 本文主要介绍了hexo博客怎么搭建 1. 环境配置 安装git、nodejs、npm创建博客文件夹blogcmd输入命令npm install -g hexo初始化…

10波形震荡原因及采集设备安装视频

10波形震荡原因及采集设备安装视频 排查过程算法软件后台解码计算嵌入式采集设备准备视频 结语其他以下是废话 之前说过&#xff1a;“解决不了的真的就不是我这边能解决的了”&#xff0c;这是因为我们充分排查了自身&#xff0c;那么问题出在哪里呢&#xff1f; 不愿沟通、不…

Spring Boot 全面解析:从入门到实践案例

引言&#xff1a; Spring Boot 是由 Pivotal 团队提供的全新框架&#xff0c;旨在简化 Spring 应用的初始搭建以及开发过程。它基于 Spring 平台&#xff0c;通过“约定优于配置”的原则&#xff0c;尽可能自动化配置&#xff0c;减少XML配置&#xff0c;使得开发者能够快速启…