Spring Cloud Ribbon负载均衡器

客户端负载均衡Spring Cloud Ribbon

 Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,基于Netflix Ribbon实现。

目录

  1. 客户端负载均衡
  2. 源码分析
  3. 负载均衡器(本文重点)
  4. 负载均衡策略
  5. 配置详解
  6. 自动化配置

客户端负载均衡&源码分析

 请在上一篇文章的基础上进行下面的学习,点击这里阅读上一篇

负载均衡器

 下面我们看一下具体的的负载均衡器,也就是ILoadBalancer接口的实现类。

AbstractLoadBalancer

 该类是ILoadBalancer接口的抽象实现类。

 在该抽象实现类中含有一个关于服务实例的分组枚举类,该枚举类主要有以下三种类型:

  1. ALL:所有服务实例
  2. STATUS_UP:正常服务的实例
  3. STATUS_NOT_UP:停止服务的实例

 该抽象类下面的的函数有以下几个:

  1. chooseServer():该函数通过调用接口中的chooseServer(Object key)实现,其中参数key为null,表示在选择具体服务实例时忽略key的条件判断
  2. List<Server> getServerList(ServerGroup serverGroup):定义了根据分组类型来获取不同的服务实例的列表
  3. LoadBalancerStats getLoadBalancerStats():定义了获取LoadBalancerStats对象的方法,LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息。这些信息可以用来观察负载均衡器的运行情况,同时也是用来制定负载均衡策略的重要依据。

BaseLoadBalancer

 该类是Ribbon负载均衡器的基础实现类,在该类中定义了很多关于负载均衡相关的基础内容。

 该类中定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。代码如下:

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());

 定义了用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。

 定义了检查服务实例是否正常的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的实现。

 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancerz中默认使用了该类中定义的静态内部类SerialPingStrategy。根据源码,可以看到该策略采用线性遍历ping服务实例的方式实现检查。但是该策略在当IPing的实现速度不理想或者Server列表过大时,可能会影响系统性能。这时就需要自己去实现自己的IPing策略。

 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancer中chooseServer(Object key)方法源码中也可以看出它是将服务实例选择的任务交给了IRule中的Server choose(Object key)方法。默认的IRule实现是RoundRobinRule。

 启动Ping任务,在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认执行的时间间隔为10s。

 实现了ILoadBalancer接口定义的负载均衡器应该具备以下操作:

  1. addServers(List<Server> newServers):向负载均衡器中增加新的服务实例列表。该实现将原本已经维护的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入到newList中,然后再调用setServersList(List lsrv)方法对newList进行处理。在BaseLoadBalancer中的默认实现会用新的列表覆盖旧的列表。后面几个扩展实现类对于服务实例清单的更新的优化都是通过对setServersList(List lsrv)重写来实现的。
  2. Server chooseServer(Object key):挑选一个具体的服务实例,上面介绍IRule的时候已经说过,不再重说。
  3. markServerDown(Server server):用来标记某个服务实例暂停服务
  4. List<Server> getReachableServers():获取可用的服务实例列表
  5. List<Server> getAllServers():获取所有的服务实例列表

DynamicServerListLoadBalancer

 DynamicServerListLoadBalancer该类继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。

 在该负载均衡器,实现了服务实例清单在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,我们可以通过过滤器来选择性的获取一批服务实例清单。

 下面看一下负载均衡器增加了哪些内容。

ServerList

 通过查看源码,发现增加了一个关于服务列表的操作对象ServerList<T> serverListImpl,其中T是一个Server的子类,即代表了一个具体的服务实例的扩展类。其中ServerList的定义如下:

public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();
    
    public List<T> getUpdatedListOfServers();   

}

 该抽象接口定义了两个抽象方法,如下:

  1. List<T> getInitialListOfServers():用于获取初始化的服务实例清单
  2. List<T> getUpdatedListOfServers():用于获取更新的服务实例清单

 该抽象接口的实现类有很多,因为该负载均衡器中需要实现服务实例的动态更新,那么就需要Ribbon具备访问Eureka服务注册中心获取服务实例的能力,在DynamicServerListLoadBalancer默认的ServerList是DomainExtractingServerList(默认的实现是在EurekaRibbonClientConfiguration),源码如下:

package org.springframework.cloud.netflix.ribbon.eureka;

@Configuration
public class EurekaRibbonClientConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

}

 查看DomainExtractingServerList的源码可以看出,该类中有一个ServerList<DiscoveryEnabledServer> list,通过查看DomainExtractingServerList的构造函数,DomainExtractingServerList中的ServerList对象就是从上面的代码中传过来的DiscoveryEnabledNIWSServerList,源码如下:

