spring eureka集群相关问题

一、集群节点信息如何更新?

EurekaServer节点启动的时候,DefaultEurekaServerContext.init()方法调用PeerEurekaNodes.start()方法,start方法中resolvePeerUrls()会从配置文件读取serviceUrl属性值获得集群最新节点信息,通过updatePeerEurekaNodes()方法将最新节点信息更新到PeerEurekaNodes类的属性peerEurekaNodes和peerEurekaNodeUrls中。

PeerEurekaNodes.start()还提供了一个更新节点信息的定时任务,每隔PeerEurekaNodesUpdateIntervalMs (默认10min) 时间执行一次。

针对上述这个过程,需要注意的是(网上很多文章表述有误):peerEurekaNodesUpdateIntervalMs这个参数并不是集群节点注册信息的同步间隔时间。为什么这么说呢?我们可以看上图中的定时任务peersUpdateTask做了什么事情即可,其实只要看updatePeerEurekaNodes(resolvePeerUrls())做了什么即可。

通过源码跟踪:resolvePeerUrls()方法默认即执行了EndpointUtils.getServiceUrlsFromConfig(),即从配置文件读取服务节点urls信息。updatePeerEurekaNodes()方法则把获取得到的urls与之前urls信息进行对比,该新增的放到新增list,该删除的放到删除list,如果既没有新增的节点也没有待删除的节点,则返回不做任何处理。下面是代码:

针对要删除的节点,处理方式是从节点列表移除,同时调用该节点的shutDown方法;针对要新增的节点,则加入到节点列表中,并创建新的节点,调用方法createPeerEurekaNode(String peerEurekaNodeUrl);创建节点最终调用了构造方法:

new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);

上述构造方法会从服务其他节点获取注册服务信息同步到新创建的服务节点中。

那么接下来就要问第二个问题了。

二、注册服务信息如何在集群节点中同步?

注册服务信息在集群节点中的同步有两种场景:一种是新增服务节点如何从其他服务节点同步服务注册信息;另一种是稳定的集群中,各节点之间是如何同步服务注册信息。

我们先来看第一种场景,新增节点如何从其他服务节点同步服务注册信息。

(1)Eureka Server节点启动时的服务同步

Eureka Server启动时,EurekaServerBootStrap调用PeerAwareInstanceRegistryImpl.syncUp(),同步注册服务信息。

// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

syncUp()进行注册服务信息的同步:

    /*** Populates the registry information from a peer eureka node. This* operation fails over to other nodes until the list is exhausted if the* communication fails.*/@Overridepublic int syncUp() {// Copy entire entry from neighboring DS nodeint count = 0;for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) {try {Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());} catch (InterruptedException e) {logger.warn("Interrupted during registry transfer..");break;}}Applications apps = eurekaClient.getApplications();for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {try {if (isRegisterable(instance)) {register(instance, instance.getLeaseInfo().getDurationInSecs(), true);count++;}} catch (Throwable t) {logger.error("During DS init copy", t);}}}}return count;}

首先会从EurekaClient.getApplications获取所有的注册服务信息,然后调用register()进行服务注册。

如果同步失败,则会sleep serverConfig.getRegistrySyncRetryWaitMs()后,再次进行同步。

同步完成后,调用PeerAwareInstanceRegistryImpl.openForTraffic()方法,进行自我保护阀值的计算:

   public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.this.expectedNumberOfClientsSendingRenews = count;updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");applicationInfoManager.setInstanceStatus(InstanceStatus.UP);super.postInit();}

至此,Eureka Server启动时的节点复制就进行完了。

备注

最大同步重试次数=serverConfig.getRegistrySyncRetries(),默认5次。

同步失败后每次同步的间隔时间=serverConfig.getRegistrySyncRetryWaitMs(),默认30s。

如果启动时同步服务注册信息失败,一段时间不对外提供服务注册功能,waitTimeInMsWhenSyncEmpty,默认5min。

第二种场景:

(2)Eureka Server接收到Register、renew、cancel时的服务同步

处理Register、renew、cancel同步时比较复杂,包括好几个队列的转换处理以及异常处理。这里算是个简图吧,省略了队列的转换和异常处理。

Eureka Server接收到Register、renew或cancel事件后,执行向其他节点同步:

 /*** Replicates all eureka actions to peer eureka nodes except for replication* traffic to this node.**/private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();}}/*** Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);switch (action) {case Cancel:node.cancel(appName, id);break;case Heartbeat:InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:node.register(info);break;case StatusUpdate:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);}}

以Register为例(逻辑是一样的),Eureka Server循环遍历其他节点,然后调用node.register(info):

/*** Sends the registration information of {@link InstanceInfo} receiving by* this node to the peer node represented by this class.** @param info*            the instance information {@link InstanceInfo} of any instance*            that is send to this instance.* @throws Exception*/public void register(final InstanceInfo info) throws Exception {long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);batchingDispatcher.process(taskId("register", info),new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {public EurekaHttpResponse<Void> execute() {return replicationClient.register(info);}},expiryTime);}

