【SpringCloud】Eureka源码解析 上

e246a1dda09849a5a89535a62441565d.png

Eureka是一个服务发现与注册组件,它包含服务端和客户端,服务端管理服务的注册信息,客户端简化服务实例与服务端的交互。我们结合源码来分析下eureka组件的实现原理,内容分为上下两章,第一章分析eureka的服务注册,第二章分析eureka的心跳机制,本章节是第一章。

参考源码:<spring-cloud.version>Hoxton.SR9</spring-cloud.version>

往期系列:

【SpringBoot】SpringBoot源码解析第一章 SpringBoot的构造方法-CSDN博客

【SpringBoot】SpringBoot源码解析第二章 SpringBoot的run方法-CSDN博客

【SpringBoot】SpringBoot源码解析第三章 SpringBoot的自动化配置-CSDN博客

【SpringBoot】SpringBoot源码解析第四章 SpringBoot的bean接口-CSDN博客

【SpringBoot】SpringBoot源码解析第五章 SpringBoot的beanDefinition收集过程-CSDN博客

【SpringBoot】SpringBoot源码解析第六章 SpringBoot的getBean方法-CSDN博客

【SpringBoot】SpringBoot源码解析第七章 SpringBoot的感悟-CSDN博客

1、注册服务

1.1 服务端接收注册信息

spring-cloud-netflix-eureka-server依赖包下有一个spring.factories文件,文件内容如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

根据springboot自动配置的原理可知,EurekaServerAutoConfiguration会被标记成了一个自动配置类。EurekaServerAutoConfiguration配置类中有一个jerseyApplication方法,这个方法会收集指定包下被Path或Provider注解标记的类的beanDefinition,这些类可以看作是Controller

// 扫描包路径
private static final String[] EUREKA_PACKAGES = new String[]{"com.netflix.discovery", "com.netflix.eureka"};    // 收集包下指定类的beanDefinition,放入application对象
@Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);// 收集的对象要求被Path或Provider注解标记provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));String[] var5 = EUREKA_PACKAGES;int var6 = var5.length;for(int var7 = 0; var7 < var6; ++var7) {String basePackage = var5[var7];// 扫描包路径,收集beanDefinitionSet<BeanDefinition> beans = provider.findCandidateComponents(basePackage);Iterator var10 = beans.iterator();while(var10.hasNext()) {BeanDefinition bd = (BeanDefinition)var10.next();Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());classes.add(cls);}}...return rc;
}// 获取到application,将beanDefinition置入servlet容器
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(Application eurekaJerseyApp) {FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();bean.setFilter(new ServletContainer(eurekaJerseyApp));bean.setOrder(Integer.MAX_VALUE);bean.setUrlPatterns(Collections.singletonList("/eureka/*"));return bean;
}

收集的beanDefinition会通过jerseyFilterRegistration方法放入servlet容器,这样接收请求时就能通过url映射给指定的bean来处理请求

com.netflix.eureka包下被扫描的类如下:

ApplicationResource类是Controller中的一员,它有一个addInstance方法,这个方法就是服务端响应服务注册的方法

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {...// 执行注册this.registry.register(info, "true".equals(isReplication));return Response.status(204).build();
}
调用链:
-> ApplicationResource.addInstance
-> InstanceRegistry.register
-> PeerAwareInstanceRegistryImpl.register
-> AbstractInstanceRegistry.register

服务端使用currentHashMap来存储服务的信息,服务端响应注册的过程较为简单

// 用currentHashMap存储服务信息 
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry= new ConcurrentHashMap();    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication)   
{...Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 将服务信息放入map中((Map)gMap).put(registrant.getId(), lease);...
}

1.2 客户端发送注册信息
1.2.1 client客户端

spring-cloud-netflix-eureka-client依赖包下也有一个spring.factories文件,文件内容如下

...
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
...

EurekaClientAutoConfiguration被标记成自动配置类,它里面有一个创建EurekaClient类对象的bean方法,看类的名称我们知道这是一个客户端

@Bean(destroyMethod = "shutdown"
)
@ConditionalOnMissingBean(value = {EurekaClient.class},search = SearchStrategy.CURRENT
)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {ApplicationInfoManager appManager;if (AopUtils.isAopProxy(manager)) {appManager = (ApplicationInfoManager)ProxyUtils.getTargetObject(manager);} else {appManager = manager;}// 创建客户端CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);cloudEurekaClient.registerHealthCheck(healthCheckHandler);return cloudEurekaClient;
}
调用链:
-> EurekaAutoServiceRegistration.eurekaClient
-> new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
-> CloudEurekaClient.super(applicationInfoManager, config, args);
-> DiscoveryClient.DiscoveryClient... 构造方法重载-> DiscoveryClient.DiscoveryClient
-> initScheduledTasks

