Spring Cloud Ribbon

客户端负载均衡Spring Cloud Ribbon

 Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,基于Netflix Ribbon实现。

目录

  1. 客户端负载均衡(本文重点)
  2. 源码分析(本文重点)
  3. 负载均衡器
  4. 负载均衡策略
  5. 配置详解
  6. 自动化配置

客户端负载均衡

 负载均衡是对系统的高可用、网络压力的缓解和处理内容扩容的重要手段之一。

 负载均衡可以分为客户端负载均衡和服务端负载均衡。

 负载均衡按设备来分为硬件负载均衡和软件负载均衡,都属于服务端负载均衡。

 硬件负载均衡主要通过在服务器节点之间安装专门用于负载均衡的设备,例如F5等。

 软件负载均衡通过在服务器上安装一些具有负载均衡功能或模块的软件来完成请求的转发工作,例如Nginx等。

 硬件负载均衡和软件负载均衡都会维护一个可用的服务清单,然后通过心跳检测来剔除故障节点以保证服务清单中的节点都正常可用。当客户端发出请求时,负载均衡器会按照某种算法(线性轮询、按权重负载、按流量负载等)从服务清单中取出一台服务器的地址,然后将请求转发到该服务器上。

 客户端负载均衡需要客户端自己维护自己要访问的服务实例清单, 这些服务清单来源于注册中心(在使用Eureka进行服务治理时)。

基于Spring Cloud Ribbon实现客户端负载均衡

 基于Spring Cloud Ribbon实现客户端负载均衡非常简单,主要由以下步骤:

  1. 服务提供者需要启动多个服务实例并注册到一个或多个相关联的服务注册中心上;
  2. 服务消费者直接通过带有@LoadBalanced注解的RestTemplate向服务提供者发送请求以实现客户端的负载均衡。

源码分析

 既然Ribbon客户端负载均衡需要为RestTemplate增加@LoadBalanced注解,那么下面我们就从这个注解开始分析。

@LoadBalanced

 通过该注解的头部注释可以得知,该注解的作用是使用LoadBalancerClient来对RestTemplate进行配置,下面接着看LoadBalancerClient

LoadBalancerClient

 该类是一个接口类,代码如下:

package org.springframework.cloud.client.loadbalancer;

import org.springframework.cloud.client.ServiceInstance;

import java.io.IOException;
import java.net.URI;

public interface LoadBalancerClient extends ServiceInstanceChooser {
    
    /**
     * 注意该方法是从父类ServiceInstanceChooser接口中继承过来的
     */
    ServiceInstance choose(String serviceId);

    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

    URI reconstructURI(ServiceInstance instance, URI original);
}

 从接口的方法和注释可以看出,一个客户端负载均衡器应该具有以下几种重要功能:

  1. ServiceInstance choose(String serviceId) : 根据传入的服务名称,从负载均衡器中挑选一个对应服务的实例。
  2. <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException : 使用从负载均衡器中挑选出的服务实例来执行请求内容。
  3. URI reconstructURI(ServiceInstance instance, URI original) : 构建一个符合host:port格式的URI。在分布式系统中,我们都使用逻辑上的服务名作为host来构建URI(代替服务实例的host:port形式)进行请求。在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,后者URI对象则是使用逻辑服务名的URI,返回的URI是根据这两者转换后的host:port形式的URI。

 通过分析org.springframework.cloud.client.loadbalancer包中的类,可以找出org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration是实现客户端负载均衡的自动配置类

LoadBalancerAutoConfiguration

 从LoadBalancerAutoConfiguration类的头部注解上可以看出,Ribbon实现客户端负载均衡的自动配置需要满足下面两个条件:

  1. @ConditionalOnClass(RestTemplate.class) : RestTemplate类必须存在于当前工程的环境中
  2. @ConditionalOnBean(LoadBalancerClient.class) : 在Spring的Bean工程中必须要有LoadBalancerClient的实现Bean

 在该自动化配置的任务中,主要完成以下三个任务:

  1. 创建一个LoadBalancerInterceptor实例,用于客户端发起请求时进行拦截,进而实现客户端的负载均衡
  2. 创建一个RestTemplateCustomizer实例,用于给RestTemplate增加LoadBalancerInterceptor拦截器
  3. 维护一个被@LoadBalanced注解修饰的RestTemplate集合(List<RestTemplate>),并在这里进行初始化,通过调用RestTemplateCustomizer实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器

 通过上面可以看出,真正实现客户端负载均衡是因为有LoadBalancerInterceptor拦截器的存在,那么下面看一下LoadBalancerInterceptor类

