SpringCloud微服务实战(一)

一 Spring Cloud简介

Spring Cloud是一个基千SpringBoot实现的微服务架构开发 工具。它为微服务架构中
涉及的 配置管理、服务治理、 断路器、 智能路由、微代理、 控制总线、 全局锁、 决策竞选、
分布式会话和集群状态管理等操作提供了一种简单的开发方式。

Spring Cloud包含了多个子项目,如下所示:

  1. Spring Cloud Config: 配置管理工具, 支持使用Git存储配置内容, 可以使用它实现
    应用配置的外部化存储,并支持客户端配置信息刷新、 加密/解密配置内容等。
  2. Spring Cloud Netflix: 核心 组件,对多个Netflix OSS开源套件进行整合。
    • Eureka: 服务治理组件, 包含服务注册中心、 服务注册与发现机制的实现。
    • Hystrix: 容错管理组件,实现断路器模式,帮助服务依赖中出现的延迟和为故障
      提供强大的容错能力。
    • Ribbon: 客户端负载均衡的服务调用组件。
    • Feign: 基于Ribbon 和 Hystrix 的声明式服务调用组件。
    • Zuul: 网关组件,提供智能路由、 访问过滤等功能。
    • Archaius: 外部化配置组件。
  3. Spring Cloud Bus: 事件、 消息总线, 用于传播集群中的状态变化或事件, 以触发后
    续的处理, 比如用来动态刷新配置等。
  4. Spring Cloud Cluster: 针对 ZooKeeper、 Redis、 Hazelcast、 Consul 的选举算法和通用
    状态模式的实现。
  5. Spring Cloud Cloudfoundry: 与 Pivotal Cloudfoundry 的整合支持。
  6. Spring Cloud Consul: 服务发现与配置管理工具。
  7. Spring Cloud Stream: 通过 Redis、 Rabbit 或者 Kafka 实现的消费微服务, 可以通过
    简单的声明式模型来发送和接收消息。
  8. Spring Cloud A WS: 用千简化整合 Amazon Web Service 的组件。
  9. Spring Cloud Security: 安全工具包, 提供在 Zuul 代理中对 0Auth2 客户端请求的中
    继器。
  10. Spring Cloud Sleuth: Spring Cloud 应用的分布式跟踪实现, 可以完美整合 Zipkin。
  11. Spring Cloud ZooKeeper: 基于 ZooKeeper 的服务发现与配置管理组件。
  12. Spring Cloud Starters: Spring Cloud 的基础组件, 它是基于Spring Boot 风格项目的
    基础依赖模块。
  13. Spring Cloud CLI: 用于在 Groovy 中快速创建 Spring Cloud 应用的 Spring Boot CLI
    插件。

二 服务治理: Spring Cloud Eureka

Spring Cloud Eureka 是 Spring Cloud Netflix 微服务套件中的一部分, 它基于 Netflix
Eureka 做了二次封装, 主要负责完成微服务架构中的服务治理功能。 Spring Cloud 通过为
Eureka 增加了 Spring Boot 风格的自动化配置,我们只需通过简单引入依赖和注解配置就能
让 Spring Boot 构建的微服务应用轻松地与 Eureka 服务治理体系进行整合。

2.1 服务治理

为了解决微服务架构中的服务实例维护问题, 产生了大量的服务治理框架和产品。 这
些框架和产品的实现都围绕着服务注册与服务发现机制来完成对微服务应用实例的自动化
管理。

  1. 服务注册
    在服务治理框架中, 通常都会构建一个注册中心, 每个服务单元向注册中心登记自己提供的服务, 将主机与端口号、 版本号、 通信协议等一些附加信息告知注册中心, 注册中心按服务名分类组织服务清单。

  2. 服务发现
    由于在服务治理框架下运作, 服务间的调用不再通过指定具体的实例地址来实现, 而是通过向服务名发起请求调用实现。 所以,服务调用方在调用服务提供方接口的时候, 并不知道具体的服务实例位置。 因此, 调用方需要向服务注册中心咨询服务, 并获取所有服务的实例清单, 以实现对具体服务实例的访问。

2.2 Netflix Eureka

Spring Cloud Eureka, 使用Netflix Eureka来实现服务注册与发现, 它既包含了服务端组件,也包含了客户端组件,并且服务端与客户端均采用Java编写,所以Eureka主要适用于通过Java实现的分布式系统,或是与JVM兼容语言构建的系统。

Eureka服务端,我们也称为服务注册中心。 它同其他服务注册中心一样,支持高可用配置。它依托于强一致性提供良好的服务实例可用性,可以应对多种不同的故障场景。 如果Eureka以集群模式部署,当集群中有分片出现故障时,那么Eureka就转入自我保护模式。它允许在分片故障期间继续提供服务的发现和注册,当故障分片恢复运行时, 集群中的其他分片会把它们的状态再次同步回来。

Eureka客户端,主要处理服务的注册与发现。客户端服务通过注解和参数配置的方式,嵌入在客户端应用程序的代码中,在应用程序运行时,Eureka客户端向注册中心注册自身提供的服务并周期性地发送心跳来更新它的服务租约。同时,它也能从服务端查询当前注册的服务信息并把它们缓存到本地并周期性地刷新服务状态。

2.2.1 服务提供者

服务注册

“服务提供者” 在启动的时候会通过发送REST请求的方式将自己注册到EurekaServer上, 同时带上了自身服务的一些元数据信息。Eureka Server接收到这个REST请求之后,将元数据信息存储在一个双层结构Map中, 其中第一层的key是服务名, 第二层的key是具体服务的实例名

在服务注册时, 需要确认一下 eureka.client.register-with-eureka=true参数是否正确, 该值默认为true。 若设置为false将不会启动注册操作。

服务同步

如架构图中所示, 这里的两个服务提供者分别注册到了两个不同的服务注册中心上,也就是说, 它们的信息分别被两个服务注册中心所维护。 此时, 由于服务注册中心之间因互相注册为服务, 当服务提供者发送注册请求到一个服务注册中心时, 它会将该请求转发给集群中相连的其他注册中心, 从而实现注册中心之间的服务同步 。 通过服务同步,两个服务提供者的服务信息就可以通过这两台服务注册中心中的任意一台获取到。

服务续约

在注册完服务之后,服务提供者会维护一个心跳用来持续告诉EurekaServer: "我还活着 ”, 以防止Eureka Server 的 “ 剔除任务 ” 将该服务实例 从服务列表中排除出去, 我们称该操作为服务续约(Renew)。

2.2.2 服务消费者

获取服务

到这里,在服务注册中心已经注册了一个服务,并且该服务有两个实例。当我们启动服务消费者的时候, 它会发送一个REST请求给服务注册中心,来获取上面注册的服务清单。为了性能考虑,Eureka Server会维护一份只读的服务清单来返回给客户端,同时该缓存清单会每隔30秒更新一次。

获取服务是服务消费者的基础,所以必须确保eureka.client.fetch-registry= true参数没有被修改成false, 该值默认为true。若希望修改缓存清单的更新时间,可以通过 eureka.client.registry-fetch-interval-seconds=30参数进行修改,该参数默认值为30, 单位为秒。

服务调用

服务消费者在获取服务清单后,通过服务名可以获得具体提供服务的实例名和该实例的元数据信息。 因为有这些服务实例的详细信息, 所以客户端可以根据自己的需要决定具 体调用哪个实例,在ribbon中会默认采用轮询的方式进行调用,从而实现客户端的负载均衡。

对于访问实例的选择,Eureka中有Region和Zone的概念,一个Region中可以包含多个 Zone, 每个服务客户端需要被注册到一个Zone中,所以每个客户端对应一个Region和一个Zone。 在进行服务调用的时候,优先访问同处一个Zone中的服务提供方,若访问不到,就访问其他的Zone。

服务下线

在系统运行过程中必然会面临关闭或重启服务的某个实例的情况, 在服务关闭期间, 我们自然不希望客户端会继续调用关闭了的实例。 所以在客户端程序中,当服务实例进行正常的关闭操作时, 它会触发一个服务下线的REST请求给Eueka Server, 告诉服务注册中心:“我要下线了”。 服务端在接收到请求之后, 将该服务状态置为下线(DOWN), 并把 该下线事件传播出去。

2.2.3 服务注册中心

失效剔除

有些时候, 我们的服务实例并不一定会正常下线, 可能由于内存溢出、 网络故障等原因使得服务不能正常工作, 而服务注册中心并未收到 “ 服务下线 ” 的请求。 为了从服务列表中将这些无法提供服务的实例剔除, Eureka Srevre 在启动的时候会创建一个定时任务, 默认每隔一段时间(默认为60秒) 将当前清单中超时(默认为90秒)没有续约的服务剔除出去。

自我保护

服务注 册到EurekaSrever 之后,会维护一个心跳连接,告诉EurekaServer自己还活着。EurekaServer 在运行期间,会统计心跳失败的比例在15分钟之内是否低于85%, 如果出现低于的情况,Eureka Server会将当前的实例注册信息保护起来,让这些实例不会过期,尽可能保护这些注册信息。但是 在这段保护期间内实例若出现问题,那么客户端很容易拿到实际已经不存在的服务实例,会出现调用失败的清况,所以客户端必须要有容错机制,比如可以使用请求重试、 断路器等机制。

由于本地调试很容易触发注册中心的保护机制, 这会使得注册中心维护的服务实例不那么准确。 所以, 我们在本地进行开发的时候, 可以使用eureka.server.enable­-self-preservervation=false参数来关闭保护机制, 以确保注册中心可以将不可用的实例正确剔除。

2.3 源码分析