跟踪EurekaClient类的构造方法找到DiscoveryClient类,DiscoveryClient类的构造方法调用了initScheduledTasks方法,初始化了一个定时任务

private void initScheduledTasks() {...// 添加状态变更监听器this.statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {public String getId() {return "statusChangeListener";}public void notify(StatusChangeEvent statusChangeEvent) {if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {DiscoveryClient.logger.error("Saw local status change event {}", statusChangeEvent);} else {DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);}// 监听器被通知后调用onDemandUpdate方法DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();}};...}

定时任务内添加了一个状态修改监听器,监听器调用notify方法时会回调onDemandUpdate方法,追踪这个回调方法

调用链:
-> InstanceInfoReplicator.onDemandUpdate
-> InstanceInfoReplicator.this.run
-> this.discoveryClient.register
-> this.eurekaTransport.registrationClient.register(this.instanceInfo)
-> AbstractJerseyEurekaHttpClient.register

进入到AbstractJerseyEurekaHttpClient类的register方法

public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;EurekaHttpResponse var5;try {// 向注册中心发送http请求WebResource.Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();this.addExtraHeaders(resourceBuilder);response = (ClientResponse)((WebResource.Builder)((WebResource.Builder)((WebResource.Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});}if (response != null) {response.close();}}return var5;
}

注册方法写得很直白了:客户端拿到注册中心地址,然后携带服务元数据,发送请求完成注册。不过还有一个问题,之前我们提到定时任务内初始化了一个监听器,这个监听器只有被通知了才会执行后续的注册方法,那么监听器是如何被通知的?它的触发时机又在何时?

1.2.2 监听器

EurekaClientAutoConfiguration配置类还有一个创建EurekaAutoServiceRegistration类的bean方法

// 创建服务注册客户端 
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
@ConditionalOnProperty(value = {"spring.cloud.service-registry.auto-registration.enabled"},matchIfMissing = true
)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {return new EurekaAutoServiceRegistration(context, registry, registration);
}

EurekaAutoServiceRegistration类实现了SmartLifecycle接口。当spring容器加载完所有bean后会调用SmartLifeCycle接口实现类的start方法,start方法调用EurekaServiceRegistry类的regiser方法

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener 
{public void start() {if (this.port.get() != 0) {if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());}if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());}}if (!this.running.get() && this.registration.getNonSecurePort() > 0) {// 调用EurekaServiceRegistry的regiserthis.serviceRegistry.register(this.registration);this.context.publishEvent(new InstanceRegisteredEvent(this, this.registration.getInstanceConfig()));this.running.set(true);}}
}

EurekaServiceRegistry类的regiser方法会设置实例的状态。进入ApplicationInfoManager类的setInstanceStatus方法

// 设置实例状态        
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

setInstanceStatus方法触发了一个状态修改事件,并且通知了监听器

