客户端负载均衡Ribbon之源码解析
什么是负载均衡器?
假设有一个分布式系统,该系统由在不同计算机上运行的许多服务组成。但是,当用户数量很大时,通常会为服务创建多个副本。每个副本都在另一台计算机上运行。此时,出现 “Load Balancer(负载均衡器)”。它有助于在服务器之间平均分配传入流量。
服务器端负载均衡器
传统上,Load Balancers(例如Nginx、F5)是放置在服务器端的组件。当请求来自 客户端 时,它们将转到负载均衡器,负载均衡器将为请求指定 服务器 。负载均衡器使用的最简单的算法是随机指定。在这种情况下,大多数负载平衡器是用于控制负载平衡的硬件集成软件。
重点:
- 对客户端不透明,客户端不知道服务器端的服务列表,甚至不知道自己发送请求的目标地址存在负载均衡器。
- 服务器端维护负载均衡服务器,控制负载均衡策略和算法。
客户端负载均衡器
当负载均衡器位于 客户端 时, 客户端 得到可用的服务器列表然后按照特定的负载均衡策略,分发请求到不同的 服务器 。
重点:
- 对客户端透明,客户端需要知道服务器端的服务列表,需要自行决定请求要发送的目标地址。
- 客户端维护负载均衡服务器,控制负载均衡策略和算法。
- 目前单独提供的客户端实现比较少( 我用过的只有Ribbon),大部分都是在框架内部自行实现。
Ribbon
简介
Ribbon是Netflix公司开源的一个客户单负载均衡的项目,可以自动与 Eureka 进行交互。它提供下列特性:
- 负载均衡
- 容错
- 以异步和反应式模型执行多协议 (HTTP, TCP, UDP)
- 缓存和批量
Ribbon中的关键组件
- ServerList:可以响应客户端的特定服务的服务器列表。
- ServerListFilter:可以动态获得的具有所需特征的候选服务器列表的过滤器。
- ServerListUpdater:用于执行动态服务器列表更新。
- Rule:负载均衡策略,用于确定从服务器列表返回哪个服务器。
- Ping:客户端用于快速检查服务器当时是否处于活动状态。
- LoadBalancer:负载均衡器,负责负载均衡调度的管理。
源码分析
LoadBalancerClient
实际应用中,通常将 RestTemplate 和 Ribbon 结合使用,例如:
@Configuration public class RibbonConfig { @Bean @LoadBalanced RestTemplate restTemplate() { return new RestTemplate(); } }
消费者调用服务接口:
@Service public class RibbonService { @Autowired private RestTemplate restTemplate; public String hi(String name) { return restTemplate.getForObject("http://service-hi/hi?name="+name,String.class); } }
@LoadBalanced,通过源码可以发现这是一个标记注解:
/** * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient * @author Spencer Gibb */ @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Qualifier public @interface LoadBalanced { }
通过注释可以知道@LoadBalanced注解是用来给RestTemplate做标记,方便我们对RestTemplate添加一个LoadBalancerClient,以实现客户端负载均衡。
根据spring boot的自动配置原理,可以知道同包下的 LoadBalancerAutoConfiguration ,应该是实现客户端负载均衡器的自动化配置类。代码如下:
@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } }); } @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, transformers); } @Configuration @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } @Configuration @ConditionalOnClass(RetryTemplate.class) public static class RetryAutoConfiguration { @Bean @ConditionalOnMissingBean public LoadBalancedRetryFactory loadBalancedRetryFactory() { return new LoadBalancedRetryFactory() {}; } } @Configuration @ConditionalOnClass(RetryTemplate.class) public static class RetryInterceptorAutoConfiguration { @Bean @ConditionalOnMissingBean public RetryLoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory) { return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory); } @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final RetryLoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } }
从代码可以看出,这个类作用主要是使用RestTemplateCustomizer对所有标注了@LoadBalanced的RestTemplate Bean添加了一个LoadBalancerInterceptor拦截器,而这个拦截器的作用就是对请求的URI进行转换获取到具体应该请求哪个服务实例。
那再看看添加的拦截器LoadBalancerInterceptor的代码,如下:
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; private LoadBalancerRequestFactory requestFactory; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } }
从代码可以看出 LoadBalancerInterceptor 拦截了请求后,通过LoadBalancerClient执行具体的请求发送。
打开LoadBalancerClient,发现它是一个接口:
public interface LoadBalancerClient { ServiceInstance choose(String serviceId); <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; URI reconstructURI(ServiceInstance instance, URI original); }
接口说明:
- ServiceInstance choose(String serviceId):根据传入的服务id,从负载均衡器中为指定的服务选择一个服务实例。
- T execute(String serviceId, LoadBalancerRequest request):根据传入的服务id,指定的负载均衡器中的服务实例执行请求。
- T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request):根据传入的服务实例,执行请求。
LoadBalancerClient 有一个唯一的实现类 RibbonLoadBalancerClient,关键代码如下:
public class RibbonLoadBalancerClient implements LoadBalancerClient { public ServiceInstance choose(String serviceId) { Server server = this.getServer(serviceId); return server == null ? null : new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server)); } public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId); Server server = this.getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } else { RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server)); return this.execute(serviceId, ribbonServer, request); } } protected Server getServer(String serviceId) { return this.getServer(this.getLoadBalancer(serviceId)); } protected Server getServer(ILoadBalancer loadBalancer) { return loadBalancer == null ? null : loadBalancer.chooseServer("default"); } protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } //省略... }
负载均衡器
从 RibbonLoadBalancerClient 代码可以看出,实际负载均衡的是通过 ILoadBalancer 来实现的。
ILoadBalancer 接口代码如下:
public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
接口说明:
- addServers:向负载均衡器中添加一个服务实例集合。
- chooseServer:跟据key,从负载均衡器获取服务实例。
- markServerDown:用来标记某个服务实例下线。
- getReachableServers:获取可用的服务实例集合。
- getAllServers():获取所有服务实例集合,包括下线的服务实例。
ILoadBalancer 的实现 依赖关系示意图如下:
- NoOpLoadBalancer:啥都不做
- BaseLoadBalancer:
- 一个负载均衡器的基本实现,其中有一个任意列表,可以将服务器设置为服务器池。
- 可以设置一个ping来确定服务器的活力。
- 在内部,该类维护一个“all”服务器列表,以及一个“up”服务器列表,并根据调用者的要求使用它们。
- DynamicServerListLoadBalancer:
- 通过动态的获取服务器的候选列表的负载平衡器。
- 可以通过筛选标准来传递服务器列表,以过滤不符合所需条件的服务器。
- ZoneAwareLoadBalancer:
- 用于测量区域条件的关键指标是平均活动请求,它根据每个rest客户机和每个区域聚合。这是区域内未完成的请求总数除以可用目标实例的数量(不包括断路器跳闸实例)。当在坏区上缓慢发生超时时,此度量非常有效。
- 该负载均衡器将计算并检查所有可用区域的区域状态。如果任何区域的平均活动请求已达到配置的阈值,则该区域将从活动服务器列表中删除。如果超过一个区域达到阈值,则将删除每个服务器上活动请求最多的区域。一旦去掉最坏的区域,将在其余区域中选择一个区域,其概率与其实例数成正比。服务器将使用给定的规则从所选区域返回。对于每个请求,将重复上述步骤。也就是说,每个与区域相关的负载平衡决策都是实时做出的,最新的统计数据可以帮助进行选择。
那么在整合Ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通过RibbonClientConfiguration配置类,可以知道在整合时默认采用了ZoneAwareLoadBalancer来实现负载均衡器。
@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { return (ILoadBalancer)(this.propertiesFactory .isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory .get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); }
从这段代码 ,也可以看出,负载均衡器所需的主要配置项是IClientConfig, ServerList, ServerListFilter, IRule, IPing, ServerListUpdater。下面逐一分析他们。
IClientConfig
IClientConfig 用于对客户端或者负载均衡的配置,它的默认实现类为 DefaultClientConfigImpl。
IRule
为LoadBalancer定义“负载均衡策略”的接口。
public interface IRule{ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
IRule 的实现 依赖关系示意图如下:
- BestAvailableRule:选择具有最低并发请求的服务器。
- ClientConfigEnabledRoundRobinRule:轮询。
- RandomRule:随机选择一个服务器。
- RoundRobinRule:轮询选择服务器。
- RetryRule:具备重试机制的轮询。
- WeightedResponseTimeRule:根据使用平均响应时间去分配一个weight(权重) ,weight越低,被选择的可能性就越低。
- ZoneAvoidanceRule:根据区域和可用性筛选,再轮询选择服务器。
IPing
定义如何 “ping” 服务器以检查其是否存活。
public interface IPing { public boolean isAlive(Server server); }
IPing 的实现 依赖关系示意图如下:
- PingUrl:真实的去ping 某个url,判断其是否alive。
- PingConstant:固定返回某服务是否可用,默认返回true,即可用
- NoOpPing:不去ping,直接返回true,即可用。
- DummyPing:继承抽象类AbstractLoadBalancerPing,认为所以服务都是存活状态,返回true,即可用。
- NIWSDiscoveryPing:结合eureka使用时,如果Discovery Client在线,则认为心跳检测通过。
ServerList
定义获取所有的服务实例清单。
public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); public List<T> getUpdatedListOfServers(); }
ServerList 的实现 依赖关系示意图如下:
- DomainExtractingServerList:代理类,根据传入的ServerList的值,实现具体的逻辑。
- ConfigurationBasedServerList:从配置文件中加载服务器列表。
- DiscoveryEnabledNIWSServerList:从Eureka注册中心中获取服务器列表。
- StaticServerList:通过静态配置来维护服务器列表。
ServerListFilter
允许根据过滤配置动态获得的具有所需特性的候选服务器列表。
public interface ServerListFilter<T extends Server> { public List<T> getFilteredListOfServers(List<T> servers); }
ServerListFilter 的实现 依赖关系示意图如下:
- DefaultNIWSServerListFilter:完全继承自ZoneAffinityServerListFilter。
- ZonePreferenceServerListFilter:EnableZoneAffinity 或 EnableZoneExclusivity 开启状态使用,默认关闭。处理基于区域感知的过滤服务器,过滤掉不和客户端在相同zone的服务,若不存在相同zone,则不进行过滤。
- ServerListSubsetFilter:服务器列表筛选器,它将负载平衡器使用的服务器数量限制为所有服务器的子集。如果服务器机群很大(例如数百个),并且不需要使用每一个机群并将连接保存在http客户机的连接池中,那么这是非常有用的。它还可以通过比较总的网络故障和并发连接来驱逐相对不健康的服务器。
ServerListUpdater
用于执行动态服务器列表更新。
public interface ServerListUpdater { public interface UpdateAction { void doUpdate(); } void start(UpdateAction updateAction); void stop(); String getLastUpdate(); long getDurationSinceLastUpdateMs(); int getNumberMissedCycles(); int getCoreThreads(); }
ServerListUpdater 的实现 依赖关系示意图如下:
- PollingServerListUpdater:默认的实现策略,会启动一个定时线程池,定时执行更新策略。
- EurekaNotificationServerListUpdater:利用Eureka的事件监听器来驱动服务列表的更新操作。
欢迎工作一到五年的Java工程师朋友们加入Java程序员开发: 721575865
群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!