LoadBalancerInterceptor

package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.Assert;

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}
package org.springframework.cloud.client.loadbalancer;

import java.util.List;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpResponse;

public class LoadBalancerRequestFactory {

    private LoadBalancerClient loadBalancer;
    private List<LoadBalancerRequestTransformer> transformers;

    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
            List<LoadBalancerRequestTransformer> transformers) {
        this.loadBalancer = loadBalancer;
        this.transformers = transformers;
    }

    public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
            final byte[] body, final ClientHttpRequestExecution execution) {
        return instance -> {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);
        };
    }
}
package org.springframework.cloud.client.loadbalancer;

import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.support.HttpRequestWrapper;

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

 拦截器的intercept方法还用到另外两个类LoadBalancerRequestFactory和ServiceRequestWrapper。

 当使用被@LoadBalanced注解的RestTemplate发送请求时,会被LoadBalancerInterceptor拦截器拦截,执行LoadBalancerInterceptor的intercept方法,最终在该方法中选择合适的服务实例通过LoadBalancerClient的execute方法进行调用。

 因为LoadBalancerClient是抽象的负载均衡器接口,下面我们可以通过一个具体的负载均衡器RibbonLoadBalancerClient来进行分析。

RibbonLoadBalancerClient

package org.springframework.cloud.netflix.ribbon;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;

import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import static org.springframework.cloud.netflix.ribbon.RibbonUtils.updateToSecureConnectionIfNeeded;

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if(serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer)serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

}

 分析execute(String serviceId, LoadBalancerRequest<T> request)方法的执行步骤:

 1.获取负载均衡器,默认的负载均衡器是ZoneAwareLoadBalancer,这个后面再讲

 2.获取具体的服务实例,这里并没有调用LoadBalancerClient的ServiceInstance choose(String serviceId)方法,而是调用的ILoadBalancer的Server chooseServer(Object key)方法来获取具体的服务实例,想知道ILoadBalancer干啥用,请看下面的接口介绍

 3.将获取到的具体服务实例包装成RibbonServer对象(该对象存储了服务实例信息、服务名serviceId、是否需要https等其他信息),最后调用LoadBalancerRequest的apply方法,向一个实际的的具体服务实例发起请求,请看下面LoadLoadBalancerRequest的分析

ILoadBalancer

package com.netflix.loadbalancer;

import java.util.List;

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);
    
    public Server chooseServer(Object key);
    
    public void markServerDown(Server server);
    
    public List<Server> getReachableServers();

    public List<Server> getAllServers();
}

 该类也是负载均衡器的一个抽象接口,主要定义了一系列的抽象操作:

  1. void addServers(List<Server> newServers) : 向负载均衡器维护的服务实例列表中增加服务实例
  2. Server chooseServer(Object key) : 根据某种负载策略,从负载均衡器中挑选一个具体的服务实例
  3. void markServerDown(Server server) : 通知和标识负载均衡器中的某个具体的服务实例已经停止服务,防止负载均衡器在下一次获取服务实例清单前认为该服务实例是正常服务
  4. List<Server> getReachableServers() : 获取当前正常的服务实例列表
  5. List<Server> getAllServers() : 获取所有已知的服务实例列表

 该接口中使用到的Server对象定义是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括host、port以及一些部署信息等。

 整理该接口的实现类主要有:

  1. AbstractLoadBalancer(抽象类)
  2. BaseLoadBalancer:继承自AbstractLoadBalancer
  3. DynamicServerListLoadBalancer:继承自BaseLoadBalancer
  4. NoOpLoadBalancer:继承自AbstractLoadBalancer
  5. ZoneAwareLoadBalancer:继承自DynamicServerListLoadBalancer

 默认使用的负载均衡器是ZoneAwareLoadBalancer,要想知道这个结果很简单,查看一下RibbonClientConfiguration配置类,该类中有一个方法如下:

@Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

 此处的代码逻辑如果没有配置负载均衡器,那么默认的负载均衡器就是ZoneAwareLoadBalancer。

LoadLoadBalancerRequest

 该类是一个抽象的接口,只有一个apply(ServiceInstance instance)方法,先看一下ServiceInstance类。

ServiceInstance

 该接口暴露了服务实例的一些基本信息,如:serviceId、port、host等,源码如下:

package org.springframework.cloud.client;

import java.net.URI;
import java.util.Map;

public interface ServiceInstance {

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();
    
    default String getScheme() {
        return null;
    }
}

 上面提及的RibbonServer是ServiceInstance的一个具体实现,查看源码可以知道RibbonServer除了包含Server对象之外,还存储了服务名、是否使用HTTPS以及一个Map类型的元数据集合。

 看完上面两个类,我们来看一下具体的apply方法的实现,源码中使用Lamada表达式实现,下面我只抽出具体的功能实现的源码如下:

HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
            if (transformers != null) {
                for (LoadBalancerRequestTransformer transformer : transformers) {
                    serviceRequest = transformer.transformRequest(serviceRequest, instance);
                }
            }
            return execution.execute(serviceRequest, body);

 在apply方法中,首先将HttpRequest包装成ServiceRequestWrapper对象,下面看一下ServiceRequestWrapper类,源码如下:

package org.springframework.cloud.client.loadbalancer;

import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.support.HttpRequestWrapper;

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
                                 LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(
                this.instance, getRequest().getURI());
        return uri;
    }
}

 在ServiceRequestWrapper类中重写了getURI方法,重写后的该方法通过调用LoadBalancerClient中的reconstructURI方法来构建一个host:port形式的URI对外发起请求。

 在apply方法的最后在调用ClientHttpRequestExecution的execute方法时,实际会去执行InterceptingClientHttpRequest类下面的InterceptingRequestExecution的execute方法,该方法的具体代码如下:

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
                ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            }
            else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                        streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                    }
                    else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
                return delegate.execute();
            }
        }
    }

 分析上面的代码可以看出,在调用requestFactory.createRequest(request.getURI(), method)创建请求时会调用getURI方法,此时调用的getURI方法就是ServiceRequestWrapper类中的getURI方法,进而调用LoadBalancerClient中的reconstructURI方法,下面我们看一下RibbonLoadBalancerClient中的reconstructURI是如何实现的

public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        } else {
            server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }

 分析上面的代码,首先根据serviceId从SpringClientFactory对象中获取serviceId对应的负载均衡器的上下文RibbonLoadBalancerContext对象,然后再使用该上下文对象的reconstructURIWithServer方法和server对象来构建具体的URI。

 备注:

  1. SpringClientFactory : 一个用来创建客户端负载均衡器的工厂类,该工厂类会为每一个不同名的Ribbon客户端生成不同的上下文
  2. RibbonLoadBalancerContext : LoadBalancerContext的子类,该类用于存储一些被负载均衡器使用的上下文内容和API操作。

 关于LoadBalancerContext类中的reconstructURIWithServer方法是如何组装host:port形式的逻辑很容易理解,我们就不在这里解释了,有兴趣的可以自己去看一下。

小结

 使用被@LoadBalanced注解的RestTemplate发起请求时,会被LoadBalancerInterceptor拦截,然后借助负载均衡器LoadBalancerClient将逻辑服务名转换为host:port的具体的服务实例地址,在使用RibbonLoadBalancerClient(Ribbon实现的负载均衡器)时实际使用的是Ribbon中定义的ILoadBalancer,默认自动化配置的负载均衡器是ZoneAwareLoadBalancer。

代码地址

https://gitee.com/petterheng/spring-cloud-eureka

后续

后面会介绍负载均衡器的源码分析,请继续关注!!!

 前往微信公众号阅读文章,点击这里,或者直接扫码关注公众号

Spring Cloud Ribbon

相关推荐