首先,对于服务注册中心、服务提供者、服务消费者这三个主要元素来说,后两者(也就是 Eureka 客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中 心主要是处理请求的接收者。所以,我们可以从Eureka的客户端作为入口看看它是如何完 成这些主动通信行为的。

我们在将一个普通的 Spring Boot 应用注册到 Eureka Server 或是从 Eureka Server 中获取服务列表时, 主要就做了两件事:

  • 在应用主类中配置了@EnableDiscoveryClient注解。
  • 在application.properties中用eureka.client.serviceUrl.defaultZone参数指定了服务注册中心的位置。

我们来看看@EnableDiscoveryClient 的源码, 具体如下:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({EnableDiscoveryClientImportSelector.class})
public @interface EnableDiscoveryClient {
    boolean autoRegister() default true;
}

它主要用来开启DiscoveryClient 的实例。通过搜索DiscoveryClient, 我们可以发现有 个类和一个接口。 通过梳理可以得到如下图所示的关系:

其中, 左边的org.springframework.cloud.client.discovery.DiscoveryClient 是Spring Cloud的接口,它定义了用来发现服务的常用抽象方法, 通过该接口可以有效地 屏蔽服务治理的实现细节, 所以使用 Spring Cloud 构建的微服务应用可以方便地切换不同服务治理框架, 而不改动程序代码, 只需要另外添加一些针对服务治理框架的配置即可。

org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是对该接口的实现,从命名来判断, 它实现的是对 Eureka 发现服务的封装。 所以 EurekaDiscoveryClient 依赖了 Netflix Eureka 的 com.netflix.discovery. EurekaClient 接口, EurekaClient 继承了 LookupService 接口, 它们都是Netflix 开源包中的内容, 主要定义了针对Eureka的发现服务的抽象方法, 而真正实现发现服务的 则是Netflix 包中的 com.netflix.discovery.DiscoveryClient类。

EurekaClient负责下面的任务:

  • 向Eureka Server注册服务实例
  • 向Eureka Server服务租约
  • 当服务关闭期间, 向Eureka Server取消租约
  • 查询Eureka Server中的服务实例列表

在具体研究 Eureka Client负责完成的任务之前,我们先看看在哪里对 Eureka Server的URL列表进行配置。根据我们配置的属性名eureka.client.serviceUrl.defaultZone, 通过serviceUrl可以找到该属性相关的加载属性,但是在 SR5 版本中它们都被 @Deprecated 标注为不再建议使用,并@link到了替代类com.netflix.discovery.endpoint.EndpointUtils, 所以我们可以在该类中找到下面这个函数:

public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
    List<String> orderedUrls = new ArrayList();
    String region = getRegion(clientConfig);
    String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
    if (availZones == null || availZones.length == 0) {
        availZones = new String[]{"default"};
    }
    int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
    List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
    if (serviceUrls != null) {
        orderedUrls.addAll(serviceUrls);
    }
    int currentOffset = myZoneOffset == availZones.length - 1 ? 0 : myZoneOffset + 1;
    while(currentOffset != myZoneOffset) {
        serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
        if (serviceUrls != null) {
            orderedUrls.addAll(serviceUrls);
        }
        if (currentOffset == availZones.length - 1) {
            currentOffset = 0;
        } else {
            ++currentOffset;
        }
    }
    if (orderedUrls.size() < 1) {
        throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
    } else {
        return orderedUrls;
    }
}

2.3.1 Region、Zone

在上面的函数中, 可以发现, 客户端依次加载了两个内容, 第一个是Region, 第二个 是Zone, 从其加载逻辑上我们可以判断它们之间的关系:
通过getRegion函数,我们可以看到它从配置中读取了一个Region返回, 所以一个微服务应用只可以属于 一个Region, 如果不特别配置, 默认为default。若我们要自己设置, 可以通过eureka.client.region属性来定义。

public static String getRegion(EurekaClientConfig clientConfig) {
    String region = clientConfig.getRegion();
    if (region == null) {
        region = "default";
    }
    region = region.trim().toLowerCase();
    return region;
}

通过 getAvailabi让tyZones函数,可以知道当我们没有特别为Region配置Zone的时候,将默认采用defaultZone , 这也是我们之前配置参数 eureka.client.serviceUrl.defaultZone的由来。 若要为应用指定Zone, 可以通过 eureka.client.availability-zones属性来进行设置。从该函数的return内容, 我们可以知道Zone能够设置多个,并且通过逗号分隔来配置。 由此, 我们可以判断Region与Zone是一对多的关系

public String[] getAvailabilityZones(String region) {
    String value = (String)this.availabilityZones.get(region);
    if (value == null) {
        value = "defaultZone";
    }
    return value.split(",");
}

在获取了Region和Zone的信息之后,才开始真正加载 Eureka Server 的具体地址。它根据传入的参数按 一定算法确定加载位于哪一个Zone配置的serviceUris。

int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);

具体获取 serviceUrls 的实现, 我们可以详细查看 getEurekaServerServiceUrls 函数的具体实现类 EurekaClientConfigBean, 该类是 EurekaClientConfig 和 EurekaConstants 接口的实现,用来加载配置文件中的内容。

public List<String> getEurekaServerServiceUrls(String myZone) {
    String serviceUrls = (String)this.serviceUrl.get(myZone);
    if (serviceUrls == null || serviceUrls.isEmpty()) {
        serviceUrls = (String)this.serviceUrl.get("defaultZone");
    }
    if (!StringUtils.isEmpty(serviceUrls)) {
        String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
        List<String> eurekaServiceUrls = new ArrayList(serviceUrlsSplit.length);
        String[] var5 = serviceUrlsSplit;
        int var6 = serviceUrlsSplit.length;
        for(int var7 = 0; var7 < var6; ++var7) {
            String eurekaServiceUrl = var5[var7];
            if (!this.endsWithSlash(eurekaServiceUrl)) {
                eurekaServiceUrl = eurekaServiceUrl + "/";
            }
            eurekaServiceUrls.add(eurekaServiceUrl);
        }
        return eurekaServiceUrls;
    } else {
        return new ArrayList();
    }
}

当我们在微服务应用中使用 Ribbon 来实现服务调用时,Zone 的设置可以在负载均衡时实现区域亲和特性,,Ribbon 的默认策略会优先访问同客户端处于一个Zone中的服务端实例,只有当同一个Zone 中没有可用服务端实例的时候才会访问其他Zone中的实例。所以通过Zone属性的定义,配合实际部署的物理结构,我们就可以有效地设计出对区域性故障的容错集群。

2.3.2 服务注册

在理解了多个服务注册中心信息的加载后,我们再回头看看 DiscoveryClient类是 如何实现服务注册行为的, 通过查看它的构造类,可以找到它调用了下面这个函数:

private void initScheduledTasks() {
    int renewalIntervalInSecs;
    int expBackOffBound;
    if (this.clientConfig.shouldFetchRegistry()) {
        renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
        expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        //服务获取
        this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
    }
    if (this.clientConfig.shouldRegisterWithEureka()) {
        renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: renew interval is: " + renewalIntervalInSecs);
        //维持心跳,服务续约
        this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
        // 创建了一个InstanceinfoReplicator类的实例,它会执行一个定时任务进行服务注册
        this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
        this.statusChangeListener = new StatusChangeListener() {
            public String getId() {
                return "statusChangeListener";
            }
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
                    DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
                } else {
                    DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
                }
                DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
            }
        };
        if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
            this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
        }
        this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

其中创建了一个 InstanceinfoReplicator 类的实例, 它会执行一个定时任务, 而这个定时任务的具体工作可以查看该类的 run() 函数,具体如下所示:

public void run() {
    boolean var6 = false;

    ScheduledFuture next;
    label53: {
        try {
            var6 = true;
            this.discoveryClient.refreshInstanceInfo();
            Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                this.discoveryClient.register();
                this.instanceInfo.unsetIsDirty(dirtyTimestamp);
                var6 = false;
            } else {
                var6 = false;
            }
            break label53;
        } catch (Throwable var7) {
            logger.warn("There was a problem with the instance info replicator", var7);
            var6 = false;
        } finally {
            if (var6) {
                ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
                this.scheduledPeriodicRef.set(next);
            }
        }
        next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
        this.scheduledPeriodicRef.set(next);
        return;
    }
    next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
    this.scheduledPeriodicRef.set(next);
}

相信大家都发现了中scoveryClient.register () ; 这一行,真正触发调用注册的地方就在这里。 继续查看 register ()的实现内容,如下所示:

boolean register() throws Throwable {
    logger.info("DiscoveryClient_" + this.appPathIdentifier + ": registering service...");

    EurekaHttpResponse httpResponse;
    try {
        httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
    } catch (Exception var3) {
        logger.warn("{} - registration failed {}", new Object[]{"DiscoveryClient_" + this.appPathIdentifier, var3.getMessage(), var3});
        throw var3;
    }

    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", "DiscoveryClient_" + this.appPathIdentifier, httpResponse.getStatusCode());
    }

    return httpResponse.getStatusCode() == 204;
}

注册操作也是通过REST请求的方式进行的。同时, 我们能看到发起注册请求的时候, 传入了一个com.neflix.appinfo.Instanceinfo 对象,该对象就是注册时客户端给服务端的服务的元数据。

2.3.3 服务获取和服务续约

我们继续来看 DiscoveryClient 的initScheduledTasks 函 数,不难发现在其中还有两个定时任务, 分别是服务获取服务续约

服务获取任务相对于服务续约和服务注册任务更 为独立。服务续约与服务注册在同一个if逻辑中,这个不难理解,服务注册到 Eureka Server 后,自然需要一个心跳去续约, 防止被剔除, 所以它们肯定是成对出现的。 从源码中, 对于服务续约相关的时间控制参数有两个重要属性, 我们可以关注并根据需要来进行调整:

#用于定义服务续约任务的调用间隔时间,默认为30秒
eureka.instance.lease-renewal-interval-in-seconds=30 
#用于定义服务失效的时间,默认为90秒
eureka.instance.lease-expiration-duration-in-seconds=90

其中 “ 服务续约 ” 的实现较为简单, 直接以REST请求的方式进行续约:

boolean renew() {
   try {
       EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);
       logger.debug("{} - Heartbeat status: {}", "DiscoveryClient_" + this.appPathIdentifier, httpResponse.getStatusCode());
       if (httpResponse.getStatusCode() == 404) {
           this.REREGISTER_COUNTER.increment();
           logger.info("{} - Re-registering apps/{}", "DiscoveryClient_" + this.appPathIdentifier, this.instanceInfo.getAppName());
           return this.register();
       } else {
           return httpResponse.getStatusCode() == 200;
       }
   } catch (Throwable var3) {
       logger.error("{} - was unable to send heartbeat!", "DiscoveryClient_" + this.appPathIdentifier, var3);
       return false;
   }
}

而 “ 服务获取 ” 则复杂一些, 会根据是否是第一次获取发起不同的 REST 请求和相应 的处理。 具体的实现逻辑跟之前类似。

2.3.4 服务注册中心处理

Eureka Server 对于各类 REST 请求的定义都位于 com.netflix.eureka.resources 包下。
以 “服务注册“ 请求为例:

 @POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getIPAddr())) {
        return Response.status(400).entity("Missing ip address").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }

    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

在对注册信息进行了一堆校验之后,会调用org.springframework.cloud. netflix.eureka.server.InstanceRegistry对象中的register(Instanceinfo info, int leaseDuration, boolean isReplication)函数来进行服务注册:

public void register(InstanceInfo info, boolean isReplication) {
    this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
    super.register(info, isReplication);
}

在注册函数中, 先调用handleRegistration中的publishEvent函数,将该新服务注册的事件传播出去, 然 后调用com.netflix.eureka.registry.AbstractlnstanceRegistry父类中的注册实现,将InstanceInfo中的元数据信息存储在 一个ConcurrentHashMap对象中。 正如我们之前所说的, 注册中心存储了两层Map结构, 第一层的key存储服务名:Instancelnfo中的appName属性, 第二层的key存储实例名: Instancelnfo中的 instanceid属性。

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        this.read.lock();
        Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
        EurekaMonitors.REGISTER.increment(isReplication);
        if (gMap == null) {
            ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
            gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
       }
    Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
    //...
    Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
    if (existingLease != null) {
        lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
    }

    ((Map)gMap).put(registrant.getId(), lease);

