spring eureka集群相关问题

一、集群节点信息如何更新?

EurekaServer节点启动的时候,DefaultEurekaServerContext.init()方法调用PeerEurekaNodes.start()方法,start方法中resolvePeerUrls()会从配置文件读取serviceUrl属性值获得集群最新节点信息,通过updatePeerEurekaNodes()方法将最新节点信息更新到PeerEurekaNodes类的属性peerEurekaNodes和peerEurekaNodeUrls中。

PeerEurekaNodes.start()还提供了一个更新节点信息的定时任务,每隔PeerEurekaNodesUpdateIntervalMs (默认10min) 时间执行一次。

针对上述这个过程,需要注意的是(网上很多文章表述有误):peerEurekaNodesUpdateIntervalMs这个参数并不是集群节点注册信息的同步间隔时间。为什么这么说呢?我们可以看上图中的定时任务peersUpdateTask做了什么事情即可,其实只要看updatePeerEurekaNodes(resolvePeerUrls())做了什么即可。

通过源码跟踪:resolvePeerUrls()方法默认即执行了EndpointUtils.getServiceUrlsFromConfig(),即从配置文件读取服务节点urls信息。updatePeerEurekaNodes()方法则把获取得到的urls与之前urls信息进行对比,该新增的放到新增list,该删除的放到删除list,如果既没有新增的节点也没有待删除的节点,则返回不做任何处理。下面是代码:

针对要删除的节点,处理方式是从节点列表移除,同时调用该节点的shutDown方法;针对要新增的节点,则加入到节点列表中,并创建新的节点,调用方法createPeerEurekaNode(String peerEurekaNodeUrl);创建节点最终调用了构造方法:

new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);

上述构造方法会从服务其他节点获取注册服务信息同步到新创建的服务节点中。

那么接下来就要问第二个问题了。

二、注册服务信息如何在集群节点中同步?

注册服务信息在集群节点中的同步有两种场景:一种是新增服务节点如何从其他服务节点同步服务注册信息;另一种是稳定的集群中,各节点之间是如何同步服务注册信息。

我们先来看第一种场景,新增节点如何从其他服务节点同步服务注册信息。

(1)Eureka Server节点启动时的服务同步

Eureka Server启动时,EurekaServerBootStrap调用PeerAwareInstanceRegistryImpl.syncUp(),同步注册服务信息。

// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

syncUp()进行注册服务信息的同步:

    /*** Populates the registry information from a peer eureka node. This* operation fails over to other nodes until the list is exhausted if the* communication fails.*/@Overridepublic int syncUp() {// Copy entire entry from neighboring DS nodeint count = 0;for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) {try {Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());} catch (InterruptedException e) {logger.warn("Interrupted during registry transfer..");break;}}Applications apps = eurekaClient.getApplications();for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {try {if (isRegisterable(instance)) {register(instance, instance.getLeaseInfo().getDurationInSecs(), true);count++;}} catch (Throwable t) {logger.error("During DS init copy", t);}}}}return count;}

首先会从EurekaClient.getApplications获取所有的注册服务信息,然后调用register()进行服务注册。

如果同步失败,则会sleep serverConfig.getRegistrySyncRetryWaitMs()后,再次进行同步。

同步完成后,调用PeerAwareInstanceRegistryImpl.openForTraffic()方法,进行自我保护阀值的计算:

   public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.this.expectedNumberOfClientsSendingRenews = count;updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();}

至此,Eureka Server启动时的节点复制就进行完了。

备注

最大同步重试次数=serverConfig.getRegistrySyncRetries(),默认5次。

同步失败后每次同步的间隔时间=serverConfig.getRegistrySyncRetryWaitMs(),默认30s。

如果启动时同步服务注册信息失败,一段时间不对外提供服务注册功能,waitTimeInMsWhenSyncEmpty,默认5min。

第二种场景:

(2)Eureka Server接收到Register、renew、cancel时的服务同步

处理Register、renew、cancel同步时比较复杂,包括好几个队列的转换处理以及异常处理。这里算是个简图吧,省略了队列的转换和异常处理。

Eureka Server接收到Register、renew或cancel事件后,执行向其他节点同步:

 /*** Replicates all eureka actions to peer eureka nodes except for replication* traffic to this node.**/private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();}}/*** Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);switch (action) {case Cancel:node.cancel(appName, id);break;case Heartbeat:InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:node.register(info);break;case StatusUpdate:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);}}

以Register为例(逻辑是一样的),Eureka Server循环遍历其他节点,然后调用node.register(info):

/*** Sends the registration information of {@link InstanceInfo} receiving by* this node to the peer node represented by this class.** @param info*            the instance information {@link InstanceInfo} of any instance*            that is send to this instance.* @throws Exception*/public void register(final InstanceInfo info) throws Exception {long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);batchingDispatcher.process(taskId("register", info),new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {public EurekaHttpResponse<Void> execute() {return replicationClient.register(info);}},expiryTime);}

