源码深度剖析Eureka与Ribbon服务发现原理

本文基于 spring cloud dalston,同时文章较长,请选择舒服姿势进行阅读。

Eureka 与 Ribbon 是什么?和服务发现什么关系?

Eureka 与 Ribbon 都是 Netflix 提供的微服务组件,分别用于服务注册与发现、负载均衡。同时,这两者均属于 spring cloud netflix 体系,和 spring cloud 无缝集成,也正由于此被大家所熟知。

Eureka 本身是服务注册发现组件,实现了完整的 Service Registry 和 Service Discovery。

Ribbon 则是一款负载均衡组件,那它和服务发现又有什么关系呢?负载均衡在整个微服务的调用模型中是紧挨着服务发现的,而 Ribbon 这个框架它其实是起到了开发者服务消费行为与底层服务发现组件 Eureka 之间桥梁的作用。从严格概念上说 Ribbon 并不是做服务发现的,但是由于 Netflix 组件的松耦合,Ribbon 需要对 Eureka 的缓存服务列表进行类似"服务发现"的行为,从而构建自己的负载均衡列表并及时更新,也就是说 Ribbon 中的"服务发现"的宾语变成了 Eureka(或其他服务发现组件)。

Eureka 的服务注册与发现

我们会先对 Eureka 的服务发现进行描述,重点是 Eureka-client 是如何进行服务的注册与发现的,同时不会过多停留于 Eureka 的架构、Eureka-server 的实现、Zone/Region 等范畴。

Eureka-client 的服务发现都是由 DiscoveryClient 类实现的,它主要包括的功能有:

  • 向 Eureka-server 注册服务实例
  • 更新在 Eureka-server 的租期
  • 取消在 Eureka-server 的租约(服务下线)
  • 发现服务实例并定期更新

服务注册

DiscoveryClient 所有的定时任务都是在 initScheduledTasks()方法里,我们可以看到以下关键代码:

private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());}
}

我们可以看到在 if 判断分支里创建了一个 instanceInfoReplicator 实例,它会通过 start 执行一个定时任务:

public void run() {try {discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

我们可以在 InstanceInfoReplicator 类的 run()方法中找到这一段,同时可以一眼发现其注册关键点在于discoveryClient.register()这段,我们点进去看看:

boolean register() throws Throwable {logger.info(PREFIX + appPathIdentifier + ": registering service...");EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == 204;}

这边可以发现是通过 HTTP REST (jersey 客户端)请求的方式将 instanceInfo 实例信息注册到 Eureka-server 上。我们简单看一下 InstanceInfo 对象,属性基本上都能见名知义:

@JsonCreatorpublic InstanceInfo(@JsonProperty("instanceId") String instanceId,@JsonProperty("app") String appName,@JsonProperty("appGroupName") String appGroupName,@JsonProperty("ipAddr") String ipAddr,@JsonProperty("sid") String sid,@JsonProperty("port") PortWrapper port,@JsonProperty("securePort") PortWrapper securePort,@JsonProperty("homePageUrl") String homePageUrl,@JsonProperty("statusPageUrl") String statusPageUrl,@JsonProperty("healthCheckUrl") String healthCheckUrl,@JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl,@JsonProperty("vipAddress") String vipAddress,@JsonProperty("secureVipAddress") String secureVipAddress,@JsonProperty("countryId") int countryId,@JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo,@JsonProperty("hostName") String hostName,@JsonProperty("status") InstanceStatus status,@JsonProperty("overriddenstatus") InstanceStatus overriddenstatus,@JsonProperty("leaseInfo") LeaseInfo leaseInfo,@JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer,@JsonProperty("metadata") HashMap<String, String> metadata,@JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp,@JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp,@JsonProperty("actionType") ActionType actionType,@JsonProperty("asgName") String asgName) {this.instanceId = instanceId;this.sid = sid;this.appName = StringCache.intern(appName);this.appGroupName = StringCache.intern(appGroupName);this.ipAddr = ipAddr;this.port = port == null ? 0 : port.getPort();this.isUnsecurePortEnabled = port != null && port.isEnabled();this.securePort = securePort == null ? 0 : securePort.getPort();this.isSecurePortEnabled = securePort != null && securePort.isEnabled();this.homePageUrl = homePageUrl;this.statusPageUrl = statusPageUrl;this.healthCheckUrl = healthCheckUrl;this.secureHealthCheckUrl = secureHealthCheckUrl;this.vipAddress = StringCache.intern(vipAddress);this.secureVipAddress = StringCache.intern(secureVipAddress);this.countryId = countryId;this.dataCenterInfo = dataCenterInfo;this.hostName = hostName;this.status = status;this.overriddenstatus = overriddenstatus;this.leaseInfo = leaseInfo;this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer;this.lastUpdatedTimestamp = lastUpdatedTimestamp;this.lastDirtyTimestamp = lastDirtyTimestamp;this.actionType = actionType;this.asgName = StringCache.intern(asgName);// ---------------------------------------------------------------// for compatibilityif (metadata == null) {this.metadata = Collections.emptyMap();} else if (metadata.size() == 1) {this.metadata = removeMetadataMapLegacyValues(metadata);} else {this.metadata = metadata;}if (sid == null) {this.sid = SID_DEFAULT;}}

总结一下整个过程如下:

服务续期

服务续期说起来可能比较晦涩,其实就是在 client 端定时发起调用,让 Eureka-server 知道自己还活着,在 eureka 代码中的注释解释为心跳(heart-beat)。

这里有两个比较重要的配置需要注意:

  • instance.leaseRenewalIntervalInSeconds
    表示客户端的更新频率,默认 30s,也就是每 30s 就会向 Eureka-server 发起 renew 更新操作。
  • instance.leaseExpirationDurationInSeconds
    这是服务端视角的失效时间,默认是 90s,也就是 Eureka-server 在 90s 内没有接收到来自 client 的 renew 操作就会将其剔除。

我们直接从代码角度看一下,同样呢相关定时任务在 initScheduledTasks()方法中:

private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);...}
}

