一、服务实例过滤器ServerListFilter
服务实例过滤器(ServerListFilter)为负载均衡器(Loadbalancer)提供从服务实例列表(ServerList)获取的服务实例过滤出符合要求的服务实例。
负载均衡器(Loadbalancer)通过服务实例列表(ServerList)从注册中心(register)或者配置文件(yaml或properties)上读取全部服务实例(server),然后以服务实例过滤器(ServerListFilter)的过滤方式进行筛选留下满足条件的服务实例,进而借助负载均衡策略(IRule)选择出一个合适的服务实例。
二、ServerListFilter实现类
ZoneAffinityServerListFilter 区区域相关性筛选服务实例列表过滤器
ZonePreferenceServerListFilter 首选本地区域服务实例列表过滤器
ServerListSubsetFilter 服务实例数限制为所有服务实例的子集 筛选过滤器
三、具体代码实现
(1)ZoneAffinityServerListFilter
public class ZoneAffinityServerListFilter<T extends Server> extendsAbstractServerListFilter<T> implements IClientConfigAware {private volatile boolean zoneAffinity = DefaultClientConfigImpl.DEFAULT_ENABLE_ZONE_AFFINITY;private volatile boolean zoneExclusive = DefaultClientConfigImpl.DEFAULT_ENABLE_ZONE_EXCLUSIVITY;private DynamicDoubleProperty activeReqeustsPerServerThreshold;private DynamicDoubleProperty blackOutServerPercentageThreshold;private DynamicIntProperty availableServersThreshold;private Counter overrideCounter;private ZoneAffinityPredicate zoneAffinityPredicate = new ZoneAffinityPredicate();private static Logger logger = LoggerFactory.getLogger(ZoneAffinityServerListFilter.class);String zone;public ZoneAffinityServerListFilter() { }public ZoneAffinityServerListFilter(IClientConfig niwsClientConfig) {initWithNiwsConfig(niwsClientConfig);}@Overridepublic void initWithNiwsConfig(IClientConfig niwsClientConfig) {String sZoneAffinity = "" + niwsClientConfig.getProperty(CommonClientConfigKey.EnableZoneAffinity, false);if (sZoneAffinity != null){zoneAffinity = Boolean.parseBoolean(sZoneAffinity);logger.debug("ZoneAffinity is set to {}", zoneAffinity);}String sZoneExclusive = "" + niwsClientConfig.getProperty(CommonClientConfigKey.EnableZoneExclusivity, false);if (sZoneExclusive != null){zoneExclusive = Boolean.parseBoolean(sZoneExclusive);}if (ConfigurationManager.getDeploymentContext() != null) {zone = ConfigurationManager.getDeploymentContext().getValue(ContextKey.zone);}activeReqeustsPerServerThreshold = DynamicPropertyFactory.getInstance().getDoubleProperty(niwsClientConfig.getClientName() + "." + niwsClientConfig.getNameSpace() + ".zoneAffinity.maxLoadPerServer", 0.6d);logger.debug("activeReqeustsPerServerThreshold: {}", activeReqeustsPerServerThreshold.get());blackOutServerPercentageThreshold = DynamicPropertyFactory.getInstance().getDoubleProperty(niwsClientConfig.getClientName() + "." + niwsClientConfig.getNameSpace() + ".zoneAffinity.maxBlackOutServesrPercentage", 0.8d);logger.debug("blackOutServerPercentageThreshold: {}", blackOutServerPercentageThreshold.get());availableServersThreshold = DynamicPropertyFactory.getInstance().getIntProperty(niwsClientConfig.getClientName() + "." + niwsClientConfig.getNameSpace() + ".zoneAffinity.minAvailableServers", 2);logger.debug("availableServersThreshold: {}", availableServersThreshold.get());overrideCounter = Monitors.newCounter("ZoneAffinity_OverrideCounter");Monitors.registerObject("NIWSServerListFilter_" + niwsClientConfig.getClientName());}private boolean shouldEnableZoneAffinity(List<T> filtered) { if (!zoneAffinity && !zoneExclusive) {return false;}if (zoneExclusive) {return true;}LoadBalancerStats stats = getLoadBalancerStats();if (stats == null) {return zoneAffinity;} else {logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);double loadPerServer = snapshot.getLoadPerServer();int instanceCount = snapshot.getInstanceCount(); int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() || loadPerServer >= activeReqeustsPerServerThreshold.get()|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});return false;} else {return true;}}}@Overridepublic List<T> getFilteredListOfServers(List<T> servers) {if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){List<T> filteredServers = Lists.newArrayList(Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));if (shouldEnableZoneAffinity(filteredServers)) {return filteredServers;} else if (zoneAffinity) {overrideCounter.increment();}}return servers;}@Overridepublic String toString(){StringBuilder sb = new StringBuilder("ZoneAffinityServerListFilter:");sb.append(", zone: ").append(zone).append(", zoneAffinity:").append(zoneAffinity);sb.append(", zoneExclusivity:").append(zoneExclusive);return sb.toString(); }
}
(2)ZonePreferenceServerListFilter
public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {private String zone;@Overridepublic void initWithNiwsConfig(IClientConfig niwsClientConfig) {super.initWithNiwsConfig(niwsClientConfig);if (ConfigurationManager.getDeploymentContext() != null) {this.zone = ConfigurationManager.getDeploymentContext().getValue(ContextKey.zone);}}@Overridepublic List<Server> getFilteredListOfServers(List<Server> servers) {List<Server> output = super.getFilteredListOfServers(servers);if (this.zone != null && output.size() == servers.size()) {List<Server> local = new ArrayList<>();for (Server server : output) {if (this.zone.equalsIgnoreCase(server.getZone())) {local.add(server);}}if (!local.isEmpty()) {return local;}}return output;}public String getZone() {return zone;}public void setZone(String zone) {this.zone = zone;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}ZonePreferenceServerListFilter that = (ZonePreferenceServerListFilter) o;return Objects.equals(zone, that.zone);}@Overridepublic int hashCode() {return Objects.hash(zone);}@Overridepublic String toString() {return new StringBuilder("ZonePreferenceServerListFilter{").append("zone='").append(zone).append("'").append("}").toString();}}
(3)ServerListSubsetFilter
public class ServerListSubsetFilter<T extends Server> extends ZoneAffinityServerListFilter<T> implements IClientConfigAware, Comparator<T>{private Random random = new Random();private volatile Set<T> currentSubset = Sets.newHashSet(); private DynamicIntProperty sizeProp = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.size", 20);private DynamicFloatProperty eliminationPercent = new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);private DynamicIntProperty eliminationFailureCountThreshold = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationFailureThresold", 0);private DynamicIntProperty eliminationConnectionCountThreshold = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationConnectionThresold", 0);@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {super.initWithNiwsConfig(clientConfig);sizeProp = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() + ".ServerListSubsetFilter.size", 20);eliminationPercent = new DynamicFloatProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace() + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);eliminationFailureCountThreshold = new DynamicIntProperty( clientConfig.getClientName() + "." + clientConfig.getNameSpace()+ ".ServerListSubsetFilter.eliminationFailureThresold", 0);eliminationConnectionCountThreshold = new DynamicIntProperty(clientConfig.getClientName() + "." + clientConfig.getNameSpace()+ ".ServerListSubsetFilter.eliminationConnectionThresold", 0);}/*** Given all the servers, keep only a stable subset of servers to use. This method* keeps the current list of subset in use and keep returning the same list, with exceptions* to relatively unhealthy servers, which are defined as the following:* <p>* <ul>* <li>Servers with their concurrent connection count exceeding the client configuration for * {@code <clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold} (default is 0)* <li>Servers with their failure count exceeding the client configuration for * {@code <clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold} (default is 0)* <li>If the servers evicted above is less than the forced eviction percentage as defined by client configuration* {@code <clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent} (default is 10%, or 0.1), the* remaining servers will be sorted by their health status and servers will worst health status will be* forced evicted.* </ul>* <p>* After the elimination, new servers will be randomly chosen from all servers pool to keep the* number of the subset unchanged. * */@Overridepublic List<T> getFilteredListOfServers(List<T> servers) {List<T> zoneAffinityFiltered = super.getFilteredListOfServers(servers);Set<T> candidates = Sets.newHashSet(zoneAffinityFiltered);Set<T> newSubSet = Sets.newHashSet(currentSubset);LoadBalancerStats lbStats = getLoadBalancerStats();for (T server: currentSubset) {// this server is either down or out of serviceif (!candidates.contains(server)) {newSubSet.remove(server);} else {ServerStats stats = lbStats.getSingleServerStat(server);// remove the servers that do not meet health criteriaif (stats.getActiveRequestsCount() > eliminationConnectionCountThreshold.get()|| stats.getFailureCount() > eliminationFailureCountThreshold.get()) {newSubSet.remove(server);// also remove from the general pool to avoid selecting them againcandidates.remove(server);}}}int targetedListSize = sizeProp.get();int numEliminated = currentSubset.size() - newSubSet.size();int minElimination = (int) (targetedListSize * eliminationPercent.get());int numToForceEliminate = 0;if (targetedListSize < newSubSet.size()) {// size is shrinkingnumToForceEliminate = newSubSet.size() - targetedListSize;} else if (minElimination > numEliminated) {numToForceEliminate = minElimination - numEliminated; }if (numToForceEliminate > newSubSet.size()) {numToForceEliminate = newSubSet.size();}if (numToForceEliminate > 0) {List<T> sortedSubSet = Lists.newArrayList(newSubSet); Collections.sort(sortedSubSet, this);List<T> forceEliminated = sortedSubSet.subList(0, numToForceEliminate);newSubSet.removeAll(forceEliminated);candidates.removeAll(forceEliminated);}// after forced elimination or elimination of unhealthy instances,// the size of the set may be less than the targeted size,// then we just randomly add servers from the big poolif (newSubSet.size() < targetedListSize) {int numToChoose = targetedListSize - newSubSet.size();candidates.removeAll(newSubSet);if (numToChoose > candidates.size()) {// Not enough healthy instances to choose, fallback to use the// total server poolcandidates = Sets.newHashSet(zoneAffinityFiltered);candidates.removeAll(newSubSet);}List<T> chosen = randomChoose(Lists.newArrayList(candidates), numToChoose);for (T server: chosen) {newSubSet.add(server);}}currentSubset = newSubSet; return Lists.newArrayList(newSubSet); }/*** Randomly shuffle the beginning portion of server list (according to the number passed into the method) * and return them.* * @param servers* @param toChoose* @return*/private List<T> randomChoose(List<T> servers, int toChoose) {int size = servers.size();if (toChoose >= size || toChoose < 0) {return servers;} for (int i = 0; i < toChoose; i++) {int index = random.nextInt(size);T tmp = servers.get(index);servers.set(index, servers.get(i));servers.set(i, tmp);}return servers.subList(0, toChoose); }/*** Function to sort the list by server health condition, with* unhealthy servers before healthy servers. The servers are first sorted by* failures count, and then concurrent connection count.*/@Overridepublic int compare(T server1, T server2) {LoadBalancerStats lbStats = getLoadBalancerStats();ServerStats stats1 = lbStats.getSingleServerStat(server1);ServerStats stats2 = lbStats.getSingleServerStat(server2);int failuresDiff = (int) (stats2.getFailureCount() - stats1.getFailureCount());if (failuresDiff != 0) {return failuresDiff;} else {return (stats2.getActiveRequestsCount() - stats1.getActiveRequestsCount());}}
}