package org.springframework.cloud.netflix.ribbon.eureka;

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    private ServerList<DiscoveryEnabledServer> list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getInitialListOfServers());
        return servers;
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }

}

 同时,通过上面的源码还可以看出,getInitialListOfServers()和getUpdatedListOfServers()方法的实现其实交给DiscoveryEnabledNIWSServerList来实现的,下面看一下DiscoveryEnabledNIWSServerList中这两个方法的实现

package com.netflix.niws.loadbalancer;

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{

    private static final Logger logger = LoggerFactory.getLogger(DiscoveryEnabledNIWSServerList.class);

    String clientName;
    String vipAddresses;
    boolean isSecure = false;

    boolean prioritizeVipAddressBasedServers = true;

    String datacenter;
    String targetRegion;

    int overridePort = DefaultClientConfigImpl.DEFAULT_PORT;
    boolean shouldUseOverridePort = false;
    boolean shouldUseIpAddr = false;

    private final Provider<EurekaClient> eurekaClientProvider;

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

}

 上述代码的主要逻辑是借助EurekaClient从服务注册中心获取到具体的服务实例(InstanceInfo)列表,首页获取到EurekaClient,然后更具逻辑服务名(vipAddress),获取服务实例,将服务实例状态为UP(正常服务)的实例转换为DiscoveryEnabledServer对象,最终放在一个列表里返回。

 在获取到ServerList之后,DomainExtractingServerList会调用自身的setZones方法,源码如下:

private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

 通过源码可以看出,该方法的主要作用是将DiscoveryEnabledNIWSServerList返回的List<DiscoveryEnabledServer>列表中的元素,转换成DiscoveryEnabledServer的子类对象DomainExtractingServer,在该类对象的构造函数中将为服务实例对象设置一些必要的属性,如id,zone,isAliveFlag,readToServer等。

ServerListUpdate

 在DynamicServerListLoadBalancer类中有如下一段代码,ServerListUpdater对象的实现就是对ServerList的更新

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

 下面看一下ServerListUpdater接口,该类内部还定义了一个UpdateAction接口,下面看一下源码:

package com.netflix.loadbalancer;

public interface ServerListUpdater {
    
    public interface UpdateAction {
        void doUpdate();
    }

    void start(UpdateAction updateAction);

    void stop();

    String getLastUpdate();

    long getDurationSinceLastUpdateMs();
    
    int getNumberMissedCycles();

    int getCoreThreads();
}

 下面是该接口方法的介绍

  1. void doUpdate():该方法的实现内容就是对ServerList的具体更新操作
  2. void start(UpdateAction updateAction):启动更新服务器,传入的UpdateAction对象为更新操作的具体实现
  3. void stop():停止更新服务器
  4. String getLastUpdate():获取最近的更新时间戳
  5. long getDurationSinceLastUpdateMs():获取上一次更新到现在的时间间隔,单位ms
  6. int getNumberMissedCycles():获取错过的更新周期数
  7. int getCoreThreads():获取核心线程数

 下面看一下ServerListUpdater的具体实现类
Spring Cloud Ribbon负载均衡器

  1. PollingServerListUpdater:动态服务列表更新的默认策略,DynamicServerListLoadBalancer负载均衡器中的默认实现就是该类,它通过定时任务的方式进行服务列表的更新。
  2. EurekaNotificationServerListUpdater:该更新器可以用于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServerListUpdater不同,它需要利用Eureka的事件监听器来驱动服务列表的更新操作。

 下面看一下PollingServerListUpdater的实现,我们从start函数看起

public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

 通过上述代码可以看出大致逻辑,创建了一个Runnable线程任务,在线程中调用了UpdateAction的doUpdate()方法,最后再启动定时任务,initialDelayMs默认值1000ms,refreshIntervalMs默认值是30*1000ms,也就是说更新服务实例在初始化之后延迟1s后开始执行,并以30s为周期重复执行。

ServerListFilter

 下面我们回顾一下UpdateAction中doUpdate()方法的具体实现,源码如下:

public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

 在上述源码可以看出,首先是调用了ServerList的getUpdatedListOfServers方法,这是用来从Eureka Server获取正常的服务实例列表。在获取完服务实例列表以后,我们会调用filter.getFilteredListOfServers(servers),此处的filter就是我们所要找的ServerListFilter。

 ServerListFilter接口非常简单,仅仅有一个List<T> getFilteredListOfServers(List<T> servers)方法,用于实现对服务列表的过滤,下面看一下它的主要实现类:
Spring Cloud Ribbon负载均衡器
 在上面的图中,ZonePreferenceServerListFilter的实现是Spring Cloud Ribbon中对Netflix Ribbon的扩展实现,其他都是Netflix Ribbon中的原生实现类。下面我们这些类的特点。

AbstractServerListFilter

package com.netflix.loadbalancer;

public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {

    private volatile LoadBalancerStats stats;
    
    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }
    
    public LoadBalancerStats getLoadBalancerStats() {
        return stats;
    }

}

 该类是一个抽象过滤器,在这里定义了过滤时需要的一个重要依据对象LoadBalancerStats,该对象存储了关于负载均衡器的一些属性和统计信息等。

ZoneAffinityServerListFilter

 该过滤器基于区域感知(Zone Affinity)的方式实现服务实例的过滤,它会根据提供服务的实例所处的区域(Zone)与消费者自身所处区域(Zone)进行比较,过滤掉那些不是同处一个区域的实例。

public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }

 从上面的源码可以看出,对于服务实例列表的过滤是通过Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())来实现的,其中判断依据由ZoneAffinityPredicate实现服务实例与消费者的Zone比较。

 在比较过后,并不是立即返回过滤之后的ServerList。而是通过shouldEnableZoneAffinity方法来判断是否要启用区域感知的功能。下面看一下shouldEnableZoneAffinity的实现:

private boolean shouldEnableZoneAffinity(List<T> filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }

 通过查看源码可以看出,它调用了LoadBalancerStats的getZoneSnapshot方法来获取这些过滤后的同区域实例的基础指标(包含实例数量、断路由器断开数、活动请求数、实例平均负载等),然后根据一系列的算法求出下面的几个评价值并与设置的阀值进行对比,如果有一个条件符合,就不启用区域感知过滤的服务实例清单。

 上述算法实现为集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障。

  1. blackOutServerPercentage:故障实例百分比(断路由器断开数/实例数量)>=0.8
  2. activeReqeustsPerServer:实例平均负载>=0.6
  3. availableServers:可用实例数量(实例数量-断路器断开数)<2

DefaultNIWSServerListFilter

 该过滤器完全继承自ZoneAffinityServerListFilter,是默认的NIWS(Netflix Internal Web Service)过滤器。

ServerListSubsetFilter

 该过滤器继承自ZoneAffinityServerListFilter,适合拥有大规模服务集群(上百或更多)的系统。该过滤器可以产生一个区域感知结果的子集列表,同时还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性地从服务实例列表中剔除那些相对不够健康的实例。该过滤器的实现主要有以下三步:

1.获取区域感知的过滤结果,作为候选的服务实例清单。

2.从当前消费者维护的服务实例子集中剔除那些相对不够健康的实例(同时将这些实例从候选清单中剔除,防止第三步的时候又被选入),不健康的标准如下:

 a. 服务实例的并发连接数超过客户端配置的值,默认为0,配置参数为<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold

 b. 服务实例的失败数超过客户端配置的值,默认为0,配置参数为<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold。

 c. 如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为10%,配置参数为<clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent,那么就先对剩下的实例列表进行健康排序,再从最不健康的实例进行剔除,直到达到配置的剔除百分比。

3.在完成剔除后,清单已经少了至少10%的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,默认的实例自己数量为20,配置参数为<clientName>.<nameSpace>.ServerListSubsetFilter.size。

ZonePreferenceServerListFilter

 Spring Cloud整合时新增的过滤器。若使用Spring Cloud整合Eureka和Ribbon时会默认使用该过滤器。它实现了通过配置或者Eureka实例元数据的所属区域(Zone)来过滤出同区域的服务实例。下面看一下源码:

@Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }

 通过源码分析可以得出以下几个步骤:

  1. 首先通过父类的ZoneAffinityServerListFilter过滤器来获得区域感知的服务实例列表
  2. 遍历获取的服务实例列表,取出根据消费者配置预设的区域Zone来进行过滤
  3. 过滤的结果如果是空直接返回区域感知的服务实例列表,如果不为空则返回过滤后的结果