可以看到这里创建了一个 HeartbeatThread()线程执行操作:

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}

我们直接看 renew()方法:

    boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());return register();}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);return false;}}

这里比较简单,可以发现和服务注册是类似的,同样使用 HTTP REST 发起一个 hearbeat 请求,底层使用 jersey 客户端。
总结一下整个过程如下:

服务注销

服务注销逻辑比较简单,本身并不在定时任务中触发,而是通过对方法标记@PreDestroy,从而调用 shutdown 方法触发,最终会调用 unRegister()方法进行注销,同样的这也是一个 HTTP REST 请求,可以简单看下代码:

    @PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}}/*** unregister w/ the eureka service.*/void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + appPathIdentifier + " - deregister  status: " + httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);}}}

服务发现及更新

我们来看作为服务消费者的关键逻辑,即发现服务以及更新服务。

首先 consumer 会在启动时从 Eureka-server 获取所有的服务列表,并在本地缓存。同时呢,由于本地有一份缓存,所以需要定期更新,更新频率可以配置。

启动时候在 consumer 在 discoveryClient 中会调用 fetchRegistry() 方法:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {...if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{...getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);}...
}

这里可以看到 fetchRegistry 里有 2 个判断分支,对应首次更新以及后续更新。首次更新会调用 getAndStoreFullRegistry()方法,我们看一下:

 private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();logger.info("Getting all instance registry info from the eureka server");Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}logger.info("The response status is {}", httpResponse.getStatusCode());if (apps == null) {logger.error("The application is null for some reason. Not storing this information");} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());} else {logger.warn("Not updating applications as another thread is updating it already");}}

可以看到和之前类似,如果在没有特殊指定的情况下,我们会发起一个 HTTP REST 请求拉取所有应用的信息并进行缓存,缓存对象为 Applications,有兴趣的可以进一步查看。

接下来,在我们熟悉的 initScheduledTasks()方法中,我们还会启动一个更新应用信息缓存的 task:

private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}...
}

在 CacheRefreshThread()这个 task 的 run 方法中,仍然会调用到我们之前的 fetchRegistry()方法,同时在判断时会走到另一个分支中,即调用到 getAndUpdateDelta()方法:

private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();}if (delta == null) {logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+ "Hence got the full registry.");getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());String reconcileHashCode = "";if (fetchRegistryUpdateLock.tryLock()) {try {updateDelta(delta);reconcileHashCode = getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");}// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall}} else {logger.warn("Not updating application delta as another thread is updating it already");logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());}}

可以看到,这边是使用 HTTP REST 发起一个 getDelta 请求,同时在 updateDelta()方法中会更新本地的 Applications 缓存对象。

总结一下,整个服务发现与更新的过程如下:

Ribbon 的"服务发现"

接下来我们来看看 Ribbon 是怎么基于 Eureka 进行"服务发现"的,我们之前说过这里的"服务发现"并不是严格意义上的服务发现,而是 Ribbon 如何基于 Eureka 构建自己的负载均衡列表并及时更新,同时我们也不关注 Ribbon 其他负载均衡的具体逻辑(包括 IRule 路由,IPing 判断可用性)。

我们可以先做一些猜想,首先 Ribbon 肯定是基于 Eureka 的服务发现的。我们上边描述了 Eureka 会拉取所有服务信息到本地缓存 Applications 中,那么 Ribbon 肯定是基于这个 Applications 缓存来构建负载均衡列表的了,同时呢,负载均衡列表同样需要一个定时更新的机制来保证一致性。

服务调用

首先我们从开发者的最初使用上看,在开发者在 RestTemplate 上开启@LoadBalanced 注解就可开启 Ribbon 的逻辑了,显然这是用了类似拦截的方法。在 LoadBalancerAutoConfiguration 类中,我们可以看到相关代码:

...
@Beanpublic SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {return new SmartInitializingSingleton() {@Overridepublic void afterSingletonsInstantiated() {for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {for (RestTemplateCustomizer customizer : customizers) {customizer.customize(restTemplate);}}}};}@Configuration@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")static class LoadBalancerInterceptorConfig {@Beanpublic LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient,LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);}@Bean@ConditionalOnMissingBeanpublic RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return new RestTemplateCustomizer() {@Overridepublic void customize(RestTemplate restTemplate) {List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);}};}}
...

可以看到,在初始化的过程中通过调用 customize()方法来给 RestTemplate 增加了拦截器 LoadBalancerInterceptor。而 LoadBalancerInterceptor 则是在拦截方法中使用了 loadBalancer(RibbonLoadBalancerClient 类) 完成请求调用:

@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}

服务发现

到现在为止呢,我们的请求调用已经被 RibbonLoadBalancerClient 所封装,而其"服务发现"也是发生在 RibbonLoadBalancerClient 中的。

我们点到其 execute()方法中:

@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);Server server = getServer(loadBalancer);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);}RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,serviceId), serverIntrospector(serviceId).getMetadata(server));return execute(serviceId, ribbonServer, request);}

这里根据 serviceId 构建了一个 ILoadBalancer,同时从 loadBalancer 中获取到了最终的实例 server 信息。ILoadBalancer 是定义了负载均衡的一个接口,它的关键方法 chooseServer()即是从负载均衡列表根据路由规则中选取一个 server。当然我们主要关心的点在于,负载均衡列表是怎么构建出来的。通过源码跟踪我们发现,在通过 getLoadBalancer()方法构建好 ILoadBalancer 对象后,对象中就已经包含了服务列表。所以我们来看看 ILoadBalancer 对象是怎么创建的:

	protected ILoadBalancer getLoadBalancer(String serviceId) {return this.clientFactory.getLoadBalancer(serviceId);}

那么这里其实是 springcloud 封装的 clientFactory,它会在 applicationContext 容器中寻找对应的 bean 。

通过源码追踪,我们可以在自动配置类 RibbonClientConfiguration 中找到对应代码:

	@Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config,ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);}return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}