2.3.5 配置详解

Eureka客户端的配置主要分为以下两个方面。
• 服务注册相关的配置信息, 包括服务注册中心的地址、 服务获取的间隔时间、 可用 区域等。
• 服务实例相关的配置信息, 包括服务实例的名称、IP地址、 端口号、 健康检查路径
等。

服务注册类配置
参数名 说明 默认值
enabled 启用Eureka客户端 true
registryFetchIntervalSeconds 从Eureka服务端获取注册信息的间隔时间,单位为秒 30
instancelnfoReplicationlntervalSeconds 更新实例信息的变化到E田eka服务端的间隔时间, 单位为秒 30
inItiallnstancelnfoRepIicationintervalSeconds 初始化实例信息到Eureka服务端的间隔时间,单位为秒 40
eurekaServiceUrlPolllntervalSeconds 轮询Eureka服务端地址更改的间隔时间,单位为秒 300
eurekaServerReadTimeoutSeconds 读取Eureka Server信息的超时时间, 单位为秒 8
eurekaServerConnectTimeoutSeconds 连接 Eureka Server的超时时间, 单位为秒 5
eurekaServerTotalConnections 从Eureka客户端到所有Eureka服务端的连接总数 200
eurekaServerTotalConnectionsPerHost 从Eureka客户端到每个Eureka服务端主机的连接总数 50
eurekaConnectionldleTimeoutSeconds Eureka服务端连接的空闲关闭时间,单位为秒 30
heartbeatExecutorThreadPoolSize 心跳连接池的初始化线程数 2
heartbeatExecutorExponenttalBackOffBound 心跳超时重试延迟时间的最大乘数值 10
cacheRefreshExecutorThreadPoolSize 缓存刷新线程池的初始化线程数 2
cacheRefreshExecutorExponentialBackOffBound 缓存刷新重试延迟时间的最大乘数值 10
useDnsForFetchmgServerUrls 使用DNS来获取Eureka服务端的serviceUrl false
registerWithEureka 是否要将自身的实例信息注册到Eureka服务端 true
preferSameZoneEureka 是否偏好使用处于相同Zone的Eureka服务端 true
filterOnlyUplnstances 获取实例时是否过滤,仅保留UP状态的实例 true
fetchRegistry 是否从 Eureka服务端获取注册信息 true
服务实例类配置

三 客户端负载均衡: Ribbon

Spring Cloud Ribbon 是一个基于HTTP和TCP的客户端负载均衡工具,它基于 Netflix ribbon实现。 通过SpringCloud的封装,可以让我们轻松地将面向服务的REST模板请求 自动转换成客户端负载均衡的服务调用。

3.1 客户端负载均衡

我们通常所说的负 载均衡都指的是服务端负载均衡,其中分为硬件负载均衡和软件负载均衡。 硬件负载均衡 主要通过在服务器节点之间安装专门用于负载均衡的设备,比如 F5 等;而软件负载均衡则 是通过在服务器上安装一 些具有均衡负载功能或模块的软件来完成请求分发工作, 比如 Nginx 等。 不论采用硬件负载均衡还是软件负载均衡,只要是服务端负载均衡都能以类似 下图的架构方式构建起来:

硬件负载均衡的设备或是软件负载均衡的软件模块都会维护一个下挂可用的服务端清单,通过心跳检测来剔除故障的服务端节点以保证清单中都是可以正常访问的服务端节点。 当客户端发送请求到负载均衡设备的时候 ,该设备按某种算法(比如线性轮询、按权重负载、按流量负载等)从维护的可用服务端清单中取出一台服务端的地址, 然后进行转发。

而客户端负载均衡和服务端负载均衡最大的不同点在千上面所提到的服务清单所存储的位置。 在客户端负载均衡中,所有客户端节点都维护着自己要访问的服务端清单, 而这些 服务端的清单来自于服务注册中心,比如的Eureka服务端。同服务端负载均衡的架构类似,在客户端负载均衡中也需要心跳去维护服务端清单的健康性, 只是这个步骤 需要与服务注册中心配合完成。在SpringCloud实现的服务治理框架中,默认会创建针对各 个服务治理框架的ribbon自动化整合配置,比如Eureka中的org.springframework. cloud.netflix.ribbon.eureka. RibbonEurekaAutoConfiguration,Consul 中的org.springframework.cloud.consul.discovery. RibbonConsulAuto- Configuration。

通过Spring CloudRibbon的封装, 我们在微服务架构中使用客户端负载均衡调用非常简单, 只需要如下两步:

  1. 服务提供者只需要启动多个服务实例并注册到一个注册中心或是多个相关联的服务 注册中心。
  2. 服务消费者直接通过调用被@LoadBalanced注解修饰过的 RestTemplate 来实现面向服务的接口调用。

3.2 RestTemplate 详解

RestTemplate会使用 Ribbon 的自动化配置, 同时通过配置@LoadBalanced 还能够开启客户端负载均衡。RestTemplate针对几种不同请求类型和参数类型的服务调用实现如下。

3.2.1 GET请求

在RestTemplate中,对GET 请求可以通过如下两个方法进行调用实现。

第一种: getForEntity函数。该方法返回的是ResponseEntity, 该对象是 Spring 对 HTTP 请求响应的封装, 其中主要存储了 HTTP 的几个重要元素, 比如 HTTP 请求状态 码的枚举对象 HttpStatus (也就是我们常说的 404、 500 这些错误码)、 在它的父类 HttpEntity 中还存储着 HTTP 请求的头信息对象 HttpHeaders 以及泛型类型的请求体对象。

比如下面的例子,就是访问USER-SERVER服务的/user请求,同时最后一个参数 didi 会替换 url 中的{1} 占位符,而返回的 ResponseEntity 对象中的 body 内容类型 会根据第二个参数转换为String类型。getForEntity 函数实际上提供了以下三种不同的重载实现。

  1. getForEntity(String url, Class responseType, Object... urlVariables):该方法提供 了三个参数,其中 url 为请求的地址,responseType为请求响应体body的包装类型,urlVariables为url中的参数绑定。
  2. getForEntity(String url, Class responseType, Map urlVariables):该方法提供的参数中, 只有 urlVariables 的参数类型与上面的方法不同。这里使用了Map类型,所以使用该方法进行参数绑定时需要在占位符中指定Map中参数的 key 值。
  3. getForEntity(UR工 url, Class responseType): 该方法使用URI 对象来 替代之前的 url 和 urlVariables 参数来指定访问地址和参数绑定。 URI 是 JDK java.net 包下的一个类,它表示一个统一 资源标识符 (Uniform Resource Identifier)引用。

第二种: getForObject 函数。该方法可以理解为对 getForEntity的进一步封装, 它通过 HttpMessageConverterExtractor 对 HTTP 的请求响应体 body内容进行对象转换,实现请求直接返回包装好的对象内容。

3.2.2 POST请求

在 RestTemplate 中, 对 POST请求时可以通过如下三个方法进行调用实现。

第一种: postForEntity 函数。该方法同 GET 请求中的 getForEntity 类似, 会在调用后返回 ResponseEntity对象, 其中T为请求响应的 body类型。
第二种: postForObject 函数。

3.2.3 PUT请求

在RestTemplate中,对PUT请求可以通过put方法 进行调用实现,比如:

RestTemplate restTemplate = new RestTemplate ();
Long id = 100011;
User user = new User("didi", 40); restTemplate.put("http://USER-SERVICE/user/{l}", user, id);

• put(String url, Object request, Object... urlVariables)
• put(String url, Object request, Map urlVariables)
• put(URI url, Object request)

3.2.4 DELETE请求

在RestTemplate中,对DELETE请求可以通过delete方法进行调用实现,比如:

RestTemplate restTemplate = new RestTemplate();
Long id= 10001L; 
restTemplate.delete("http://USER-SERVICE/user/{1)", id);

• delete(String url, Object ... urlVariables)
• delete(String url, Map urlVariables)
• delete(URI url)

3.3 源码分析

RestTemplate 不是 Spring自己就提供的吗?跟Ribbon的客户端负载均衡又有什么关系呢?接下来看看Ribbon是如何通过 RestTemplate 实现客户端负载均衡的。

@LoadBalanced注解源码的注释中可以知道, 该注解用来给RestTemplate做标记, 以使用负载均衡的客户端(LoadBalancerClient)来配置它。

通过搜索LoadBalancerClient可以发现 , 这 是SpringCloud中定义的一个接口 :

public interface LoadBalancerClient extends ServiceInstanceChooser {
    <T> T execute(String var1, LoadBalancerRequest<T> var2) throws IOException;

    <T> T execute(String var1, ServiceInstance var2, LoadBalancerRequest<T> var3) throws IOException;

    URI reconstructURI(ServiceInstance var1, URI var2);
}

public interface ServiceInstanceChooser {
    ServiceInstance choose(String var1);
}

从该接口中,我们可以通过定义的抽象方法来了解客户端负载均衡器中应具备的几种能力。

  • ServiceInstance choose(String var1):根据传入的服务名 serviceld,从负载均衡器中挑选一个对应服务的实例。
  • T execute(String var1, LoadBalancerRequest var2):使用从负载均衡器中挑选出的服务实例来执行请求内容。
  • T execute(String var1, ServiceInstance var2, LoadBalancerRequest var3) :使用从负载均衡器中挑选出指定的服务实例来执行请求内容。
  • URI reconstructURI(ServiceInstance var1, URI var2):为系统构建一个合适的host:post形式的URI。

ServiceInstance对象是带有host和port的具体服务实例 , 而URI入参对象则是使用逻辑服务名定义为host的URI , 而返回的URI内容则是通过ServiceInstance的服务实例详情拼接出的具体host:post形式的请求地址。

顺着LoadBalancerClient接口的所属包org .springframework.cloud.client.loadbalancer, 我们对其内容进行整理, 可以得出如下图所示的关系。

其中,LoadBalancerAutoConfiguration 为实现客户端负载均衡器的自动化配置类。

@Configuration
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
    @LoadBalanced
    @Autowired( required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();
    @Autowired(required = false)
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {
        return new SmartInitializingSingleton() {
            public void afterSingletonsInstantiated() {
                Iterator var1 = LoadBalancerAutoConfiguration.this.restTemplates.iterator();

                while(var1.hasNext()) {
                    RestTemplate restTemplate = (RestTemplate)var1.next();
                    Iterator var3 = customizers.iterator();

                    while(var3.hasNext()) {
                        RestTemplateCustomizer customizer = (RestTemplateCustomizer)var3.next();
                        customizer.customize(restTemplate);
                    }
                }
            }
        };
    }

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }
    
    @Configuration
    @ConditionalOnClass({RetryTemplate.class})
    public static class RetryInterceptorAutoConfiguration {
        public RetryInterceptorAutoConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancedRetryPolicyFactory lbRetryPolicyFactory, LoadBalancerRequestFactory requestFactory, LoadBalancedBackOffPolicyFactory backOffPolicyFactory) {
            return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, lbRetryPolicyFactory, requestFactory, backOffPolicyFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }

    @Configuration
    @ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
    static class LoadBalancerInterceptorConfig {
        LoadBalancerInterceptorConfig() {
        }

        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    }
}