这里执行TaskDispatcher的process,实际底层转到了AcceptorExecutor.process():

acceptorExecutor.process(id, task, expiryTime);

然后acceptorExecutor里会开启内部线程AcceptorRunner进行处理,处理逻辑有点复杂(包括队列的转换和异常处理),最终将处理好的结果放到BlockingQueue> batchWorkQueue中。

TaskExecutors中会在内部开启WokerRunnable线程组(ThreadGroup),循环poll batchWorkQueue队列中的Task,然后调用InstanceReplicationTask的execute方法,将register事件推送到其他Eureka Server节点。

public void run() {try {while (!isShutdown.get()) {List<TaskHolder<ID, T>> holders = getWork();metrics.registerExpiryTimes(holders);List<T> tasks = getTasksOf(holders);ProcessingResult result = processor.process(tasks);switch (result) {case Success:break;case Congestion:case TransientError:taskDispatcher.reprocess(holders, result);break;case PermanentError:logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);}metrics.registerTaskResult(result, tasks.size());}} catch (InterruptedException e) {// Ignore} catch (Throwable e) {// Safe-guard, so we never exit this loop in an uncontrolled way.logger.warn("Discovery WorkerThread error", e);}}

register、renew、cancel的处理逻辑一样,只不过调用的同步接口不同罢了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/645204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电池回收产业东风中,吉利科技集团如何先行一步?

随着绿色低碳可持续发展理念深入人心&#xff0c;全球能源变革和转型升级持续推进&#xff0c;新能源行业不断涌现新的机遇。 动力电池回收和再利用&#xff0c;就是近在眼前的“红利型”产业。 我国新能源汽车市场近年来爆发式增长&#xff0c;动力电池生产紧随电动车普及步…

【代码管理】TortoiseGit 图标没有显示

当TortoiseGit在Windows系统中没有正确显示文件和目录的图标状态时&#xff0c;可能的原因和解决方法如下&#xff1a; 原因与解决方案&#xff1a; TortoiseGit未集成到资源管理器&#xff1a; 请确保TortoiseGit已正确安装&#xff0c;并在安装过程中选择了“将TortoiseGit集…

C++区间覆盖(贪心算法)

假设有n个区间&#xff0c;分别是&#xff1a;[l1,r1], [l2,r2], [l3,r3].....[ln,rn] 从这n个区间中选出某些区间&#xff0c;要求这些区间满足两两不相交&#xff0c;最多能选出多少个区间呢&#xff1f; 基本思路&#xff1a; 按照右端点从小到大排序&#xff0c;再比较左端…

深度学习中RGB影像图的直方图均衡化python代码and对图片中指定部分做基于掩模的特定区域直方图均衡化

深度学习很重要的预处理步骤 就是需要对做直方图均衡化 其中主要分成灰度图以及RGB图的直方图均衡化 这俩的方法和代码不同 想要去看具体原理的朋友可以查看下面这篇博客的内容 写的很详细颜色直方图均衡化(https://www.cnblogs.com/wancy/p/17668345.html) 我们这个场景中会用…

【RT-DETR有效改进】FasterNet一种跑起来的主干网络( 提高FPS和检测效率)

前言 大家好&#xff0c;这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进&#xff0c;内容持续更新&#xff0c;每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本&#xff0c;同时修改内容也支持ResNet32、ResNet101和PP…

Google murmur3 hashString用法

如下为将String获取hashcode&#xff0c;转为Long的方法&#xff0c;主要是在海量数据的flink程序里&#xff0c;为了节省状态的存储空间&#xff0c;所以尝试用long来存储。 同样的还可以hash其他格式的数据。 评估了下&#xff0c;murmur3最高用的是128位的hash值&#xff…

圈子论坛社交实名制系统---H5小程序APP,三端源码交付,允许二开!PHP系统uni书写!

圈子系统是一种社会化网络平台&#xff0c;它的核心是以用户为中心&#xff0c;围绕用户的兴趣、爱好、经历和职业等因素&#xff0c;将具有相同特质的个体聚集起来&#xff0c;形成具有共同话题和兴趣的社交圈子。这样的系统旨在帮助用户拓宽社交范围&#xff0c;提升社交效率…

uniapp+vue开发微信小程序,image标签图片IOS可以正常回显,安卓回显不出

仅代表个人遇到的问题&#xff0c;仅代表个人遇到的问题&#xff0c;仅代表个人遇到的问题&#xff0c; 1.先说最快的解决方案&#xff0c;直接在src下面额外添加一段url&#xff0c;https://images.weserv.nl/?url&#xff0c; <imagestyle"width: 180rpx; height: 2…

封装 element el-date-picker时间选择区间

基于el-date-picker 处理满足项目需求。&#xff08;&#xff1a;最多选择7天&#xff09; 效果&#xff1a; 1 大于当前时间的以后日期禁选。2 选中时间的前后七天可选 &#xff08;最多可查询7天数据&#xff09;3 <template><section class"warning-contai…

FPGA硬件架构——具体型号是xc7k325tffg676-2为例

1.共如下图14个时钟域&#xff0c;XmYn(按坐标理解) 2.IOB(IOB为可编程输入输出单元,当然在普通Bank上的IOB附近还有很多时钟资源&#xff0c;例如PLL&#xff0c;MMCM资源。), 2.1 FPGA的Bank分为HP Bank和HR Bank&#xff0c;二者对电压的要求范围不同&#xff0c;HR支持更大…

Spark 的宽依赖和窄依赖

Apache Spark 中的依赖关系指的是转换操作&#xff08;transformations&#xff09;之间的依赖类型。这些依赖关系决定了任务是如何在集群上分布执行的。依赖关系分为两类&#xff1a;宽依赖&#xff08;Wide Dependency&#xff09;和窄依赖&#xff08;Narrow Dependency&…

orchestrator介绍3.2 命令行之orchestrator-client

orchestrator-client 是一个包装 API 调用的脚本&#xff0c;使用起来更方便。 它可以自动确定orchestrator的Leader角色&#xff0c;并在这种情况下将所有请求转发给Leader。 有了orchestrator-client&#xff1a; 不需要到处安装orchestrator的二进制文件&#xff1b;仅在…

2023龙信杯wp

打了好像70多分&#xff0c;没拿奖&#xff0c;因为一些众所周知的原因&#xff0c;复盘间隔时间太长了没什么印象了已经 案情简介 2023年9月&#xff0c;某公安机关指挥中心接受害人报案:通过即时通讯工具添加认识一位叫“周微”的女人&#xff0c;两人谈论甚欢&#xff0c;确…

大语言模型推理提速:TensorRT-LLM 高性能推理实践

作者&#xff1a;顾静 TensorRT-LLM 如何提升 LLM 模型推理效率 大型语言模型&#xff08;Large language models,LLM&#xff09;是基于大量数据进行预训练的超大型深度学习模型。底层转换器是一组神经网络&#xff0c;这些神经网络由具有 self-attention 的编码器和解码器组…

自然语言处理(NLP)的发展

自然语言处理的发展 随着深度学习和大数据技术的进步&#xff0c;自然语言处理取得了显著的进步。人们正在研究如何使计算机更好地理解和生成人类语言&#xff0c;以及如何应用NLP技术改善搜索引擎、语音助手、机器翻译等领域。 方向一&#xff1a;技术进步 自然语言处理&…

【算法专题】动态规划之简单多状态 dp 问题

动态规划3.0 动态规划 - - - 简单多状态 dp 问题1. 按摩师(打家劫舍Ⅰ的变形)2. 打家劫舍Ⅱ3. 删除并获得点数4. 粉刷房子5. 买卖股票的最佳时机含冷冻期6. 买卖股票的最佳时机含手续费7. 买卖股票的最佳时机Ⅲ8. 买卖股票的最佳时机Ⅳ 动态规划 - - - 简单多状态 dp 问题 1. …

【Java 设计模式】行为型之备忘录模式

文章目录 1. 定义2. 应用场景3. 代码实现结语 备忘录模式&#xff08;Memento Pattern&#xff09;是一种行为型设计模式&#xff0c;用于捕获一个对象的内部状态&#xff0c;以便稍后可以将该对象恢复到此状态。备忘录模式允许在不破坏封装性的前提下捕获和外部化对象的内部状…

Could not autowire. No beans of ‘RedisConnectionFactory‘ type found.已解决

springboot2.7.8 redis3.2.100 在springboot中 使用RedisConnectionFactory 出现这样的错误Could not autowire. No beans of ‘RedisConnectionFactory‘ type found. 只需要在pom.xml中加入 <!-- 整合redis --> <dependency> <groupId>org.springf…

客户端请求+返回 服务端之间的请求和返回 实现rpc通信

背景&#xff1a; 1.无论什么类型的游戏&#xff0c;我们都会有rpc通信的需求。 2.由于客户端直连的是游戏服&#xff0c;如果工会&#xff0c;匹配之类的服务是单独的服务的话&#xff0c;必然要进行游戏服到业务服之间的转发&#xff0c;我们是否需要再转发时单独定义Req和Re…

Halcon基于透视形变的模板匹配

Halcon基于透视形变的模板匹配 透视形变也是一种形变&#xff0c;属于形状模板匹配的延伸。形状模板匹配对于形变非常敏感&#xff0c;而透视形变匹配则能适应出现透视形变的情况。透视形变的匹配又分为无标定和有标定两种情况。基于透视形变的匹配步骤如下。 &#xff08;1&a…