我们看到这里最终构建了 ILoadBalancer,其实现类是 ZoneAwareLoadBalancer,我们观察其超类的初始化:

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,ServerList<T> serverList, ServerListFilter<T> filter,ServerListUpdater serverListUpdater) {super(clientConfig, rule, ping);this.serverListImpl = serverList;this.filter = filter;this.serverListUpdater = serverListUpdater;if (filter instanceof AbstractServerListFilter) {((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());}restOfInit(clientConfig);}

这边最终执行了 restOfInit()方法,进一步跟踪:

    void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()this.setEnablePrimingConnections(false);enableAndInitLearnNewServersFeature();updateListOfServers();if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections().primeConnections(getReachableServers());}this.setEnablePrimingConnections(primeConnection);LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());}

updateListOfServers()方法是获取所有的 ServerList 的,最终由 serverListImpl.getUpdatedListOfServers()获取所有的服务列表,在此 serverListImpl 即实现类为 DiscoveryEnabledNIWSServerList。其中 DiscoveryEnabledNIWSServerList 有 getInitialListOfServers()和 getUpdatedListOfServers()方法,具体代码如下

    @Overridepublic List<DiscoveryEnabledServer> getInitialListOfServers(){return obtainServersViaDiscovery();}@Overridepublic List<DiscoveryEnabledServer> getUpdatedListOfServers(){return obtainServersViaDiscovery();}

此时我们查看 obtainServersViaDiscovery()方法,已经基本接近于事物本质了,它创建了一个 EurekaClient 对象,在此就是 Eureka 的 DiscoveryClient 实现类,调用了其 getInstancesByVipAddress()方法,它最终从 DiscoveryClient 的 Applications 缓存中根据 serviceId 选取了对应的服务信息:

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {logger.warn("EurekaClient has not been initialized yet, returning an empty list");return new ArrayList<DiscoveryEnabledServer>();}EurekaClient eurekaClient = eurekaClientProvider.get();if (vipAddresses!=null){for (String vipAddress : vipAddresses.split(",")) {// if targetRegion is null, it will be interpreted as the same region of clientList<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);for (InstanceInfo ii : listOfInstanceInfo) {if (ii.getStatus().equals(InstanceStatus.UP)) {if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);}// copy is necessary since the InstanceInfo builder just uses the original reference,// and we don't want to corrupt the global eureka copy of the object which may be// used by other clients in our systemInstanceInfo copy = new InstanceInfo(ii);if(isSecure){ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();}}DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);des.setZone(DiscoveryClient.getZone(ii));serverList.add(des);}}if (serverList.size()>0 && prioritizeVipAddressBasedServers){break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers}}}return serverList;}

服务更新

我们已经知道初次启动时,Ribbon 是怎么结合 Eureka 完成负载均衡列表的构建了,那么与 Eureka 类似,我们还需要及时对服务列表进行更新以保证一致性。

在 RibbonClientConfiguration 自动配置类中构建 ILoadBalancer 时我们可以看到其构造器中有 ServerListUpdater 对象,而此对象也是在当前类中构建的:

	@Bean@ConditionalOnMissingBeanpublic ServerListUpdater ribbonServerListUpdater(IClientConfig config) {return new PollingServerListUpdater(config);}

我们观察此对象中的 start()方法看是如何完成更新的:

@Overridepublic synchronized void start(final UpdateAction updateAction) {if (isActive.compareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {@Overridepublic void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);}return;}try {updateAction.doUpdate();lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);}}};scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);} else {logger.info("Already active, no-op");}}

这里有 2 个配置,即 initialDelayMs 首次检测默认 1s,refreshIntervalMs 检测间隔默认 30s(和 Eureka 一致),创建了一个定时任务,执行 updateAction.doUpdate()方法。

我们回到之前的 restOfInit()方法,查看其中的 enableAndInitLearnNewServersFeature()方法,可以看到是在此处触发了 ServerListUpdater 的 start 方法,同时传入了 updateAction 对象:

    public void enableAndInitLearnNewServersFeature() {LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());serverListUpdater.start(updateAction);}

其实 updateAction 一开始就已经创建好了,它仍然是调用 之前的 updateListOfServers 方法来进行后续的更新:

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {@Overridepublic void doUpdate() {updateListOfServers();}};

总结一下 Ribbon 三部分服务发现的整体流程如下:

参考资料

深度剖析服务发现组件 Netflix Eureka
深入理解 Ribbon 之源码解析