从LoadBalancerAutoConfiguration类头上的注解可以知道, Ribbon实现的负载均衡自动化配置需要满足下面条件。

  • @ConditionalOnClass( RestTemplate.class): RestTemplate类必须存在当前工程的环境中。
  • @ConditionalOnBean(LoadBalancerClient.class): 在Spring的Bean工厂中必须有LoadBalancerClient的实现Bean。

在该自动化配置类中, 主要做了下面三件事:

  • 创建了一个LoadBalancerInterceptor的Bean, 用于实现对客户端发起请时进行拦截, 以实现客户端负载均衡。
  • 创建了一个RestTemplateCustomizer的Bean, 用于给RestTemplate增加 LoadBalancerInterceptor拦截器。
  • 维护了一个被@LoadBalanced 注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerinterceptor拦截器。

接下来, 我们看看LoadBalancerInterceptor 拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
}

我们可以看到在拦截器中注入了LoadBalancerClient的实现。 当一个被@LoadBalanced注解修饰的 RestTemplate 对象向外发起HTTP请求时, 会被LoadBalancerInterceptor 类的 intercept 函数所拦截。 由于我们在使用RestTemplate时采用了服务名作为host, 所以直接从 HttpRequest的URI对象中 通过 getHost ()就可以拿到服务名,然后调用 execute 函数去根据服务名来选择实例并发起实际的请求。

分析到这里,LoadBalancerClient还只是一个抽象的负载均衡器接口 所以我们还需要找到它的具体实现类来进一步进行分析。通过查看Ribbon的源码,可以很容易地在 org.springframework.cloud.netflix.ribbon 包下找到对应的实现类Ribbon­LoadBalancerClient。

org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient

public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
    ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
    Server server = this.getServer(loadBalancer);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    } else {
        RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
        return this.execute(serviceId, ribbonServer, request);
    }
}

public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {
        server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();
    }

    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    } else {
        RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        } catch (IOException var8) {
            statsRecorder.recordStats(var8);
            throw var8;
        } catch (Exception var9) {
            statsRecorder.recordStats(var9);
            ReflectionUtils.rethrowRuntimeException(var9);
            return null;
        }
    }
}

可以看到,在execute函数的实现中,第一步做的就是通过getServer根据传入的服务名serviceId去获得具体的服务实例:

protected Server getServer(ILoadBalancer loadBalancer) {
    return loadBalancer == null ? null : loadBalancer.chooseServer("default");
}

通过getServer函数的实现源码, 我们可以看到这里获取具体服务实例的时候并没 有使用LoadBalancerClient接口中的choose函数,而是使用了Netflix Ribbon自身的ILoadBalancer接口中定义的chooseServer函数。

我们先来认识一下这个 ILoadBalancer 接口:

public interface ILoadBalancer {
    //向负载均衡器中维护的实例列表增加服务实例。
    void addServers(List<Server> var1);
    //通过某种策略, 从负载均衡器中挑选出一个具体的服务实例。
    Server chooseServer(Object var1);
    //用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。
    void markServerDown(Server var1);

    /** @deprecated */
    @Deprecated
    List<Server> getServerList(boolean var1);
    //获取当前正常服务的实例列表。
    List<Server> getReachableServers();
    //获取所有已知的服务实例列表, 包括正常服务和停止服务的实例。
    List<Server> getAllServers();
}

在该接口定义中涉及的Server对象定义是一个传统的服务端节点, 在该类中存储了服务端节点的一些元数据信息, 包括 host、 port 以及一 些部署信息等。