public synchronized void setInstanceStatus(InstanceInfo.InstanceStatus status) {InstanceInfo.InstanceStatus next = this.instanceStatusMapper.map(status);if (next != null) {InstanceInfo.InstanceStatus prev = this.instanceInfo.setStatus(next);if (prev != null) {Iterator var4 = this.listeners.values().iterator();while(var4.hasNext()) {StatusChangeListener listener = (StatusChangeListener)var4.next();try {// 通知监听器listener.notify(new StatusChangeEvent(prev, next));} catch (Exception var7) {logger.warn("failed to notify listener: {}", listener.getId(), var7);}}}}
}

这里的监听器和上面提到的状态修改监听器其实是同一个监听器,在调用EurekaAutoServiceRegistration对象的start方法后,监听器会收到通知然后调用客户端的register方法,这就是发送注册服务请求的执行时机

2、拉取服务

2.1 初次拉取

客户端第一次拉取服务和DiscoveryClient类的构造方法有关,详情如下:

@Inject
DiscoveryClient(...){...// 调用fetchRegistry方法,拉取服务boolean primaryFetchRegistryResult = this.fetchRegistry(false);if (!primaryFetchRegistryResult) {logger.info("Initial registry fetch from primary servers failed");}...
}  private boolean fetchRegistry(boolean forceFullRegistryFetch) {...// 调用getAndStoreFullRegistry方法,拉取全部服务this.getAndStoreFullRegistry();...
}private void getAndStoreFullRegistry() throws Throwable {...long currentUpdateGeneration = this.fetchRegistryGeneration.get();// 启动时会打印这行日志logger.info("Getting all instance registry info from the eureka server");Applications apps = null;// 发送http请求EurekaHttpResponse<Applications> httpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null ? this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = (Applications)httpResponse.getEntity();}...
}
2.2 定时拉取

为了保证服务信息真实可信,客户端会定时拉取远程注册列表更新本地数据。提到到定时任务,自然的联想到DiscoveryClient类的initScheduledTasks方法(1.2.1的内容)

private void initScheduledTasks() {int renewalIntervalInSecs;int expBackOffBound;if (this.clientConfig.shouldFetchRegistry()) {renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();// 定时刷新本地服务列表任务,具体任务在CacheRefreshThread内this.cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new // 执行任务的线程CacheRefreshThread());this.scheduler.schedule(this.cacheRefreshTask, (long)renewalIntervalInSecs, TimeUnit.SECONDS);}
}class CacheRefreshThread implements Runnable {CacheRefreshThread() {}public void run() {// 刷新服务列表DiscoveryClient.this.refreshRegistry();}
}@VisibleForTesting
void refreshRegistry() {...// 获取服务列表boolean success = this.fetchRegistry(remoteRegionsModified);...
}

3、总结

eureka服务端启动后通过自动配置加载com.netflix.eureka包下的处理器,处理器会响应注册、拉取、剔除服务等http请求

eureka客户端启动后会发送注册请求,并定时更新服务列表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/38487.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu apt命令 出现红色弹框 Daemons using outdated libraries

1. 弹框没截图&#xff0c;是因为ubuntu22.04一个新特性导致的&#xff0c;由 needrestart 命令触发&#xff0c;默认情况是交互性质的&#xff0c;也就是会中断在这里需要手动要处理提示。 2. 修改/etc/needrestart/needrestart.conf 文件&#xff0c;将 #$nrconf{restart} …

【区分vue2和vue3下的element UI PageHeader 页头组件,分别详细介绍属性,事件,方法如何使用,并举例】

在 Vue 2 中&#xff0c;Element UI 并没有一个直接命名为 PageHeader 的组件。然而&#xff0c;你可能是在寻找一种方式来创建自定义的页头&#xff08;Page Header&#xff09;&#xff0c;这通常是通过组合 Element UI 的其他组件&#xff08;如 el-header、el-row、el-col、…

大数据开发中如何计算用户留存及SQL示例

在大数据开发领域&#xff0c;用户留存是一个关键指标&#xff0c;它反映了产品吸引并保留用户的能力。 留存率的计算不仅有助于评估产品的健康状况&#xff0c;还能为产品优化和市场策略提供重要依据。 本文将详细介绍如何在大数据开发中计算用户留存&#xff0c;并附带具体…

gpu是什么?

GPU&#xff08;Graphics Processing Unit&#xff0c;图形处理单元&#xff09;是一种专门在个人电脑、工作站、游戏机以及一些移动设备&#xff08;如平板电脑、智能手机等&#xff09;上进行图像和图形相关运算工作的微处理器。以下是关于GPU的详细解释&#xff1a; 1. **定…

精密空气加热器负载组

小型便携式 &#xff1a;精密空气加热器&#xff08;负载组&#xff09;能够对数据中心热通道/冷通道冷却系统进行全面测试。EAK 是一款 19 英寸机架式设备&#xff08;10U 高&#xff09;&#xff0c;可轻松安装到各种标准服务器机架中。通过集成可调节的热量水平&#xff08;…

决策树算法介绍:原理与案例实现以及Python、R、Java、 MATLAB中使用

决策树&#xff08;Decision Tree&#xff09;是一种常用的机器学习算法&#xff0c;适用于分类和回归任务。它通过一系列的二分决策将数据逐步划分成不同的子集&#xff0c;直到每个子集中的数据点具有较高的同质性。下面介绍决策树的基本原理&#xff0c;并通过Python实现一个…

C++ :lambda表达式

目录 lambda表达式书写格式&#xff1a; lambda表达式各部分说明&#xff1a; lambda的使用示范&#xff1a; 注意事项&#xff1a; 返回值类型可以省略&#xff0c;参数也可也省略&#xff1a; sort内部也可以直接写lambda表达式&#xff1a; 排序时利用lambda进行排序…

STM32MP135裸机编程:使用软件触发硬件复位

0 参考资料 STM32MP13xx参考手册.pdf 1 使用寄存器实现软件复位 1.1 复位电路概述 重点关注下面标红的路线&#xff1a; 通过这条路线可以清楚看到&#xff0c;我们可以通过设置RCC_MP_GRSTCSETR寄存器让RPCTL&#xff08;复位脉冲控制器&#xff09;给NRST&#xff08;硬件复…

苹果可能与谷歌合作推AI订阅服务;全国首个司法审判大模型在深圳上线

&#x1f989; AI新闻 &#x1f680; 苹果可能与谷歌合作推AI订阅服务 摘要&#xff1a;苹果宣布将与OpenAI合作推出Apple Intelligence&#xff0c;并有望在今年秋季与谷歌达成合作&#xff0c;接入Gemini。Meta的Llama因质量不佳被拒。苹果计划推出订阅模式的智能功能服务&…

Oracle PL / SQL 存储过程

PL / SQL存储过程不返回值。他们执行他们的指示并返回。您不能在赋值语句&#xff08;如函数&#xff09;的右侧使用存储过程。 创建存储过程 以下代码是一个非常简单的存储过程示例。 它基于SYS.DBMS_LOCK包的程序sleep&#xff08;数量&#xff09;。 此存储过程将停止执行…

在 C++的跨平台开发中,如何处理不同操作系统和编译器之间的细微差异,以确保程序能够稳定且高效地运行?

在 C 的跨平台开发中&#xff0c;处理不同操作系统和编译器之间的细微差异是非常重要的。以下是一些处理差异的技巧&#xff1a; 使用条件编译&#xff1a;使用预处理指令&#xff0c;根据不同的操作系统和编译器来编写不同的代码。 #if defined(_WIN32)// Windows 特定代码 …

kafka的工作原理与常见问题

定义 kafka是一个分布式的基于发布/订阅模式的消息队列&#xff08;message queue&#xff09;&#xff0c;主要应用于大数据的实时处理领域 消息队列工作原理 kafka的组成结构 kafka的基础架构主要有broker、生产者、消费者组构成&#xff0c;还包括zookeeper. 生产者负责发送…

算法09 日期相关模拟算法【C++实现】

这是《C算法宝典》算法篇的第09节文章啦~ 如果你之前没有太多C基础&#xff0c;请点击&#x1f449;专栏&#xff1a;C语法入门&#xff0c;如果你C语法基础已经炉火纯青&#xff0c;则可以进阶算法&#x1f449;专栏&#xff1a;算法知识和数据结构&#x1f449;专栏&#xff…

计算斜率,判断斜率

#include <stdio.h> #include <stdlib.h> #include <math.h> #include <stdbool.h>// 定义常量 #define LOW_COOK_WINDOW_SIZE 20 // 滑动窗口大小&#xff0c;10个样本点&#xff08;10秒&#xff09; #define LOW_COOK_SLOPE…

Java代码生成器(开源版本)

一、在线地址 Java在线代码生成器&#xff1a;在线访问 二、页面截图 三、核心功能 支持Mybatis、MybatisPlus、Jpa代码生成使用 antlr4 解析SQL语句&#xff0c;保证了SQL解析的成功率支持自定义包名、作者名信息支持自定义方法名、接口地址支持自定义选择是否生成某个方法…

16-Python Pandas聚合函数

Python Pandas聚合函数 窗口函数可以与聚合函数一起使用&#xff0c;聚合函数指的是对一组数据求总和、最大值、最小值以及平均值的操作。 应用聚合函数 首先让我们创建一个 DataFrame 对象&#xff0c;然后对聚合函数进行应用。 import pandas as pd import numpy as np d…

SQL中的子查询和CTE(with ....as..)

第一次看到with as 这种类似于python中读文件的写法还是挺疑惑的&#xff0c;其实它是CTE&#xff0c;功能和子查询很类似但又有不同点&#xff0c;在实际应用场景中具有着独特作用。 子查询 子查询是在主查询中的嵌套查询&#xff0c;可以出现在SELECT、FROM、WHERE等子句中…

ai除安卓手机版APP软件一键操作自动渲染去擦消稀缺资源下载

安卓手机版&#xff1a;点击下载 苹果手机版&#xff1a;点击下载 电脑版&#xff08;支持Mac和Windows&#xff09;&#xff1a;点击下载 一款全新的AI除安卓手机版APP&#xff0c;一键操作&#xff0c;轻松实现自动渲染和去擦消效果&#xff0c;稀缺资源下载 1、一键操作&…

数学建模(1):期末大乱炖

1 概述&#xff01;&#xff01; 1.1 原型和模型 原型&#xff1a;客观存在的研究对象称为原型&#xff0c;也称为“系统”、“过程”。 机械系统、电力系统、化学反应过程、生产销售过程等都是原型&#xff1b; 研究原型的结构和原理&#xff0c; 从而进行优化、预测、评价…

Perl编程艺术:深入探索Tie机制的魔力

&#x1f31f; Perl编程艺术&#xff1a;深入探索Tie机制的魔力 在Perl的世界里&#xff0c;tie功能是一种极其强大的特性&#xff0c;它允许程序员将变量绑定到一个对象上&#xff0c;从而改变这个变量的默认行为。这种机制为变量提供了一种代理访问方式&#xff0c;使得变量…