学了feign源码之后感觉,这部分还是按运行流程分块学合适。核心组件什么的,当专业术语学妥了。
序章:认识真正のRibbon
但只用认识一点点
之前我们学习Ribbon的简单使用时,都是集成了Eureka-client或者Feign等组件,甚至在使用时感受不到ribbon的存在,顶多提一嘴他的负载均衡接口。而本文要专注于讲解微服务体系下,Ribbon部分的基本原理。
ribbon项目目前已经进入On Maintenance维护状态,大家可以学的轻松些。
摘自Github:
Ribbon is a client side IPC library that is battle-tested in cloud. It provides the following features
- Load balancing 负载均衡
- Fault tolerance 容错
- Multiple protocol (HTTP, TCP, UDP) support in an asynchronous and reactive model
- 多协议、异步、reactive模型
- Caching and batching 缓存和批处理
Ribbon是一个久经沙场的IPC库。IPC(Inter-Process Communication)进程间通信,是指两个进程的数据之间产生交互。
Ribbon的核心类
ribbon-loadbalancer模块
Server
对服务实例的封装,里面封装了服务实例的ip、端口、是否可用等信息。
ServerList
ServerList是个接口,提供了两个方法,都是获取Server实例列表的。
eureka、nacos等注册中心都实现了这个接口,将注册中心的服务实例数据提供给Ribbon,供Ribbon来进行负载均衡。
public interface ServerList<T extends Server> {public List<T> getInitialListOfServers(); /*** Return updated list of servers. This is called say every 30 secs * (configurable) by the Loadbalancer's Ping cycle **/ public List<T> getUpdatedListOfServers();
}
- 这个get是说从注册中心拿,还是本地调用的时候从这里内存拿?
3、ServerListUpdater
用来更新服务注册列表的数据,他有唯一的实现类PollingServerListUpdater,该实现类有一个核心的方法start()。
start()方法首先通过CAS原子操作:isActive.compareAndSet(false, true)
来获取锁(并发编程操作),然后将传入的参数updateAction的doUpdate()方法封装进了一个Runnable,然后包装好的Runnable扔到了定时调度的线程池,经过initialDelayMs(默认1s)时间后,会调用第一次,之后每隔refreshIntervalMs(默认30s)调用一次。
- 参数updateAction哪里来,干什么?
@Override
public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); }return;}try {updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); }}};scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, initialDelayMs,refreshIntervalMs, TimeUnit.MILLISECONDS); } else { logger.info("Already active, no-op"); }
}
4、IRule
public interface IRule{/* * choose one alive server from lb.allServers or * lb.upServers according to key ** @return choosen Server object. NULL is returned if none * server is available */public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
IRule是负载均衡的算法的统一接口,其实现类是各种负载均衡算法的具体实现。
比如说RandomRule类,随机轮询。
5、IClientConfig
配置接口,有个默认的实现DefaultClientConfigImpl,通过这个可以获取到一些配置Ribbon的一些配置。
6、ILoadBalancer
public interface ILoadBalancer {public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); @Deprecated public List<Server> getServerList(boolean availableOnly);public List<Server> getReachableServers();public List<Server> getAllServers();
}
ILoadBalancer主要提供了获取服务实例列表和选择服务实例的功能。
虽然对外主要提供获取服务的功能,但是在实现的时候,主要是用来协调上面提到的各个核心组件的,使得他们能够协调工作
这个接口的实现有好几个实现类,但是我讲两个比较重要的。
BaseLoadBalancer
public class BaseLoadBalancer extends AbstractLoadBalancer implementsPrimeConnections.PrimeConnectionListener, IClientConfigAware {private final static IRule DEFAULT_RULE = new RoundRobinRule(); protected IRule rule = DEFAULT_RULE;private IClientConfig config; protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,IPing ping, IPingStrategy pingStrategy) {logger.debug("LoadBalancer [{}]: initialized", name);this.name = name;this.ping = ping;this.pingStrategy = pingStrategy;setRule(rule);setupPingTask();lbStats = stats;init();}public BaseLoadBalancer(IClientConfig config) {initWithNiwsConfig(config);}public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));}void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {this.config = clientConfig;String clientName = clientConfig.getClientName();this.name = clientName;int pingIntervalTime = Integer.parseInt(""+ clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30")));int maxTotalPingTime = Integer.parseInt(""+ clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,Integer.parseInt("2")));setPingInterval(pingIntervalTime);setMaxTotalPingTime(maxTotalPingTime);// cross associate with each other// i.e. Rule,Ping meet your container LB// LB, these are your Ping and Rule guys ...setRule(rule);setPing(ping);setLoadBalancerStats(stats);rule.setLoadBalancer(this);if (ping instanceof AbstractLoadBalancerPing) {((AbstractLoadBalancerPing) ping).setLoadBalancer(this);}logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);boolean enablePrimeConnections = clientConfig.get(CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);if (enablePrimeConnections) {this.setEnablePrimingConnections(true);PrimeConnections primeConnections = new PrimeConnections(this.getName(), clientConfig);this.setPrimeConnections(primeConnections);}init();}public void setRule(IRule rule) {if (rule != null) {this.rule = rule;} else {/* default rule */this.rule = new RoundRobinRule();}if (this.rule.getLoadBalancer() != this) {this.rule.setLoadBalancer(this);}}public Server chooseServer(Object key) {if (counter == null) {counter = createCounter();}counter.increment();if (rule == null) {return null;} else {try {return rule.choose(key);} catch (Exception e) {logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);return null;}}}}
核心属性
allServerList:缓存了所有的服务实例数据
upServerList:缓存了能够使用的服务实例数据。
rule:负载均衡算法组件,默认是RoundRobinRule
核心方法
setRule:这个方法是设置负载均衡算法的,并将当前这个ILoadBalancer对象设置给IRule,从这可以得出一个结论,IRule进行负载均衡的服务实例列表是通过ILoadBalancer获取的,也就是 IRule 和 ILoadBalancer相互引用。setRule(rule)一般是在构造对象的时候会调用。
chooseServer:就是选择一个服务实例,是委派给IRule的choose方法来实现服务实例的选择。
DynamicServerListLoadBalancer
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);volatile ServerList<T> serverListImpl; volatile ServerListFilter<T> filter; protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; protected volatile ServerListUpdater serverListUpdater; public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); } @Override public void setServersList(List lsrv) { super.setServersList(lsrv); List<T> serverList = (List<T>) lsrv; Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>(); for (Server server : serverList) { // make sure ServerStats is created to avoid creating them on hot // path getLoadBalancerStats().getSingleServerStat(server); String zone = server.getZone(); if (zone != null) { zone = zone.toLowerCase(); List<Server> servers = serversInZones.get(zone); if (servers == null) { servers = new ArrayList<Server>(); serversInZones.put(zone, servers); } servers.add(server); } } setServerListForZones(serversInZones); }protected void setServerListForZones( Map<String, List<Server>> zoneServersMap) { LOGGER.debug("Setting server list for zones: {}", zoneServersMap); getLoadBalancerStats().updateZoneServerMapping(zoneServersMap); }@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }/** * Update the AllServer list in the LoadBalancer if necessary and enabled * * @param ls */ protected void updateAllServerList(List<T> ls) { // other threads might be doing this - in which case, we pass if (serverListUpdateInProgress.compareAndSet(false, true)) { try { for (T s : ls) { s.setAlive(true); // set so that clients can start using these // servers right away instead // of having to wait out the ping cycle. } setServersList(ls); super.forceQuickPing(); } finally { serverListUpdateInProgress.set(false); } } }}
DynamicServerListLoadBalancer继承自BaseLoadBalancer, 进行了一些扩展。
成员变量
serverListImpl:上面说过,通过这个接口获取服务列表
filter:起到过滤的作用
updateAction:是个匿名内部类,实现了doUpdate方法,会调用updateListOfServers方法
serverListUpdater:上面说到过,默认就是唯一的实现类PollingServerListUpdater,也就是每个30s就会调用传入的updateAction的doUpdate方法。
这不是巧了么,serverListUpdater的start方法需要一个updateAction,刚刚好成员变量有个updateAction的匿名内部类的实现,所以serverListUpdater的start方法传入的updateAction的实现其实就是这个匿名内部类。
那么哪里调用了serverListUpdater的start方法传入了updateAction呢?是在构造的时候调用的,具体的调用链路是调用 restOfInit -> enableAndInitLearnNewServersFeature(),这里就不贴源码了
所以,其实DynamicServerListLoadBalancer在构造完成之后,默认每隔30s中,就会调用updateAction的匿名内部类的doUpdate方法,从而会调用updateListOfServers。所以我们来看一看 updateListOfServers 方法干了什么。
public void updateListOfServers() {List<T> servers = new ArrayList<T>();if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);if (filter != null) {servers = filter.getFilteredListOfServers(servers);LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);}}updateAllServerList(servers);}
这个方法实现很简单,就是通过调用 ServerList 的getUpdatedListOfServers获取到一批服务实例数据,然后过滤一下,最后调用updateAllServerList方法。
查看updateAllServerList()源码:
protected void updateAllServerList(List<T> ls) {// other threads might be doing this - in which case, we passif (serverListUpdateInProgress.compareAndSet(false, true)) {try {for (T s : ls) {s.setAlive(true); // set so that clients can start using these// servers right away instead// of having to wait out the ping cycle.}setServersList(ls);super.forceQuickPing();} finally {serverListUpdateInProgress.set(false);}}}
该方法调用每个服务实例的setAlive()方法,将isAliveFlag设置成true,然后调用setServersList()将服务实例更新到内部的缓存中,也就是上面提到的allServerList和upServerList中,这里就不贴源码了。
其实分析完updateListOfServers方法之后,再结合上面源码的分析,我们可以清楚的得出一个结论,那就是默认每隔30s都会重新通过ServerList组件获取到服务实例数据,然后更新到BaseLoadBalancer缓存中,IRule的负载均衡所需的服务实例数据,就是这个内部缓存。
从DynamicServerListLoadBalancer的命名也可以看出,他相对于父类BaseLoadBalancer而言,提供了动态更新内部服务实例列表的功能。
说完一些核心的组件,以及他们跟ILoadBalancer的关系之后,接下来就来分析一下,ILoadBalancer是在ribbon中是如何使用的。
8、AbstractLoadBalancerAwareClient
ILoadBalancer是一个可以获取到服务实例数据的组件,AbstractLoadBalancerAwareClient,这个是用来执行请求的,我们来看一下这个类的构造。
public AbstractLoadBalancerAwareClient(ILoadBalancer lb) { super(lb); }
/**
* Delegate to {@link #initWithNiwsConfig(IClientConfig)}
* @param clientConfig
*/
public AbstractLoadBalancerAwareClient(ILoadBalancer lb, IClientConfig clientConfig) { super(lb, clientConfig);
}
通过上面可以看出,在构造的时候需要传入一个ILoadBalancer。
AbstractLoadBalancerAwareClient中有一个方法executeWithLoadBalancer,这个是用来执行传入的请求,以负载均衡的方式。
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }
这个方法构建了一个LoadBalancerCommand,随后调用了submit方法,传入了一个匿名内部类,这个匿名内部类中有这么一行代码很重要。
URI finalUri = reconstructURIWithServer(server, request.getUri());
这行代码是根据给定的一个Server重构了URI,reconstructURIWithServer干的一件事就是将ServerA服务名替换成真正的服务所在的机器的ip和端口,这样就能发送http请求到ServerA服务所对应的一台服务器了。
之后根据新的地址,调用这个类中的execute方法来执行请求,execute方法是个抽象方法,也就是交给子类实现,子类就可以通过实现这个方法,来发送http请求,实现rpc调用。
那么这台Server是从获取的呢?肯定是通过ILoadBalancer获取的。直接贴出submit方法中核心的一部分代码
Observable<T> o = (server == null ? selectServer() : Observable.just(server))
就是通过selectServer来选择一个Server的,selectServer我就不翻源码了,其实最终还是调用ILoadBalancer的方法chooseServer方法来获取一个服务,之后就会调用上面的说的匿名内部类的方法,重构URI,然后再交由子类的execut方法来实现发送http请求。
所以,通过对AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,我们可以知道,这个抽象类的主要作用就是通过负载均衡算法,找到一个合适的Server,然后将你传入的请求路径http://ServerA/api/sayHello重新构建成类似http://192.168.1.101:8088/api/sayHello这样,之后调用子类实现的execut方法,来发送http请求,就是这么简单。
到这里其实Ribbon核心组件和执行原理我就已经说的差不多了,再来画一张图总结一下
SpringCloud中使用的核心组件的实现都有哪些
说完了Ribbon的一些核心组件和执行原理之后,我们再来看一下在SpringCloud环境下,这些组件到底是用的哪些实现
Ribbon, 启动!(原理)
启动阶段的任务
- 你们微服务源码流程够有共性的,一起总结下?
Ribbon的自动装配类为RibbonAutoConfiguration,此处粘贴核心源码:
@Configuration
@RibbonClients
public class RibbonAutoConfiguration {@Autowired(required = false)private List<RibbonClientSpecification> configurations = new ArrayList<>();@Beanpublic SpringClientFactory springClientFactory() {SpringClientFactory factory = new SpringClientFactory();factory.setConfigurations(this.configurations);return factory;}
}
RibbonAutoConfiguration配置类上有个@RibbonClients注解,接下来讲解一下这个注解的作用
@Import(RibbonClientConfigurationRegistrar.class)
public @interface RibbonClients {RibbonClient[] value() default {};Class<?>[] defaultConfiguration() default {};
}
OpenFeign的小伙伴肯定知道,要使用Feign,得需要使用@EnableFeignClients,@EnableFeignClients的作用可以扫描指定包路径下的@FeignClient注解,也可以声明配置类;
RibbonClients的作用也是可以声明配置类,同样也使用了@Import注解来实现,RibbonClientConfigurationRegistrar这个配置类的作用就是往spring容器中注入每个服务的Ribbon组件(@RibbonClient里面可以声明每个服务对应的配置)的配置类和默认配置类,将配置类封装为RibbonClientSpecification注入到spring容器中。
RibbonAutoConfiguration的主要作用就是注入了一堆RibbonClientSpecification,就是每个服务对应的配置类,然后声明了SpringClientFactory这个bean,将配置类放入到里面。
SpringClientFactory跟OpenFeign中的FeignContext很像,SpringClientFactory也继承了NamedContextFactory,实现了配置隔离,同时也在构造方法中传入了每个容器默认的配置类RibbonClientConfiguration。至于什么是配置隔离,我在OpenFeign那篇文章说过,不清楚的小伙伴可以后台回复feign01即可获得文章链接。
配置优先级问题
优先级最高的是springboot启动的时候的容器,因为这个容器是每个服务的容器的父容器,而在配置类声明bean的时候,都有@ConditionalOnMissingBean注解,一旦父容器有这个bean,那么子容器就不会初始化。
优先级第二高的是每个客户端声明的配置类,也就是通过@FeignClient和@RibbonClient的configuration属性声明的配置类
优先级第三高的是@EnableFeignClients和@RibbonClients注解中configuration属性声明的配置类
优先级最低的就是FeignContext和SpringClientFactory构造时传入的配置类
至于优先级怎么来的,其实是在NamedContextFactory中createContext方法中构建AnnotationConfigApplicationContext时按照配置的优先级一个一个传进去的。
RibbonClientConfiguration提供的默认的bean
接下来我们看一下RibbonClientConfiguration都提供了哪些默认的bean
@Bean@ConditionalOnMissingBeanpublic IClientConfig ribbonClientConfig() {DefaultClientConfigImpl config = new DefaultClientConfigImpl();config.loadProperties(this.name);config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);return config;}
配置类对应的bean,这里设置了ConnectTimeout和ReadTimeout都是1s中。
@Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; }
IRule,默认是ZoneAvoidanceRule,这个Rule带有过滤的功能,过滤哪些不可用的分区的服务,过滤成功之后,继续采用线性轮询的方式从过滤结果中选择一个出来。至于这个propertiesFactory,可以不用管,这个是默认读配置文件的中的配置,一般不设置,后面看到都不用care。
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {if (this.propertiesFactory.isSet(ServerList.class, name)) {return this.propertiesFactory.get(ServerList.class, config, name);}ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();serverList.initWithNiwsConfig(config);return serverList;
}
默认是ConfigurationBasedServerList,也就是基于配置来提供服务实例列表。但是在SpringCloud环境中,服务信息是注册在注册中心的。因此注册中心实现了优先级更高的ServerList。比如Nacos的实现NacosServerList,这里我贴出NacosServerList的bean的声明,在配置类NacosRibbonClientConfiguration中
@Bean @ConditionalOnMissingBeanpublic ServerList<?> ribbonServerList(IClientConfig config, NacosDiscoveryProperties nacosDiscoveryProperties) {NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties); serverList.initWithNiwsConfig(config); return serverList;}
至于为什么容器选择NacosServerList而不是ConfigurationBasedServerList,主要是因为NacosRibbonClientConfiguration这个配置类是通过@RibbonClients导入的,比SpringClientFactory导入的RibbonClientConfiguration配置类优先级高。
- @ConditionalOnMissingBean?这类注解还是不要在Spring里总结了,大而全,还是需要的时候临时现场介绍下
- 是说单独使用ribbon的时候用ConfigurationBasedServerList从配置里读取服务列表,使用注册中心的时候,利用优先级覆盖关系,使用注册中心的列表?多么天才的发明!
- 服务中心怎么通信自己的?怎么 拉取的?nacos也默认集成ribbon吗?
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config);
}
ServerListUpdater,就是我们剖析的PollingServerListUpdater,默认30s更新一次BaseLoadBalancer内部服务的缓存。
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);}return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}
ILoadBalancer,默认是ZoneAwareLoadBalancer,构造的时候也传入了上面声明的的bean,ZoneAwareLoadBalancer这个类继承了DynamicServerListLoadBalancer。
到这里,Ribbon在SpringCloud的配置我们就讲完了,主要就是声明了很多核心组件的bean,最后都设置到ZoneAwareLoadBalancer中。但是,AbstractLoadBalancerAwareClient这个对象的声明我们并没有在配置类中找到,主要是因为这个对象是OpenFeign整合Ribbon的一个入口,至于是如何整合的,这个坑就留给下篇文章吧。
那么在springcloud中,上图就可以加上注册中心。
附录
可能一辈子也没必要看,但如果你真的想看的Ribbon源码地址
在不结合其他微服务组件的情况下,我们也可以单独使用ribbon
<dependency><groupId>com.netflix.ribbon</groupId><artifactId>ribbon</artifactId><version>2.2.2</version>
</dependency>