public class Server {
    public static final String UNKNOWN_ZONE = "UNKNOWN";
    private String host;
    private int port;
    private String scheme;
    private volatile String id;
    private volatile boolean isAliveFlag;
    private String zone;
    private volatile boolean readyToServe;
    private Server.MetaInfo simpleMetaInfo;

而对于该接口的实现,有出如下图所示的结构。可以看到,BaseLoadBalancer类实现了基础的负载均衡,而 DynamicServerListLoaclBalancer和ZoneAwareLoaclBalancer在负载均衡的策略上做了一些功能的扩展。

那么在整合ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通ribbonClientConfiguration配置类,可以知道在整合时默认采用了ZoneAware­LoadBalancer来实现负载均衡器。

`org.springframework.cloud.netflix.ribbon.ribbonClientConfiguration

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}

下面,我们再回到RibbonLoadBalancerClient的execute函数逻辑,在通过ZoneAwareLoadBalancer 的chooseServer函数获取了负载均衡策略分配到的服务实例对象Server之后,将其内容包装成ribbonServer对象(该对象除了存储了服务 实例的信息之外, 还增加了服务名serviceId、 是否需要使用 HTTPS 等其他信息),然后使用该对象再回调LoadBalancerinterceptor请求拦截器中 LoadBalancerRequest的 apply(final ServiceinsIance instance)函数, 向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求到host:post 形式的实际访问地址的转换。

在apply(final Serviceinstance instance) 函数中传入的Serviceinstance接口对象是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如serviceld、 host、port等,具体定义如下:

public interface ServiceInstance {
    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();
}

而上面提到的具体包装Server服务实例的RibbonServer对象就是ServiceInstance接口的实现, 可以看到它除了包含Server对象之外, 还存储了服务名、是否使用HTTPS标识以及一个Map类型的元数据集合。

public static class RibbonServer implements ServiceInstance {
    private final String serviceId;
    private final Server server;
    private final boolean secure;
    private Map<String, String> metadata;

    public RibbonServer(String serviceId, Server server) {
        this(serviceId, server, false, Collections.emptyMap());
    }

    public RibbonServer(String serviceId, Server server, boolean secure, Map<String, String> metadata) {
        this.serviceId = serviceId;
        this.server = server;
        this.secure = secure;
        this.metadata = metadata;
    }

那么apply (final Serviceinstance instance)函数在接收到了具体ServiceInstance实例后,是如何通过 LoadBalancerClient 接口中的reconstructURI操作来组织具体请求地址的呢?

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
    return new LoadBalancerRequest<ClientHttpResponse>() {
        public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
            HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, LoadBalancerRequestFactory.this.loadBalancer);
            LoadBalancerRequestTransformer transformer;
            if (LoadBalancerRequestFactory.this.transformers != null) {
                for(Iterator var3 = LoadBalancerRequestFactory.this.transformers.iterator(); var3.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
                    transformer = (LoadBalancerRequestTransformer)var3.next();
                }
            }

            return execution.execute((HttpRequest)serviceRequest, body);
        }
    };
}

可以看到它具体执行的时候,还传入了ServiceRequest­Wrapper对象,该对象继承了HttpRequestWrapper并重写了getURI函数,重写后的getURI通过调用LoadBalancerClient接口的 reconstructURI 函数来重新构建一个URI来进行访问。

在 LoadBalancerinterceptor 拦截器中, ClientHttpRequestExecution 的实例 具体执行 execution.execute(serviceRequest, body) 时, 会调用 Intercepting­ ClientHttpRequest 下 InterceptingRequestExecution 类的 execute 函数

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) {
        super(request);
        this.instance = instance;
        this.loadBalancer = loadBalancer;
    }

    public URI getURI() {
        URI uri = this.loadBalancer.reconstructURI(this.instance, this.getRequest().getURI());
        return uri;
    }
}

此时,它就会使用 RibbonLoadBalancerClient 中实现的 reconstructURI 来组织具体请求的服务实例地址。

public URI reconstructURI(ServiceInstance instance, URI original) {
    Assert.notNull(instance, "instance can not be null");
    String serviceId = instance.getServiceId();
    RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
    Server server = new Server(instance.getHost(), instance.getPort());
    IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
    ServerIntrospector serverIntrospector = this.serverIntrospector(serviceId);
    URI uri = RibbonUtils.updateToHttpsIfNeeded(original, clientConfig, serverIntrospector, server);
    return context.reconstructURIWithServer(server, uri);
}

从 reconstructURI 函数中我们可以看到,它通过 ServiceInstance实例对象的 serviceid, 从 SpringeClientFactory 类的clientFactory对象 中获取对应 serviceId 的负载均衡器的上下文ribbonLoadBalancerContext对象。然后根据 ServiceInstance 中的信息来构建具体服务实例信息的 Server 对象,并使用 RibbonLoadBalancerContext对象的reconstructURIWithServer函数来构建服 务实例的URI。

简单介绍一 下上面提到的 SpringClientFactory 和 RibbonLoad­BalancerContext:
• SpringClientFactory 类是一个用来创建客户端负载均衡器的工厂类, 该工厂类会为每一个不同名的 Ribbon客户端生成不同的 Spring 上下文。
• RibbonLoadBalancerContext 类是 LoadBalancerContext的子类, 该类用与存储一些被负载均衡器 使用的上下文内容和API操作(reconstructURIWithServer就是其中之一)。

从reconstructURIWithServer的实现中我们可以看到,它同reconstructURI的定义类似。 只是reconstructURI的第一个保存具体服务实例的参数使用了Spring Cloud定义的ServiceInstance, 而reconstructURIWithServer中使用了Netflix中定义的 Server, 所以在 RibbonLoadBalancerClient 实现 reconstructURI 的 时候, 做了一次转换,使用Serviceinstance的host和port信息构建了 一 个 Server 对象来给reconstructURIWithServer使用。

从reconstructURIWithServer的 实现逻辑中, 我们可以看到, 它从 Server 对象中获取 host 和 port 信息, 然后根据以服务名为 host 的 URI 对象original中获取其他请求信息, 将两者内容进行拼接整合,形成最终要访间的服务实例的具体地址。

public URI reconstructURIWithServer(Server server, URI original) {
    String host = server.getHost();
    int port = server.getPort();
    String scheme = server.getScheme();
    if (host.equals(original.getHost()) && port == original.getPort() && scheme == original.getScheme()) {
        return original;
    } else {
        if (scheme == null) {
            scheme = original.getScheme();
        }

        if (scheme == null) {
            scheme = (String)this.deriveSchemeAndPortFromPartialUri(original).first();
        }

        try {
            StringBuilder sb = new StringBuilder();
            sb.append(scheme).append("://");
            if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                sb.append(original.getRawUserInfo()).append("@");
            }

            sb.append(host);
            if (port >= 0) {
                sb.append(":").append(port);
            }

            sb.append(original.getRawPath());
            if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                sb.append("?").append(original.getRawQuery());
            }

            if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                sb.append("#").append(original.getRawFragment());
            }

            URI newURI = new URI(sb.toString());
            return newURI;
        } catch (URISyntaxException var8) {
            throw new RuntimeException(var8);
        }
    }
}

另外,从ribbonLoadBalancerClient的execute函数逻辑中,我们还能看到在回调拦截器中, 执行具体的请求之后,Ribbon还通过ribbonStatsRecorder对象对服务的请求进行了跟踪记录。

3.4 负载均衡器

虽然SpringCloud中定义了LoadBalancerClient作为负载均衡器的通用接口, 并且针对Ribbon实现了ribbonLoadBalancerClient,但是它在具体实现客户端负载均衡时,是通过ribbon的ILoadBalancer接口实现的。

下面我们根据ILoadBalancer接口的实现类逐个看看它是如何实现客户端负载均衡的。

3.4.1 AbstractloadBalancer

AbstractLoadBalancer是ILoadBalancer接口的抽象实现。在该抽象类中定义了一个关于服务实例的分组枚举类 ServerGroup, 它包含三种不同类型。还实现了一个chooseServer()函数, 该函数通过调用接口中的chooseServer (Objectkey)实现, 其中参数key为null, 表示在选择具体服务实例时忽略key的条件判断。

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    public AbstractLoadBalancer() {
    }

    public Server chooseServer() {
        return this.chooseServer((Object)null);
    }

    public abstract List<Server> getServerList(AbstractLoadBalancer.ServerGroup var1);

    public abstract LoadBalancerStats getLoadBalancerStats();

    public static enum ServerGroup {
        //所有服务实例
        ALL,
        //正常服务的实例
        STATUS_UP,
        //停止服务的实例
        STATUS_NOT_UP;

        private ServerGroup() {
        }
    }
}

最后, 还定义了两个抽象函数。
• getServerList(ServerGroup serverGroup): 定义了根据分组类型来获取不同的服务实例的列表。
• getLoadBalancerStats(): 定义了获取LoadBalancerStats 对象的方法,LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息。这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。

3.4.2 BaseloadBalancer

BaseLoadBalancer类是ribbon负载均衡器的基础实现类,在该类中定义了很多关 于负载均衡器相关的基础内容。

  • 定义并维护了两个存储服务实例Server对象的列表。 一个用与存储所有服务实例的清单, 一个用于存储正常服务的实例清单。
@Monitor(
    name = "LoadBalancer_AllServerList",
    type = DataSourceType.INFORMATIONAL
)
protected volatile List<Server> allServerList;
@Monitor(
    name = "LoadBalancer_UpServerList",
    type = DataSourceType.INFORMATIONAL
)
protected volatile List<Server> upServerList;
  • 定义了用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。

  • 定义了检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null, 需要在构造时注入它的具体实现。

  • 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。

  • 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancer中chooseServer(Object key) 的实现源码,我们可以知道,负载均衡器实际将服务实例选择任务委托给了IRule实例中的choose函数来实现。 而在这里, 默认初始化了RoundRobinRule为IRule 的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡规则。

  • 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查 Server是否健康的任务。 该任务默认的执行间隔为10秒。

  • 实现了ILoadBalancer接口定义的负载均衡器应具备以下一系列基本操作。
    1、addServers(List newServers): 向负载均衡器中增加新的服务实例列表。
    2、chooseServer(Object key): 挑选一个具体的服务实例。
    3、markServerDown(Server server): 标记某个服务实例暂停服务。
    4、getReachableServers(): 获取可用的服务实例列表。
    5、getA11Servers (): 获取所有的服务实例列表。

public void addServers(List<Server> newServers) {
    if (newServers != null && newServers.size() > 0) {
        try {
            ArrayList<Server> newList = new ArrayList();
            newList.addAll(this.allServerList);
            newList.addAll(newServers);
            this.setServersList(newList);
        } catch (Exception var3) {
            logger.error("LoadBalancer [{}]: Exception while adding Servers", this.name, var3);
        }
    }
}

public Server chooseServer(Object key) {
    if (this.counter == null) {
        this.counter = this.createCounter();
    }

    this.counter.increment();
    if (this.rule == null) {
        return null;
    } else {
        try {
            return this.rule.choose(key);
        } catch (Exception var3) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", new Object[]{this.name, key, var3});
            return null;
        }
    }
}

public void markServerDown(Server server) {
    if (server != null && server.isAlive()) {
        logger.error("LoadBalancer [{}]:  markServerDown called on [{}]", this.name, server.getId());
        server.setAlive(false);
        this.notifyServerStatusChangeListener(Collections.singleton(server));
    }
}

public List<Server> getReachableServers() {
    return Collections.unmodifiableList(this.upServerList);
}

public List<Server> getAllServers() {
    return Collections.unmodifiableList(this.allServerList);
}

3.4.3 DynamicServerListloadBalancer

DynamicServerListloadBalancer类继承于 BaseLoadBalancer 类,它是对基础负载均衡器的扩展。 在该负载均衡器中,实现了服务实例清单在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说,我们可以通过过滤器来选择性地获取一批服务实例清单。

ServerList
其中含有一个关于服务列表的操作对象ServerList serverListimpl,其中泛型T从类名中对于T的限定DynamicServerListLoadBalancer可以获知它是一个 Server 的子类,即代表了一个具体的服务实例的扩展类。而ServerList 接口定义如下所示:

public interface ServerList<T extends Server> {
    //用于获取初始化的服务实例清单
    List<T> getInitialListOfServers();
    //用于获取更新的服务实例清单 
    List<T> getUpdatedListOfServers();
}

从上图中我可们以看到有多个ServerList 的实现类,那么在DynamicServer­ListLoadBalancer中的ServerList默认配置到底使用了哪个具体实现呢?然在该负载均衡器中需要实现 服务实例的动态更新, 那么势必需要Ribbon具备访问Eureka来获取服务实例的能力,所以我们从Spring Cloud整合和ribbon与Eureka的包org.springframework.cloud.netflix.ribbon.eureka下进行探索,可以找到配置类 EurekaRibbonClientConfiguration, 在该类中可以找到如下创建ServerList实例的内容:

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config) {
    DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList( config);
    DomainExtractingServerList serverList = new DomainExtractingServerList( discoveryServerList, config, this.approximateZoneFromHostname);
    return serverList;
}

这里创建的是 一个DomainExtractingServerList 实例,从下面它的源码中我们可以看到, 在它内部还定义了一个ServerList list。同时,Domain­ExtractingServerList类中getinitialListOfServers和getUpdated­ListOfServers的具体实现, 其实委托给了内部定义的ServerList list对象,而该对象是通过创建 DomainExtractingServerList 时,由构造函数传入的 DiscoveryEnabledNIWSServerList实现的。

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
    private ServerList<DiscoveryEnabledServer> list;
    private IClientConfig clientConfig;
    private boolean approximateZoneFromHostname;

    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list, IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.clientConfig = clientConfig;
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = this.setZones(this.list.getInitialListOfServers());
        return servers;
    }

    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = this.setZones(this.list.getUpdatedListOfServers());
        return servers;
    }

    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList();
        boolean isSecure = this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.IsSecure, Boolean.TRUE);
        boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE);
        Iterator var5 = servers.iterator();

        while(var5.hasNext()) {
            DiscoveryEnabledServer server = (DiscoveryEnabledServer)var5.next();
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname));
        }

        return result;
    }
}

那么DiscoveryEnabledNIWSServerList是如何实现这两个服务实例获取的呢?我们从源码中可以看到这 两 个方法都是通过该类中的一个私有函数 obtainServersViaDiscovery 通过服务发现机制来实现服务实例的获取的。

com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList

public List<DiscoveryEnabledServer> getInitialListOfServers() {
    return this.obtainServersViaDiscovery();
}

public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
    return this.obtainServersViaDiscovery();
}

而obtainServersViaDiscovery的实现逻辑如下所示,主要依靠EurekaClient从服务注册中心中获取到具体的服务实例InstanceInfo列表(这里传入的 vipAddress可以理解为逻辑上的服务名, 比如USER-SERVICE)。接着,对这些服务实例进行遍历,将状态为UP (正常服务)的实例转换成 DiscoveryEnabledServer对象, 最后将这些实例组织成列表返回。

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList();
    if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
        EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
        if (this.vipAddresses != null) {
            String[] var3 = this.vipAddresses.split(",");
            int var4 = var3.length;

            for(int var5 = 0; var5 < var4; ++var5) {
                String vipAddress = var3[var5];
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
                Iterator var8 = listOfInstanceInfo.iterator();

                while(var8.hasNext()) {
                    InstanceInfo ii = (InstanceInfo)var8.next();
                    if (ii.getStatus().equals(InstanceStatus.UP)) {
                        if (this.shouldUseOverridePort) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
                            }

                            InstanceInfo copy = new InstanceInfo(ii);
                            if (this.isSecure) {
                                ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
                            } else {
                                ii = (new Builder(copy)).setPort(this.overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, this.isSecure, this.shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }

                if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
                    break;
                }
            }
        }

        return serverList;
    } else {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList();
    }
}

在DiscoveryEnabledNIWSServerLi江中通过EurekaClien七从服务注册中心 获取到最新的服务实例清单后, 返回的List到了DomainExtractingServerList类中,将继续通过setZones函数进行处理。而这里的处理具体内容如下所示, 主要完成将DiscoveryEnabledNIWSServerList返回的List列表中的元素, 转换成内部定义的DiscoveryEnabledServer 的子类对象 DomainExtractingServer, 在该对象的构造函数中将为服务实例对象设置一些必要的属性, 比如id、zone、isAliveFlag、readyToServe等信息。

private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
    List<DiscoveryEnabledServer> result = new ArrayList();
    boolean isSecure = this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.IsSecure, Boolean.TRUE);
    boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE);
    Iterator var5 = servers.iterator();

    while(var5.hasNext()) {
        DiscoveryEnabledServer server = (DiscoveryEnabledServer)var5.next();
        result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname));
    }

    return result;
}

ServerListUpdater
通过上面的分析我们已经知道了ribbon与Eureka整合后,如何实现从Eureka Server中获取服务实例清单。那么它又是如何触发向 Eureka Server 去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?继续来看DynamicServer­ListLoadBalancer中的实现内容:

protected volatile ServerListUpdater serverListUpdater;
class NamelessClass_1 implements UpdateAction {
    NamelessClass_1() {
    }

    public void doUpdate() {
        DynamicServerListLoadBalancer.this.updateListOfServers();
    }
}

this.updateAction = new NamelessClass_1();

在ServerListUpdater内部还定义了一个UpdateAction接口,上面定义的updateAction对象就是以匿名内部 类的方式创建了一个它的具体实现,其中doUpdate实现的内容就是对Serverlist的具体更新操作。除此之外,ServerListUpdater中还定义了一系列控制它和获取它的信息的操作。

public interface ServerListUpdater {
    //启动服务更新器,传入的UpdateAction对象为更新操作的具体实现。
    void start(ServerListUpdater.UpdateAction var1);
    //停止服务更新器
    void stop();
    //荻取最近的更新时间戳
    String getLastUpdate();
    //获取上一次更新到现在的时间间隔,单位为毫秒
    long getDurationSinceLastUpdateMs();
    //荻取错过的更新周期数
    int getNumberMissedCycles();
    //荻取核心线程数
    int getCoreThreads();

    public interface UpdateAction {
        void doUpdate();
    }
}

而ServerListUpdater的实现类不多,根据两个类的注释,我们可以很容易地知道它们的作用。

  • PollingServerListUpdater: 动态服务列表更新的默认策略,DynamicServerListLoadBalancer负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新。
  • EurekaNotificationServerListUpdater: 该更新器也可服务于 Dynamic­ServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServer­ListUpdater不同,它需要利用Eureka的事件监听器来驱动服务列表的更新操作。

下面我们来详细看看它默认实现的PollingServerListUpdater。 先从用于启动 “服务更新器 ” 的 start函数源码看起,具体如下。它先创建了一个Runnable的线程实现,在该实现中调用了上面提到的具体更新服务实例列表的方法updateAcyion.doUpdate(), 最后再为这个Runnable线程实现启动了一个定时任务来执行。

public synchronized void start(final UpdateAction updateAction) {
    if (this.isActive.compareAndSet(false, true)) {
        Runnable wrapperRunnable = new Runnable() {
            public void run() {
                if (!PollingServerListUpdater.this.isActive.get()) {
                    if (PollingServerListUpdater.this.scheduledFuture != null) {
                        PollingServerListUpdater.this.scheduledFuture.cancel(true);
                    }

                } else {
                    try {
                        updateAction.doUpdate();
                        PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
                    } catch (Exception var2) {
                        PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
                    }

                }
            }
        };
        this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
    } else {
        logger.info("Already active, no-op");
    }
}

我们可以找到用于启动定时任务的两个重要参数initialDelayMsrefreshIntervalMs的默认定义分别为 1000和30*1000, 单位为毫秒。 也就是说, 更新服务实例在初始化之后延迟1秒后开始执行,并以 30秒为周期重复执行。除了这些内容之外,还能看到它还会记录最后更新时间、是否存活等信息,同时也实现了ServerListUpdater中定义的一 些其他操作内容。

ServerListFilter
我们回到updateAction. doUpdate()调用的具体实现位置,在DynamicServerListLoadBalancer中, 它的实际实现委托给了updateListOfServers函数,具体实现如下:

@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList();
    if (this.serverListImpl != null) {
        servers = this.serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
        if (this.filter != null) {
            servers = this.filter.getFilteredListOfServers((List)servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
        }
    }

    this.updateAllServerList((List)servers);
}

可以看到这里终于用到了之前提到的ServerList的getUpdatedListOfServers() 通过之前的介绍已经知道这一步实现从Eureka Server中获取服务可用实例的列表。 在获得了服务实例列表之后,这里又将引入一个新的对象filter, 追溯该对象的定义,我们可以找到它是ServerListFilter定义的。

ServerListFilter接口非常简单,该接口中定义了一个方法List getFiltered ListOfServers(List servers), 主要用于实现对服务实例列表的过滤,通过传入的服务实例清单,根据一些规则返回过滤后的服务实例清单。 该接口的实现如下图所示。

其中, 除了ZonePreferenceServerListFilter的实现是Spring Cloud Ribbon中对Netflix Ribbon的扩展实现外,其他均是Netflix Ribbon中的原生实现类。下面,我们 可以分别看看这些过滤器实现都有什么特点。

  • AbstractServerListFilter: 这是一个抽象过滤器,在这里定义了过滤时需要的一个重要依据对象 LoadBalancerStats,,该对象存储了关于负载均衡器的一些属性和统计信息等。
public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
    private volatile LoadBalancerStats stats;

    public AbstractServerListFilter() {
    }

    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }

    public LoadBalancerStats getLoadBalancerStats() {
        return this.stats;
    }
}
  • ZoneAffinityServerListFilter: 该过滤器基于==区域感知 (Zone Affinity)==的方式实现服务实例的过滤,也就是说,它会根据提供服务的实例所处的区域 (Zone) 与消费者自身的所处区域 (Zone) 进行比较,过滤掉那些不是同处一个区域的实例。
public List<T> getFilteredListOfServers(List<T> servers) {
    if (this.zone != null && (this.zoneAffinity || this.zoneExclusive) && servers != null && servers.size() > 0) {
        List<T> filteredServers = Lists.newArrayList(Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
        if (this.shouldEnableZoneAffinity(filteredServers)) {
            return filteredServers;
        }

        if (this.zoneAffinity) {
            this.overrideCounter.increment();
        }
    }
    return servers;
}

对千服务实例列表的过滤是通过Iterables. filter(servers,this.zoneAffinityPredicate.getServerOnlyPredicate()) 来实现的,其中判断依据由 ZoneAffinityPredicate实现服务实例与消费者的Zone比较。而在过滤之后,这里并不会马上返回过滤的结果,而是通过 shouldEnableZone­Affinity函数来判断是否要启用区域感知的功能。

从下面shouldEnableZoneAffinity的实现中,它使用了LoadBalancerStats的getZoneSnapshot方法 来获取这些过滤后的同区域实例的基础指标(包含实例数量、断路器断开数、 活动请求数、 实例平均负载等),根据一系列的算法求出下 面的几个评价值并与设置的阙值进行对比(下面的为默认值),若有一个条件符合, 就不启用区域感知过滤的服务实例清单。这一算法实现为集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障。
1、blackOutServerPercentage: 故障实例百分比(断路器断开数/实例数量) >=0.8。
2、activeReqeustsPerServer: 实例平均负载 >=0.6 。
3、availableServers: 可用实例数(实例数量 - 断路器断开数)< 2。

private boolean shouldEnableZoneAffinity(List<T> filtered) {
    if (!this.zoneAffinity && !this.zoneExclusive) {
        return false;
    } else if (this.zoneExclusive) {
        return true;
    } else {
        LoadBalancerStats stats = this.getLoadBalancerStats();
        if (stats == null) {
            return this.zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (!((double)circuitBreakerTrippedCount / (double)instanceCount >= this.blackOutServerPercentageThreshold.get()) && !(loadPerServer >= this.activeReqeustsPerServerThreshold.get()) && instanceCount - circuitBreakerTrippedCount >= this.availableServersThreshold.get()) {
                return true;
            } else {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[]{(double)circuitBreakerTrippedCount / (double)instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            }
        }
    }
}
  • DefaultNIWSServerListFilter: 该过滤器完全继承自ZoneAffinity­ServerListFilter, 是默认的NIWS (Netflix Internal Web Service)过滤器。
  • ServerListSubsetFilter: 该过滤器也继承自 ZoneAffinityServer­ListFilter, 它非常适用于拥有大规模服务器集群(上百或更多)的系统。 因为它可以产生一个区域感知结果的子集列表,同时它还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性地从服务实例列表中剔除那些相对不够健康的实例。
  • ZonePreferenceServerListFilter: Spring Cloud整合时新增的过滤器。若使用Spring Cloud整合Eureka和Ribbon时会默认使用该过滤器。它实现了通过配置或者Eureka实例元数据的所属区域 (Zone) 来过滤出同区域的服务实例。
public List<Server> getFilteredListOfServers(List<Server> servers) {
    List<Server> output = super.getFilteredListOfServers(servers);
    if (this.zone != null && output.size() == servers.size()) {
        List<Server> local = new ArrayList();
        Iterator var4 = output.iterator();

        while(var4.hasNext()) {
            Server server = (Server)var4.next();
            if (this.zone.equalsIgnoreCase(server.getZone())) {
                local.add(server);
            }
        }

        if (!local.isEmpty()) {
            return local;
        }
    }
    return output;
}

首先通过父类ZoneAffinityServerListFilter的过滤器来获得区域感的服务实例列表,然后遍历这个结果,取出根据消费 者配置预设的区域Zone来进行过滤,如果过滤的结果是空就直接返回父类获取的
结果 如果不为空就返回通过消费者配置的Zone过滤后的结果。

3.4.4 ZoneAwareloadBalancer

ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。在 DynamicServerListLoadBalancer中,我们可以看到它并没有重写选择具体服务实例的chooseServer 函数,所以它依然会采用在BaseLoadBalancer中实现的算法。使用 RoundRobinRule 规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域 (Zone) 的概念,所以它会把所有实例视为一个 Zone下的节点来看待,这样就会周期性地产生跨区域 (Zone) 访问的情况,由于跨区域会产生更高的延迟, 这些实例主要以防止区域性故障实现高可用为目的而不能作为常规访问的实例,所以在多区域部署的清况下会有一定的性能问题,而该负载均衡器则可以避免这样的问题。 那么它是如何实现的呢?

首先在ZoneAwareLoadBalancer中,我们可以发现,它并没有重写setServersList, 说明实现服务实例清单的更新主逻辑没有修改。但是我们可以发现它重写了这个函数setServerListForZones(Map<String, List>zoneServersMap)。

看到这里可能会有一 些陌生,因为它并不是接口中定义的必备函数, 所以我们不妨去父类 DynamicServerListLoadBalancer中寻找一下该函数,我们可以找到下面的定义:

DynamicServerListLoadBalancer

public void setServersList(List lsrv) {
    super.setServersList(lsrv);
    Map<String, List<Server>> serversInZones = new HashMap();
    Iterator var4 = lsrv.iterator();

    while(var4.hasNext()) {
        Server server = (Server)var4.next();
        this.getLoadBalancerStats().getSingleServerStat(server);
        String zone = server.getZone();
        if (zone != null) {
            zone = zone.toLowerCase();
            List<Server> servers = (List)serversInZones.get(zone);
            if (servers == null) {
                servers = new ArrayList();
                serversInZones.put(zone, servers);
            }

            ((List)servers).add(server);
        }
    }

    this.setServerListForZones(serversInZones);
}

protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
    LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
    this.getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}

setServerListForZones函数的调用位于更新服务实例清单函数setServers­List的最后,同时从其实现的内容来看,它在父类 DynamicServerListLoadBalancer中的作用是根据按区域Zone分组的实例列表, 为负载均衡器中的LoadBalancerStats对象创建Zonestats并放入Map zonestatsMap 集合中,每一个区域Zone对应一个ZoneStats, 它用于存储每个Zone 的一些状态和统计信息。

在 ZoneAwareLoadBalancer 中对 setServerListForZones 的重写如下:

protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
    super.setServerListForZones(zoneServersMap);
    if (this.balancers == null) {
        this.balancers = new ConcurrentHashMap();
    }

    Iterator var2 = zoneServersMap.entrySet().iterator();

    Entry existingLBEntry;
    while(var2.hasNext()) {
        existingLBEntry = (Entry)var2.next();
        String zone = ((String)existingLBEntry.getKey()).toLowerCase();
        this.getLoadBalancer(zone).setServersList((List)existingLBEntry.getValue());
    }

    var2 = this.balancers.entrySet().iterator();
    while(var2.hasNext()) {
        existingLBEntry = (Entry)var2.next();
        if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
            ((BaseLoadBalancer)existingLBEntry.getValue()).setServersList(Collections.emptyList());
        }
    }
}

可以看到,在该实现中创建了一个ConcurrentHashMap() 类型的balancers对象,它将用来存储每个 Zone区域对应的负载均衡器。而具体的负载均衡器的创建则是通过在下面的第一个循环中调用getLoadBalancer函数来完成,同时在创建负载均衡器的时候会创建它的规则(如果当前实现中没有IRule的实例,就创建一个 AvailabilityFilteringRule规则;如果已经有具体实例,就克隆一个)。在创建完负载均衡器后又马上调用setServersList函数为其设置对应Zone区域的实例清单。而第二个循环则是对 Zone 区域中实例清单的检查,看看是否有Zone区域下已经没有实例了,是的话就将balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点时,防止过时的Zone区域统计信息干扰具体实例的选择算法。

在了解了该负载均衡器是如何扩展服务实例清单的实现后, 我们来具体看看它是如何挑选服务实例,来实现对区域的识别的:

public Server chooseServer(Object key) {
    if (ENABLED.get() && this.getLoadBalancerStats().getAvailableZones().size() > 1) {
        Server server = null;

        try {
            LoadBalancerStats lbStats = this.getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (this.triggeringLoad == null) {
                this.triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2D);
            }

            if (this.triggeringBlackoutPercentage == null) {
                this.triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999D);
            }

            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());
            if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = this.getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception var8) {
        }

        if (server != null) {
            return server;
        } else {
            return super.chooseServer(key);
        }
    } else {
        return super.chooseServer(key);
    }
}

只有当负载均衡器中维护的实例所属的Zone区域的个数大于 1 的时候才会执行这里的选择策略,否则还是将使用父类的实现。当Zone区域的个数大于1的时候,它的实现步骤如下所示。
• 调用ZoneAvoidanceRule中的静态方法createSnapshot(lbStats)为当前负载均衡器中所有的Zone区域分别创建快照,保存在Map zoneSnapshot中 这些快照中的数据将用于后续的算法。
• 调用ZoneAvoidanceRule中的静态方法getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get()),来获取可用的Zone区域集合,在该函数中会通过Zone区域快照中的统计数据来实现可用区的挑选。
首先它会剔除符合这些规则的Zone区域: 所属实例数为零的Zone区域; Zone区域内实例的平均负载小于零,或者实例故障率( 断路器断开次数/实例数)大于等于阙值(默认为0.99999)。
然后根据Zone区域的实例平均负载计算出最差的Zone区域,这里的最差指的是实例平均负载最高的Zone区域。
如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小于阈值 (默认为20%), 就直接返回所有Zone区域为可用区域。 否则,从最坏Zone区域集合中随机选择一个,将它从可用Zone区域集合中 剔除。
• 当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机选择一个Zone区域。
• 在确定了某个Zone区域后,则获取了对应Zone区域的服务均衡器,并调用chooseServer来选择具体的服务实例,而在chooseServer中将使用IRule接口的choose函数来选择具体的服务实例。在这里IRule接口的实现会使用ZoneAvoidanceRule来挑选出具体的服务实例。

3.5 负载均衡策略

Ribbon中实现了非常多的选择策略,其中也包含了我们在前面内容中提到过的RoundRobinRule和ZoneAvoidanceRule。下面我们来详细解读一下IRule接口的各个实现。

3.5.1 AbstractloadBalancerRule

负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够在具体实现选择服务策略时,获取到一些负载均衡器中维护的信息来作为分配依据,并以此设计一些符法来实现针对特定场景的高效策略。

public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
    private ILoadBalancer lb;

    public AbstractLoadBalancerRule() {
    }

    public void setLoadBalancer(ILoadBalancer lb) {
        this.lb = lb;
    }

    public ILoadBalancer getLoadBalancer() {
        return this.lb;
    }
}

3.5.2 RandomRule

该策略实现了从服务实例清单中随机选择一个服务实例的功能。可以看到IRule接口的choose (Object key)函数实现,委托给了该类中的choose (ILoadBalancer lb, Object key), 该方法增加了一个负载均衡器对象的参数。从具体的实现上看,它会使用传入的负载均衡器来获得可用实例列表upList和所有实例列表 allList, 并通过rand.nextInt(serverCount)函数来获取一个随机数,并将该随机数作为upList的索引值来返回具体实例。同时,具体的选择逻辑在一个while(server == null)循环之内,而根据选择逻辑的实现,正常情况下每次选择都应该选出一个服务实例,如果出现死循环获取不到服务实例时,则很有可能存在并发的Bug。

@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    } else {
        Server server = null;
        while(server == null) {
            if (Thread.interrupted()) {
                return null;
            }

            List<Server> upList = lb.getReachableServers();
            List<Server> allList = lb.getAllServers();
            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }
            int index = this.rand.nextInt(serverCount);
            server = (Server)upList.get(index);
            if (server == null) {
                Thread.yield();
            } else {
                if (server.isAlive()) {
                    return server;
                }
                server = null;
                Thread.yield();
            }
        }
        return server;
    }
}

public Server choose(Object key) {
    return this.choose(this.getLoadBalancer(), key);
}

3.5.3 RoundRobinRule

该策略实现了按照线性轮询的方式依次选择每个服务实例的功能。它的具体实现如下,其详细结构与 RandomRule 非常类似。除了循环条件不同外,就是从可用列表中获取所谓的逻辑不同。 从循环条件中,我们可以看到增加了一个 count计数变量,该变量会在每次循环之后累加,也就是说,如果一直选择不到server超过10次,那么就会结束尝试,并打印一个警告信息。

而线性轮询的实现则是通过Atomicinteger nextServerCyclicCounter对象实现,每次进行实例选择时通过调用incrementAndGetModulo函数实现递增。

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    } else {
        Server server = null;
        int count = 0;

        while(true) {
            if (server == null && count++ < 10) {
                List<Server> reachableServers = lb.getReachableServers();
                List<Server> allServers = lb.getAllServers();
                int upCount = reachableServers.size();
                int serverCount = allServers.size();
                if (upCount != 0 && serverCount != 0) {
                    int nextServerIndex = this.incrementAndGetModulo(serverCount);
                    server = (Server)allServers.get(nextServerIndex);
                    if (server == null) {
                        Thread.yield();
                    } else {
                        if (server.isAlive() && server.isReadyToServe()) {
                            return server;
                        }

                        server = null;
                    }
                    continue;
                }

                return null;
            }

            if (count >= 10) {
            }

            return server;
        }
    }
}

3.5.4 RetryRule

该策略实现了一个具备重试机制的实例选择功能。在其内部还定义了一个IRule对象,默认使用了 RoundRobinRule实例。而在choose方法中则实现了对内部定义的策略进行反复尝试的策略, 若期间能够选择到具体的服务实例就返回,若选择不到就根据设置的尝试结束时间为阙值(maxRetryMillis 参数定义的值 + choose 方法开始执行的时间戳), 当超过该阑值后就返回 null。

public class RetryRule extends AbstractLoadBalancerRule {
    IRule subRule = new RoundRobinRule();
    long maxRetryMillis = 500L;
     public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();
        long deadline = requestTime + this.maxRetryMillis;
        Server answer = null;
        answer = this.subRule.choose(key);
        if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
            InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());

            while(!Thread.interrupted()) {
                answer = this.subRule.choose(key);
                if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
                    break;
                }

                Thread.yield();
            }

            task.cancel();
        }

        return answer != null && answer.isAlive() ? answer : null;
    }
}

3.5.5 WeightedResponseTimeRule

该策略是对RoundRobinRule的扩展,增加了根据实例的运行情况来计算权重, 并根据权重来挑选实例, 以达到更优的分配效果,它的实现主要有三个核心内容。

定时任务
WeightedResponseTimeRule策略在初始化的时候会通过==serverWeightTimer. schedule (new DynamicServerWeightTask(), 0, serverWeightTaskTimerinterval)==启动一个定时任务, 用来为每个服务实例计算权重,该任务默认30秒执行一次。

class DynamicServerWeightTask extends TimerTask {
    DynamicServerWeightTask() {
    }

    public void run() {
        WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();

        try {
            serverWeight.maintainWeights();
        } catch (Exception var3) {
            WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
        }

    }
}

权重计算
用千存储权重的对象为List accumulatedWeights = new ArrayList() , 该List 中每个权重值所处的位置对应了负载均衡器维护的服务实例清单中所有实例在清单中的位置。维护实例权重的计算过程通过maintainWeights函数实现,具体如下面的代码所示:

class ServerWeight {
    ServerWeight() {
    }

    public void maintainWeights() {
        ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
        if (lb != null) {
            if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                try {
                    WeightedResponseTimeRule.logger.info("Weight adjusting job started");
                    AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                    LoadBalancerStats stats = nlb.getLoadBalancerStats();
                    if (stats != null) {
                        //计算所有实例的平均响应时间的总和
                        double totalResponseTime = 0.0D;

                        ServerStats ss;
                        for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
                            Server server = (Server)var6.next();
                            //如果服务实例的状态快照不在缓存中, 那么这里会进行自动加载
                            ss = stats.getSingleServerStat(server);
                        }
                        //逐个计算每个实例的权重: weightSoFar + totalResponseTime -实例的平均响应时间
                        Double weightSoFar = 0.0D;
                        List<Double> finalWeights = new ArrayList();
                        Iterator var20 = nlb.getAllServers().iterator();

                        while(var20.hasNext()) {
                            Server serverx = (Server)var20.next();
                            ServerStats ssx = stats.getSingleServerStat(serverx);
                            double weight = totalResponseTime - ssx.getResponseTimeAvg();
                            weightSoFar = weightSoFar + weight;
                            finalWeights.add(weightSoFar);
                        }

                        WeightedResponseTimeRule.this.setWeights(finalWeights);
                        return;
                    }
                } catch (Exception var16) {
                    return;
                } finally {
                    WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
                }

            }
        }
    }
}

该函数的实现主要分为两个步骤:

  • 根据LoadBalancerStats中记录的每个实例的统计信息,累加所有实例的平均响应时间,得到总平均响应时间totalResponseTime, 该值会用于后续的计算。
  • 为负载均衡器中维护的实例清单逐个计算权重(从第 一个开始),计算规则为weigh七SoFar+totalResponseTime — 实例的平均响应时间,其中weightSoFar初始化为零,并且每计算好一个权重需要累加到weightSoFar上供下一次计算使用。

举个简单的例子来理解这个计算过程, 假设有4个实例A、 B、 C、 D, 它们的平均响 应时间为10、40、 80、 100, 所以总响应时间是230, 每个实例的权重为总响应时间与实例自身的平均响应时间的差的累积所得, 所以实例A、 B、 C、 D 的权重分别如下所示。
• 实例A: 230-10 =220
• 实例B: 220 + (230-40) =410
• 实例C:410 + (230- 80) = 560
• 实例D: 560 + (230-100) = 690

需要注意的是, 这里的权重值只是表示了各实例权重区间的上限,并非某个实例的优先级, 所以不是数值越大被选中的概率就越大。那么什么是权重区间呢?以上面例子的计算结果为例, 它实际上是为这4个实例构建了4个不同的区间,每个实例的区间下限是上一个实例的区间上限,而每个实例的区间上限则是 我们上面计算并存储于List accumulatedWeights的权重值,其中第一个实例的下限默认为零。 所以, 根据上面示例的权重计算结果, 我们可以得到每个实例的权重区间。

• 实例A: [0, 220]
• 实例B: (20,410]
• 实例C: (410, 560]
• 实例D: (560, 690)

实际上每个区间的宽度就是: 总的平均响应时间 - 实例的平均响应时间,所以实例的平均响应时间越短、 权重区间的宽度越大,而权重区间的宽度越大被选中的概率就越高。

实例选择
WeightedResponseTimeRule选择实例的实现与之前介绍的算法结构类似,下面是它主体的算法:

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    } else {
        Server server = null;

        while(server == null) {
            List<Double> currentWeights = this.accumulatedWeights;
            if (Thread.interrupted()) {
                return null;
            }

            List<Server> allList = lb.getAllServers();
            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }

            int serverIndex = 0;
            //获取最后一个实例的权重
            double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);
            if (maxTotalWeight < 0.001D) {
                //如果最后一个实例的权重值小于0.001, 则采用父类实现的线性轮询的策略
                server = super.choose(this.getLoadBalancer(), key);
                if (server == null) {
                    return server;
                }
            } else {
                //如果最后一个实例的权重值大于等于0.001, 就产生一个(0, maxTotalWeight)的随机数
                double randomWeight = this.random.nextDouble() * maxTotalWeight;
                int n = 0;

                for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                    Double d = (Double)var13.next();
                    //遍历维护的权重清单, 若权重大于等于随机得到的数值, 就选择这个实例
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    }
                }

                server = (Server)allList.get(serverIndex);
            }

            if (server == null) {
                Thread.yield();
            } else {
                if (server.isAlive()) {
                    return server;
                }

                server = null;
            }
        }

        return server;
    }
}
  • 生成一个[ 0, 最大权重值)区间内的随机数。
  • 遍历权重列表, 比较权重值与随机数的大小,如果权重值大于等千随机数, 就拿当前权重列表的索引值去服务实例列表中获取具体的实例。

3.5.6 ClientConfigEnabledRoundRobinRule

该策略较为特殊,我们一般不直接使用它。因为它本身并没有实现什么特殊的处理逻辑, 正如下面的源码所示, 在它的内部定义了一个RoundRobinRule策略,而choose函数的实现也正是使用了RoundRobinRule 的线性轮询机制,所以它实现的功能实际上与RoundRobinRule相同,那么定义它有什么特殊的用处呢?

虽然我们不会直接使用该策略,但是通过继承该策略,默认的choose就实现了线性轮询机制,在子类中做一 些高级策略时通常有可能会存在一些无法实施的情况,那么就可以用父类的实现作为备选。 在后文中我们将继续介绍的高级策略均是基 ClientConfigEnabledRoundRobinRule的扩展。

public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
    RoundRobinRule roundRobinRule = new RoundRobinRule();

    public ClientConfigEnabledRoundRobinRule() {
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.roundRobinRule = new RoundRobinRule();
    }

    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        this.roundRobinRule.setLoadBalancer(lb);
    }

    public Server choose(Object key) {
        if (this.roundRobinRule != null) {
            return this.roundRobinRule.choose(key);
        } else {
            throw new IllegalArgumentException("This class has not been initialized with the RoundRobinRule class");
        }
    }
}

3.5.7 BestAvailableRule

该策略继承自ClientConfigEnabledRoundRobinRule, 在实现中它注入了负载均衡器的统计对象 LoadBalancerStats, 同时在具体的choose算法中利用LoadBalancerStats保存的实例统计信息来选择满足要求的实例。它通过遍历负载均衡器中维护的所有服务实例,会过滤掉故障的实例,并找出并发请求数最小的一个,所以该策略的特性是=可选出最空闲的实例==。

public Server choose(Object key) {
    if (this.loadBalancerStats == null) {
        return super.choose(key);
    } else {
        List<Server> serverList = this.getLoadBalancer().getAllServers();
        int minimalConcurrentConnections = 2147483647;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        Iterator var7 = serverList.iterator();

        while(var7.hasNext()) {
            Server server = (Server)var7.next();
            ServerStats serverStats = this.loadBalancerStats.getSingleServerStat(server);
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                if (concurrentConnections < minimalConcurrentConnections) {
                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }

        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        }
    }
}

同时,由于该算法的核心依据是统计对象 loadBalancerStats, 当其为空的时候, 该策略是无法执行的。所以从源码中我们可以看到,当loadBalancerStats为空的时候,它会采用父类的线性轮询策略。

3.5.8 PredicateBasedRule

这是一个抽象策略,它也继承了ClientConfigEnabledRoundRobinRule, 从其命名中可以猜出这是一个基于Predicate实现的策略,Predicate是Google Guava Collection工具对集合进行过滤的条件接口。

它定义了一个抽象函数getPredicate来获取AbstractServer­ redicate对象的实现, 而在choose函数中,通过AbstractServerPredicate的chooseRoundRobinAfterFiltering函数来选出具体的服务实例。从该函数的命名我们也大致能猜出它的基础逻辑: 先通过子类中实现的 Predicate 逻辑来过滤一部分服务实例, 然后再以线性轮询的方式从过滤后的实例清单中选出一个。

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
    public PredicateBasedRule() {
    }

    public abstract AbstractServerPredicate getPredicate();

    public Server choose(Object key) {
        ILoadBalancer lb = this.getLoadBalancer();
        Optional<Server> server = this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        return server.isPresent() ? (Server)server.get() : null;
    }
}

3.5.9 AvailabilityFilteringRule

该策略继承自上面介绍的抽象策略PredicateBasedRule 所以它也继承了先过滤清单,再轮询选择的基本处理逻辑,其中过滤条件使用了AbstractServerPredicate:

3.5.10 ZoneAvoidanceRule

从ZoneAvoidanceRule的源码片段中可以看到,它使用了CompositePredicate来进行服务实例清单的过滤。 这是一个组合过滤条件,在其构造函数中,它以ZoneAvoidancePredicate为主过滤条件,AvailabilityPredicate为次过滤条件初始化了组合过滤条件的实例。

public class ZoneAvoidanceRule extends PredicateBasedRule {
    private static final Random random = new Random();
    private CompositePredicate compositePredicate;

    public ZoneAvoidanceRule() {
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        this.compositePredicate = this.createCompositePredicate(zonePredicate, availabilityPredicate);
    }

在实现的时候并没有像AvailabilityFilteringRule那样重写choose函数来优化,所以它完全遵循了父类的过滤主逻辑先过滤清单,再轮询选择。其中过滤清单的条件就是我们上面提到的以ZoneAvoidancePredicate为主过滤条件、AvailabilityPredicate为次过滤条件的组合过滤条件CompositePredicate。从 CompositePredicate 的源码片段中,我们可以看到它定义了一个主过滤条件 AbstractServerPredicate delegate 以及一组次过滤条件列表List fallbacks, 所以它的次过滤列表是可以拥有多个的,并且由于它采用了List存储所以次过滤条件是按顺序执行的。

public class CompositePredicate extends AbstractServerPredicate {
    private AbstractServerPredicate delegate;
    private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
    private int minimalFilteredServers = 1;
    private float minimalFilteredPercentage = 0.0F;

    public boolean apply(@Nullable PredicateKey input) {
        return this.delegate.apply(input);
    }

    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        List<Server> result = super.getEligibleServers(servers, loadBalancerKey);

        AbstractServerPredicate predicate;
        for(Iterator i = this.fallbacks.iterator(); (result.size() < this.minimalFilteredServers || result.size() <= (int)((float)servers.size() * this.minimalFilteredPercentage)) && i.hasNext(); result = predicate.getEligibleServers(servers, loadBalancerKey)) {
            predicate = (AbstractServerPredicate)i.next();
        }

        return result;
    }
}

在获取过滤结果的实现函数getEligibleServers中, 它的处理逻辑如下所示。

  • 使用主过滤条件对所有实例过滤并返回过滤后的实例清单。
  • 依次使用次过滤条件列表中的过滤条件对主过滤条件的结果进行过滤。
  • 每次过滤之后(包括主过滤条件和次过滤条件),都需要判断下面两个条件, 只要有 一个符合就不再进行过滤, 将当前结果返回供线性轮询算法选择:
    过滤后的实例总数>=最小过滤实例数(minimalFilteredServers, 默认为1) 。
    过滤后的实例比例>最小过滤百分比(minimalFilteredPercentage, 默认为0) 。

3.6 配置详解&自动化配置

3.6.1 自动化配置

由于Ribbon中定义的每一个接口都有多种不同的策略实现,同时这些接口之间又有一定的依赖关系,在引入Spring CloudRibbon的依赖之后, 就能够自动化构建下面这些接口的实现。

  1. IClientConfig: Ribbon的客户端配置,默认采用com.netflix.client.config.DefaultClientConfigimpl实现。
  2. IRule: Ribbon的负载均衡策略, 默认采用 com.netflix.loadbalancer.ZoneAvoidanceRule实现,该策略能够在多区域环境下选出最佳区域的实例进行访问。
  3. IPing:ribbon的实例检查策略,默认采用com.netflix.loadbalancer.NoOpPing实现,该检查策略是一个特殊的实现,实际上它并不会检查实例是否可用,而是始终返回true, 默认认为所有服务实例都是可用的。
  4. ServerList: 服务实例清单的维护机制, 默认采用 com.netflix.loadbalancer.ConfigurationBasedServerList实现。
  5. ServerListFilter: 服务实例清单过滤机制,默认采用org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter实现,该策略能够优先过滤出与请求调用方处于同区域的服务实例。
  6. ILoadBalancer: 负载均衡器, 默认采用 com.netflix.loadbalancer.ZoneAwareLoadBalancer实现,它具备了区域感知的能力。

一条小咸鱼