---------------------
作者:YouluBank
来源:CSDN
原文:https://blog.csdn.net/m0_37787662/article/details/109286790
版权声明:本文为作者原创文章,转载请附上博文链接!
内容解析By:CSDN,CNBLOG博客文章一键转载插件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/283980.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

std的find和reverse_iterator联合使用

上代码&#xff1a; // test2013.cpp : 定义控制台应用程序的入口点。 //#include "stdafx.h" #include <stdlib.h> #include <stdio.h> #include<iostream> #include<vector> #include<map> #include<string> using namespace …

论如何提升学习的能力

为啥要学习如果有一件事情是能改变你自己的&#xff0c;我想这件事情必然就是学习&#xff0c;我的人生重要的转折点也是从学习这件事情始发的&#xff0c;那么&#xff0c;我们就从这里开始。学习不仅仅是为了找到答案&#xff0c;而是为了找到方法&#xff0c;找到一个可以找…

CSS布局解决方案(终结版)

前端布局非常重要的一环就是页面框架的搭建&#xff0c;也是最基础的一环。在页面框架的搭建之中&#xff0c;又有居中布局、多列布局以及全局布局&#xff0c;今天我们就来总结总结前端干货中的CSS布局。 居中布局 水平居中 1&#xff09;使用inline-blocktext-align&#xff…

基于ABP和Magicodes实现Excel导出操作

前端使用的vue-element-admin框架&#xff0c;后端使用ABP框架&#xff0c;Excel导出使用的Magicodes.IE.Excel.Abp库。Excel导入和导出操作几乎一样&#xff0c;不再介绍。文本主要介绍Excel导出操作和过程中遇到的坑&#xff0c;主要是Excel文件导出后无法打开的问题。一.Mag…

消息模式在实际开发应用中的优势

曾经.NET面试过程中经常问的一个问题是&#xff0c;如果程序集A&#xff0c;引用B &#xff0c;B 引用C&#xff0c;那么C怎么去访问A中的方法呢。 这个问题初学.net可能一时想不出该咋处理&#xff0c;这涉及到循环引用问题。但有点经验的可能就简单了&#xff0c;通过委托的方…

微服务:注册中心ZooKeeper、Eureka、Consul 、Nacos对比

前言 服务注册中心本质上是为了解耦服务提供者和服务消费者。对于任何一个微服务&#xff0c;原则上都应存在或者支持多个提供者&#xff0c;这是由微服务的分布式属性决定的。更进一步&#xff0c;为了支持弹性扩缩容特性&#xff0c;一个微服务的提供者的数量和分布往往是动…

为了高性能、超大规模的模型训练,这个组合“出道”了

点击上方蓝字关注我们&#xff08;本文阅读时间&#xff1a;3分钟)近年来&#xff0c;在大量数据上训练的基于 transformer 的大规模深度学习模型在多项认知任务中取得了很好的成果&#xff0c;并且被使用到一些新产品和功能背后&#xff0c;进一步增强了人类的能力。在过去五年…

SpringCloud必会知识点大全

为什么要学习Spring Cloud 在项目开发中随着业务越来越多&#xff0c;导致功能之间耦合性高、开发效率低、系统运行缓慢难以维护、不稳定。微服务 架构可以解决这些问题&#xff0c;而Spring Cloud是微服务架构最流行的实现. 1.微服务 微服务架构是使用一套小服务来开发单个应用…

30分钟掌握 C#7

1. out 变量&#xff08;out variables&#xff09; 以前我们使用out变量必须在使用前进行声明&#xff0c;C# 7.0 给我们提供了一种更简洁的语法 “使用时进行内联声明” 。如下所示&#xff1a; 1 var input ReadLine(); 2 if (int.TryParse(input, out var result)) 3 …

在 C# 中如何检查参数是否为 null

前言前不久&#xff0c;微软宣布从 C# 11 中移除参数空值检查功能&#xff0c;该功能允许在方法开始执行之前&#xff0c;在参数名称的末尾提供参数空值检查&#xff08;!!操作符&#xff09;。那么&#xff0c;在 C# 中如何检查参数是否为 null 呢&#xff1f;1. null这个可能…