ZoneAwareLoadBalancer

 ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。

 在DynamicServerListLoadBalancer中,并没有对chooseServer函数进行重写,因此会采用BaseLoadBalancer中chooseServer,使用RoundRobinRule规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域(Zone)的概念,所以会把所有实例视为一个Zone下的节点看待,这样就会周期性的产生跨区域(Zone)访问的情况,由于跨区域会产生更高的延迟,这些跨区域的实例主要以用来防止区域性故障实现高可用为目的,不能作为常规的访问实例。

 ZoneAwareLoadBalancer可以有效的避免DynamicServerListLoadBalancer的问题。下面我们来看一下是如何避免这个问题的。

首先,在ZoneAwareLoadBalancer中并没有重写setServerList,说明实现服务实例清单的更新主逻辑没有修改。但是ZoneAwareLoadBalancer中重写了setServerListForZones(Map<String, List<Server>> zoneServersMap)函数。

 下面我们先看一下DynamicServerListLoadBalancer中setServerListForZones中的实现:

@Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

 通过分析源码可以看出,setServerListForZones的调用位于更新服务实例清单setServersList函数的最后,在setServerListForZones的实现中,首先获取了LoadBalancerStats对象,然后调用其updateZoneServerMapping方法,下面我们看一下该方法的具体实现:

private ZoneStats getZoneStats(String zone) {
        zone = zone.toLowerCase();
        ZoneStats zs = zoneStatsMap.get(zone);
        if (zs == null){
            zoneStatsMap.put(zone, new ZoneStats(this.getName(), zone, this));
            zs = zoneStatsMap.get(zone);
        }
        return zs;
    }

    public void updateZoneServerMapping(Map<String, List<Server>> map) {
        upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>(map);
        // make sure ZoneStats object exist for available zones for monitoring purpose
        for (String zone: map.keySet()) {
            getZoneStats(zone);
        }
    }

 通过上述源码可以看出,setServerListForZones方法的主要作用是根据按区域(Zone)分组的实例列表,为负载均衡器中的LoadBalancerStats对象创建ZoneStats并放入Map zoneStatsMap集合中,每一个区域对应一个ZoneStats,它用于存储每个Zone的一些状态和统计信息。

 下面我们看一下ZoneAwareLoadBalancer负载均衡器中setServerListForZones方法的实现:

@Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }

 首先创建了一个ConcurrentHashMap<String, BaseLoadBalancer>类型的balancers对象,它将用来存储每个Zone区域对应的负载均衡器。具体的负载均衡器的创建则是在下面的第一个循环中调用getLoadBalancer方法来完成,在创建负载均衡器的时候同时会创建它的规则(如果当前实现中没有IRule,就创建一个AvailabilityFilteringRule规则,如果已经有实例,则克隆一个)。

 在创建完负载均衡器之后马上调用setServersList方法为其设置对应Zone区域的实例清单。

 第二个循环是对Zone区域中实例清单的检查,看看是否有Zone区域下已经没有实例了,是的话就将balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点时,防止过时的Zone区域统计信息干扰具体实例的选择算法。

 下面我们再看一下负载均衡器是如何挑选服务实例,来实现对区域的识别的:

@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

 通过源码可以看出,只有当负载均衡器中维护的实例所属的Zone区域的个数大于1的时候才会执行这里的选择策略,否则还是将使用父类的实现。当Zone区域的个数大于1的时候,它的实现步骤如下:

1.调用ZoneAvoidanceRule中的静态方法createSnapshot(lbStats),为当前负载均衡器中所有的Zone区域分别创建快照,保存在在Map zoneSnapshot中,这些快照中的数据将用于后续的算法。

2.调用ZoneAvoidanceRule中的静态方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),来获取可用的Zone区域集合,在该函数中会通过Zone区域快照中的统计数据来实现可用区的挑选

 a.首先会剔除符合这些规则的Zone区域:所属实例数为0的Zone区域;Zone区域内实例的平均负载小于0,或者实例故障率(断路由器断开次数/实例数)大于等于阀值(默认值为0.99999)

 b.然后根据Zone区域的实例平均负载计算出最差的Zone区域,这里的最差指的是实例平均负载最高的Zone区域

 c.如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小于阀值(默认20%),就直接返回所有Zone区域为可用区域。否则,从最坏Zone区域集合中随机选择一个,将它从可用Zone区域集合中剔除。

3.当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机选择一个Zone区域

4.在确定了某个Zone区域后,则获取了对应Zone区域的负载均衡器,并调用chooseServer来选择具体的服务实例,而在chooseServer中将使用IRule接口的choose方法来选择具体的服务实例。在这里,IRule接口的实现会采用ZoneAvoidanceRule来挑选具体的服务实例。

后续

后面会介绍负载均衡策略的源码分析,请继续关注!!!

相关推荐