这里执行TaskDispatcher的process,实际底层转到了AcceptorExecutor.process():

acceptorExecutor.process(id, task, expiryTime);

然后acceptorExecutor里会开启内部线程AcceptorRunner进行处理,处理逻辑有点复杂(包括队列的转换和异常处理),最终将处理好的结果放到BlockingQueue> batchWorkQueue中。

TaskExecutors中会在内部开启WokerRunnable线程组(ThreadGroup),循环poll batchWorkQueue队列中的Task,然后调用InstanceReplicationTask的execute方法,将register事件推送到其他Eureka Server节点。

public void run() {try {while (!isShutdown.get()) {List<TaskHolder<ID, T>> holders = getWork();metrics.registerExpiryTimes(holders);List<T> tasks = getTasksOf(holders);ProcessingResult result = processor.process(tasks);switch (result) {case Success:break;case Congestion:case TransientError:taskDispatcher.reprocess(holders, result);break;case PermanentError:logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);}metrics.registerTaskResult(result, tasks.size());}} catch (InterruptedException e) {// Ignore} catch (Throwable e) {// Safe-guard, so we never exit this loop in an uncontrolled way.logger.warn("Discovery WorkerThread error", e);}}

register、renew、cancel的处理逻辑一样,只不过调用的同步接口不同罢了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/645204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电池回收产业东风中,吉利科技集团如何先行一步?

随着绿色低碳可持续发展理念深入人心&#xff0c;全球能源变革和转型升级持续推进&#xff0c;新能源行业不断涌现新的机遇。 动力电池回收和再利用&#xff0c;就是近在眼前的“红利型”产业。 我国新能源汽车市场近年来爆发式增长&#xff0c;动力电池生产紧随电动车普及步…

深度学习中RGB影像图的直方图均衡化python代码and对图片中指定部分做基于掩模的特定区域直方图均衡化

深度学习很重要的预处理步骤 就是需要对做直方图均衡化 其中主要分成灰度图以及RGB图的直方图均衡化 这俩的方法和代码不同 想要去看具体原理的朋友可以查看下面这篇博客的内容 写的很详细颜色直方图均衡化(https://www.cnblogs.com/wancy/p/17668345.html) 我们这个场景中会用…

【RT-DETR有效改进】FasterNet一种跑起来的主干网络( 提高FPS和检测效率)

前言 大家好&#xff0c;这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进&#xff0c;内容持续更新&#xff0c;每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本&#xff0c;同时修改内容也支持ResNet32、ResNet101和PP…

圈子论坛社交实名制系统---H5小程序APP,三端源码交付,允许二开!PHP系统uni书写!

圈子系统是一种社会化网络平台&#xff0c;它的核心是以用户为中心&#xff0c;围绕用户的兴趣、爱好、经历和职业等因素&#xff0c;将具有相同特质的个体聚集起来&#xff0c;形成具有共同话题和兴趣的社交圈子。这样的系统旨在帮助用户拓宽社交范围&#xff0c;提升社交效率…

封装 element el-date-picker时间选择区间

基于el-date-picker 处理满足项目需求。&#xff08;&#xff1a;最多选择7天&#xff09; 效果&#xff1a; 1 大于当前时间的以后日期禁选。2 选中时间的前后七天可选 &#xff08;最多可查询7天数据&#xff09;3 <template><section class"warning-contai…

FPGA硬件架构——具体型号是xc7k325tffg676-2为例

1.共如下图14个时钟域&#xff0c;XmYn(按坐标理解) 2.IOB(IOB为可编程输入输出单元,当然在普通Bank上的IOB附近还有很多时钟资源&#xff0c;例如PLL&#xff0c;MMCM资源。), 2.1 FPGA的Bank分为HP Bank和HR Bank&#xff0c;二者对电压的要求范围不同&#xff0c;HR支持更大…

2023龙信杯wp

打了好像70多分&#xff0c;没拿奖&#xff0c;因为一些众所周知的原因&#xff0c;复盘间隔时间太长了没什么印象了已经 案情简介 2023年9月&#xff0c;某公安机关指挥中心接受害人报案:通过即时通讯工具添加认识一位叫“周微”的女人&#xff0c;两人谈论甚欢&#xff0c;确…

大语言模型推理提速:TensorRT-LLM 高性能推理实践

作者&#xff1a;顾静 TensorRT-LLM 如何提升 LLM 模型推理效率 大型语言模型&#xff08;Large language models,LLM&#xff09;是基于大量数据进行预训练的超大型深度学习模型。底层转换器是一组神经网络&#xff0c;这些神经网络由具有 self-attention 的编码器和解码器组…

HTTP动态代理的原理及其对网络性能的影响

HTTP动态代理是一种通过代理服务器来转发HTTP请求和响应数据的网络技术&#xff0c;它可以优化网络性能、提高网络安全性&#xff0c;并解决跨域请求的问题。本文将详细介绍HTTP动态代理的原理及其对网络性能的影响。 一、HTTP动态代理的原理 HTTP动态代理的基本原理是在客户…

【数据结构四】栈与Stack详解

目录 栈与Stack 1.实现一个自己的栈 2.Stack的基本使用 3.栈的一些oj题训练 4.栈&#xff0c;虚拟机栈&#xff0c;栈帧的区别 栈与Stack 栈 &#xff1a;一种特殊的线性表&#xff0c;其 只允许在固定的一端进行插入和删除元素操作 。进行数据插入和删除操作的一端称为栈顶…

opencv#34 边缘检测(二)

Laplacian(拉普拉斯)算子 前面介绍的Sobel算子和Scharr算子存在的问题: 1.要分别计算两个方向&#xff08;x,y)的边缘&#xff0c;之后将两方向的边缘进行叠加。 2.边缘与方向相关性较大。当我们通过Sobel算子提取x方向检测时&#xff0c;它所能够检测到的边缘都是一个沿着y…

差分进化算法求解基于移动边缘计算 (MEC) 的无线区块链网络的联合挖矿决策和资源分配(提供MATLAB代码)

一、优化模型介绍 在所研究的区块链网络中&#xff0c;优化的变量为&#xff1a;挖矿决策&#xff08;即 m&#xff09;和资源分配&#xff08;即 p 和 f&#xff09;&#xff0c;目标函数是使所有矿工的总利润最大化。问题可以表述为&#xff1a; max ⁡ m , p , f F miner …

gin中使用限流中间件

限流又称为流量控制&#xff08;流控&#xff09;&#xff0c;通常是指限制到达系统的并发请求数&#xff0c;本文列举了常见的限流策略&#xff0c;并以gin框架为例演示了如何为项目添加限流组件。 限流 限流又称为流量控制&#xff08;流控&#xff09;&#xff0c;通常是指…

如何在美国硅谷高防服务器上运行自定义的脚本和应用程序

在美国硅谷高防服务器上运行自定义的脚本和应用程序需要一定的技术和知识。下面我们将介绍一些关键步骤&#xff0c;帮助您顺利地在这些服务器上运行自定义应用程序和脚本。 确保您有对服务器的访问权限&#xff0c;并且已经通过SSH等方式连接到服务器。接下来&#xff0c;您可…

不就业,纯兴趣,应该自学C#还是JAVA?

不就业&#xff0c;纯兴趣&#xff0c;应该自学C#还是JAVA? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「JAVA的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff…

微信小程序(十四)分包和分包预加载

注释很详细&#xff0c;直接上代码 新增内容&#xff1a; 1.分包的配置 2.分包预加载的写法 先说说为什么需要分包&#xff1a; 小程序追求小而快&#xff0c;主包的大小控制是小程序上线的硬性要求&#xff0c;分包有利于小程序优化加载速度 分包的注意事项&#xff1a; 单个分…

网络原理-初识(1)

目录 网络发展史 独立模式 网络互连 局域网LAN 广域网WAN 网络通信基础 IP地址 概念 格式 端口 概念 格式 认识协议 概念 作用 五元组 网络发展史 独立模式 独立模式:计算机之间相互独立; 网络互连 随着时代的发展,越来越需要计算机之间相互通信,共享软件和数…

【AI的未来 - AI Agent系列】【MetaGPT】6. 用ActionNode重写技术文档助手

文章目录 0. 前置推荐阅读1. 重写WriteDirectory Action1.1 实现WriteDirectory的ActionNode&#xff1a;DIRECTORY_WRITE1.2 将 DIRECTORY_WRITE 包进 WriteDirectory中 2. 重写WriteContent Action2.1 思考重写方案2.2 实现WriteContent的ActionNode2.3 改写WriteContent Act…

UV紫外激光打标机的优缺点是什么

​ UV紫外激光打标机具有以下优点&#xff1a; 1. 精度高&#xff1a;紫外激光打标机的光束质量好&#xff0c;聚焦光斑小&#xff0c;可以实现在各种材料上进行超精细打标。 2. 速度快&#xff1a;由于紫外激光的独特特性&#xff0c;打标速度非常快&#xff0c;提高了生产效…

冷链温湿度监控解决方案,实时监测,助力运输安全

为了确保药品、生鲜等在冷链运输过程中的安全监管,需要对冷链、仓库等环节的温湿度信息进行实时自动检测和记录&#xff0c;有效防范储运过程中可能影响产品质量安全的各类风险&#xff0c;确保储存和运输过程的产品质量。 冷链温湿度监控系统解决方案&#xff0c;利用智能温湿…