带你剖析WebGis的世界奥秘----Geojson数据加载(高级)

前言&#xff1a;前两周我带你们分析了WebGis中关键步骤瓦片加载点击事件&#xff08;具体的看前两篇文章&#xff09;&#xff0c;下面呢&#xff0c;我带大家来看看Geojson的加载及其点击事件 Geojson数据解析 GeoJSON是一种对各种地理数据结构进行编码的格式。GeoJSON对象可…

「Docker入门指北」容器很难理解?带你从头到尾捋一遍

文章目录 1. 初始虚拟化 &#x1f351; 虚拟化概念&#x1f351; 硬件虚拟化2. Docker容器 &#x1f351; Docker技术的诞生&#x1f351; 容器与虚拟化&#x1f351; 性能差别&#x1f351; Docker优势 编排有序高效易迁移快速部署3. 容器生态系统 &#x1f351; 核心技术 容器…

微服务:事务管理

几乎所有的信息管理系统都会涉及到事务&#xff0c;事务的目的是为了保证数据的一致性&#xff0c;这里说的一致性是数据库状态的一致性。说到数据库状态的一致性&#xff0c;相信大家都会想到 ACID &#xff1a;原子性&#xff08;Atomic&#xff09;&#xff1a;在一个事件的…

Redis的那些事:一文入门Redis的基础操作

Redis是什么Redis&#xff0c;全称是Remote Dictionary Service,翻译过来就是&#xff0c;远程字典服务。redis属于nosql非关系型数据库。Nosql常见的数据关系&#xff0c;基本上是以key-value键值对形式存在的。Key-value: 就像翻阅中文字典或者单词字典&#xff0c;通过指定的…

10种提问型爆文标题句式 直接套用

如果你用1天的时间来写篇好文章&#xff0c;那你花掉半天时间想一个好标题都不过分&#xff01; 你是不是觉得我有点言过其实了&#xff1f;没关系&#xff0c;先来问你2个问题&#xff1a; 1、花了很长时间&#xff0c;写了一篇很牛的卖货推文&#xff0c;定稿后&#xff0c…

2016 China Joy抢先看,文末有彩蛋!

这里只有你想不到的&#xff0c;没有你看不到的。 2016 China Joy开幕在即&#xff0c;天气成了最折磨各种媒体、展商和观众的小妖精&#xff0c;一会艳阳天&#xff0c;一会大暴雨&#xff0c;轩轩现在是这样的&#xff01; 七月底的魔都&#xff0c;热的那叫一个销魂&#x…

JdbcTemplate+PageImpl实现多表分页查询

一、基础实体  MappedSuperclass public abstract class AbsIdEntity implements Serializable {private static final long serialVersionUID 7988377299341530426L;public final static int IS_DELETE_YES 1;// 标记删除public final static int IS_DELETE_NO 0;// 未删除…

消息队列选型手册

前言 消息队列中间件重要吗&#xff1f;面试必问问题之一&#xff0c;你说重不重要。我有时会问同事&#xff0c;为啥你用 RabbitMQ&#xff0c;不用 Kafka&#xff0c;或者 RocketMQ 呢&#xff1f; 他给我的回答&#xff1a;“因为公司用的就是这个&#xff0c;大家都这么用…

Jenkins 持续集成国产嵌入式操作系统 RT-Thread 的CI

我们直接在Jenkins的镜像基础上进行集成RT-Thread 的编译环境&#xff0c; 这样直接使用Shell 命令 最直接了当&#xff0c; 通过 第三方docker等插件&#xff0c; 尝试了计重方案&#xff0c; 没有找到理想中的感觉&#xff0c; 如果其他人有想法可以告知一二。 我们有现成的镜…

codevs原创抄袭题 5960 信使

题目描述 Description•战争时期&#xff0c;前线有n个哨所&#xff0c;每个哨所可能会与其他若干个哨所之间有通信联系。信使负责在哨所之间传递信息&#xff0c;当然&#xff0c;这是要花费一定时间的&#xff08;以天为单位&#xff09;。指挥部设在第一个哨所。当指挥部下达…