Spring Cloud 整合

前言

玩SpringCloud之前最好懂SpringBoot,别搞撑死骆驼的事。Servlet整一下变成Spring;SSM封装、加入东西就变为SpringBoot;SpringBoot再封装、加入东西就变为SpringCloud

Spring Boot相关内容在前面我已经整理好的了(偏使用,夹带部分源码):https://www.cnblogs.com/zixq/p/16312315.html

架构的演进

单体应用架构

单体架构:表示层、业务逻辑层和数据访问层即所有功能都在一个工程里,打成一个jar包、war包进行部署,例如:GitHub 是基于 Ruby on Rails 的单体架构,直到 2021 年,为了让超过一半的开发人员在单体代码库之外富有成效地开展工作,GitHub 以赋能为出发点开始了向微服务架构的迁移

下图服务器用Tomcat举例

优点:

  1. 单体架构开发简单,容易上手,开发人员只要集中精力开发当前工程
  2. 容易修改,只需要修改对应功能模块的代码,且容易找到相关联的其他业务代码
  3. 部署简单,由于是完整的结构体,编译打包成jar包或者war包,直接部署在一个服务器上即可
  4. 容易扩展,可以将某些业务抽出一个新的单体架构,用于独立分担压力,也可以方便部署集群
  5. 性能最高,对于单台服务器而言,单体架构独享内存和cpu,不需要api远程调用,性能损耗最小

缺点:

  1. 灵活度不高,随着代码量增加,代码整体编译效率下降
  2. 规模化,无法满足团队规模化开发,因为共同修改一个项目
  3. 应用扩展性比较差,只能横向扩展,不能深度扩展,扩容只能只对这个应用进行扩容,不能做到对某个功能点进行扩容,关键性的代码改动一处多处会受影响
  4. 健壮性不高,任何一个模块的错误均可能造成整个系统的宕机
  5. 技术升级,如果想对技术更新换代,代价很大

演进:增加本地缓存和分布式缓存

缓存能够将经常访问的页面或信息存起来,从而不让其去直接访问数据库,从而减小数据库压力,但是:这就会把压力变成单机Tomcat来承受了,因此缺点就是:此时单机的tomcat又不足以支撑起高并发的请求

垂直应用架构:引入Nginx

搭配N个tomcat,从而对请求”均衡处理”,如:如果Nginx可以处理10000条请求,假设一个 tomcat可以处理100个请求,那么:就需要100个tomcat从而实现每个tomcat处理100个请求(假设每个tomcat的性能都一样 )

缺点就是数据库不足以支撑压力

后面就是将数据库做读写分离

后面还有数据库大表拆小表、大业务拆为小业务、复用功能抽离…………..

面向服务架构:SOA

SOA指的是Service-OrientedArchitecture,即面向服务架构

随着业务越来越多,代码越来越多,按照业务功能将本来一整块的系统拆分为各个不同的子系统分别提供不同的服务,服务之间会彼此调用,错综复杂

而SOA的思想就是基于前面拆成不同的服务之后,继续再抽离一层,搞一个和事佬,即下图的“统一接口”

这样不同服务之间调用就可以通过统一接口进行调用了,如:用户服务需要调用订单服务,那么用户服务去找统一接口,然后由统一接口去调用订单服务,从而将订单服务中需要的结果通过统一接口的http+json或其他两种格式返回给用户服务,这样订单服务就是服务提供者,用户服务就是服务消费者,而统一接口就相当于是服务的注册与发现

注意:上面这段话很重要,和后面要玩的微服务框架SpringCloud技术栈有关

学过设计模式的话,上面这种不就类似行为型设计模式的“中介者模式”吗

上面这种若是反应不过来,那拆回单体架构就懂了

微服务架构

微服务架构是分布式架构的具体实现方式,和Spring的IOC控制反转和DI依赖注入的关系一样,一种是理论,一种是具体实现方案

微服务架构和前面的SOA架构是孪生兄弟,即:微服务架构是在SOA架构的基础上,通过前人不断实践、不断踩坑、不断总结,添加了一些东西之后(如:链路追踪、配置管理、负载均衡…………),从而变出来的一种经过良好架构设计的分布式架构方案

而广泛应用的方案框架之一就是 SpringCloud

其中常见的组件包括:

另外,SpringCloud底层是依赖于SpringBoot的,并且有版本的兼容关系,如下:

因此。现在系统架构就变成了下面这样,当然不是一定是下面这样架构设计,还得看看架构师,看领导

因此,微服务技术知识如下

Eureka注册中心

SpringCloud中文官网:https://www.springcloud.cc/spring-cloud-greenwich.html#netflix-ribbon-starter

SpringCloud英文网:https://docs.spring.io/spring-cloud-netflix/docs/current/reference/html/#service-discovery-eureka-clients

Eureka是什么?

Eureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转移的目的。

SpringCloud将它集成在其子项目spring-cloud-netflix中,以实现SpringCloud的服务发现功能

偷张图更直观地了解一下:

如上图所示,服务提供方会将自己注册到EurekaServer中,这样EurekaServer就会存储各种服务信息,而服务消费方想要调用服务提供方的服务时,直接找EurekaServer拉取服务列表,然后根据特定地算法(轮询、随机……),选择一个服务从而进行远程调用

  • 服务提供方:一次业务中,被其它微服务调用的服务。(提供接口给其它微服务)
  • 服务消费方:一次业务中,调用其它微服务的服务。(调用其它微服务提供的接口)

服务提供者与服务消费者的角色并不是绝对的,而是相对于业务而言

如果服务A调用了服务B,而服务B又调用了服务C,服务B的角色是什么?

  • 对于A调用B的业务而言:A是服务消费者,B是服务提供者
  • 对于B调用C的业务而言:B是服务消费者,C是服务提供者

因此,服务B既可以是服务提供者,也可以是服务消费者

Eureka的自我保护机制

这张图中EurekaServer和服务提供方有一个心跳检测机制,这是EurekaServer为了确定这些服务是否还在正常工作,所以进行的心跳检测

eureka-client启动时, 会开启一个心跳任务,向Eureka Server发送心跳,默认周期为30秒/次,如果Eureka Server在多个心跳周期内没有接收到某个节点的心跳,Eureka Server将会从服务注册表中把这个服务节点移除(默认90秒)

eureka-server维护了每个实例的最后一次心跳时间,客户端发送心跳包过来后,会更新这个心跳时间

eureka-server启动时,开启了一个定时任务,该任务每60s/次,检查每个实例的最后一次心跳时间是否超过90s,如果超过则认为过期,需要剔除

但是EurekaClient也会因为网络等原因导致没有及时向EurekaServer发送心跳,因此EurekaServer为了保证误删服务就会有一个“自我保护机制”,俗称“好死不如赖活着”

如果在短时间内EurekaServer丢失过多客户端时 (可能断网了,低于85%的客户端节点都没有正常的心跳 ),那么Eureka Server就认为客户端与注册中心出现了网络故障,Eureka Server自动进入自我保护状态 。Eureka的这样设计更加精准地控制是网络通信延迟,而不是服务挂掉了,一旦进入自我保护模式,那么 EurekaServer就会保留这个节点的属性,不会删除,直到这个节点恢复正常心跳

  • 85% 这个阈值,可以通过如下配置来设置:
eureka:
  server:
    renewal-percent-threshold: 0.85

这里存在一个问题,这个85%是超过谁呢?这里有一个预期的续约数量,计算公式如下:

自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子

在自我保护模式中,EurekaServer会保留注册表中的信息,不再注销任何服务信息,当它收到正常心跳时,才会退出自我保护模式,也就是:宁可保留错误的服务注册信息,也不会盲目注销任何可能健康的服务实例,即:好死不如赖活着

因此Eureka进入自我保护状态后,会出现以下几种情况:

  • Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其他节点上,保证当前节点依然可用。Eureka的自我保护机制可以通过如下的方式开启或关闭
eureka:
  server:
#   开启Eureka自我保护机制,默认为true
    enable-self-preservation: true
  • Eureka Server不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务,如果在保护期内这个服务提供者刚好非正常下线了,此时服务消费者就会拿到一个无效的服务实例,此时会调用失败,对于这个问题需要服务消费者端要有一些容错机制,如重试,断路器等!

Eureka常用配置

eureka:
  client: # eureka客户端配置
    register-with-eureka: true # 是否将自己注册到eureka服务端上去
    fetch-registry: true # 是否获取eureka服务端上注册的服务列表
    service-url:
      defaultZone: http://localhost:8001/eureka/ # 指定注册中心地址。若是集群可以写多个,中间用 逗号 隔开
    enabled: true # 启用eureka客户端
    registry-fetch-interval-seconds: 30 # 定义去eureka服务端获取服务列表的时间间隔
  instance: # eureka客户端实例配置
    lease-renewal-interval-in-seconds: 30 # 定义服务多久去注册中心续约
    lease-expiration-duration-in-seconds: 90 # 定义服务多久不去续约认为服务失效
    metadata-map:
      zone: hangzhou # 所在区域
    hostname: localhost # 服务主机名称
    prefer-ip-address: false # 是否优先使用ip来作为主机名
  server: # eureka服务端配置
    enable-self-preservation: false #关 闭eureka服务端的自我保护机制

使用Eureka

实现如下的逻辑:

搭建Eureka Server

自行单独创建一个Maven项目,导入依赖如下:

<!--Eureka Server-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

在YAML文件中一般可配置内容如下:

server:
  port: 10086
spring:
  application:
    name: EUREKA-SERVER
eureka:
  instance:
    # Eureka的主机名,是为了eureka集群服务器之间好区分
    hostname: 127.0.0.1
    # 最后一次心跳后,间隔多久认定微服务不可用,默认90
    lease-expiration-duration-in-seconds: 90
  client:
    # 不向注册中心注册自己。应用为单个注册中心设置为false,代表不向注册中心注册自己,默认true	注册中心不需要开启
    # registerWithEureka: false
    # 不从注册中心拉取自身注册信息。单个注册中心则不拉取自身信息,默认true	注册中心不需要开启
    # fetchRegistry: false
    service-url:
      # Eureka Server的地址
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka
#    server:
#      # 开启Eureka自我保护机制,默认为true
#      enable-self-preservation: true

注:在SpringCloud中配置文件YAML有两种方式,一种是 application.yml 另一种是 bootstrap.yml ,这个知识后续Nacos注册中心会用到,区别去这里:https://www.cnblogs.com/sharpest/p/13678443.html

启动类编写内容如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

/**
 * <p>@description  : 该类功能  eureka server启动类
 * </p>
 * <p>@author       : ZiXieqing</p>
 */

/*@EnableEurekaServer 开启Eureka Server功能*/
@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }
}

服务提供者

新建一个Maven模块项目,依赖如下:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
<!--eureka client-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

YAML配置内容如下:

server:
  port: 8081
spring:
  application:
    name: USER-SERVICE
eureka:
  client:
    service-url:
      # 将服务注册到哪个eureka server
      defaultZone: http://localhost:10086/eureka

启动类内容如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }
}

关于开启Eureka Client的问题

上一节中启动类里面有些人会看到是如下的方式:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient // 多了这么一个操作:开启eureka client功能
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }
}

在eureka client启动类中,有些人会加 @EnableEurekaClient 注解,而有些人不会加上,为什么?除了显示声明还有其他原因吗??

要弄这个问题,首先看yml中的配置,有些是在yml中做了一个操作:

eureka:
  client:
    service-url:
      # 向哪个eureka server进行服务注册
      defaultZone: http://localhost:10086/eureka
    # 开启eureka client功能,默认就是true,差不多等价于启动类中加 @EnableEurekaClient 注解
    enabled: true

既然上面配置默认值都是true,那还有必要在启动类中加入 @EnableEurekaClient 注解吗?

答案是根本不用加,加了也是多此一举(前提:yml配置中没有手动地把值改为false),具体原因看源码:答案就在Eureka client对应的自动配置类 EurekaClientAutoConfiguration

上图中这一行的意思是只有当application.yaml(或者环境变量,或者系统变量)里,eureka.client.enabled这个属性的值为true才会初始化这个类(如果手动赋值为false,就不会初始化这个类了)

另外再加上另一个原因,同样在 EurekaClientAutoConfiguration 类中还有一个 eurekaAutoServiceRegistration() 方法

在这里使用 EurekaAutoServiceRegistration类+@Bean注解 意思就是通过 @Bean 注解,装配一个 EurekaAutoServiceRegistration 对象作为Spring的bean,而我们从名字就可以看出来EurekaClient的注册就是 EurekaAutoServiceRegistration 对象所进行的操作

同时,在这个方法上,也有这么一行 @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)

综上所述:我们可以看出来,EurekaClient的注册和两个配置项有关的,一个是 eureka.client.enabled ,另一个是 spring.cloud.service-registry.auto-registration.enabled ,只不过这两个配置默认都是true。这两个配置无论哪个我们手动配置成false,我们的服务都无法进行注册,测试自行做

另外还有一个原因:上图中不是提到了 EurekaAutoServiceRegistration类+@Bean注解 吗,那去看一下

可以看到 EurekaAutoServiceRegistration 类实现了Spring的 SmartLifecycle 接口,这个接口的作用是帮助一个类在作为Spring的Bean的时候,由Spring帮助我们自动进行一些和生命周期有关的工作,比如在初始化或者停止的时候进行一些操作。而我们最关心的 注册(register) 这个动作,就是在SmartLifecycle接口的 start() 方法实现里完成的

而上一步讲到,EurekaAutoServiceRegistration 类在 EurekaClientAutoConfiguration 类里恰好被配置成Spring的Bean,所以这里的 start() 方法是会自动被Spring调用的,我们不需要进行任何操作

总结

当我们引用了EurekaClient的依赖后,并且 eureka.client.enabledspring.cloud.service-registry.auto-registration.enabled 两个开关不手动置为false,Spring就会自动帮助我们执行 EurekaAutoServiceRegistration 类里的 start() 方法,而注册的动作就是在该方法里完成的

所以,我们的EurekaClient工程,并不需要显式地在SpringBoot的启动类上标注 @EnableEurekaClient 注解

服务消费者

创建Maven模块,依赖如下:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

YAML配置如下:

server:
  port: 8080
spring:
  application:
    name: ORDER-SERVICE
eureka:
  client:
    service-url:
      # 向哪个eureka server进行服务拉取
      defaultZone: http://localhost:10086/eureka

启动类如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

    /**
     * RestTemplate 用来进行远程调用服务提供方的服务
     * LoadBalanced 注解 是SpringCloud中的
     *              此处作用:赋予RestTemplate负载均衡的能力 也就是在依赖注入时,只注入实例化时被@LoadBalanced修饰的实例
     *              底层是 Spring的Qualifier注解,即为spring的原生操作
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

@Qualifier 注解很重要:

@Autowired 默认是根据类型进行注入的,因此如果有多个类型一样的Bean候选者,则需要限定其中一个候选者,否则将抛出异常

@Qualifier 限定描述符除了能根据名字进行注入,更能进行更细粒度的控制如何选择候选者

@LoadBalanced很明显,”继承”了注解@QualifierRestTemplates通过@Autowired注入,同时被@LoadBalanced修饰,所以只会注入@LoadBalanced修饰的RestTemplate,也就是我们的目标RestTemplate

通过 RestTemplate +eureka 远程调用服务提供方中的服务

import com.zixieqing.order.mapper.OrderMapper;
import com.zixieqing.order.pojo.Order;
import com.zixieqing.order.pojo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private RestTemplate restTemplate;

    public Order queryOrderById(Long orderId) {
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        // 2、远程调用服务的url 此处直接使用服务名,不用ip+port
        // 原因是底层有一个LoadBalancerInterceptor,里面有一个intercept(),后续玩负载均衡Ribbon会看到
        String url = "http://USER-SERVICE/user/" + order.getUserId();
        // 2.1、利用restTemplate调用远程服务,封装成user对象
        User user = restTemplate.getForObject(url, User.class);
        // 3、给oder设置user对象值
        order.setUser(user);
        // 4.返回
        return order;
    }
}

不会玩 RestTemplate 用法的 戳这里

测试

依次启动eureka-server、user-service、order-service,然后将user-service做一下模拟集群即可,将user-service弄为模拟集群操作方式如下:不同版本IDEA操作有点区别,出入不大


再将复刻的use-service2也启动即可,启动之后点一下eureka-server的端口就可以在浏览器看到服务情况


可以自行在服务提供方和服务消费方编写逻辑,去链接数据库,然后在服务消费方调用服务提供方的业务,最后访问自己controller中定义的路径和参数即可

Ribbon负载均衡

负载均衡的作用范围

负载均衡是作用在网络通信上,来实现请求的分发。
而在网络架构中,基于 OSI 模型,又分为 7 层网络模型

也就是意味着我们可以在网络的某些分层上做请求分发处理,因此根据这样一个特性,对于负载均衡的作用范围又可以分为:

  1. 二层负载
  2. 三层负载
  3. 四层负载
  4. 七层负载

二层负载:基于 Mac 地址来实现请求分发,一般采用虚拟 Mac 的方式实现,服务器收到请求后,通过动态分配后端服务的实际 Mac 地址进行响应,从而实现负载均衡
三层负载:基于 IP 层负载,一般通过虚拟 IP 的方式实现,外部请求访问虚拟 IP,服务器收到请求后根据后端实际 IP 地址进行转发。
四层负载:通过请求报文中的目标地址和端口进行负载,Nginx、F5、LVS 等都可以实现四层负载。

七层负载:七层负载是基于应用层负载,也就是服务器端可以根据 http 协议中请求的报文信息来决定把请求分发到哪个目标服务器上,比如 Cookie、消息体、RequestHeader 等。

Ribbon是什么?

Ribbon是Netflix发布的开源项目,Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的框架

Ribbon属于哪种负载均衡?

LB负载均衡(Load Balance)是什么?

  • 简单地说就是将用户的请求平摊的分配到多个服务上,从而达到系统的HA(高可用)
  • 常见的负载均衡有软件Nginx,硬件 F5等

什么情况下需要负载均衡?

  • 现在Java非常流行微服务,也就是所谓的面向服务开发,将一个项目拆分成了多个项目,其优点有很多,其中一个优点就是:将服务拆分成一个一个微服务后,我们很容易地来针对性的进行集群部署。例如订单模块用的人比较多,那就可以将这个模块多部署几台机器,来分担单个服务器的压力

  • 这时候有个问题来了,前端页面请求的时候到底请求集群当中的哪一台?既然是降低单个服务器的压力,所以肯定全部机器都要利用起来,而不是说一台用着,其他空余着。这时候就需要用负载均衡了,像这种前端页面调用后端请求的,要做负载均衡的话,常用的就是Nginx

Ribbon和Nginx负载均衡的区别

  • 当后端服务是集群的情况下,前端页面调用后端请求,要做负载均衡的话,常用的就是Nginx
  • Ribbon主要是在“服务端内”做负载均衡,举例:订单后端服务 要调用 支付后端服务,这属于后端之间的服务调用,压根根本不经过页面,而后端支付服务是集群,这时候订单服务就需要做负载均衡来调用支付服务,记住是订单服务做负载均衡 “来调用” 支付服务

负载均衡分类

  • 集中式LB:即在服务的消费方和提供方之间使用独立的LB设施(可以是硬件,如F5, 也可以是软件,如nginx),由该设施负责把访问请求通过某种策略转发至服务的提供方
  • 进程内LB:将LB逻辑集成到“消费方”,消费方从服务注册中心获知有哪些地址可用,然后自己再从这些地址中选择出一个合适的服务器

Ribbon负载均衡

  • Ribbon就属于进程内LB,它只是一个类库,集成于服务消费方进程

Ribbon的流程

通过上图一定要明白一点:Ribbon一定是用在消费方,而不是服务的提供方!

Ribbon在工作时分成两步(这里以Eureka为例,consul和zk同样道理):

  • 第一步先选择 EurekaServer ,它优先选择在同一个区域内负载较少的server
  • 第二步再根据用户指定的策略(轮询、随机、响应时间加权…..),从server取到的服务注册列表中选择一个地址

请求怎么从服务名地址变为真实地址的?

只要引入了注册中心(Eureka、consul、zookeeper),那Ribbon的依赖就在注册中心里面了,证明如下:

回到正题:为什么下面这样使用服务名就可以调到服务提供方的服务,即:请求 http://userservice/user/101 怎么变成的 http://localhost:8081 ??因为它长得好看?

import com.zixieqing.order.mapper.OrderMapper;
import com.zixieqing.order.pojo.Order;
import com.zixieqing.order.pojo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private RestTemplate restTemplate;

    public Order queryOrderById(Long orderId) {
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        // 2、远程调用服务的url 此处直接使用服务名,不用ip+port
        // 原因是底层有一个LoadBalancerInterceptor,里面有一个intercept(),后续玩负载均衡Ribbon会看到
        String url = "http://USER-SERVICE/user/" + order.getUserId();
        // 2.1、利用restTemplate调用远程服务,封装成user对象
        User user = restTemplate.getForObject(url, User.class);
        // 3、给oder设置user对象值
        order.setUser(user);
        // 4.返回
        return order;
    }
}


// RestTemplate做了下面操作,使用了 @Bean+@LoadBalanced


    /**
     * RestTemplate 用来进行远程调用服务提供方
     * LoadBalanced 注解 是SpringCloud中的
     *              此处作用:赋予RestTemplate负载均衡的能力 也就是在依赖注入时,只注入实例化时被@LoadBalanced修饰的实例
     *              底层是 Spring的Qualifier注解,即为spring的原生操作
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

想知道答案就得Debug了,而要Debug,就得找到 LoadBalancerInterceptor

LoadBalancerInterceptor类

然后对服务消费者进行Debug









问题的答案已经出来了:为什么使用服务名就可以调到服务提供方的服务,即:请求 http://userservice/user/101 怎么变成的 http://localhost:8081 ??

  • 原因就是使用了RibbonLoadBalancerClient+loadBalancer(默认是 ZoneAwareLoadBalance 从服务列表中选取服务)+IRule(默认是 RoundRobinRule 轮询策略选择某个服务)

总结

SpringCloudRibbon的底层采用了一个拦截器LoadBalancerInterceptor,拦截了RestTemplate发出的请求,对地址做了修改

负载均衡策略有哪些?

根据前面的铺垫,也知道了负载均衡策略就在 IRule 中,那就去看一下

转换一下:

ClientConfigEnabledRoundRobinRule:该策略较为特殊,我们一般不直接使用它。因为它本身并没有实现什么特殊的处理逻辑。一般都是可以通过继承他重写一些自己的策略,默认的choose()就实现了线性轮询机制

  • BestAvailableRule:继承自ClientConfigEnabledRoundRobinRule,会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务,该策略的特性是可选出最空闲的实例

PredicateBasedRule:继承自ClientConfigEnabledRoundRobinRule,抽象策略,需要重写方法,然后自定义过滤规则

  • AvailabilityFilteringRule:继承PredicateBasedRule,先过滤掉故障实例,再选择并发较小的实例。过滤掉的故障服务器是以下两种:
    1. 在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加
    2. 并发数过高的服务器。如果一个服务器的并发连接数过高,配置了AvailabilityFilteringRule规则的客户端也会将其忽略。并发连接数的上限,可以由客户端的<clientName>.<clientConfigNameSpace>.ActiveConnectionsLimit 属性进行配置
  • ZoneAvoidanceRule:继承PredicateBasedRule,默认规则,复合判断server所在区域的性能和server的可用性选择服务器

com.netflix.loadbalancer.RoundRobinRule:轮询 Ribbon的默认规则

  • WeightedResponseTimeRule:对RoundRobinRule的扩展。为每一个服务器赋予一个权重值,服务器响应时间越长,其权重值越小,这个权重值会影响服务器的选择,即:响应速度越快的实例选择权重越大,越容易被选择
  • ResponseTimeWeightedRule:对RoundRobinRule的扩展。响应时间加权

com.netflix.loadbalancer.RandomRule:随机

com.netflix.loadbalancer.StickyRule:这个基本也没人用

com.netflix.loadbalancer.RetryRule:先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会进行重试,从而获取可用的服务

ZoneAvoidanceRule:先复合判断server所在区域的性能和server的可用性选择服务器,再使用Zone对服务器进行分类,最后对Zone内的服务器进行轮询

自定义负载均衡策略

在前面已经知道了策略是 IRule ,所以就是改变了这个玩意而已

1、代码方式 :服务消费者的启动类或重开config模块编写如下内容即可

@Bean
public IRule randomRule(){
    // new前面提到的那些rule对象即可,当然这里面也可以自行篡改策略逻辑返回
    return new RandomRule();
}

注: 此种方式是全局策略,即所有服务均采用这里定义的负载均衡策略

2、@RibbonClient注解:用法如下

/**
 * 在服务消费者的启动类中加入如下注解即可 如下注解指的是:调用 USER-SERVICE 服务时 使用MySelfRule负载均衡规则
 *
 * 这里的MySelfRule可以弄为自定义逻辑的策略,也可以是前面提到的那些rule策略
 */
@RibbonClient(name = "USER-SERVICE",configuration=MySelfRule.class)

这种方式可以达到只针对某服务做负载均衡策略,但是:官方给出了明确警告 configuration=MySelfRule.class 自定义配置类一定不能放到@ComponentScan 所扫描的当前包下以及子包下,否则我们自定义的这个配置类就会被所有的Ribbon客户端所共享,达不到特殊化定制的目的了(也就是一旦被扫描到,RestTemplate直接不管调用哪个服务都会用指定的算法)

springboot项目当中的启动类使用了@SpringBootApplication注解,这个注解内部就有@ComponentScan注解,默认是扫描启动类包下所有的包,所以我们要达到定制化一定不要放在它能扫描到的地方

cloud中文官网:https://www.springcloud.cc/spring-cloud-greenwich.html#netflix-ribbon-starter

3、使用YAML配置文件方式 在服务消费方的yml配置文件中加入如下格式的内容即可

# 给某个微服务配置负载均衡规则,这里是user-service服务
user-service: 
  ribbon:
    # 负载均衡规则
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

注意,一般用默认的负载均衡规则,不做修改

Ribbon饿汉加载

Ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长。

而饿汉加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:

ribbon:
  eager-load:
    # 开启负载均衡饿汉加载模式
    enabled: true
    # clients是一个String类型的List数组,多个时采用下面的 - xxxx服务 的形式,单个时直接使用 clients: 服务名 即可
    clients:
      - USER-SERVICE

Nacos注册中心

国内公司一般都推崇阿里巴巴的技术,比如注册中心,SpringCloudAlibaba也推出了一个名为Nacos的注册中心

Nacos 是阿里巴巴的产品,现在是 SpringCloud 中的一个组件。相比 Eureka 功能更加丰富,在国内受欢迎程度较高

安装Nacos

windows安装

GitHub中下载:https://github.com/alibaba/nacos/releases

下载好之后直接解压即可,但:别解压到有“中文路径”的地方

Nacos的默认端口是8848,若该端口被占用则关闭该进程 或 修改nacos中的默认端口(conf/application.properties)

启动Nacos:密码和账号均是 nacos

startup.cmd -m standalone


-m 				mode 模式
standalone		单机

Linux安装

Nacos是基于Java开发的,所以需要JDK支持,因此Linux中需要有JDK环境

上传Linux版的JDK

# 解压
tar -xvf jdk-8u144-linux-x64.tar.gz

# 配置环境变量
export JAVA_HOME=/usr/local/java			# =JDK解压后的路径
export PATH=$PATH:$JAVA_HOME/bin

# 刷新环境变量
source /etc/profile

上传Linux版的Nacos

# 解压
tar -xvf nacos-server-1.4.1.tar.gz

# 进入 nacos/bin 目录中,输入命令启动Nacos
sh startup.sh -m standalone

# 有8848端口冲突和windows中一样方式解决

注册服务到Nacos中

拉取Nacos的依赖管理,服务端加入如下依赖

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-alibaba-dependencies</artifactId>
  <version>2.2.5.RELEASE</version>
  <type>pom</type>
  <scope>import</scope>
</dependency>

客户端依赖如下:

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

注:不要有其他注册中心的依赖,如前面玩的Eureka,有的话注释掉

修改客户端的yml配置文件:

server:
  port: 8081
spring:
  application:
    name: USER-SERVICE
  cloud:
    nacos:
      # Nacos服务器地址
      server-addr: localhost:8848
#eureka:
#  client:
#    # 去哪里拉取服务列表
#    service-url:
#      defaultZone: http://localhost:10086/eureka

启动之后,在 ip+port/nacos 就在Nacos控制台看到信息了

Nacos集群配置与负载均衡策略调整

1、集群配置:Nacos的服务多级存储模型和其他的不一样

就多了一个集群,不像其他的是 服务—–> 实例

好处:微服务互相访问时,应该尽可能访问同集群实例,因为本地访问速度更快。当本集群内不可用时,才访问其它集群

配置服务集群:想要对哪个服务配置集群则在其yml配置文件中加入即可

server:
  port: 8081
  application:
    name: USER-SERVICE
  cloud:
    nacos:
      # Nacos服务器地址
      server-addr: localhost:8848
      # 配置集群名称,如:HZ,杭州
      cluster-name: HZ

测试则直接将“服务提供者”复刻多份,共用同一集群名启动,然后再复刻修改集群名启动即可,如下面的:

2、负载均衡策略调整:前面玩Ribbon时已经知道了默认是轮询策略,而想要达到Nacos的 尽可能访问同集群实例,因为本地访问速度更快。当本集群内不可用时,才访问其它集群 的功能,则就需要调整负载均衡策略,配置如下:

USER-SERVICE:
  ribbon:
    # 单独对某个服务设置负载均衡策略
#    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule
    # 改为Naocs的负载均衡策略
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule

注: 再次说明前面提到的 ——> 负载均衡策略调整放在“服务消费方”

经过上面的配置之后,服务消费方去调用服务提供方的服务时,会优先选择和服务消费方同集群下的服务提供方的服务,若无法访问才跨集群访问其他集群下的服务提供方得到服务

  • 小细节: 服务消费方访问同集群下服务提供方的服务时(提供方是集群,多实例),选择这些实例中的哪一个服务时并不是采用轮询了,而是随机

另外的负载均衡策略就是Ribbon中的:

3、加权策略 :服务器权重值越高,越容易被选择,所以能者多劳,性能好的服务器被访问的次数应该越多

权重值一般在 [0,10000] 之间。直接去Nacos的控制台中选择想要修改权重值的服务,点击“详情”即可修改

注: 当权重值为0时,代表此服务实例不会再被访问,类似于停机迭代

Nacos环境隔离

前面一节见到了Nacos的集群结构,但那只是较内的一层,Nacos不止是注册中心,也可以是数据中心

  • namespace :就是环境隔离,如 dev开发环境、test测试环境、prod生产环境。若没配置,则默认是public,在没有指定命名空间时都会默认从public这个命名空间拉取配置以及注册到该命名空间下的注册表中。什么是注册表在后续看源码时会说明
  • group :就是在namespace的基础上,再进行分组,就是平时理解的分组,如 将服务相关性强的分在一个组
  • service —-> clusters —–> instances :就是前面说的集群,服务 —-> 集群 ——> 实例

配置namespace: 注意事项如下

  1. 同名的命名空间只能创建一个
  2. 微服务间如果没有注册到一个命名空间下,无法使用OpenFeign指定服务名负载通信(服务拉取的配置文件不同命名空间不影响)。Feign是后面要玩的



在yml配置文件中进行环境隔离配置

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      cluster-name: HZ
      # 环境隔离:即当前这个服务要注册到哪个命名空间环境去
      # 值为在Nacos控制台创建命名空间时的id值,如下面的dev环境
      namespace: e7144264-0bf4-4caa-a17d-0af8e81eac3a

Nacos临时与非临时实例

1、Nacos和Eureka的不同:不同在下图字体加粗的部分,加粗是Nacos具备而Eureka不具备的

临时实例: 由服务提供者主动给Nacos发送心跳情况,在规定时间内要是没有发送,则Nacos认为此服务挂了,就会从服务列表中踢掉(非亲儿子)

非临时实例/永久实例:由Nacos主动来询问服务是否还健康、活着(此种实例会让服务器压力变大),若非临时实例挂了,Naocs并不会将其踢掉(亲儿子)

  1. 临时实例:Nacos官网 https://nacos.io/zh-cn/docs/open-api.html 中的“服务发现”的“发送实例心跳”中可以看到源码是在什么地方找

    • 适合:流量激增时使用(高并发故增加更多实例),后续流量下降了这些实例就可以不要了

    • 采用客户端心跳检测模式,心跳周期5秒

    • 心跳间隔超过15秒则标记为不健康

    • 心跳间隔超过30秒则从服务列表删除

  2. 永久实例:

    • 适合:常备实例

    • 采用服务端主动健康检测方式

    • 周期为2000 + 5000,即[2000, 7000]毫秒内的随机数

    • 检测异常只会标记为不健康,不会删除

push:若是Nacos检测到有服务提供者挂了,就会主动给消费者发送服务变更的消息,然后服务消费者更新自己的服务缓存列表。这一步就会让服务列表更新很及时

  • 此方式是Nacos具备而Eureka不具备的,Eureka只有pull操作,因此Eureka的缺点就是服务更新可能会不及时(在30s内,服务提供者变动了,个别挂了,而消费者中的服务缓存列表还是旧的,只能等到30s到了才去重新pull)

Nacos的服务发现分为两种模式:

  • 模式一:主动拉取模式(push模式),消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表
  • 模式二:订阅模式(pull模式),消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者

查看服务发现源码的地方:后续也会介绍

Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP方式

补充:CAP定理 这是分布式事务中的一个方法论

  1. C 即:Consistency 数据一致性。指的是:用户访问分布式系统中的任意节点,得到的数据必须一致
  2. A 即:Availability 可用性。指的是:用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
  3. P 即:Partition Tolerance 分区容错性。指的是:由于某种原因导致系统中任意信息的丢失或失败都不能影响系统的继续独立运作

注: 分区容错性是必须满足的,数据一致性( C )和 可用性( A )只满足其一即可,一般的搭配是如下的(即:取舍策略):

  1. CP 保证数据的准确性
  2. AP 保证数据的及时性

既然CAP定理都整了,那就再加一个Base理论吧,这个理论是对CAP中C和A这两个矛盾点的调和和选择

  1. BA 即:Basically Available 基本可用性。指的是:在发生故障的时候,可以允许损失“非核心部分”的可用性,保证系统正常运行,即保证核心部分可用
  2. S 即:Soft State 软状态。指的是:允许系统的数据存在中间状态,只要不影响整个系统的运行就行
  3. E 即:Eventual Consistency 最终一致性。指的是:无论以何种方式写入数据库 / 显示出来,都要保证系统最终的数据是一致的

2、配置临时实例与非临时实例:在需要的一方的yml配置文件中配置如下开关即可

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      cluster-name: HZ
      # 默认为true,即临时实例
      ephemeral: false

改完之后可以在Nacos控制台看到服务是否为临时实例

Nacos统一配置管理

统一配置管理: 将容易发生改变的配置单独弄出来,然后在后续需要变更时,直接去统一配置管理处进行更改,这样凡是依赖于这些配置的服务就可以统一被更新,而不用挨个服务更改配置,同时更改配置之后不用重启服务,直接实现热更新

Nacos和SpringCloud原生的config不一样,Nacos是将 注册中心+config 结合在一起了,而SpringCloud原生的是Eureka+config

1、设置Nacos配置管理



以上便是在Nacos中设置了统一配置。但是:项目/服务想要得到这些配置,那就得获取到这些配置,怎么办?

在前面说过SpringCloud中有两种yml的配置方式,一种是 application.yml ,一种是 bootstrap.yml ,这里就需要借助后者了,它是引导文件,优先级比前者高,会优先被加载,这样就可以先使用它加载到Nacos中的配置文件,然后再读取 application.yml ,从而完成Spring的那一套注册实例的事情

2、在需要读取Nacos统一配置的服务中引入如下依赖:

<!--nacos配置管理依赖-->
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

3、resources下新建 bootstrap.yml,里面的配置内容如下

spring:
  application:
    # 服务名,对应在nacos中进行配置管理的data id的服务名
    name: userservice
  profiles:
    # 环境,对应在nacos中进行配置管理的data id的环境
    active: dev
  cloud:
    nacos:
      # nacos服务器地址,需要知道去哪里拉取配置信息
      server-addr: localhost:8848
    config:
      # 文件后缀,对应在nacos中进行配置管理的data id的后缀名
      file-extension: yaml

经过上面的操作之后,以前需要单独在 application.yml 改的事情就不需要了,bootstrap.yml 配置的东西会去拉取nacos中的配置

还有一种引入nacos统一配置的方式:

spring:
  cloud:
    nacos:
      config:
        server-addr: localhost:8848
  config:
    import:
      - nacos:userservice-dev.yaml

4、设置热更新: 假如业务代码中有需要用到nacos中的配置信息,那nacos中的配置改变之后,不需要重启服务,自动更新。一共有两种方式

  1. @RefreshScope+@Value注解: 在 @Value 注入的变量所在类上添加注解 @RefreshScope

  1. @ConfigurationProperties 注解

然后在需要的地方直接注入对象即可

Nacos多环境共享配置

有时会遇到这样的情况:生产环境、开发环境、测试环境有些配置是相同的,这种应该不需要在每个环境中都配置,因此需要让这些相同的配置单独弄出来,然后实行共享

在前面一节中已经说到了一种Nacos的配置文件格式 即 服务名-环境.后缀,除了这种还有一种格式 即 服务名.后缀

因此:想要让环境配置共享,那么直接在Nacos控制台的配置中再加一个以 服务名.后缀名 格式命名的配置即可,如下:

其他的都不用动,要只是针对于项目中的yml,如 appilication.yml,那前面已经说了,会先读取Nacos中配置,然后和 application.yml 进行合并

但是:若项目本地的yml中、服务名.后缀、服务名-环境.后缀 中有相同的属性/配置时,优先级不一样,如下:

Nacos集群部署

windows和Linux都是一样的思路,集群部署的逻辑如下:

1、解压压缩包

2、进入nacos的conf目录,修改配置文件cluster.conf.example,重命名为cluster.conf,并添加要部署的集群ip+port,如下:

ip1:port1
ip2:port2
ip3:port3

3、然后修改conf/application.properties文件,添加数据库配置

# 告诉nacos数据库集群是MySQL,根据需要自定义
spring.datasource.platform=mysql
# 数据库的数量
db.num=1
# 数据库url
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
# 数据库用户名
db.user.0=root
# 数据库密码
db.password.0=88888

4、复制解压包,部署到不同服务器,然后改变每个解压包的端口,路径:conf/application.properties文件,例如:

# 第一个nacos节点
server.port=8845

# 第二个nacos节点
server.port=8846

# 第三个nacos节点
server.port=8847

5、挨个启动nacos即可,进入到解压的nacos的bin目录中,执行如下命令即可

startup.cmd

此命令告知:nacos默认就是集群启动,前面玩时加了 -m standalone 就是单机启动

5、使用Nginx做反向代理 :修改conf/nginx.conf文件,配置如下:

upstream nacos-cluster {
  server ip1:port1;
  server ip2:port2;
  server ip3:port3;
}

server {
  listen       80;
  server_name  localhost;

  location /nacos {
    proxy_pass http://nacos-cluster;
  }
}

6、代码中application.yml文件配置如下:

spring:
  cloud:
    nacos:
      # Nacos地址,上一步Nginx中的 server_name+listen监听的端口
      server-addr: localhost:80

7、访问 http://localhost/nacos 即可

  • 注:浏览器默认就是80端口,而上面Nginx中监听的就是80,所以根据情况自行修改这里的访问路径

Nacos服务注册表结构是怎样的?

分析源码就在nacos官网下载的source.code:nacos-naming/controller/InstanceController#register(HttpServletRequest request)

Java代码中是使用了Map<String, Map<String, Service>>:每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中

  • key:是namespace_id,用于环境隔离
  • value:是Map<String, Service>
    • key:是group,但是是使用group+serviceName组成的key
    • value:表示Service服务,这个Service又套了一个Map<String, Cluster>
      • key:就是集群名
      • value:就是Cluster对象,这里面又套了一个Set ,这里面就是实例了

Nacos为何能抗住数十万服务注册压力?

源码在:nacos-naming/controller/InstanceController#register(HttpServletRequest request)中的serviceManager.registerInstance(namespaceId, serviceName, instance)里面

先看抗住压力的原因的结论:

  • 在Nacos集群下,对于临时实例,服务注册时是将其丢给了一个ArrayBlockingQueue阻塞队列,然后就返回客户端,最后通过一个死循环利用线程池去执行阻塞队列中的任务(注册服务),这就做到了异步操作
  • 将服务更新情况同步给集群中的其他节点也是同样的原理,底层还是用了阻塞队列+线程池

具体的逻辑在 DistroConsistencyServiceImpl.put()中

public class DistroConsistencyServiceImpl {
    
	@Override
    public void put(String key, Record value) throws NacosException {
		// 异步服务注册 key是服务唯一id,value就是instances
        onPut(key, value);
        // 服务更新情况异步更新给集群下的另外节点
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }


    /**
     * Put a new record.
     *
     * @param key   key of record
     * @param value record
     */
    public void onPut(String key, Record value) {

        // 判断是否是临时实例
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            // 封装 Instances 信息到 数据集:Datum
            Datum<Instances> datum = new Datum<>();
            // value就是instances
            datum.value = (Instances) value;
            // key是服务的唯一id
            datum.key = key;
            // 加入当前修改时间
            datum.timestamp.incrementAndGet();
            // 数据存储 放入dataStore中
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }

        // notifier这玩意儿 implements Runnable
        notifier.addTask(key, DataOperation.CHANGE);
    }




    public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        /**
         * 维护了一个阻塞队列
         */
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {

            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            // 将服务唯一id + 事件类型(CHANGE)放入了阻塞队列
            tasks.offer(Pair.with(datumKey, action));
        }
        
        
        
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            for (; ; ) { // 死循环
                try {
                    // 去阻塞队列中获取任务
                    Pair<String, DataOperation> pair = tasks.take();
                    // 有任务就处理任务,更新服务列表;无任务就进入wait,所以此死循环不会导致CPU负载过高
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        
        
        
		/**
		 * DistroConsistencyServiceImpl.Notifier类的 handle 方法:即 handle(pair) 中的逻辑
		 */
		private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();

                services.remove(datumKey);

                int count = 0;

                if (!listeners.containsKey(datumKey)) {
                    return;
                }

                // 遍历,找到变化的service,这里的 RecordListener 就是 Service
                for (RecordListener listener : listeners.get(datumKey)) {

                    count++;

                    try {
                        // 如果是 CHANGE 事件
                        if (action == DataOperation.CHANGE) {
                            // 就更新服务列表
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }

                        // 如果是 DELETE 事件
                        if (action == DataOperation.DELETE) {
                            // 就根据服务ID从服务列表中删除服务
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }
}

因此能抗住压力的原因:

  • 在Nacos集群下,对于临时实例,服务注册时是将其丢给了一个ArrayBlockingQueue阻塞队列,然后就返回客户端,最后通过一个死循环利用线程池去执行阻塞队列中的任务(注册服务),这就做到了异步操作
  • 将服务更新情况同步给集群中的其他节点也是同样的原理,底层还是用了阻塞队列+线程池

Nacos实例的并发读写问题

源码还是在:nacos-naming/controller/InstanceController#register(HttpServletRequest request)中的serviceManager.registerInstance(namespaceId, serviceName, instance)里面

具体思路:采用了同步锁+CopyOnWrite思想

  • 并发读的解决方式 – CopyOnWrite思想:将原来的实例列表Map拷贝给了一个新的Map,然后对新的实例列表Map进行增删,最后将新的实例列表Map的引用给旧的实例列表Map
  • 并发写的解决方式:
    • 在注册实例时,会使用synchronized同步锁对service进行加锁,不同service不影响,相同service通过锁排斥
    • 另外还有一个原因是:更新实例列表时,底层使用了线程池异步更新实例列表,但是线程池的线程数量为“1”
@Component
public class ServiceManager {

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        // 监听服务列表用到的key,服务唯一标识
        // 如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        // 获取服务
        Service service = getService(namespaceId, serviceName);

        // 同步锁:解决并发写的问题
        synchronized (service) {
            // 1、获取要更新的实例列表
            // addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中 即:COPY
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            // 2、将更新后的数据封装到Instances对象
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);

            // 3、完成 注册表更新 以及 Nacos集群的数据同步(保证集群一致性)
            // 在这里面 完成对实例状态更新后,会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取
            consistencyService.put(key, instances);
        }
    }




    private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    }




    /**
     * 拷贝旧的实例列表,添加新实例到列表中
     */
    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
            throws NacosException {

        // 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum
        // 第一次来,肯定是null
        Datum datum = consistencyService
                .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

        // 得到服务中旧的实例列表
        List<Instance> currentIPs = service.allIPs(ephemeral);
        // 保存实例列表,key为ip地址,value是Instance对象
        Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
        // 创建Set集合,保存实例的instanceId
        Set<String> currentInstanceIds = Sets.newHashSet();

        // 遍历旧实例列表
        for (Instance instance : currentIPs) {
            // 保存实例列表
            currentInstances.put(instance.toIpAddr(), instance);
            // 添加instanceId到set中
            currentInstanceIds.add(instance.getInstanceId());
        }

        // 用来保存更新后的实例列表
        Map<String, Instance> instanceMap;
        // 如果服务中已经有旧的数据
        if (datum != null && null != datum.value) {
            // 将旧实例列表与新实例列表进行比对、合并
            instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
        } else {
            // 若服务中没有数据,则直接创建新的map
            instanceMap = new HashMap<>(ips.length);
        }

        // 遍历新实例列表ips
        for (Instance instance : ips) {
            // 判断服务中是否包含要注册的实例的cluster信息
            if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                // 如果不包含,创建新的cluster
                Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                // 将集群放入service的注册表
                service.getClusterMap().put(instance.getClusterName(), cluster);
                // ......记录日志
            }

            // 删除实例 or 新增实例
            // 若是Remove删除事件类型
            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                // 则通过实例ID删除实例
                instanceMap.remove(instance.getDatumKey());
            } else {
                // 通过实例ID从旧实例列表中获取实例
                Instance oldInstance = instanceMap.get(instance.getDatumKey());
                if (oldInstance != null) {
                    // 若旧实例列表中有这个实例 则将旧实例ID赋值给新实例ID
                    instance.setInstanceId(oldInstance.getInstanceId());
                } else {
                    // 若旧实例列表中没有这个实例 则给新实例生成一个实例ID
                    instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
                }
                // 实例ID为key、实例为value存入新实例列表
                instanceMap.put(instance.getDatumKey(), instance);
            }

        }

        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
            throw new IllegalArgumentException(
                    "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                            .toJson(instanceMap.values()));
        }

        // 将instanceMap中的所有实例转为List返回
        return new ArrayList<>(instanceMap.values());
    }
}

服务注册源码

Nacos的注册表结构是什么样的?

  • Nacos是多级存储模型,最外层通过namespace来实现环境隔离,然后是group分组,分组下就是service服务,一个服务又可以分为不同的cluster集群,集群中包含多个instance实例。因此其注册表结构为一个Map,类型是:Map<String, Map<String, Service>>

    外层key是namespace_id,内层key是group+serviceName.

    Service内部维护一个Map,结构是:Map<String,Cluster>,key是clusterName,值是集群信息

    Cluster内部维护一个Set集合,元素是Instance类型,代表集群中的多个实例。

Nacos如何保证并发写的安全性?

  • 在注册实例时,会对service加锁,不同service之间本身就不存在并发写问题,互不影响。相同service时通过锁来互斥。并且,在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1.

问题延伸:Nacos是如何应对数十万服务的并发写请求?

  • Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力

Nacos如何避免并发读写的冲突?

  • Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。

客户端

流程如下:

NacosServiceRegistryAutoConfiguration

Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:

spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar

这个包中找到Nacos自动装配信息:

可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:

NacosAutoServiceRegistration

可以看到在初始化时,其父类AbstractAutoServiceRegistration也被初始化了

AbstractAutoServiceRegistration如图:

可以看到它实现了ApplicationListener接口,监听Spring容器启动过程中的事件

在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法。

其中的bind方法如下:

public void bind(WebServerInitializedEvent event) {
    // 获取 ApplicationContext
    ApplicationContext context = event.getApplicationContext();
    // 判断服务的 namespace,一般都是null
    if (context instanceof ConfigurableWebServerApplicationContext) {
        if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                                .getServerNamespace())) {
            return;
        }
    }
    // 记录当前 web 服务的端口
    this.port.compareAndSet(0, event.getWebServer().getPort());
    // 启动当前服务注册流程
    this.start();
}

其中的start方法流程:

public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		// 当前服务处于未运行状态时,才进行初始化
		if (!this.running.get()) {
            // 发布服务开始注册的事件
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
            // 开始注册
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
            // 发布注册完成事件
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
            // 服务状态设置为运行状态,基于AtomicBoolean
			this.running.compareAndSet(false, true);
		}

	}

其中最关键的register()方法就是完成服务注册的关键,代码如下:

protected void register() {
    this.serviceRegistry.register(getRegistration());
}

此处的this.serviceRegistry就是NacosServiceRegistry:

NacosServiceRegistry

NacosServiceRegistry是Spring的ServiceRegistry接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了register、deregister等方法的声明。

NacosServiceRegistryregister的实现如下:

@Override
public void register(Registration registration) {
	// 判断serviceId是否为空,也就是spring.application.name不能为空
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
    // 获取Nacos的命名服务,其实就是注册中心服务
    NamingService namingService = namingService();
    // 获取 serviceId 和 Group
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();
	// 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等
    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        // 开始注册服务
        namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                 instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        if (nacosDiscoveryProperties.isFailFast()) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                      registration.toString(), e);
            rethrowRuntimeException(e);
        }
        else {
            log.warn("Failfast is false. {} register failed...{},", serviceId,
                     registration.toString(), e);
        }
    }
}

可以看到方法中最终是调用NamingService的registerInstance方法实现注册的

而NamingService接口的默认实现就是NacosNamingService

NacosNamingService

NacosNamingService提供了服务注册、订阅等功能

其中registerInstance就是注册服务实例,源码如下:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
    NamingUtils.checkInstanceIsLegal(instance);
    // 拼接得到新的服务名,格式为:groupName@@serviceId
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判断是否为临时实例,默认为 true。
    if (instance.isEphemeral()) { // 这里面的两行代码很关键
        // 如果是临时实例,需要定时向 Nacos 服务发送心跳 ---------- 涉及临时实例的心跳检测
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        // 添加心跳任务
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // 发送注册服务实例的请求
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

最终,由NacosProxy的registerService方法,完成服务注册

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

	// 组织请求参数
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
	// 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}

这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:

  • namespace_id:环境
  • service_name:服务名称
  • group_name:组名称
  • cluster_name:集群名称
  • ip: 当前实例的ip地址
  • port: 当前实例的端口

服务端

服务端流程图:

官网下载源码:进入 naming-nacos/com/alibaba/nacos/naming/controllers/InstanceController#register(HttpServletRequest request)

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
	// 尝试获取namespaceId
    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 尝试获取serviceName,其格式为 group_name@@service_name
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
	// 解析出实例信息,封装为Instance对象
    final Instance instance = parseInstance(request);
	// 注册实例
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

进入serviceManager.registerInstance(namespaceId, serviceName, instance)

ServiceManager

这里面的东西在前面并发读写的解决方式中见过了

这里面的流程一句话来说就是:先获取旧的实例列表,然后把新的实例信息与旧的做对比、合并,新的实例就添加,老的实例同步ID。然后返回最新的实例列表

registerInstance方法就是注册服务实例的方法:

/**
 * 注册服务实例
 *
 * Register an instance to a service in AP mode.
 *
 * <p>This method creates service or cluster silently if they don't exist.
 *
 * @param namespaceId id of namespace
 * @param serviceName service name
 * @param instance    instance to register
 * @throws Exception any error occurred in the process
 */
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
	// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
    // 此时不包含实例信息
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // 拿到创建好的service
    Service service = getService(namespaceId, serviceName);
    // 拿不到则抛异常
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // 添加要注册的实例到service中
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

创建好了服务,接下来就要添加实例到服务中:

/**
 * 添加实例到服务中
 * 
 * Add instance to service.
 *
 * @param namespaceId namespace
 * @param serviceName service name
 * @param ephemeral   whether instance is ephemeral
 * @param ips         instances
 * @throws NacosException nacos exception
 */
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {
    
	// 监听服务列表用到的key
    // 服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 获取服务
    Service service = getService(namespaceId, serviceName);
    // 同步锁,避免并发修改的安全问题
    synchronized (service) {
        // 1、获取要更新的实例列表
        // addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中 即:COPY
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        // 2、将更新后的数据封装到Instances对象
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        // 3、完成 注册表更新 以及 Nacos集群的数据同步(保证集群一致性)
        // 在这里面 完成对实例状态更新后,会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取
        consistencyService.put(key, instances);
    }
}

最后就要更新服务的实例 列表了

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}



/**
 * 拷贝旧的实例列表,添加新实例到列表中
 */
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
    throws NacosException {

    // 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum
    // 第一次来,肯定是null
    Datum datum = consistencyService
        .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

    // 得到服务中旧的实例列表
    List<Instance> currentIPs = service.allIPs(ephemeral);
    // 保存实例列表,key为ip地址,value是Instance对象
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    // 创建Set集合,保存实例的instanceId
    Set<String> currentInstanceIds = Sets.newHashSet();

    // 遍历旧的实例列表
    for (Instance instance : currentIPs) {
        // 保存实例列表
        currentInstances.put(instance.toIpAddr(), instance);
        // 添加instanceId到set中
        currentInstanceIds.add(instance.getInstanceId());
    }

    // 用来保存更新后的实例列表
    Map<String, Instance> instanceMap;
    // 如果服务中已经有旧的数据
    if (datum != null && null != datum.value) {
        // 将旧的实例列表与新的实例列表进行比对
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        // 若服务中没有数据,则直接创建新的map
        instanceMap = new HashMap<>(ips.length);
    }

    // 遍历新的实例列表ips
    for (Instance instance : ips) {
        // 判断服务中是否包含要注册的实例的cluster信息
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            // 如果不包含,创建新的cluster
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            // 将集群放入service的注册表
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                      instance.getClusterName(), instance.toJson());
        }

        // 删除实例 or 新增实例
        // 若是Remove删除事件类型
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            // 则通过实例ID删除实例
            instanceMap.remove(instance.getDatumKey());
        } else {
            // 通过实例ID从旧实例列表中获取实例
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                // 若旧实例列表中有这个实例 则将旧实例ID赋值给新实例ID
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                // 若旧实例列表中没有这个实例 则给新实例生成一个实例ID
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            // 实例ID为key、实例为value存入新实例列表
            instanceMap.put(instance.getDatumKey(), instance);
        }

    }

    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
            "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
            .toJson(instanceMap.values()));
    }

    // 将instanceMap中的所有实例转为List返回
    return new ArrayList<>(instanceMap.values());
}

Nacos集群一致性

在上一节中,在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:

consistencyService.put(key, instances);

/**
 * 添加实例到服务中
 * 
 * Add instance to service.
 *
 * @param namespaceId namespace
 * @param serviceName service name
 * @param ephemeral   whether instance is ephemeral
 * @param ips         instances
 * @throws NacosException nacos exception
 */
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {
    
	// 监听服务列表用到的key
    // 服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 获取服务
    Service service = getService(namespaceId, serviceName);
    // 同步锁,避免并发修改的安全问题
    synchronized (service) {
        // 1、获取要更新的实例列表
        // addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中 即:COPY
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        // 2、将更新后的数据封装到Instances对象
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        // 3、完成 注册表更新 以及 Nacos集群的数据同步(保证集群一致性)
        // 在这里面 完成对实例状态更新后,会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取
        consistencyService.put(key, instances);
    }
}

这里的ConsistencyService接口,代表集群一致性的接口,有很多种不同实现:

进入DelegateConsistencyServiceImpl来看:

@Override
public void put(String key, Record value) throws NacosException {
    // 根据实例是否是临时实例,判断委托对象
    mapConsistencyService(key).put(key, value);
}

其中的mapConsistencyService(key)方法就是选择委托方式:

private ConsistencyService mapConsistencyService(String key) {
    // 判断是否是临时实例:
    // 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl
    // 否,选择 persistentConsistencyService,也就是 PersistentConsistencyServiceDelegateImpl
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

默认情况下,所有实例都是临时实例,因此关注DistroConsistencyServiceImpl即可

DistroConsistencyServiceImpl

这里面的逻辑在前面“Nacos如何抗住数十万服务注册压力”中见过了的,但是没弄全

@Override
public void put(String key, Record value) throws NacosException {
    // 异步服务注册 key是服务的唯一id,value就是instances
    onPut(key, value);
    // 服务更强情况异步更新给集群下的另外节点
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                        globalConfig.getTaskDispatchPeriod() / 2);
}
onPut 更新本地实例列表
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {

	public void onPut(String key, Record value) {

        // 判断是否是临时实例
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            // 封装 Instances 信息到 数据集:Datum
            Datum<Instances> datum = new Datum<>();
            // value就是instances
            datum.value = (Instances) value;
            // key是服务的唯一id
            datum.key = key;
            // 加入当前修改时间
            datum.timestamp.incrementAndGet();
            // 数据存储 放入dataStore中
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {
            return;
        }

        // notifier这玩意儿 implements Runnable
        notifier.addTask(key, DataOperation.CHANGE);
    }
    
    
    
	public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        /**
         * 维护了一个阻塞队列
         */
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {

            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            // 将服务唯一id + 事件类型(CHANGE)放入了阻塞队列
            tasks.offer(Pair.with(datumKey, action));
        }
    }
}
Notifier异步更新

Notifier是一个Runnable,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    
	public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        /**
         * 维护了一个阻塞队列
         */
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        
        
		@Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            for (; ; ) { // 死循环
                try {
                    // 去阻塞队列中获取任务
                    Pair<String, DataOperation> pair = tasks.take();
                    // 有任务就处理任务,更新服务列表;无任务就进入wait,所以此死循环不会导致CPU负载过高
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }

        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();

                services.remove(datumKey);

                int count = 0;

                if (!listeners.containsKey(datumKey)) {
                    return;
                }

                // 遍历,找到变化的service,这里的 RecordListener就是 Service
                for (RecordListener listener : listeners.get(datumKey)) {

                    count++;

                    try {
                        // 如果是 CHANGE 事件
                        if (action == DataOperation.CHANGE) {
                            // 就更新服务列表
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }

                        // 如果是 DELETE 事件
                        if (action == DataOperation.DELETE) {
                            // 就根据服务ID删除从服务列表中删除服务
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }
}
onChange 覆盖实例列表

上一节中 listener.onChange(datumKey, dataStore.get(datumKey).value); 进去,选择Service的onChange()

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {

    @Override
    public void onChange(String key, Instances value) throws Exception {
        
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        
        for (Instance instance : value.getInstanceList()) {
            
            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }
            
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        
        // 更新实例列表
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        
        recalculateChecksum();
    }
}

updateIPs 的逻辑如下:

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    
	/**
	 * 更新实例列表
	 *
     * Update instances.
     *
     * @param instances instances
     * @param ephemeral whether is ephemeral instance
     */
    public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        // key是cluster,值是集群下的Instance集合
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        // 获取服务的所有cluster名称
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }

        // 遍历要更新的实例
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }

                // 判断实例是否包含clusterName,没有的话用默认cluster
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    // DEFAULT_CLUSTER_NAME = "DEFAULT"
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }

                // 判断cluster是否存在,不存在则创建新的cluster
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                    instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }

                // 获取当前cluster实例的集合,不存在则创建新的
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }

                // 添加新的实例到 Instance 集合
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }

        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            // 将实例集合更新到 clusterMap(注册表)
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }

        setLastModifiedMillis(System.currentTimeMillis());
        // 发布服务变更的通知消息
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();

        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
        }

        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                stringBuilder.toString());

    }
}

上面的 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); 就是在更新实例列表,进入 updateIps(entryIPs, ephemeral) 即可看到逻辑

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    
	/**
	 * 更新实例列表
	 *
     * Update instance list.
     *
     * @param ips       instance list
     * @param ephemeral whether these instances are ephemeral
     */
    public void updateIps(List<Instance> ips, boolean ephemeral) {

        // 获取旧实例列表
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;

        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());

        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }

        // 更新实例列表
        List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
        if (updatedIPs.size() > 0) {
            for (Instance ip : updatedIPs) {
                Instance oldIP = oldIpMap.get(ip.getDatumKey());

                // do not update the ip validation status of updated ips
                // because the checker has the most precise result
                // Only when ip is not marked, don't we update the health status of IP:
                if (!ip.isMarked()) {
                    ip.setHealthy(oldIP.isHealthy());
                }

                if (ip.isHealthy() != oldIP.isHealthy()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                            (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
                }

                if (ip.getWeight() != oldIP.getWeight()) {
                    // ip validation status updated
                    Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                            ip.toString());
                }
            }
        }

        // 检查新加入实例的状态
        List<Instance> newIPs = subtract(ips, oldIpMap.values());
        if (newIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                            getName(), newIPs.size(), newIPs.toString());

            for (Instance ip : newIPs) {
                HealthCheckStatus.reset(ip);
            }
        }

        // 移除要删除的实例
        List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

        if (deadIPs.size() > 0) {
            Loggers.EVT_LOG
                    .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                            getName(), deadIPs.size(), deadIPs.toString());

            for (Instance ip : deadIPs) {
                HealthCheckStatus.remv(ip);
            }
        }

        toUpdateInstances = new HashSet<>(ips);

        // 直接覆盖旧实例列表
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }
}
Nacos集群一致性

@Component
public class DistroProtocol {
    
	/**
	 * 同步数据到其他远程服务器
	 *
     * Start to sync data to all remote server.
     *
     * @param distroKey distro key of sync data
     * @param action    the action of data operation
     */
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        // 遍历 Nacos 集群中除自己以外的其它节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            // Distro同步任务
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            // 交给线程池去执行
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }
}

distroTaskEngineHolder.getDelayTaskExecuteEngine() 的返回值是 NacosDelayTaskExecuteEngine,它维护了一个线程池,并且接收任务,执行任务。执行任务的方法为processTasks()方法

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {

    protected void processTasks() {
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                // 尝试执行同步任务,如果失败会将任务重新入队重试
                if (!processor.process(task)) {
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                retryFailedTask(taskKey, task);
            }
        }
    }
}

Distro模式的同步是异步进行的,并且失败时会将任务重新入队并重试,因此不保证同步结果的强一致性,属于AP模式的一致性策略

心跳检测源码

Nacos的健康检测有两种模式:

  • 临时实例:适合增加更多实例来应对高并发
    • 采用客户端心跳检测模式,心跳周期5秒
    • 心跳间隔超过15秒则标记为不健康
    • 心跳间隔超过30秒则从服务列表删除
  • 永久实例:适合常备实例
    • 采用服务端主动健康检测方式
    • 周期为2000 + 5000毫秒内的随机数
    • 检测异常只会标记为不健康,不会删除

客户端

在前面看nacos服务注册的客户端源码时,看到过一段代码:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    
    // 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
    NamingUtils.checkInstanceIsLegal(instance);
    // 拼接得到新的服务名,格式为:groupName@@serviceId
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判断是否为临时实例,默认为 true。
    if (instance.isEphemeral()) { // 这里面的两行代码很关键
        // 如果是临时实例,需要定时向 Nacos 服务发送心跳 ---------- 涉及临时实例的心跳检测
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        // 添加心跳任务
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // 发送注册服务实例的请求
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

这个IF中就涉及的是心跳检测

BeatInfo

就包含心跳需要的各种信息

BeatReactor

维护了一个线程池

public class BeatReactor implements Closeable {

    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.lightBeatEnabled = false;
        this.dom2Beat = new ConcurrentHashMap();
        this.serverProxy = serverProxy;
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }
}

当调用BeatReactoraddBeatInfo(groupedServiceName, beatInfo)方法时,就会执行心跳

public class BeatReactor implements Closeable {

    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }

        this.dom2Beat.put(key, beatInfo);
        // 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod()
        this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
    }
}

心跳周期的默认值在com.alibaba.nacos.api.common.Constants类中:

默认5秒一次心跳

BeatTask

上一节中 this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS)

心跳的任务封装就在BeatTask这个类中,是一个Runnable

@Override
public void run() {
    if (beatInfo.isStopped()) {
        return;
    }
    // 获取心跳周期
    long nextTime = beatInfo.getPeriod();
    try {
        // 发送心跳
        JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
        long interval = result.get("clientBeatInterval").asLong();
        boolean lightBeatEnabled = false;
        if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
            lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
        }
        BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
        if (interval > 0) {
            nextTime = interval;
        }
        // 判断心跳结果
        int code = NamingResponseCode.OK;
        if (result.has(CommonParams.CODE)) {
            code = result.get(CommonParams.CODE).asInt();
        }
        if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
            // 如果失败,则需要 重新注册实例
            Instance instance = new Instance();
            instance.setPort(beatInfo.getPort());
            instance.setIp(beatInfo.getIp());
            instance.setWeight(beatInfo.getWeight());
            instance.setMetadata(beatInfo.getMetadata());
            instance.setClusterName(beatInfo.getCluster());
            instance.setServiceName(beatInfo.getServiceName());
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(true);
            try {
                serverProxy.registerService(beatInfo.getServiceName(),
                                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
            } catch (Exception ignore) {
            }
        }
    } catch (NacosException ex) {
        // ...... 记录日志

    } catch (Exception unknownEx) {
        // ...... 记录日志
    } finally {
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

发送心跳

JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled) ,最终心跳的发送还是通过NamingProxysendBeat方法来实现

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    // 组织请求参数
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    // 发送请求,这个地址就是:/v1/ns/instance/beat
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

服务端

对于临时实例,服务端代码分两部分:

  1. InstanceController提供了一个接口,处理客户端的心跳请求
  2. 定时检测实例心跳是否按期执行

InstanceController

在nacos-naming模块中的InstanceController类中,定义了一个方法用来处理心跳请求

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    /** 
     * 为实例创建心跳
     * 
     * Create a beat for instance.
     *
     * @param request http request
     * @return detail information of instance
     * @throws Exception any error during handle
     */
    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {

        // 解析心跳的请求参数
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
        RsInfo clientBeat = null;
        if (StringUtils.isNotBlank(beat)) {
            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
        }
        String clusterName = WebUtils
                .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
        if (clientBeat != null) {
            if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                clusterName = clientBeat.getCluster();
            } else {
                // fix #2533
                clientBeat.setCluster(clusterName);
            }
            ip = clientBeat.getIp();
            port = clientBeat.getPort();
        }
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
        // 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息从Nacos的注册表中 获取实例
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

        // 如果获取失败,说明心跳失败,实例尚未注册
        if (instance == null) {
            if (clientBeat == null) {
                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                return result;
            }

            // ...... 记录日志

            // 重新注册一个实例
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());

            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }

        // 尝试基于 namespaceId + serviceName 从 注册表 中获取Service服务
        Service service = serviceManager.getService(namespaceId, serviceName);

        // 如果不存在,说明服务不存在,返回SERVER_ERROR = 500
        if (service == null) {
            throw new NacosException(NacosException.SERVER_ERROR,
                    "service not found: " + serviceName + "@" + namespaceId);
        }
        if (clientBeat == null) {
            clientBeat = new RsInfo();
            clientBeat.setIp(ip);
            clientBeat.setPort(port);
            clientBeat.setCluster(clusterName);
        }
        // 如果心跳没问题(在确认心跳请求对应的服务、实例都在的情况下),开始处理心跳结果
        service.processClientBeat(clientBeat);

        result.put(CommonParams.CODE, NamingResponseCode.OK);
        if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
            result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
        }
        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
        return result;
    }
}

processClientBeat() 处理心跳请求

在上一节中有如下方法

// 如果心跳没问题(在确认心跳请求对应的服务、实例都在的情况下),开始处理心跳结果
service.processClientBeat(clientBeat);

这个方法的逻辑如下:

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service 
    implements Record, RecordListener<Instances> {

	/**
     * Process client beat.
     *
     * @param rsInfo metrics info of server
     */
    public void processClientBeat(final RsInfo rsInfo) {
        
        // 创建线程:ClientBeatProcessor implements Runnable
        ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
        clientBeatProcessor.setService(this);
        clientBeatProcessor.setRsInfo(rsInfo);
        // HealthCheckReactor:线程池的封装
        HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }
}

所以关键业务逻辑就在ClientBeatProcessor的run()方法中

public class ClientBeatProcessor implements Runnable {
 
	@Override
    public void run() {
        // 获取service、ip、clusterName、port、Cluster对象
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }

        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        // 获取Cluster对象
        Cluster cluster = service.getClusterMap().get(clusterName);
        // 获取集群中的所有实例信息
        List<Instance> instances = cluster.allIPs(true);

        for (Instance instance : instances) {
            // 找到心跳的这个实例
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                // 更新最新的实例心跳时间,LastBeat就是用来判断心跳是否过期的
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    // 若实例已被标记为:不健康
                    if (!instance.isHealthy()) {
                        // 则将实例状态改为健康状态
                        instance.setHealthy(true);
                        Loggers.EVT_LOG
                                .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                        cluster.getService().getName(), ip, port, cluster.getName(),
                                        UtilsAndCommons.LOCALHOST_SITE);
                        // 进行服务变更推送,即:push操作
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }
}

Service#init() 开启心跳检测任务

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service 
    implements Record, RecordListener<Instances> {

	/**
     * Init service.
     */
    public void init() {
        
        // 开启心跳检测任务
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        // 遍历注册表中的集群
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
             // 完成集群初始化:非临时实例的主动健康检测的逻辑就可以在这里面找到
            entry.getValue().init();
        }
    }
}

心跳检测任务的逻辑如下:

public class HealthCheckReactor {

	/**
     * Schedule client beat check task with a delay.
     *
     * @param task client beat check task
     */
    public static void scheduleCheck(ClientBeatCheckTask task) {
        // ClientBeatCheckTask task 还是一个 Runnable
        // computeIfAbsent(key, mappingFunction) 与指定key关联的当前(现有的或function计算的)值,
        // 										若计算的(mappingFunction)为null则为null
        //      key:服务唯一ID,即 com.alibaba.nacos.naming.domains.meta. + NamespaceId + ## + serviceName
        //      value:mappingFunction 计算值的函数
        futureMap.computeIfAbsent(task.taskKey(),
                // scheduleNamingHealth() 第3个参数 delay 就是心跳检测任务执行时间,即:5s执行一次心跳检测任务
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }
}

ClientBeatCheckTask的run()方法逻辑如下:

public class ClientBeatCheckTask implements Runnable {

    public void run() {
        try {

            // 找到所有临时实例的列表
            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            // 给临时实例设置健康状态
            for (Instance instance : instances) {
                // 判断 心跳间隔(当前时间 - 最后一次心跳时间) 是否大于 心跳超时时间,默认15s
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            // 如果超时,标记实例为不健康 healthy = false
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            // 发布实例状态变更的事件
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }
                
                // 判断心跳间隔(当前时间 - 最后一次心跳时间)是否大于 实例被删除的最长超时时间,默认30s
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    // 若超过超时时间,则删除该实例
                    deleteIp(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
}

非临时实例:主动健康检测

对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态

在前面看服务注册的代码:InstanceController/re/register(HttpServletRequest request)#serviceManager.registerInstance(namespaceId, serviceName, instance)中有如下的代码

创建空服务时:

@Component
public class ServiceManager implements RecordListener<Service> {

    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        // 若服务不存在,则创建新服务
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }
    
    
    /**
     * 若服务不存在,则创建新服务
     */
	public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        // 通过 namespaceId + serviceName 获取服务
        Service service = getService(namespaceId, serviceName);
        if (service == null) {

            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            // 若服务不存在则创建新服务
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();

            // 写入注册表 并 初始化
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }
    
    
    
    /**
     * 将服务写入注册表 并 初始化服务
     */
	private void putServiceAndInit(Service service) throws NacosException {
        // 将服务添加到注册表
        putService(service);
        // 通过 NamespaceId + serviceName 尝试获取服务
        service = getService(service.getNamespaceId(), service.getName());
        // 初始化服务 这里就是进入Servicec,init()方法,即:开启心跳检测任务
        service.init();
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }
}

service.init();的逻辑如下:

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {

    /**
     * Init service.
     */
    public void init() {
        // 开启心跳检测任务
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        // 遍历注册表中的集群
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            // 完成集群初始化
            entry.getValue().init();
        }
    }
}

entry.getValue().init();的逻辑如下:

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {

    /**
     * Init cluster.
     */
    public void init() {
        if (inited) {
            return;
        }
        checkTask = new HealthCheckTask(this);
        // 这里会开启对 非临时实例的 定时健康检测
        HealthCheckReactor.scheduleCheck(checkTask);
        inited = true;
    }
}

HealthCheckTask还是一个Runnable,其run()方法逻辑如下:

public class HealthCheckTask implements Runnable {

    public void run() {

        try {
            if (distroMapper.responsible(cluster.getService().getName()) && 
                	switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
                // 进行健康检测
                healthCheckProcessor.process(this);
                // ......记录日志
            }
        } catch (Throwable e) {
            // ......记录日志
        } finally {
            if (!cancelled) {
                // 结束后,再次进行任务调度,一定延迟后执行
                HealthCheckReactor.scheduleCheck(this);
                // ..........
            }
        }
    }

健康检测逻辑定义在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,这个接口也有很多实现,默认是TcpSuperSenseProcessor

@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
 
        public void process(HealthCheckTask task) {
        // 从注册表中获取所有的 非临时实例
        List<Instance> ips = task.getCluster().allIPs(false);

        if (CollectionUtils.isEmpty(ips)) {
            return;
        }

        // 遍历非临时实例
        for (Instance ip : ips) {
            // 若没被标记为 不健康 则找下一个非临时实例
            if (ip.isMarked()) {
                if (SRV_LOG.isDebugEnabled()) {
                    SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
                }
                continue;
            }

            // 若此非临时实例不是正在被标记
            if (!ip.markChecking()) {
                // ......记录日志

                // 重新计算响应时间 并 找下一个实例
                healthCheckCommon
                        // 默认CheckRtNormalized = -1
                        // 默认TcpHealthParams:max=5000、min=1000、factor=0.75F
                        .reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
                continue;
            }

            // 封装健康检测信息到 Beat
            Beat beat = new Beat(ip, task);
            // 异步执行:放入一个阻塞队列中
            taskQueue.add(beat);
            MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
        }
    }
}

可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略

TcpSuperSenseProcessor本身就是一个Runnable,在它的构造函数中会把自己放入线程池中去执行,其run方法如下

@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {

    /** 
     * 构造
     */
	public TcpSuperSenseProcessor() {
        try {
            selector = Selector.open();

            // 将自己放入线程池
            GlobalExecutor.submitTcpCheck(this);

        } catch (Exception e) {
            throw new IllegalStateException("Error while initializing SuperSense(TM).");
        }
    }
    
	
    
    public void run() {
        while (true) {
            try {
                // 处理任务
                processTask();
                // ......
            } catch (Throwable e) {
                SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
            }
        }
    }
    
    
    
	private void processTask() throws Exception {
        // 将任务封装为一个 TaskProcessor,并放入集合
        Collection<Callable<Void>> tasks = new LinkedList<>();
        do {
            Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
            if (beat == null) {
                return;
            }

            // 将任务丢给 TaskProcessor 去执行,TaskProcessor implements Callable<Void>
            tasks.add(new TaskProcessor(beat));
        } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);

        // 批量处理集合中的任务
        for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
            f.get();
        }
    }
}

TaskProcessor的cail()方法逻辑如下:

@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {

	private class TaskProcessor implements Callable<Void> {

        @Override
        public Void call() {
            // 获取检测任务已经等待的时长
            long waited = System.currentTimeMillis() - beat.getStartTime();
            if (waited > MAX_WAIT_TIME_MILLISECONDS) {
                Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
            }

            SocketChannel channel = null;
            try {
                // 获取实例信息
                Instance instance = beat.getIp();

                BeatKey beatKey = keyMap.get(beat.toString());
                if (beatKey != null && beatKey.key.isValid()) {
                    if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
                        instance.setBeingChecked(false);
                        return null;
                    }

                    beatKey.key.cancel();
                    beatKey.key.channel().close();
                }

                // 通过NIO建立TCP连接
                channel = SocketChannel.open();
                channel.configureBlocking(false);
                // only by setting this can we make the socket close event asynchronous
                channel.socket().setSoLinger(false, -1);
                channel.socket().setReuseAddress(true);
                channel.socket().setKeepAlive(true);
                channel.socket().setTcpNoDelay(true);

                Cluster cluster = beat.getTask().getCluster();
                int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
                channel.connect(new InetSocketAddress(instance.getIp(), port));

                // 注册连接、读取事件
                SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
                key.attach(beat);
                keyMap.put(beat.toString(), new BeatKey(key));

                beat.setStartTime(System.currentTimeMillis());

                GlobalExecutor
                        .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
                        "tcp:error:" + e.getMessage());

                if (channel != null) {
                    try {
                        channel.close();
                    } catch (Exception ignore) {
                    }
                }
            }

            return null;
        }
    }
}

服务发现源码

Nacos的服务发现分为两种模式:

  1. 主动拉取模式(push模式):消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表
  2. 订阅模式(pull模式):消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者

客户端

定时更新服务列表

在前面看服务注册的源码时有一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能

通过下面的思路去找也行

所有的getAllInstances重载方法都进入了下面的方法:

public class NacosNamingService implements NamingService {

	@Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
            boolean subscribe) throws NacosException {
        
        ServiceInfo serviceInfo;
        // 是否需要订阅服务信息 默认true
        if (subscribe) {
            // 订阅服务信息
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                    StringUtils.join(clusters, ","));
        } else {
           // 直接去Nacos中拉取服务信息
            serviceInfo = hostReactor
                    .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                            StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        // 从服务信息中获取实例列表并返回
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
}
HostReactor#getServiceInfo() 订阅服务信息

进入上一节的hostReactor.getServiceInfo()

public class HostReactor implements Closeable {

	public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        // key = name + "@@" + clusters
        String key = ServiceInfo.getKey(serviceName, clusters);
        
        // 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        
		// 判断本地缓存是否存在
        if (null == serviceObj) {
            // 不存在,直接创建新的ServiceInfo 放入缓存
            serviceObj = new ServiceInfo(serviceName, clusters);
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            // 放入待更新的服务列表(updatingMap)中
            updatingMap.put(serviceName, new Object());
            // 立即更新服务列表:此方法中的逻辑就是立刻从Nacos中获取
            updateServiceNow(serviceName, clusters);
            // 从待更新服务列表中删除已更新的服务
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) { // 缓存中有,但是需要更新
            
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish 等待5秒,待更新完成
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        
        // 本地缓存中有,则开启定时更新服务列表的功能
        scheduleUpdateIfAbsent(serviceName, clusters);
        // 返回缓存中的服务信息
        return serviceInfoMap.get(serviceObj.getKey());
    }
}

基本逻辑就是先从本地缓存读,根据结果来选择:

  1. 如果本地缓存没有,立即去nacos读取,updateServiceNow(serviceName, clusters)

  1. 如果本地缓存有,则开启定时更新功能,并返回缓存结果:scheduleUpdateIfAbsent(serviceName, clusters)

在UpdateTask中,最终还是调用updateService方法:

不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:

public class HostReactor implements Closeable {
    
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            // 基于ServerProxy发起远程调用,查询服务列表
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

            if (StringUtils.isNotEmpty(result)) {
                // 处理查询结果
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
NamingProxy#queryList() 发起查询服务下的实例列表的请求

进入上一节的serverProxy.queryList()

public class NamingProxy implements Closeable {
    
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {
        // 准备请求参数
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        // 给服务端发起请求,接口地址就是:/nacos/v1/ns/instance/list
        return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }

处理服务变更通知

除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能

基本思路是:

  1. 通过PushReceiver监听服务端推送的变更数据
  2. 解析数据后,通过NotifyCenter发布服务变更的事件
  3. InstanceChangeNotifier监听变更事件,完成对服务列表的更新

在HostReactor类的构造函数中,有非常重要的几个步骤:

PushReceiver 服务端推送变更的接收器

这个类会以UDP方式接收Nacos服务端推送的服务变更数据

先看构造函数:

public PushReceiver(HostReactor hostReactor) {
    try {
        this.hostReactor = hostReactor;
        // 创建 UDP客户端
        String udpPort = getPushReceiverUdpPort();
        if (StringUtils.isEmpty(udpPort)) {
            this.udpSocket = new DatagramSocket();
        } else {
            this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
        }
        // 准备线程池
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });
		// 开启线程任务,准备接收变更数据
        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:

@Override
public void run() {
    while (!closed) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
			// 接收推送数据
            udpSocket.receive(packet);
			// 解析为json字符串
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
			// 反序列化为对象
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                // 交给 HostReactor去处理
                hostReactor.processServiceJson(pushPacket.data);

                // send ack to server 发送ACK回执,略。。
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}
HostReactor#processServiceJson() 通知数据的处理

通知数据的处理交给了HostReactorprocessServiceJson方法:

public class HostReactor implements Closeable {
    
	public ServiceInfo processServiceJson(String json) {
        // 解析出ServiceInfo信息
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        // 查询缓存中的 ServiceInfo
        ServiceInfo oldService = serviceInfoMap.get(serviceKey);

        // 如果缓存存在,则需要校验哪些数据要更新
        boolean changed = false;
        if (oldService != null) {
            // 拉取的数据是否已经过期
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                                   + serviceInfo.getLastRefTime());
            }
            // 放入缓存
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

            // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;
            // modHosts:需要修改的实例
            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                // 发布实例变更的事件
                NotifyCenter.publishEvent(new InstancesChangeEvent(
                    serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
                DiskCache.write(serviceInfo, cacheDir);
            }

        } else {
            // 本地缓存不存在
            changed = true;
            // 放入缓存
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            // 直接发布实例变更的事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }
        // 。。。
        return serviceInfo;
    }
}

服务端

拉取服务列表

进入前面说的 /nacos/v1/ns/instance/list 接口中,也就是naming-nacos/controller/InstanceController#list(HttpServletRequest request)

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    
    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {

        // 从request中获取 namespaceId、serviceName
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);

        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        // 获取客户端的UDP端口
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

        // 获取服务列表
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
    }
}

doSrvIpxt()的逻辑如下:

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    /**
     * 获取服务列表
     */
    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
                                String clusters, String clientIP,
                                int udpPort, String env, boolean isCheck,
                                String app, String tid, boolean healthyOnly) throws Exception {
        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        // 获取服务列表信息
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();

        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {
                // 添加当前客户端 IP、UDP端口到 PushService 中
                pushService
                    .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                               pushDataSource, tid, app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }

        if (service == null) {
            // 如果没找到,返回空
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }
        // 结果的检测,异常实例的剔除等逻辑省略
        // 最终封装结果并返回 。。。

        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
            && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }

发布服务变更的UDP通知

在上一节中,InstanceController中的doSrvIpxt()方法中,有这样一行代码:

// 添加当前客户端 IP、UDP端口到 PushService 中
pushService.addClient(namespaceId, serviceName, clusters, agent,
                      new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);

就是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息

PushService类本身实现了ApplicationListener接口:这个是事件监听器接口,监听的是ServiceChangeEvent(服务变更事件)

当服务列表变化时,就会通知我们:

Feign远程调用

Feign与OpenFeign是什么?

Feign是Netflix开发的声明式、模板化的HTTP客户端, 在 RestTemplate 的基础上做了进一步的封装,Feign可以帮助我们更快捷、优雅地调用HTTP API。具有可插入注解支持,包括Feign注解和JAX-RS注解,通过 Feign,我们只需要声明一个接口并通过注解进行简单的配置(类似于 Dao 接口上面的 Mapper 注解一样)即可实现对 HTTP 接口的绑定;通过 Feign,我们可以像调用本地方法一样来调用远程服务,而完全感觉不到这是在进行远程调用

OpenFeign全称Spring Cloud OpenFeign,2019 年 Netflix 公司宣布 Feign 组件正式进入停更维护状态,于是 Spring 官方便推出了一个名为 OpenFeign 的组件作为 Feign 的替代方案。基于Netflix feign实现,是一个声明式的http客户端,整合了Spring Cloud Ribbon,除了支持netflix的feign注解之外,增加了对Spring MVC注释的支持,OpenFeign 的 @FeignClient 可以解析SpringMVC的 @RequestMapping 注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务

  • 声明式·: 即只需要将调用服务需要的东西声明出来,剩下就不用管了,交给feign即可

Spring Cloud Finchley 及以上版本一般使用 OpenFeign 作为其服务调用组件。由于 OpenFeign 是在 2019 年 Feign 停更进入维护后推出的,因此大多数 2019 年及以后的新项目使用的都是 OpenFeign,而 2018 年以前的项目一般使用 Feign

OpenFeign 常用注解

使用 OpenFegin 进行远程服务调用时,常用注解如下表:

注解说明
@FeignClient该注解用于通知 OpenFeign 组件对 @RequestMapping 注解下的接口进行解析,并通过动态代理的方式产生实现类,实现负载均衡和服务调用。
@EnableFeignClients该注解用于开启 OpenFeign 功能,当 Spring Cloud 应用启动时,OpenFeign 会扫描标有 @FeignClient 注解的接口,生成代理并注册到 Spring 容器中。
@RequestMappingSpring MVC 注解,在 Spring MVC 中使用该注解映射请求,通过它来指定控制器(Controller)可以处理哪些 URL 请求,相当于 Servlet 中 web.xml 的配置。
@GetMappingSpring MVC 注解,用来映射 GET 请求,它是一个组合注解,相当于 @RequestMapping(method = RequestMethod.GET) 。
@PostMappingSpring MVC 注解,用来映射 POST 请求,它是一个组合注解,相当于 @RequestMapping(method = RequestMethod.POST) 。

Feign VS OpenFeign

相同点

Feign 和 OpenFegin 具有以下相同点:

  1. Feign 和 OpenFeign 都是 Spring Cloud 下的远程调用和负载均衡组件
  2. Feign 和 OpenFeign 作用一样,都可以实现服务的远程调用和负载均衡
  3. Feign 和 OpenFeign 都对 Ribbon 进行了集成,都利用 Ribbon 维护了可用服务清单,并通过 Ribbon 实现了客户端的负载均衡
  4. Feign 和 OpenFeign 都是在服务消费者(客户端)定义服务绑定接口并通过注解的方式进行配置,以实现远程服务的调用

不同点

Feign 和 OpenFeign 具有以下不同:

  1. Feign 和 OpenFeign 的依赖项不同,Feign 的依赖为 spring-cloud-starter-feign,而 OpenFeign 的依赖为 spring-cloud-starter-openfeign
  2. Feign 和 OpenFeign 支持的注解不同,Feign 支持 Feign 注解和 JAX-RS 注解,但不支持 Spring MVC 注解;OpenFeign 除了支持 Feign 注解和 JAX-RS 注解外,还支持 Spring MVC 注解

入手OpenFeign

OpenFeign是Feign的增强版,使用时将依赖换一下,然后注意一下二者能支持的注解的区别即可

1、依赖:在“服务消费方”添加如下依赖

<!--openfeign的依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>



<!--Feign的依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-feign</artifactId>
</dependency>

2、启动类加入如下注解:在“服务消费方”启动类添加

@EnableFeignClients     /*开启feign客户端功能*/

3、创建接口,并使用 @org.springframework.cloud.openfeign.FeignClient 注解:这种方式相当于 DAO

/**
 * @FeignClient("USER-SERVICE")
 * 
 * Spring Cloud 应用在启动时,OpenFeign 会扫描标有 @FeignClient 注解的接口生成代理,并注人到 Spring 容器中
 *
 * 参数为要调用的服务名,这里的服务名区分大小写
 */

@FeignClient("USER-SERVICE")
public interface FeignClient {
    /**
     * 支持SpringMVC的所有注解
     */
    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") long id);
}

在编写服务绑定接口时,需要注意以下 2 点:

  1. 在 @FeignClient 注解中,value 属性的取值为:服务提供者的服务名,即服务提供者配置文件(application.yml)中 spring.application.name 的值
  2. 接口中定义的每个方法都与 服务提供者 中 Controller 定义的服务方法对应

4、在需要调用3中服务与方法的地方进行调用

import com.zixieqing.order.client.FeignClient;
import com.zixieqing.order.entity.Order;
import com.zixieqing.order.entity.User;
import com.zixieqing.order.mapper.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * <p>@description  : order服务
 * </p>
 * <p>@author       : ZiXieqing</p>
 */

@Service
public class OrderService {
   /* @Autowired
    private RestTemplate restTemplate;*/

    @Autowired
    private FeignClient feignClient;

    @Autowired
    private OrderMapper orderMapper;

    public Order queryOrderById(Long orderId) {
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        
       /* // 2、远程调用服务的url 此处直接使用服务名,不用ip+port
        // 原因是底层有一个LoadBalancerInterceptor,里面有一个intercept(),后续玩负载均衡Ribbon会看到
        String url = "http://USER-SERVICE/user/" + order.getUserId();
        // 2.1、利用restTemplate调用远程服务,封装成user对象
        User user = restTemplate.getForObject(url, User.class); */

        // 2、使用feign来进行远程调研
        User user = feignClient.findById(order.getUserId());
        // 3、给oder设置user对象值
        order.setUser(user);
        // 4.返回
        return order;
    }
}

OpenFeign自定义配置

Feign可以支持很多的自定义配置,如下表所示:

类型作用说明
feign.Logger.Level修改日志级别包含四种不同的级别:NONE、BASIC、HEADERS、FULL
1、NONE:默认的,不显示任何日志
2、BASIC:仅记录请求方法、URL、响应状态码及执行时间
3、HEADERS:除了BASIC中定义的信息之外,还有请求和响应的头信息
4、FULL:除了HEADERS中定义的信息之外,还有请求和响应的正文及元数据
feign.codec.Decoder响应结果的解析器http远程调用的结果做解析,例如解析json字符串为Java对象
feign.codec.Encoder请求参数编码将请求参数编码,便于通过http请求发送
feign. Contract支持的注解格式默认是SpringMVC的注解
feign. Retryer失败重试机制请求失败的重试机制,默认是没有,不过会使用Ribbon的重试

一般情况下,默认值就能满足我们使用,如果要自定义时,只需要创建自定义的 @Bean 覆盖默认Bean即可

配置日志增强

这个有4种配置方式,局部配置(2种=YAML+代码实现)、全局配置(2种=YAML+代码实现)

1、YAML实现

  1. 基于YAML文件修改Feign的日志级别可以针对单个服务:即局部配置
feign:  
  client:
    config: 
      userservice: # 针对某个微服务的配置
        loggerLevel: FULL #  日志级别
  1. 也可以针对所有服务:即全局配置
feign:  
  client:
    config: 
      default: # 这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

2、代码实现

也可以基于Java代码来修改日志级别,先声明一个类,然后声明一个Logger.Level的对象:

/** 
 * 注:这里可以不用加 @Configuration 注解
 * 因为要么在启动类 @EnableFeignClients 注解中进行声明这个配置类
 * 要么在远程服务调用的接口的 @FeignClient 注解中声明该配置
 */
public class DefaultFeignConfiguration  {
    @Bean
    public Logger.Level feignLogLevel(){
        return Logger.Level.BASIC; // 日志级别为BASIC
    }
}
  1. 如果要全局生效,将其放到启动类的 @EnableFeignClients 这个注解中:
@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration .class) 
  1. 如果是局部生效,则把它放到对应的 @FeignClient 这个注解中:
@FeignClient(value = "userservice", configuration = DefaultFeignConfiguration .class) 

配置客户端

Feign底层发起http请求,依赖于其它的框架。其底层客户端实现包括:

  1. URLConnection:默认实现,不支持连接池
  2. Apache HttpClient :支持连接池
  3. OKHttp:支持连接池

替换为Apache HttpClient

1、在服务消费方添加依赖

<!--httpClient的依赖 -->
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-httpclient</artifactId>
</dependency>

2、在YAML中开启客户端和配置连接池

feign:
  httpclient:
    # 开启feign对HttpClient的支持  默认值就是true,即 导入对应客户端依赖之后就开启了,但为了提高代码可读性,还是显示声明比较好
    enabled: true
    # 最大的连接数
    max-connections: 200
    # 每个路径最大连接数
    max-connections-per-route: 50
    # 链接超时时间
    connection-timeout: 2000
    # 存活时间
    time-to-live: 900

验证:在FeignClientFactoryBean中的loadBalance方法中打断点:

Debug方式启动服务消费者,可以看到这里的client底层就是Apache HttpClient:

Feign的失败处理

业务失败后,不能直接报错,而应该返回用户一个友好提示或者默认结果,这个就是失败降级逻辑

给FeignClient编写失败后的降级逻辑

  1. 方式一:FallbackClass,无法对远程调用的异常做处理
  2. 方式二:FallbackFactory,可以对远程调用的异常做处理。一般选择这种

使用FallbackFactory进行失败降级

  1. 在定义Feign-Client的地方创建失败逻辑处理

    package com.zixieqing.feign.fallback;
    
    import com.zixieqing.feign.clients.UserClient;
    import com.zixieqing.feign.pojo.User;
    import feign.hystrix.FallbackFactory;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * userClient失败时的降级处理
     *
     * <p>@author       : ZiXieqing</p>
     */
    
    @Slf4j
    public class UserClientFallBackFactory implements FallbackFactory<UserClient> {
        @Override
        public UserClient create(Throwable throwable) {
            return new UserClient() {
                /**
                 * 重写userClient中的方法,编写失败时的降级逻辑
                 */
                @Override
                public User findById(Long id) {
                    log.info("userClient的findById()在进行 id = {} 时失败", id);
                    return new User();
                }
            };
        }
    }
    
  2. 将定义的失败逻辑类丢给Spring容器托管

    @Bean
    public UserClientFallBackFactory userClientFallBackFactory() {
        return new UserClientFallBackFactory();
    }
    
  3. 在对应的Feign-Client中使用fallbackFactory回调函数

    package com.zixieqing.feign.clients;
    
    
    import com.zixieqing.feign.fallback.UserClientFallBackFactory;
    import com.zixieqing.feign.pojo.User;
    import org.springframework.cloud.openfeign.FeignClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    
    @FeignClient(value = "userservice",fallbackFactory = UserClientFallBackFactory.class)
    public interface UserClient {
    
        @GetMapping("/user/{id}")
        User findById(@PathVariable("id") Long id);
    }
    
  4. 调用,失败时就会进入自定义的失败逻辑中

    package com.zixieqing.order.service;
    
    import com.zixieqing.feign.clients.UserClient;
    import com.zixieqing.feign.pojo.User;
    import com.zixieqing.order.mapper.OrderMapper;
    import com.zixieqing.order.pojo.Order;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    public class OrderService {
    
        @Autowired
        private OrderMapper orderMapper;
    
        @Autowired
        private UserClient userClient;
    
        public Order queryOrderById(Long orderId) {
            // 1.查询订单
            Order order = orderMapper.findById(orderId);
            // 2.用Feign远程调用
            User user = userClient.findById(14321432143L);	// 传入错误 id=14321432143L 模拟错误
            // 3.封装user到Order
            order.setUser(user);
            // 4.返回
            return order;
        }
    }
    

Gateway 网关

在微服务架构中,一个系统往往由多个微服务组成,而这些服务可能部署在不同机房、不同地区、不同域名下。这种情况下,客户端(例如浏览器、手机、软件工具等)想要直接请求这些服务,就需要知道它们具体的地址信息,如 IP 地址、端口号等

这种客户端直接请求服务的方式存在以下问题:

  1. 当服务数量众多时,客户端需要维护大量的服务地址,这对于客户端来说,是非常繁琐复杂的
  2. 在某些场景下可能会存在跨域请求的问题
  3. 身份认证的难度大,每个微服务需要独立认证

我们可以通过 API 网关来解决这些问题,下面就让我们来看看什么是 API 网关

API 网关

API 网关是一个搭建在客户端和微服务之间的服务,我们可以在 API 网关中处理一些非业务功能的逻辑,例如权限验证、监控、缓存、请求路由等

API 网关就像整个微服务系统的门面一样,是系统对外的唯一入口。有了它,客户端会先将请求发送到 API 网关,然后由 API 网关根据请求的标识信息将请求转发到微服务实例

对于服务数量众多、复杂度较高、规模比较大的系统来说,使用 API 网关具有以下好处:

  1. 客户端通过 API 网关与微服务交互时,客户端只需要知道 API 网关地址即可,而不需要维护大量的服务地址,简化了客户端的开发
  2. 客户端直接与 API 网关通信,能够减少客户端与各个服务的交互次数
  3. 客户端与后端的服务耦合度降低
  4. 节省流量,提高性能,提升用户体验
  5. API 网关还提供了安全、流控、过滤、缓存、计费以及监控等 API 管理功能

常见的 API 网关实现方案主要有以下 5 种:

  1. Spring Cloud Gateway
  2. Spring Cloud Netflix Zuul
  3. Kong
  4. Nginx+Lua
  5. Traefik

认识Spring Cloud Gateway

Spring Cloud Gateway 是 Spring Cloud 团队基于 Spring 5.0、Spring Boot 2.0 和 Project Reactor 等技术开发的高性能 API 网关组件

Spring Cloud Gateway 旨在提供一种简单而有效的途径来发送 API,并为它们提供横切关注点,例如:安全性,监控/指标和弹性

Spring Cloud Gateway 是基于 WebFlux 框架实现的,而 WebFlux 框架底层则使用了高性能的 Reactor 模式通信框架 Netty

Spring Cloud Gateway 核心概念

Spring Cloud Gateway 最主要的功能就是路由转发,而在定义转发规则时主要涉及了以下三个核心概念,如下表:

核心概念描述
Route 路由网关最基本的模块。它由一个 ID、一个目标 URI、一组断言(Predicate)和一组过滤器(Filter)组成
Predicate 断言路由转发的判断条件,我们可以通过 Predicate 对 HTTP 请求进行匹配,如请求方式、请求路径、请求头、参数等,如果请求与断言匹配成功,则将请求转发到相应的服务
Filter 过滤器过滤器,我们可以使用它对请求进行拦截和修改,还可以使用它对上文的响应进行再处理

注意:其中 Route 和 Predicate 必须同时声明

网关的核心功能特性

  1. 请求路由
  2. 权限控制
  3. 限流

架构图:

权限控制:网关作为微服务入口,需要校验用户是否有请求资格,如果没有则进行拦截

路由和负载均衡:一切请求都必须先经过gateway,但网关不处理业务,而是根据指定规则,把请求转发到某个微服务,这个过程叫做路由。当然路由的目标服务有多个时,还需要做负载均衡

限流:当请求流量过高时,在网关中按照下游的微服务能够接受的速度来放行请求,避免服务压力过大

Gateway 的工作流程

Spring Cloud Gateway 工作流程如下图:

Spring Cloud Gateway 工作流程说明如下:

  1. 客户端将请求发送到 Spring Cloud Gateway 上
  2. Spring Cloud Gateway 通过 Gateway Handler Mapping 找到与请求相匹配的路由,将其发送给 Gateway Web Handler
  3. Gateway Web Handler 通过指定的过滤器链(Filter Chain),将请求转发到实际的服务节点中,执行业务逻辑返回响应结果
  4. 过滤器之间用虚线分开是因为过滤器可能会在转发请求之前(pre)或之后(post)执行业务逻辑
  5. 过滤器(Filter)可以在请求被转发到服务端前,对请求进行拦截和修改,例如参数校验、权限校验、流量监控、日志输出以及协议转换等
  6. 过滤器可以在响应返回客户端之前,对响应进行拦截和再处理,例如修改响应内容或响应头、日志输出、流量监控等
  7. 响应原路返回给客户端

总而言之,客户端发送到 Spring Cloud Gateway 的请求需要通过一定的匹配条件,才能到达真正的服务节点。在将请求转发到服务进行处理的过程前后(pre 和 post),我们还可以对请求和响应进行一些精细化控制。

Predicate 就是路由的匹配条件,而 Filter 就是对请求和响应进行精细化控制的工具。有了这两个元素,再加上目标 URI,就可以实现一个具体的路由了

当然,要是再加上前面已经玩过的东西的流程就变成下面的样子了:

Predicate 断言

Spring Cloud Gateway 通过 Predicate 断言来实现 Route 路由的匹配规则。简单点说,Predicate 是路由转发的判断条件,请求只有满足了 Predicate 的条件,才会被转发到指定的服务上进行处理。

使用 Predicate 断言需要注意以下 3 点:

  1. Route 路由与 Predicate 断言的对应关系为“一对多”,一个路由可以包含多个不同断言条件
  2. 一个请求想要转发到指定的路由上,就必须同时匹配路由上的所有断言
  3. 当一个请求同时满足多个路由的断言条件时,请求只会被首个成功匹配的路由转发

常见的 Predicate 断言如下表:假设转发的 URI 为 http://localhost:8001

断言示例说明
Path– Path=/dept/list/**当请求路径与 /dept/list/ 匹配时,该请求才能被转发到 http://localhost:8001 上
Before– Before=2021-10-20T11:47:34.255+08:00[Asia/Shanghai]在 2021 年 10 月 20 日 11 时 47 分 34.255 秒之前的请求,才会被转发到 http://localhost:8001 上
After– After=2021-10-20T11:47:34.255+08:00[Asia/Shanghai]在 2021 年 10 月 20 日 11 时 47 分 34.255 秒之后的请求,才会被转发到 http://localhost:8001 上
Between– Between=2021-10-20T15:18:33.226+08:00[Asia/Shanghai],2021-10-20T15:23:33.226+08:00[Asia/Shanghai]在 2021 年 10 月 20 日 15 时 18 分 33.226 秒 到 2021 年 10 月 20 日 15 时 23 分 33.226 秒之间的请求,才会被转发到 http://localhost:8001 服务器上
Cookie– Cookie=name,www.cnblogs.com/xiegongzi携带 Cookie 且 Cookie 的内容为 name=www.cnblogs.com/xiegongzi 的请求,才会被转发到 http://localhost:8001 上
Header– Header=X-Request-Id,\d+请求头上携带属性 X-Request-Id 且属性值为整数的请求,才会被转发到 http://localhost:8001 上
Method– Method=GET只有 GET 请求才会被转发到 http://localhost:8001 上
Host– Host=.somehost.org,.anotherhost.org请求必须是访问.somehost.org和.anotherhost.org这两个host(域名)才会被转发到 http://localhost:8001 上
Query– Query=name请求参数必须包含指定参数(name),才会被转发到 http://localhost:8001 上
RemoteAddr– RemoteAddr=192.168.1.1/24请求者的ip必须是指定范围(192.168.1.1 到 192.168.1.24)
Weight权重处理weight,有两个参数:group和weight(一个整数)
如示例中表示:分80%的流量给weihthigh.org

上表中这些也叫“Predicate断言工厂”,我们在配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件

例如 Path=/user/** 是按照路径匹配,这个规则是由org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory类来处理的

入手Gateway

新建一个Maven项目,依赖如下:

<!--Nacos服务发现-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

<!--网关-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

YAML配置文件内容如下:

server:
  port: 10010 # 网关端口
spring:
  application:
    name: gateway # 服务名称
  cloud:
    nacos:
      server-addr: localhost:8848 # nacos地址
    gateway:
      routes: # 网关路由配置
        - id: userservice # 路由id,自定义,只要唯一即可
          # uri: http://127.0.0.1:8081 # 路由的目标地址,这是一种写法,常用的是下面这种
          uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
          predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
            - Path=/user/** # 按路径匹配,只要以 /user/ 开头就符合要求
        - id: orderservice
          uri: lb://orderservice
          predicates:
            - Path=/order/**

经过如上方式,就简单搭建了Gateway网关,启动、访问 localhost:10010/user/id 或 localhost:10010/order/id 即可

filter 过滤器

通常情况下,出于安全方面的考虑,服务端提供的服务往往都会有一定的校验逻辑,例如用户登陆状态校验、签名校验等

在微服务架构中,系统由多个微服务组成,所以这些服务都需要这些校验逻辑,此时我们就可以将这些校验逻辑写到 Spring Cloud Gateway 的 Filter 过滤器中

Filter是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理:

Spring Cloud Gateway 提供了以下两种类型的过滤器,可以对请求和响应进行精细化控制

过滤器类型说明
Pre 类型这种过滤器在请求被转发到微服务“之前”可以对请求进行拦截和修改,如参数校验、权限校验、流量监控、日志输出以及协议转换等操作
Post 类型这种过滤器在微服务对请求做出响应“之后”可以对响应进行拦截和再处理,如修改响应内容或响应头、日志输出、流量监控等

按照作用范围划分,Spring Cloud gateway 的 Filter 可以分为 2 类:

  1. GatewayFilter:应用在“单个路由”或者“一组路由”上的过滤器
  2. GlobalFilter:应用在“所有的路由”上的过滤器

GatewayFilter 网关过滤器

GatewayFilter 是 Spring Cloud Gateway 网关中提供的一种应用在“单个路由”或“一组路由”上的过滤器

它可以对单个路由或者一组路由上传入的请求和传出响应进行拦截,并实现一些与业务无关的功能,如登陆状态校验、签名校验、权限校验、日志输出、流量监控等

GatewayFilter 在配置文件(如 application.yml)中的写法与 Predicate 类似,格式如下:

server:
  port: 10010 # 网关端口
spring:
  application:
    name: gateway # 服务名称
  cloud:
    nacos:
      server-addr: localhost:8848 # nacos地址
    gateway:
      routes: # 网关路由配置
        - id: userservice # 路由id,自定义,只要唯一即可
          # uri: http://127.0.0.1:8081 # 路由的目标地址,常用写法是下面这种
          uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
          predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
            - Path=/user/** # 按路径匹配,只要以/user/开头就符合要求
          filters: # gateway过滤器
            - AddRequestHeader=name, zixieqing # 添加请求头name=zixieqing
        - id: orderservice
          uri: lb://orderservice
          predicates:
            - Path=/order/**

想要验证的话,可以在添加路由的服务中进行获取,如上面加在了userservice中,那么验证方式如下:

package com.zixieqing.user.web;

import com.zixieqing.user.entity.User;
import com.zixieqing.user.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * <p>@description  : 该类功能  user控制层
 * </p>
 * <p>@author       : ZiXieqing</p>
 */

@RestController
@RequestMapping("/user")
public class UserController {
    @Autowired
    private UserService userService;

    /**
     * 路径: /user/110
     *
     * @param id 用户id
     * @return 用户
     */
    @GetMapping("/{id}")
    public User queryById(@PathVariable("id") Long id,
                          @RequestHeader(value = "name",required = false) String name) {
        System.out.println("name = " + name);
        return userService.queryById(id);
    }
}

此种过滤器一共有37种(常用就几种),它们的用法和上面的差不多,可以多个过滤器共同使用

详细去看链接:https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/#gatewayfilter-factories

下表中列举了几种比较常用的网关过滤器:

路由过滤器描述参数使用示例
AddRequestHeader拦截传入的请求,并在请求上添加一个指定的请求头参数name:需要添加的请求头参数的 key
value:需要添加的请求头参数的 value
– AddRequestHeader=my-request-header,1024
AddRequestParameter拦截传入的请求,并在请求上添加一个指定的请求参数name:需要添加的请求参数的 key
value:需要添加的请求参数的 value
– AddRequestParameter=my-request-param,c.biancheng.net
AddResponseHeader拦截响应,并在响应上添加一个指定的响应头参数name:需要添加的响应头的 key
value:需要添加的响应头的 value
– AddResponseHeader=my-response-header,c.biancheng.net
PrefixPath拦截传入的请求,并在请求路径增加一个指定的前缀prefix:需要增加的路径前缀– PrefixPath=/consumer
PreserveHostHeader转发请求时,保持客户端的 Host 信息不变,然后将它传递到提供具体服务的微服务中– PreserveHostHeader
RemoveRequestHeader移除请求头中指定的参数name:需要移除的请求头的 key– RemoveRequestHeader=my-request-header
RemoveResponseHeader移除响应头中指定的参数name:需要移除的响应头– RemoveResponseHeader=my-response-header
RemoveRequestParameter移除指定的请求参数name:需要移除的请求参数– RemoveRequestParameter=my-request-param
RequestSize配置请求体的大小,当请求体过大时,将会返回 413 Payload Too LargemaxSize:请求体的大小– name: RequestSize args: maxSize: 5000000

GlobalFilter 全局过滤器

全局过滤器的作用也是处理一切进入网关的请求和微服务响应

  1. 方式一:像上面一样直接在YAML文件中配置

缺点:要是需要编写复杂的业务逻辑时会非常不方便,但是:这种过滤器的优先级比下面一种要高

server:
  port: 10010 # 网关端口
spring:
  application:
    name: gateway # 服务名称
  cloud:
    nacos:
      server-addr: localhost:8848 # nacos地址
    gateway:
      routes: # 网关路由配置
        - id: userservice # 路由id,自定义,只要唯一即可
          # uri: http://127.0.0.1:8081 # 路由的目标地址
          uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
          predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
            - Path=/user/** # 按路径匹配,只要以 /user/ 开头就符合要求
#          filters:
#            - AddRequestHeader=name, zixieqing
        - id: orderservice
          uri: lb://orderservice
          predicates:
            - Path=/order/**
      default-filters:
        # 全局过滤器
        - AddRequestHeader=name, zixieqing
  1. 方式二:使用代码实现,定义方式是 implements GlobalFilter 接口:
public interface GlobalFilter {
    /**
     * 处理当前请求,有必要的话通过 GatewayFilterChain 将请求交给下一个过滤器处理
     *
     * @param exchange 请求上下文,里面可以获取Request、Response等信息
     * @param chain 用来把请求委托给下一个过滤器 
     * @return Mono<Void> 返回标示当前过滤器业务结束
     */
    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

在filter中编写自定义逻辑,可以实现下列功能:

  • 登录状态判断
  • 权限校验
  • 请求限流等

举例如下:获取和比较的就是刚刚前面在YAML中使用的 - AddRequestHeader=name, zixieqing

package com.zixieqing.gateway.filter;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.util.List;

/**
 * <p>@description  : 自定义gateway全局路由器:请求头中有 name=zixieqing 才放行
 * </p>
 * <p>@author       : ZiXieqing</p>
 */

@Order(-1)  // 这个注解和本类 implements Ordered 是一样的效果,都是返回一个整数
            // 这个整数表示当前过滤器的执行优先级,值越小优先级越高,取值范围就是 int的范围
@Component
public class MyGlobalFilter implements GlobalFilter /* , Ordered */ {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 获取请求头中的name
        List<String> name = exchange.getRequest().getHeaders().get("name");
        for (String value : name) {
            if ("zixieqing".equals(value))
                // 放行
                return chain.filter(exchange);

        }

        // 设置状态码
        exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);

        // 不再执行下去,到此结束 setComplete即设置完成的意思
        return exchange.getResponse().setComplete();
    }
}

过滤器执行顺序

请求进入网关会碰到三类过滤器:当前路由的过滤器、DefaultFilter、GlobalFilter

请求路由后,会将当前路由过滤器和DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中,排序后依次执行每个过滤器:

排序的规则是什么呢?

  1. 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前
  2. GlobalFilter通过实现Ordered接口,或者添加 @Order 注解由我们自己指定order值
  3. 路由过滤器和defaultFilter的order值由Spring指定,默认是按照声明顺序从1递增
  4. 当过滤器的order值一样时,会按照 defaultFilter > 路由过滤器 > GlobalFilter 的顺序执行

详细内容,可以查看源码:

  1. org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator#getFilters()方法是先加载defaultFilters,然后再加载某个route的filters,最后合并
  2. org.springframework.cloud.gateway.handler.FilteringWebHandler#handle()方法会加载全局过滤器,与前面的过滤器合并后根据order排序,组织过滤器链

网关跨域问题

跨域:域名不一致就是跨域,主要包括:

  • 域名不同: www.taobao.com 和 www.taobao.org 和 www.jd.com 和 miaosha.jd.com

  • 域名相同,端口不同:localhost:8080 和 localhost8081

跨域问题:浏览器禁止请求的发起者与服务端发生跨域ajax请求,请求被浏览器拦截的问题

解决方案:CORS,了解CORS可以去这里 https://www.ruanyifeng.com/blog/2016/04/cors.html

全局跨域

解决方式:在gateway服务的 application.yml 文件中,添加下面的配置:

spring:
  cloud:
    gateway:
      globalcors: # 全局的跨域处理
        # 解决options请求被拦截问题。CORS跨域浏览器会问服务器可不可以跨域,而这种请求是options,网关默认会拦截这种请求
        add-to-simple-url-handler-mapping: true
        corsConfigurations:
          '[/**]':	# 拦截哪些请求,此处为拦截所有请求
            allowedOrigins: # 允许哪些网站的跨域请求 
              - "http://localhost:8090"
            allowedMethods: # 允许的跨域ajax的请求方式
              - "GET"
              - "POST"
              - "DELETE"
              - "PUT"
              - "OPTIONS"
            allowedHeaders: "*" # 允许在请求中携带的头信息
            allowCredentials: true # 是否允许携带cookie
            maxAge: 360000 # 这次跨域检测的有效期是多少秒。每次跨域都要询问一次服务器,这会浪费一定性能,因此加入有效期

局部跨域

route配置允许将 CORS 作为元数据直接应用于路由,例如下面的配置:

spring:
  cloud:
    gateway:
      routes:
      - id: cors_route
        uri: https://example.org
        predicates:
        - Path=/service/**
        metadata:
          cors
            allowedOrigins: '*'
            allowedMethods:
              - GET
              - POST
            allowedHeaders: '*'
            maxAge: 30

注意:若是 predicates 中的 Path 没有的话,那么默认使用 /**

Docker

安装docker

1、安装yum工具

yum install -y yum-utils device-mapper-persistent-data lvm2 --skip-broken

2、更新本地镜像源为阿里镜像源:docker官方镜像仓库网速较差,我们需要设置国内镜像服务

yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
    
sed -i 's/download.docker.com/mirrors.aliyun.com\/docker-ce/g' /etc/yum.repos.d/docker-ce.repo

yum makecache fast

参考阿里云的镜像加速文档:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors

3、安装docker

yum install -y docker-ce

4、关闭防火墙

Docker应用需要用到各种端口,逐一去修改防火墙设置。非常麻烦,因此可以选择直接关闭防火墙,也可以开放需要的端口号,这里采用直接关闭防火墙

# 关闭
systemctl stop firewalld
# 禁止开机启动防火墙
systemctl disable firewalld

5、启动docker服务

systemctl start docker

6、开启开机自启

systemctl enable docker

7、测试是否成功

docker ps

出现这个页面,则:说明安装成功

或者是:

docker -v		# 出现docker版本号也表示成功

镜像名称

首先来看下镜像的名称组成:

  • 镜名称一般分两部分组成:[repository]:[tag]。
  • 在没有指定tag时,默认是latest,代表最新版本的镜像

如图:

这里的mysql就是repository,5.7就是tag,合一起就是镜像名称,代表5.7版本的MySQL镜像。

Docker命令

Docker仓库地址(即dockerHub):https://hub.docker.com

常见的镜像操作命令如图:

# 拉取镜像
docker pull 镜像名称

# 查看全部镜像
docker images

# 删除镜像
docker rmi 镜像ID

# 将本地的镜像导出 
docker save -o 导出的路径 镜像id

# 加载本地的镜像文件 
docker load -i 镜像文件

# 修改镜像名称 
docker tag 镜像id 新镜像名称:版本




# 简单运行操作 
docker run 镜像ID | 镜像名称
# docker run	指的是创建一个容器并运行

# 跟参数的运行
docker run -d -p 宿主机端口:容器端口 --name 容器名称 镜像ID | 镜像名称
# 如:docker run -d -p 8081:8080 --name tomcat b8
# -d:代表后台运行容器 
# -p 宿主机端口:容器端口		为了映射当前Linux的端口和容器的端口 
# --name 容器名称:指定容器的名称

# 查看运行的容器
docker ps [-qa]
# -a:查看全部的容器,包括没有运行 
# -q:只查看容器的标识

# 查看日志
docker logs -f 容器id 
# -f:可以滚动查看日志的最后几行

# 进入容器内部
docker exec -it 容器id bash 
# docker exec 进入容器内部,执行一个命令
# -it	给当前进入的容器创建一个标准输入、输出终端,允许我们与容器交互
# bash	进入容器后执行的命令,bash是一个Linux终端交互命令
# 退出容器:exit

# 将宿主机的文件复制到容器内部的指定目录
docker cp 文件名称 容器id:容器内部路径 
docker cp index.html 982:/usr/local/tomcat/webapps/ROOT

# 重新启动容器
docker restart 容器id

# 启动停止运行的容器
docker start 容器id

# 停止指定的容器(删除容器前,需要先停止容器)
docker stop 容器id

# 停止全部容器
docker stop $(docker ps -qa)

# 删除指定容器
docker rm 容器id

# 删除全部容器
docker rm $(docker ps -qa)




# ==================数据卷volume========================

# 创建数据卷
docker volume create 数据卷名称
# 创建数据卷之后,默认会存放在一个目录下 /var/lib/docker/volumes/数据卷名称/_data

# 查看数据卷详情
docker volume inspect 数据卷名称

# 查看全部数据卷
docker volume ls

# 删除指定数据卷
docker volume rm 数据卷名称



# Docker容器映射数据卷==========>有两种方式:
# 1、通过数据卷名称映射,如果数据卷不存在。Docker会帮你自动创建,会将容器内部自带的文件,存储在默认的存放路径中

# 通过数据卷名称映射
docker run -v 数据卷名称:容器内部的路径 镜像id

# 2、通过路径映射数据卷,直接指定一个路径作为数据卷的存放位置。但是这个路径不能是空的 - 重点掌握的一种
# 通过路径映射数据卷 
docker run -v 宿主机中自己创建的路径:容器内部的路径 镜像id

# 如:docker run -d -p 8081:8080 --name tomcat -v[volume] /opt/tocmat/usr/local/tocmat/webapps b8

数据卷挂载和目录直接挂载的区别:

  1. 数据卷挂载耦合度低,由docker来管理目录且目录较深,所以不好找
  2. 目录挂载耦合度高,需要我们自己管理目录,不过很容易查看

更多命令通过 docker -helpdocker 某指令 --help 来学习

虚悬镜像

虚悬镜像:指的是仓库名、标签都是 <none> ,即俗称dangling image

出现的原因:在构建镜像或删除镜像时出现了某些错误,从而导致仓库名和标签都是 <none>

事故重现:

# 1、创建Dockerfile文件,注:必须是大写的D
vim Dockerfile

# 2、编写如下内容,下面这两条指令看不懂没关系,下一节会解释
FROM ubuntu
CMD echo "执行完成"

# 3、构建镜像
docker build .

# 4、查看镜像
docker images

这种东西就是“虚悬镜像”,就是个残次品,不是一定会出事,也不是一定不会出事,但一旦有,就很可能会导致项目出问题,因此绝不可以出现这种镜像,一旦有就最好删掉

# 查看虚悬镜像有哪些
docker image ls -f dangling=true

# 删除所有的虚悬镜像
docker image prune

Dockerfile 自定义镜像

玩这个玩的就是三步骤,重现虚悬镜像时已经见了一下:

  1. 编辑Dockerfile文件 注:必须是大写D
  2. docker build构建成Docker镜像
  3. 启动构建的Docker镜像

Dockerfile文件中的关键字

官网: https://docs.docker.com/engine/reference/builder/

指令含义解读示例
#注释字面意思# 注释内容
FROM指定当前新镜像是基于哪个基础镜像,即:基于哪个镜像继续升级
“必须放在第一行”
类似于对“某系统”进行升级,添加新功能
这里的“某系统”就是基础镜像
FROM centos:7
MAINTAINER镜像的作者和邮箱和IDEA中写一个类或方法时留下自己姓名和邮箱类似MAINTAINER zixqzixq8@qq.com
RUN容器“运行时”需要执行的命令
RUN是在进行docker build时执行
在进行docker build时会安装一些命令或插件,亦或输出一句话用来提示进行到哪一步了/当前这一步是否成功了有两种格式:
1、shell格式:RUN <命令行命令> 如:RUN echo “Successfully built xxxx” 或者是 RUN yum -y imstall vim
这种等价于在终端中执行shell命令

2、exec格式:RUN {“可执行文件”,”参数1”,”参数2”}
如:RUN {“./startup.cmd”,”-m”,”standalone”}
等价于 startup.cmd -m standalone

EXPOSE当前容器对外暴露出的端口字面意思。容器自己想设定的端口,docker要做宿主机和容器内端口映射咯EXPOSE 80
WORKDIR指定在容器创建后,终端默认登录进来时的工作目录虚拟机进入时默认不就是 ~ 或者 Redis中使用Redis -cli登录进去之后不是也有默认路径吗WORKDIR /usr/local


WORKDIR /

USER指定该镜像以什么样的用户去执行,若不进行指定,则默认用 root 用户

这玩意儿一般都不会特意去设置

时空见惯了,略过USER root
ENV是environment的缩写,即:用来在镜像构建过程中设置环境变量可以粗略理解为定义了一个 key=value 形式的常量,这个常量方便后续某些地方直接进行引用ENV MY_NAME=”John Doe”

或形象点
ENV JAVA_HOME=/usr/local/java

VOLUME数据卷,进行数据保存和持久化和前面docker中使用 -v 数据卷是一样的VOLUME /myvol
COPY复制,拷贝目录和文件到镜像中COPY test.txt relativeDir/

注:这里的目标路径或目标文件relativeDir 不用事先创建,会自动创建

ADD将宿主机目录下的文件拷贝进镜像 且 会自动处理URL和解压tar压缩包和COPY类似,就是COPY+tar文件解压这两个功能组合ADD test.txt /mydir/

或形象点
ADD target/tomcat-stuffed-1.0.jar /deployments/app.jar

CMD指定容器“启动后”要干的事情

Dockerfile中可以有多个CMD指令,“但是:只有最后一个有效”

“但可是:若Dockerfile文件中有CMD,而在执行docker run时后面跟了参数,那么就会替换掉Dockerfile中CMD的指令”,如:
docker run -d -p 80:80 —name tomcat 容器ID /bin/bash
这里用了/bin/bash参数,那就会替换掉自定义的Dockerfile中的CMD指令

和RUN一样也是支持两种格式

1、shell格式:CMD <命令> 如 CMD echo “wc,This is a test”

2、exec格式:CMD {“可执行文件”,”参数1”,”参数2”}

和RUN的区别:
CMD是docker run时运行
RUN是docker build时运行

ENTRYPOINT也是用来指定一个容器“启动时”要运行的命令类似于CMD指令,但:ENRTYPOINT不会被docker run后面的命令覆盖,且这些命令行会被当做参数送给ENTRYPOINT指令指定的程序
和CMD一样,支持两种格式

1、shell格式:ENTRYPOINT<命令>

2、exec格式:ENTRYPOINT

注意: 上表中指令必须是大写

再理解Dockerfile语法,直接参考Tomcat:https://github.com/apache/tomcat/blob/main/modules/stuffed/Dockerfile

将微服务构建为镜像部署

这个玩意儿属于云原生技术里面的,因为前面都玩了Dockerfile,所以就顺便弄一下这个

思路:

  1. 创建一个微服务项目,编写自己的逻辑,通过Maven的package工具打成jar包

  2. 将打成的jar包上传到自己的虚拟机中,目录自己随意

  3. 创建Dockerfile文件,并编写内容,参考如下:

    # 基础镜像
    FROM java:8
    # 作者
    MAINTAINER zixq
    # 数据卷 在宿主机/var/lib/docker目录下创建了一个临时文件并映射到容器的/tmp
    VOLUME /tmp
    # 将jar包添加到容器中 并 更名为 zixq_dokcer.jar
    ADD docker_boot-0.0.1.jar zixq_docker.jar
    # 运行jar包
    RUN bash -c "touch /zixq_docker.jar"
    ENTRYPOINT {"java","-jar","/zixq_docker.jar"}
    # 暴露端口
    EXPOSE 8888
    

    :Dockerfile文件和jar包最好在同一目录

  4. 构建成docker镜像

    # docker build -t 仓库名字(REPOSITORY):标签(TAG)
    docker build -t zixq_docker:0.1 .
    # 最后有一个	点.	表示:当前目录,jar包和Dockerfile不都在当前目录吗
    
  5. 运行镜像

    docker run -d -p 8888:8888 镜像ID
    
    # 注意防火墙的问题,端口是否开放或防火墙是否关闭,否则关闭/开放,然后重启docker,重现运行镜像.........
    
  6. 浏览器访问

    自己虚拟机ip + 5中暴露的port + 自己微服务中的controller路径
    

Docker-Compose

Docker Compose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器!

安装Docker-Compose

1、下载Docker-Compose

# 1、安装
# 1.1、选择在线,直接官网拉取
curl -L https://github.com/docker/compose/releases/download/1.23.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

# 要是嫌慢的话,也可以去这个网址
curl -L https://get.daocloud.io/docker/compose/releases/download/1.26.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

# 1.2、也可以选择离线安装,直接下载到本地后,上传到虚拟机 /usr/local/bin/ 路径中即可



# 2、修改文件权限,因为 /usr/local/bin/docker-compose 文件还没有执行权
chmod +x /usr/local/bin/docker-compose

# 3、检测是否成功,出现命令文档说明就表示成功了
docker-compose

可以再加上一个东西:Base自动补全命令

# 补全命令
curl -L https://raw.githubusercontent.com/docker/compose/1.29.1/contrib/completion/bash/docker-compose > /etc/bash_completion.d/docker-compose

# 若是出现错误,这是因为上面这个网址域名的问题,这需要修改hosts文件
# 可以先修改hosts,然后再拉取Base自动补全命令
echo "199.232.68.133 raw.githubusercontent.com" >> /etc/hosts

Docker-Compose语法

DockerCompose的详细语法参考官网:https://docs.docker.com/compose/compose-file/

其实DockerCompose文件可以看做是将多个docker run命令写到一个文件,只是语法稍有差异

Compose文件是一个文本文件(YAML格式),通过指令定义集群中的每个容器如何运行。格式如下:

注: 这YAML里面的格式要求很严格

  1. 每行末尾别有空格
  2. 别用tab缩进(在IDEA中编辑好除外,这种会自动进行转换,但偶尔会例外),容易导致启动不起来
  3. 注释最好像下面这样写在上面,不要像在IDEA中写在行尾,这样容易解析出错成为空格(偶尔会莫名其妙启动不起来,把注释位置改为上面又可以了)
# docker-compose的版本,目前的版本有1.x、2.x、3.x
version: "3.2"

services:
# 就是docker run中 --name 后面的名字
  nacos:
    image: nacos/nacos-server
    environment:
# 前面玩nacos的单例模式启动
      MODE: standalone
    ports:
      - "8848:8848"
  mysql:
    image: mysql:5.7.25
    environment:
      MYSQL_ROOT_PASSWORD: 123
    volumes:
      - "$PWD/mysql/data:/var/lib/mysql"
      - "$PWD/mysql/conf:/etc/mysql/conf.d/"
# 对某微服务的配置,一般不要暴露端口,网关会协调,微服务之间是内部访问,对于用户只需暴露一个入口就行,即:网关
  xxxservice:
    build: ./xxx-service
  yyyservice:
    build: ./yyy-service
# 网关微服务配置
  gateway:
    build: ./gateway
    ports:
      - "10010:10010"

上面的Compose文件就描述一个项目,其中包含两个容器(对照使用 docker run -d -p 映射出来的宿主机端口:容器内暴露的端口 –name 某名字……… 命令跑某个镜像,这文件内容就是多个容器配置都在一起,最后一起跑起来而已):

  • mysql:一个基于mysql:5.7.25镜像构建的容器,并且挂载了两个目录
  • web:一个基于docker build临时构建的镜像容器,映射端口是8090

Docker-Compose的基本命令

在使用docker-compose的命令时,默认会在当前目录下找 docker-compose.yml 文件(这个文件里面的内容就是上一节中YAML格式的内容写法),所以:需要让自己在创建的 docker-compose.yml 文件的当前目录中,从而来执行docker-compose相关的命令

# 1. 基于docker-compose.yml启动管理的容器
docker-compose up -d

# 2. 关闭并删除容器
docker-compose down

# 3. 开启|关闭|重启已经存在的由docker-compose维护的容器
docker-compose start|stop|restart

# 4. 查看由docker-compose管理的容器
docker-compose ps

# 5. 查看日志
docker-compose logs -f [服务名1] [服务名2]

更多命令使用 docker-compose -helpdocker-compose 某指令 --help 查看即可

Docker私有仓库搭建

公共仓库:像什么前面的DockerHub、DaoCloud、阿里云镜像仓库…………..

简化版仓库

Docker官方的Docker Registry是一个基础版本的Docker镜像仓库,具备仓库管理的完整功能,但是没有图形化界面。

搭建方式如下:

# 直接在虚拟机中执行命令即可
docker run -d \
    --restart=always \
    --name registry	\
    -p 5000:5000 \
    -v registry-data:/var/lib/registry \
    registry

命令中挂载了一个数据卷registry-data到容器内的 /var/lib/registry 目录,这是私有镜像库存放数据的目录

访问http://YourIp:5000/v2/_catalog 可以查看当前私有镜像服务中包含的镜像

图形化仓库

1、在自己的目录中创建 docker-compose.yml 文件

vim docker-compose.yml

2、配置Docker信任地址:Docker私服采用的是http协议,默认不被Docker信任,所以需要做一个配置

# 打开要修改的文件
vim /etc/docker/daemon.json
# 添加内容:registry-mirrors 是前面已经配置过的阿里云加速,放在这里是为了注意整个json怎么配置的,以及注意多个是用 逗号 隔开的
# 真正要加的内容是 "insecure-registries":["http://192.168.150.101:8080"]
{
  "registry-mirrors": ["https://838ztoaf.mirror.aliyuncs.com"],
  "insecure-registries":["http://192.168.150.101:8080"]
}
# 重加载
systemctl daemon-reload
# 重启docker
systemctl restart docker

3、在docekr-compose.yml文件中编写如下内容

version: '3.0'

services:
  registry:
    image: registry
    volumes:
      - ./registry-data:/var/lib/registry
# ui界面搭建,用的是别人的
  ui:
    image: joxit/docker-registry-ui:static
    ports:
      - 8080:80
    environment:
      - REGISTRY_TITLE=悠忽有限公司私有仓库
      - REGISTRY_URL=http://registry:5000
    depends_on:
      - registry

4、使用docker-compose启动容器

docekr-compsoe up -d

5、浏览器访问

虚拟机IP:上面ui中配置的ports

推送和拉取镜像

推送镜像到私有镜像服务必须先tag,步骤如下:

  1. 重新tag本地镜像,名称前缀为私有仓库的地址:192.168.xxx.yyy:8080/
# docker tag 仓库名(REPOSITORY):标签(TAG) YourIp:ui中配置的port/新仓库名:标签
docker tag nginx:latest 192.168.xxx.yyy:8080/nginx:1.0
  1. 推送镜像
docker push 192.168.xxx.yyy:8080/nginx:1.0 
  1. 拉取镜像
docker pull 192.168.xxx.yyy:8080/nginx:1.0 

RabbitMQ 消息队列

直接去这个旮旯地方:https://www.cnblogs.com/zixq/p/16242291.html

ElasticSearch 分布式搜索引擎

  1. 基础理论、DSL语法、Java操作ES:https://www.cnblogs.com/zixq/p/15684307.html
  2. 续篇:https://www.cnblogs.com/zixq/p/15770665.html

Sentinel 微服务保护

Sentinel是阿里巴巴开源的一款微服务流量控制组件。官网地址:https://sentinelguard.io/zh-cn/index.html

雪崩问题与解决方式

所谓的雪崩指的是:微服务之间相互调用,调用链中某个微服务出现问题了,导致整个服务链的所有服务也跟着出问题,从而造成所有服务都不可用

解决方式:

  1. 超时处理:是一种临时方针,即设置定时器,请求超过规定的时间就返回错误信息,不会无休止等待

    缺点:在超时时间内,还未返回错误信息内,服务未处理完,请求激增,一样会导致后面的请求阻塞

  2. 线程隔离:也叫舱壁模式,即限定每个业务能使用的线程数,避免耗尽整个tomcat的资源

    缺点:会造成一定资源的浪费。明明服务已经不可用了,还占用固定数量的线程

  3. 熔断降级

    1. 熔断: 由“断路器”统计业务执行的异常比例,如果超出“阈值”则会熔断/暂停该业务,拦截访问该业务的一切请求,后续搞好了再开启。从而做到在流量过大时(或下游服务出现问题时),可以自动断开与下游服务的交互,并可以通过自我诊断下游系统的错误是否已经修正,或上游流量是否减少至正常水平来恢复自我恢复。熔断更像是自动化补救手段,可能发生在服务无法支撑大量请求或服务发生其他故障时,对请求进行限制处理,同时还可尝试性的进行恢复
    2. 降级: 丢车保帅。针对非核心业务功能,核心业务超出预估峰值需要进行限流;所谓降级指的就是在预计流量峰值前提下,整体资源快不够了,忍痛将某些非核心服务先关掉,待渡过难关,再开启回来
  4. 限流: 也叫流量控制。指的是限制业务访问的QPS,避免服务因流量的突增而故障。是防御保护手段,从流量源头开始控制流量规避问题

限流是对服务的保护,避免因瞬间高并发流量而导致服务故障,进而避免雪崩。是一种预防措施

超时处理、线程隔离、降级熔断是在部分服务故障时,将故障控制在一定范围,避免雪崩。是一种补救措施

服务保护技术对比

在SpringCloud当中支持多种服务保护技术:

  • Netfix Hystrix
  • Sentinel
  • Resilience4J

早期比较流行的是Hystrix框架(后面这吊毛不维护、不更新了),所以目前国内实用最广泛的是阿里巴巴的Sentinel框架

SentinelHystrix
隔离策略信号量隔离线程池隔离/信号量隔离
熔断降级策略基于慢调用比例或异常比例基于失败比率
实时指标实现滑动窗口滑动窗口(基于 RxJava)
规则配置支持多种数据源支持多种数据源
扩展性多个扩展点插件的形式
基于注解的支持支持支持
限流基于 QPS,支持基于调用关系的限流有限的支持
流量整形支持慢启动、匀速排队模式不支持
系统自适应保护支持不支持
控制台开箱即用,可配置规则、查看秒级监控、机器发现等不完善
常见框架的适配Servlet、Spring Cloud、Dubbo、gRPC 等Servlet、Spring Cloud Netflix

安装sentinel

  1. 下载:https://github.com/alibaba/Sentinel/releases 是一个jar包,下载了放到“非中文”目录中

  2. 运行

    java -jar sentinel-dashboard-1.8.1.jar
    

如果要修改Sentinel的默认端口、账户、密码,可以通过下列配置:

配置项默认值说明
server.port8080服务端口
sentinel.dashboard.auth.usernamesentinel默认用户名
sentinel.dashboard.auth.passwordsentinel默认密码

例如,修改端口:

java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar
  1. 访问。如http://localhost:8080,用户名和密码都是sentinel

入手sentinel

  1. 依赖

    <!--sentinel-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId> 
        <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    </dependency>
    
  2. YAML配置

    server:
      port: 8088
    spring:
      cloud: 
        sentinel:
          transport:
    # 		sentinel的地址
            dashboard: localhost:8080
    
  3. 然后将服务提供者、服务消费者、网关、Feign……启动,发送请求即可在前面sentinel的ui控制台看到信息了

限流 / 流量控制

雪崩问题虽然有四种方案,但是限流是避免服务因突发的流量而发生故障,是对微服务雪崩问题的预防,因此先来了解这种模式,但在了解这个之前先了解一下限流算法

限流算法

固定窗口计数器算法

给定时间窗口,维护一个计数器用于统计访问次数,并实现以下规则:

  1. 如果访问次数小于阈值,则允许访问,访问次数+1;

  2. 如果访问次数超出阈值,则限制访问,访问次数不增;

  3. 如果超过了时间窗口,计数器清零,并重置清零后的首次成功访问时间为当前时间

使用场景:

  • 保护后端服务免受大流量冲击,避免服务崩溃;
  • 对 API 调用进行限制,保证公平使用;
  • 防止恶意用户对服务进行洪水攻击;

文图示例:

  1. 将时间划分为多个窗口,窗口时间跨度称为Interval
  2. 每个窗口维护一个计数器,每有一次请求就将计数器 +1,限流就是设置计数器阈值
  3. 如果计数器超过了限流阈值,则超出阈值的请求都被丢弃,计数器不增

但是有个缺点:存在明显的临界问题。时间是不固定的,如0 – 1000ms是QPS(1秒内的请求数),这样来看没有超过阈值,可是:4500 – 5500ms也是1s啊,这是不是也是QPS啊,像下面这样就超出阈值了,服务不得干爬了

代码示例:

public class FixedWindowRateLimiter {
    
    // 统计请求数
    private static int counter = 0;
    private static long lastAcquireTime = 0L;
    // 假设固定时间窗口是1000ms
    private static final long windowUnit = 1000L;
    // 窗口阀值是10
    private static final int threshold = 10;

    public synchronized boolean tryAcquire() {
        // 获取系统当前时间
        long currentTime = System.currentTimeMillis();
        // 检查是否在时间窗口内
        if (currentTime - lastAcquireTime > windowUnit) {
            // 计数器清零
            counter = 0;
            // 开启新的时间窗口
            lastAcquireTime = currentTime;
        }
        
        // 小于阀值
        if (counter < threshold) {
            // 计数器加1
            counter++;
            // 获取请求成功
            return true;
        }
        
        // 超过阀值,无法获取请求
        return false;
    }
}

滑动窗口计数器算法

为了解决固定窗口临界问题。在固定窗口计数器算法的基础上,滑动窗口计数器算法会将一个窗口划分为n个更小的子窗口,每个子窗口独立统计,按子窗口时间滑动,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题。

适用场景:同固定窗口的场景,且对流量限制要求较高的场景,需要更好地应对突发流量

示例:

  1. 窗口时间跨度Interval为1秒;区间数量 n = 5 ,则每个小区间时间跨度为0.2s
  2. 每过0.2s,时间窗口就会往右滑动一格
  3. 每个小周期,都有自己独立的计数器,如果请求是0.83s到达的,0.8~1.0s对应的计数器就会加1

解决临界问题说明:

  • 假设我们1s内的限流阀值还是5个请求,0.8~1.0s内(比如0.9s的时候)来了5个请求,落在黄色格子里

  • 时间过了1.0s这个点之后,又来5个请求,落在紫色格子里。如果是固定窗口算法,是不会被限流的,但是滑动窗口的话,每过一个小周期,它会右移一个小格。过了1.0s这个点后,会右移一小格,当前的单位时间段是0.2~1.2s,这个区域的请求已经超过限定的5了,已触发限流啦,实际上,紫色格子的请求都被拒绝

缺点:突发流量无法处理(无法应对短时间内的大量请求,但是一旦到达限流后,请求都会直接暴力被拒绝。这样我们会损失一部分请求,这其实对于产品来说,并不太友好),需要合理调整时间窗口大小

代码示例:

import java.util.LinkedList;
import java.util.Queue;

public class SlidingWindowRateLimiter {
    
    // 存储请求的时间戳队列
    private Queue<Long> timestamps;
    // 窗口大小,即时间窗口内允许的请求数量
    private int windowSize;
    // 窗口持续时间,单位:毫秒
    private long windowDuration;

    public SlidingWindowRateLimiter(int windowSize, long windowDuration) {
        this.windowSize = windowSize;
        this.windowDuration = windowDuration;
        this.timestamps = new LinkedList<>();
    }

    public synchronized boolean tryAcquire() {
        // 获取当前时间戳
        long currentTime = System.currentTimeMillis();

        // 删除超过窗口持续时间的时间戳
        while (!timestamps.isEmpty() && currentTime - timestamps.peek() > windowDuration) {
            timestamps.poll();
        }

        // 判断当前窗口内请求数是否小于窗口大小
        if (timestamps.size() < windowSize) {
            // 将当前时间戳加入队列
            timestamps.offer(currentTime);
            // 获取请求成功
            return true;
        }

        // 超过窗口大小,无法获取请求
        return false;
    }
}

令牌桶算法

基于(入口)流速来做流控

适用场景:一般用于保护自身的系统,对调用者进行限流,保护自身的系统不被突发的流量打垮。如果自身的系统实际的处理能力强于配置的流量限制时,可以允许一定程度的流量突发,使得实际的处理速率高于配置的速率,充分利用系统资源

  1. 以固定的速率生成令牌,存入令牌桶中,如果令牌桶满了以后,不再生成令牌
  2. 请求进入后,必须先尝试从桶中获取令牌,获取到令牌后才可以被处理
  3. 如果令牌桶中没有令牌,则请求等待或丢弃

缺点

  • 对短时请求难以处理:在短时间内有大量请求到来时,可能会导致令牌桶中的令牌被快速消耗完,从而限流。这种情况下,可以考虑使用漏桶算法

还有个缺点:

  1. 假如限流阈值是1000个请求
  2. 假设捅中只能放1000个令牌,前一秒内没有请求,但是令牌已经生成了,放入了捅中
  3. 之后下一秒来了2000个请求,可捅中前一秒生成了1000令牌,所以可以有1000个请求拿到令牌,从而放行,捅中没令牌了
  4. 然后当前这一秒就要生成令牌,这样另外1000个请求也可以拿到令牌
  5. 最后2000个请求都放行了,服务又干爬了

代码示例:

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TokenBucketRateLimiter {
    
    // 令牌桶容量,即最大允许的请求数量
    private long capacity;
    // 令牌产生速率,即每秒产生的令牌数量
    private long rate;
    // 当前令牌数量
    private long tokens;
    // 调度器
    private ScheduledExecutorService scheduler;

    public TokenBucketRateLimiter(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.tokens = capacity;
        this.scheduler = new ScheduledThreadPoolExecutor(1);
        // 启动令牌补充任务
        scheduleRefill();
    }

    private void scheduleRefill() {
        scheduler.scheduleAtFixedRate(() -> {
            synchronized (this) {
                // 补充令牌,但不超过容量
                tokens = Math.min(capacity, tokens + rate);
            }
        }, 1, 1, TimeUnit.SECONDS); // 每秒产生一次令牌
    }

    public synchronized boolean tryAcquire() {
        
        // 判断令牌数量是否大于0
        if (tokens > 0) {
            // 消耗一个令牌
            tokens--;
            
            // 获取请求成功
            return true;
        }
        
        // 令牌不足,无法获取请求
        return false;
    }
}

工具

  1. 单机:RateLimiter。快速入手

基于令牌桶算法实现的一个多线程限流器,它可以将请求均匀的进行处理,当然他并不是一个分布式限流器,只是对单机进行限流。它可以应用在定时拉取接口数。通过aop、filter、Interceptor 等都可以达到限流效果

示例:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version> <!-- 使用最新的稳定版本 -->
</dependency>
import com.google.common.util.concurrent.RateLimiter;

public class RateLimiterDemo {
    public static void main(String[] args) {
        // 创建一个每秒允许2个请求的RateLimiter
        RateLimiter rateLimiter = RateLimiter.create(2.0);

        while (true) {
            // 请求RateLimiter一个令牌
            rateLimiter.acquire();
            // 执行操作
            doSomeLimitedOperation();
        }
    }

    private static void doSomeLimitedOperation() {
        // 模拟一些操作
        System.out.println("Operation executed at: " + System.currentTimeMillis());
    }
}
  1. 单机或分布式:Sentinel。即本章节内容

漏桶算法

基于(出口)流速来做流控。是对令牌桶算法做了改进:可以理解成请求在桶内排队等待。在网络通信中常用于流量整形,可以很好地解决平滑度问题

适用场景:一般用于保护第三方的系统,比如自身的系统需要调用第三方的接口,为了保护第三方的系统不被自身的调用打垮,便可以通过漏桶算法进行限流,保证自身的流量平稳的打到第三方的接口上

  1. 将每个请求视作”水滴”放入”漏桶”进行存储
  2. “漏桶”以固定速率向外”漏”出请求来执行,如果”漏桶”空了则停止”漏水”
  3. 如果”漏桶”满了则多余的”水滴”会被直接丢弃

缺点:

  • 需要对请求进行缓存,会增加服务器的内存消耗。
  • 对于流量波动比较大的场景,需要较为灵活的参数配置才能达到较好的效果。
  • 但是面对突发流量的时候,漏桶算法还是循规蹈矩地处理请求,这不是我们想看到的啦。流量变突发时,我们肯定希望系统尽量快点处理请求,提升用户体验嘛

代码示例:

public class LeakyBucketRateLimiter {
    
    // 漏桶容量,即最大允许的请求数量
    private long capacity;
    // 漏水速率,即每秒允许通过的请求数量
    private long rate;
    // 漏桶当前水量
    private long water;
    // 上一次请求通过的时间戳
    private long lastTime;

    public LeakyBucketRateLimiter(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.water = 0;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();
        long elapsedTime = now - lastTime;

        // 计算漏桶中的水量
        water = Math.max(0, water - elapsedTime * rate / 1000);

        // 判断漏桶中的水量是否小于容量
        if (water < capacity) {
            // 漏桶中的水量加1
            water++;
            // 更新上一次请求通过的时间戳
            lastTime = now;
            // 获取请求成功
            return true;
        }

        // 漏桶已满,无法获取请求
        return false;
    }
}

限流算法对比

因为计数器算法一般都会采用滑动窗口计数器,所以这里只对比三种算法

对比项滑动时间窗口令牌桶漏桶
能否保证流量曲线平滑不能,但窗口内区间越小,流量控制越平滑基本能,在请求量持续高于令牌生成速度时,流量平滑。但请求量在令牌生成速率上下波动时,无法保证曲线平滑能,所有请求进入桶内,以恒定速率放行,绝对平滑
能否应对突增流量不能,徒增流量,只要高出限流阈值都会被拒绝。能,桶内积累的令牌可以应对突增流量能,请求可以暂存在桶内
流量控制精确度低,窗口区间越小,精度越高

簇点链路

簇点链路: 就是项目内的调用链路,链路中被监控的每个接口就是一个“资源”

当请求进入微服务时,首先会访问DispatcherServlet,然后进入Controller、Service、Mapper,这样的一个调用链就叫做簇点链路。簇点链路中被监控的每一个接口就是一个资源

默认情况下sentinel会监控SpringMVC的每一个端点(Endpoint,也就是controller中的方法),因此SpringMVC的每一个端点就是调用链路中的一个资源

例如下图中的端点:/order/{orderId}

流控、熔断等都是针对簇点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则:

  1. 流控:流量控制
  2. 降级:降级熔断
  3. 热点:热点参数限流
  4. 授权:请求的权限控制

入门流控

  1. 点击下图按钮

  2. 设置基本流控信息

    上图的含义:限制 /order/{orderId} 这个资源的单机QPS为1,即:每秒只允许1次请求,超出的请求会被拦截并报错

流控模式的分类

在添加限流规则时,点击高级选项,可以选择三种流控模式

  1. 直接模式:一句话来说就是“对当前资源限流”。统计当前资源的请求,当其触发阈值时,对当前资源直接限流。上面这张图就是此种模式。这也是默认的模式。采用的算法就是滑动窗口算法

  2. 关联模式:一句话来说就是“高优先级触发阈值,对低优先级限流”。统计与当前资源A“相关”的另一个资源B,A资源触发阈值时,对B资源限流

    如:在一个Controller中,一个高流量的方法和一个低流量的方法都调用了这个Controller中的另一个方法,为了预防雪崩问题,就对低流量的方法进行限流设置

    适用场景:两个有竞争关系的资源,一个优先级高,一个优先级低,优先级高的触发阈值时,就对优先级低的进行限流

  3. 链路模式:一句话来说就是“对请求来源做限流”。统计从“指定链路”访问到本资源的请求,触发阈值时,对指定链路限流

    如:两个不同链路的请求,如需要读库和写库,这两个请求都调用了同一个服务/资源/接口,所以为了需求考虑,可以设置读库达到了阈值就进行限流

示例:

  1. 关联模式: 对谁进行限流,就点击谁的流控按钮进行设置

    上图含义:当 /order/update 请求单机达到 每秒1000 请求量的阈值时,就会对 /order/query 进行限流,从而避免影响 /order/update 资源

  2. 链路模式: 请求链路访问的是哪个资源,就点击哪个资源的流控按钮进行配置

    上图含义:只有来自 /user/queryGoods 链路的请求来访问 /order/queryGoods 资源时,每秒请求量达到1000,就会对 /user/queryGoods 进行限流

链路模式的注意事项:

  1. 默认情况下,Service中的方法是不被Sentinel监控的,想要Service中的方法也被Sentinel监控的话,则需要我们自己通过 @SentinelResource(“起个名字 或 像controllerz中请求路径写法”) 注解来标记要监控的方法

  2. 链路模式中,是对不同来源的两个链路做监控。但是sentinel默认会给进入SpringMVC的所有请求设置同一个root资源,进行了context整合,所以会导致链路模式失效。因此需要关闭一个context整合设置:

    spring:
      cloud:
        sentinel:
          web-context-unify: false # 关闭context整合
    

    同一个root资源指的是:

流控效果及其分类

流控效果:指请求达到流控阈值时应该采取的措施

分类

  1. 快速失败:达到阈值后,新的请求会被立即拒绝并抛出 FlowException异常。是默认的处理方式
  2. warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
  3. 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长

warn up 预热模式

warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值

阈值一般是一个微服务能承担的最大QPS,但是一个服务刚刚启动时,一切资源尚未初始化(冷启动),如果直接将QPS跑到最大值,可能导致服务瞬间宕机

warm up也叫预热模式,是应对服务冷启动的一种方案

请求阈值初始值 = maxThreshold / coldFactor
  • maxThreshold 就是设置的QPS数量。持续指定时长后,逐渐提高到maxThreshold值。
  • coldFactor 预热因子,默认值是3

排队等待

排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长

采用的算法:基于漏桶算法

当请求超过QPS阈值时,快速失败和warm up 会拒绝新的请求并抛出异常

而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝

QPS = 5,那么 1/5(个/ms) = 200(个/ms),意味着每200ms处理1个队列中的请求;timeout = 2000,意味着预期等待时长超过2000ms的请求会被拒绝并抛出异常

那什么叫做预期等待时长呢?

如果使用队列模式做流控,所有进入的请求都要排队,以固定的200ms的间隔执行,QPS会变的很平滑

平滑的QPS曲线,对于服务器来说是更友好的

热点参数限流

之前的限流是统计访问某个资源的所有请求,判断是否超过QPS阈值

热点参数限流是分别统计参数值相同的请求,判断是否超过QPS阈值

采用的算法: 令牌桶算法

注意事项:热点参数限流对默认的SpringMVC资源无效,需要利用@SentinelResource注解标记资源,例如:

但是配置时不要通过上面按钮点击配置,会有BUG,而是通过下图中的方式:

所谓的参数值指的是

id参数值会有变化,热点参数限流会根据参数值分别统计QPS

当id=1的请求触发阈值被限流时,id值不为1的请求不受影响

全局参数限流

就是基础设置,没有加入高级设置的情况

上图含义:对于来访问hot资源的请求,每1秒相同参数值的请求数不能超过10000

热点参数限流

刚才的配置中,对查询商品这个接口的所有商品一视同仁,QPS都限定为10000

而在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的QPS限制与其它商品不一样,高一些。那就需要配置热点参数限流的高级选项了

上图含义:对于来访问hot资源的请求,id=110时的QPS阈值为30000,id=4132443时的QPS阈值为50000,id为其他的则QPS阈值为10000

Seatinel限流与Gateway限流的差异

Gateway则采用了基于Redis实现的令牌桶算法。而Sentinel内部所有算法都有::

  1. 默认限流模式是基于滑动时间窗口算法
  2. 排队等待的限流模式则基于漏桶算法
  3. 而热点参数限流则是基于令牌桶算法

Sentinel整合Feign

Sentinel是做服务保护的,而在微服务中调来调去是常有的事,要远程调用就离不开Feign

  1. 修改配置,开启sentinel功能: 在服务“消费方”的feign配置添加如下配置内容
feign:
  sentinel:
    enabled: true # 开启feign对sentinel的支持
  1. feign-client中编写失败降级逻辑: 后面的流程就是前面玩Fengn时失败降级的流程
package com.zixieqing.feign.fallback;

import com.zixieqing.feign.clients.UserClient;
import com.zixieqing.feign.pojo.User;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;

/**
 * userClient失败时的降级处理
 *
 * <p>@author       : ZiXieqing</p>
 */

@Slf4j
public class UserClientFallBackFactory implements FallbackFactory<UserClient> {
    @Override
    public UserClient create(Throwable throwable) {
        return new UserClient() {
            /**
             * 重写userClient中的方法,编写失败时的降级逻辑
             */
            @Override
            public User findById(Long id) {
                log.info("userClient的findById()在进行 id = {} 时失败", id);
                return new User();
            }
        };
    }
}
  1. 将失败降级逻辑的类丢给Spring容器
@Bean
public UserClientFallBackFactory userClientFallBackFactory() {
    return new UserClientFallBackFactory();
}
  1. 在相关feign-client定义处使用fallbackFactory回调函数即可
package com.zixieqing.feign.clients;


import com.zixieqing.feign.fallback.UserClientFallBackFactory;
import com.zixieqing.feign.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(value = "userservice",fallbackFactory = UserClientFallBackFactory.class)
public interface UserClient {

    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}
  1. 调用,失败时就会进入自定义的失败逻辑中
package com.zixieqing.order.service;

import com.zixieqing.feign.clients.UserClient;
import com.zixieqing.feign.pojo.User;
import com.zixieqing.order.mapper.OrderMapper;
import com.zixieqing.order.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private UserClient userClient;

    public Order queryOrderById(Long orderId) {
        // 1.查询订单
        Order order = orderMapper.findById(orderId);
        // 2.用Feign远程调用
        User user = userClient.findById(order.getId());
        // 3.封装user到Order
        order.setUser(user);
        // 4.返回
        return order;
    }
}

隔离与降级

线程隔离

线程隔离有两种方式实现:

  1. 线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果

    优点:

    • 支持主动超时:也就是调用进行逻辑处理时超过了规定时间,直接噶了,不再让其继续处理
    • 支持异步调用:线程池隔离了嘛,彼此不干扰,因此可以异步了

    缺点:造成资源浪费。明明被调用的服务都出问题了,还占用固定的线程池数量

    适用场景:低扇出。MQ中扇出交换机的那个扇出,也就是较少的请求量,扇出/广播到很多服务上

  2. 信号量隔离(Sentinel默认采用):不创建线程池,而是计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求

    优点:轻量级、无额外开销

    缺点:不支持主动超时、不支持异步调用

    适用场景:高频调用、高扇出

配置Sentinel的线程隔离-信号量隔离

在添加限流规则时,可以选择两种阈值类型:

熔断降级

熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求

断路器控制熔断和放行是通过状态机来完成的:

断路器熔断策略有三种:慢调用、异常比例、异常数

状态机包括三个状态:

  • Closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到open状态
  • Open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open状态默认5秒后会进入half-open状态
  • Half-Open:半开状态,放行一次请求,根据执行结果来判断接下来的操作。
    • 请求成功:则切换到closed状态
    • 请求失败:则切换到open状态

断路器熔断策略:慢调用

慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求

在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断

上图含义:

  1. 响应时间为500ms的即为慢调用
  2. 如果1000ms内有100次请求,且慢调用比例不低于0.05(即:100*0.05=5个慢调用),则触发熔断(暂停该服务)
  3. 熔断时间达到1s进入half-open状态,然后放行一次请求测试
    1. 成功则进入Closed状态关闭断路器
    2. 失败则进入Open状态打开断路器,继续像前面一样开始统计RT=500ms,1s内有100次请求……………..

断路器熔断策略:异常比例 与 异常数

  1. 异常比例

上图含义:在1s内,若是请求数量不低于100个,且异常比例不低于0.08(即:100*0.08=8个有异常),则触发熔断,熔断时长达到1s就进入half-open状态

  1. 异常数:直接敲定有多少个异常数量就触发熔断

授权规则

授权规则可以对请求方来源做判断和控制

授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式:

  1. 白名单:来源(origin)在白名单内的调用者允许访问
  2. 黑名单:来源(origin)在黑名单内的调用者不允许访问

  • 资源名:就是受保护的资源,例如 /order/

  • 流控应用:是来源者的名单

    • 如果是勾选白名单,则名单中的来源被许可访问
    • 如果是勾选黑名单,则名单中的来源被禁止访问

我们允许请求从gateway到order-service,不允许浏览器访问order-service,那么白名单中就要填写网关的来源名称(origin)

但是上图中怎么区分请求是从网关来的还是浏览器来的?在微服务中的想法是所有请求只能走网关,然后由网关路由到具体的服务,直接访问服务应该阻止才对,像下面直接跳过网关去访问服务,应该不行才对

要做到就需要使用授权规则了:

  1. 网关授权拦截:针对于别人不知道内部服务接口的情况可以拦截成功
  2. 服务授权控制/流控应用控制:针对“内鬼“ 或者 别人知道了内部服务接口,我们限定只能从哪里来的请求才能访问该服务,否则直接拒绝

流控应用怎么控制的?

下图中的名字怎么定义?

需要实现 RequestOriginParser 这个接口的 parseOrigin() 来获取请求的来源从而做到

public interface RequestOriginParser {
    /**
     * 从请求request对象中获取origin,获取方式自定义
     */
    String parseOrigin(HttpServletRequest request);
}

示例:

  1. 在需要进行保护的服务中编写请求来源解析逻辑
package com.zixieqing.order.intercepter;

import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.RequestOriginParser;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.servlet.http.HttpServletRequest;

/**
 * 拦截请求,允许从什么地方来的请求才能访问此微服务
 *
 * <p>@author       : ZiXieqing</p>
 */

@Component
public class RequestInterceptor implements RequestOriginParser {
    @Override
    public String parseOrigin(HttpServletRequest request) {
        // 获取请求中的请求头 可自定义
        String origin = request.getHeader("origin");
        if (StringUtils.isEmpty(origin))
            origin = "black";

        return origin;
    }
}
  1. 在网关中根据2中 parseOrigin() 的逻辑添加相应的东西

  1. 添加流控规则:不要在簇点链路中选择相应服务来配置授权,会有BUG

经过上面的操作之后,要进入服务就只能通过网关路由过来了,不是从网关过来的就无法访问服务

自定义异常

默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方。异常结果都是flow limmiting(限流)。这样不够友好,无法得知是限流还是降级还是授权拦截

而如果要自定义异常时的返回结果,需要实现 BlockExceptionHandler 接口:

public interface BlockExceptionHandler {
    /**
     * 处理请求被限流、降级、授权拦截时抛出的异常:BlockException
     *
     * @param e 被sentinel拦截时抛出的异常
     */
    void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception;
}

这里的BlockException包含多个不同的子类:

异常说明
FlowException限流异常
ParamFlowException热点参数限流的异常
DegradeException降级异常
AuthorityException授权规则异常
SystemBlockException系统规则异常

示例:

  1. 在需要的服务中实现 BlockExceptionHandler 接口
package com.zixieqing.order.exception;

import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * 自定义sentinel的各种异常处理
 *
 * <p>@author       : ZiXieqing</p>
 */

@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
        String msg = "未知异常";
        int status = 429;

        if (e instanceof FlowException) {
            msg = "请求被限流了";
        } else if (e instanceof ParamFlowException) {
            msg = "请求被热点参数限流";
        } else if (e instanceof DegradeException) {
            msg = "请求被降级了";
        } else if (e instanceof AuthorityException) {
            msg = "没有权限访问";
            status = 401;
        }

        response.setContentType("application/json;charset=utf-8");
        response.setStatus(status);
        response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
    }
}
  1. 重启服务,不同异常就会出现不同结果了

规则持久化

在默认情况下,sentinel的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失

规则是否能持久化,取决于规则管理模式,sentinel支持三种规则管理模式:

  1. 原始模式:Sentinel的默认模式,将规则保存在内存,重启服务会丢失
  2. pull模式
  3. push模式

pull模式

pull模式:控制台将配置的规则推送到Sentinel客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则

缺点:服务之间的规则更新不及时。因为是定时去读取,在时间还未到时,可能规则发生了变化

push模式

push模式:控制台将配置规则推送到远程配置中心(如Nacos)。Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新

使用push模式实现规则持久化

在想要进行规则持久化的服务中引入如下依赖:

<!--sentinel规则持久化到Nacos的依赖-->
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

配置此服务的YAML文件,内容如下:

spring:
  cloud:
    sentinel:
      datasource:
        flow: # 流控规则持久化
          nacos:
            server-addr: localhost:8848 # nacos地址
            dataId: orderservice-flow-rules
            groupId: SENTINEL_GROUP
            rule-type: flow # 还可以是:degrade 降级、authority 授权、param-flow 热点参数限流
#        degrade:  # 降级规则持久化
#          nacos:
#            server-addr: localhost:8848 # nacos地址
#            dataId: orderservice-degrade-rules
#            groupId: SENTINEL_GROUP
#            rule-type: degrade
#        authority:  # 授权规则持久化
#          nacos:
#            server-addr: localhost:8848 # nacos地址
#            dataId: orderservice-authority-rules
#            groupId: SENTINEL_GROUP
#            rule-type: authority
#        param-flow: # 热电参数限流持久化
#          nacos:
#            server-addr: localhost:8848 # nacos地址
#            dataId: orderservice-param-flow-rules
#            groupId: SENTINEL_GROUP
#            rule-type: param-flow

修改sentinel的源代码

因为阿里的sentinel默认采用的是将规则内容存到内存中的,因此需要改源码

  1. 使用git克隆sentinel的源码,之后IDEA等工具打开
git clone https://github.com/alibaba/Sentinel.git
  1. 修改nacos依赖。在sentinel-dashboard模块的pom文件中,nacos的依赖默认的scope是test,那它只能在测试时使用,所以要去除 scope 标签
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
  1. 添加nacos支持。在sentinel-dashboard的test包下,已经编写了对nacos的支持,我们需要将其拷贝到src/main/java/com/alibaba/csp/sentinel/dashboard/rule 下

  1. 修改nacos地址,让其读取application.properties中的配置

  1. 在sentinel-dashboard的application.properties中添加nacos地址配置
nacos.addr=127.0.0.1:8848	# ip和port改为自己想要的即可
  1. 配置nacos数据源

  1. 修改前端

  1. 重现编译打包Sentinel-Dashboard模块

  1. 重现启动sentinel即可
java -jar -Dnacos.addr=127.0.0.1:8848 sentinel-dashboard.jar

补充:Sentinel基础知识

Sentinel实现限流、隔离、降级、熔断等功能,本质要做的就是两件事情:

  • 统计数据:统计某个资源的访问数据(QPS、RT等信息)
  • 规则判断:判断限流规则、隔离规则、降级规则、熔断规则是否满足

这里的资源就是希望被Sentinel保护的业务,例如项目中定义的controller方法就是默认被Sentinel保护的资源

ProcessorSlotChain

实现上述功能的核心骨架是一个叫做ProcessorSlotChain的类。这个类基于责任链模式来设计,将不同的功能(限流、降级、系统保护)封装为一个个的Slot,请求进入后逐个执行即可

下图在官网有:https://github.com/alibaba/Sentinel/wiki

责任链中的Slot也分为两大类:

  • 统计数据构建部分(statistic)
    • NodeSelectorSlot:负责构建簇点链路中的节点(DefaultNode),将这些节点形成链路树
    • ClusterBuilderSlot:负责构建某个资源的ClusterNode,ClusterNode可以保存资源的运行信息(响应时间、QPS、block 数目、线程数、异常数等)以及来源信息(origin名称)
    • StatisticSlot:负责统计实时调用数据,包括运行信息、来源信息等
  • 规则判断部分(rule checking)
    • AuthoritySlot:负责授权规则(来源控制)
    • SystemSlot:负责系统保护规则
    • ParamFlowSlot:负责热点参数限流规则
    • FlowSlot:负责限流规则
    • DegradeSlot:负责降级规则

Node

Sentinel中的簇点链路是由一个个的Node组成的,Node是一个接口,包括下面的实现:

所有的节点都可以记录对资源的访问统计数据,所以都是StatisticNode的子类

按照作用分为两类Node:

  • DefaultNode:代表链路树中的每一个资源,一个资源出现在不同链路中时,会创建不同的DefaultNode节点。而树的入口节点叫EntranceNode,是一种特殊的DefaultNode
  • ClusterNode:代表资源,一个资源不管出现在多少链路中,只会有一个ClusterNode。记录的是当前资源被访问的所有统计数据之和。

DefaultNode记录的是资源在当前链路中的访问数据,用来实现基于链路模式的限流规则。ClusterNode记录的是资源在所有链路中的访问数据,实现默认模式、关联模式的限流规则。

例如:我们在一个SpringMVC项目中,有两个业务:

  • 业务1:controller中的资源/order/query访问了service中的资源/goods
  • 业务2:controller中的资源/order/save访问了service中的资源/goods

创建的链路图如下:

Entry

默认情况下,Sentinel会将controller中的方法作为被保护资源,那么问题来了,我们该如何将自己的一段代码标记为一个Sentinel的资源呢?前面是用了 @SentinelResoutce 注解来实现的,那么这个注解的原理是什么?要搞清这玩意儿,那就得先来了解Entry这个吊毛玩意儿了

Sentinel中的资源用Entry来表示。声明Entry的API示例:

// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
try (Entry entry = SphU.entry("resourceName")) {
  // 被保护的业务逻辑
  // do something here...
} catch (BlockException ex) {
  // 资源访问阻止,被限流或被降级
  // 在此处进行相应的处理操作
}

原生方式自定义资源

  1. 在需要自定义资源的服务中引入依赖
<!--sentinel-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
  1. 配置Sentinel
spring:
  cloud:
    sentinel:
      transport:
        dashboard: localhost:8089
  1. 对自定义资源的地方进行逻辑编写
public Order queryOrderById(Long orderId) {
    // 创建Entry,标记资源,资源名为resource1
    try (Entry entry = SphU.entry("resource1")) {
        // 1.查询订单,这里是假数据
        Order order = Order.build(101L, 4999L, "小米 MIX4", 1, 1L, null);
        // 2.查询用户,基于Feign的远程调用
        User user = userClient.findById(order.getUserId());
        // 3.设置
        order.setUser(user);
        // 4.返回
        return order;
    }catch (BlockException e){
        log.error("被限流或降级", e);
        return null;
    }
}

打开sentinel控制台,查看簇点链路:

@SentinelResoutce 注解标记资源

通过给方法添加@SentinelResource注解的形式来标记资源:

这是怎么实现的?

Sentinel依赖中有自动装配相关的东西,spring.factories声明需要就是自动装配的配置类,内容如下:

我们来看下SentinelAutoConfiguration这个类:

可以看到,在这里声明了一个Bean,SentinelResourceAspect

/**
 * Aspect for methods with {@link SentinelResource} annotation.
 *
 * @author Eric Zhao
 */
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
	// 切点是添加了 @SentinelResource 注解的类
    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }
	
    // 环绕增强
    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        // 获取受保护的方法
        Method originMethod = resolveMethod(pjp);
		// 获取 @SentinelResource 注解
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        // 获取注解上的资源名称
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            // 创建资源 Entry
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 执行受保护的方法
            Object result = pjp.proceed();
            return result;
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

简单来说,@SentinelResource注解就是一个标记,而Sentinel基于AOP思想,对被标记的方法做环绕增强,完成资源(Entry)的创建。

Context

上一节,我们发现簇点链路中除了controller方法、service方法两个资源外,还多了一个默认的入口节点:

sentinel_spring_web_context,是一个EntranceNode类型的节点

这个节点是在初始化Context的时候由Sentinel帮我们创建的

什么是Context?

  1. Context 代表调用链路上下文,贯穿一次调用链路中的所有资源( Entry),基于ThreadLocal
  2. Context 维持着入口节点(entranceNode)、本次调用链路的 curNode(当前资源节点)、调用来源(origin)等信息
  3. 后续的Slot都可以通过Context拿到DefaultNode或者ClusterNode,从而获取统计数据,完成规则判断
  4. Context初始化的过程中,会创建EntranceNode,contextName就是EntranceNode的名称

对应的API如下:

// 创建context,包含两个参数:context名称、 来源名称
ContextUtil.enter("contextName", "originName");

Context的初始化

Context又是在何时完成初始化的?

进入SentinelWebAutoConfiguration这个类:可以直接搜,也可以去Sentinel依赖的Spring.factories中找

WebMvcConfigurer是SpringMVC自定义配置用到的类,可以配置HandlerInterceptor

SentinelWebInterceptor的声明如下:

发现继承了AbstractSentinelInterceptor这个类。

AbstractSentinelInterceptor

HandlerInterceptor拦截器会拦截一切进入controller的方法,执行preHandle前置拦截方法,而Context的初始化就是在这里完成的。

我们来看看这个类的preHandle实现:

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
    throws Exception {
    try {
        // 获取资源名称,一般是controller方法的 @RequestMapping 路径,例如/order/{orderId}
        String resourceName = getResourceName(request);
        if (StringUtil.isEmpty(resourceName)) {
            return true;
        }
        // 从request中获取请求来源,将来做 授权规则 判断时会用
        String origin = parseOrigin(request);
        
        // 获取 contextName,默认是sentinel_spring_web_context
        String contextName = getContextName(request);
        // 创建 Context
        ContextUtil.enter(contextName, origin);
        // 创建资源,名称就是当前请求的controller方法的映射路径
        Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
        request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
        return true;
    } catch (BlockException e) {
        try {
            handleBlockException(request, response, e);
        } finally {
            ContextUtil.exit();
        }
        return false;
    }
}

ContextUtil

创建Context的方法就是 ContextUtil.enter(contextName, origin);

进入该方法:

public static Context enter(String name, String origin) {
    if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
        throw new ContextNameDefineException(
            "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
    }
    return trueEnter(name, origin);
}

进入trueEnter方法:

protected static Context trueEnter(String name, String origin) {
    // 尝试获取context
    Context context = contextHolder.get();
    // 判空
    if (context == null) {
        // 如果为空,开始初始化
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        // 尝试获取入口节点
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
            LOCK.lock();
            try {
                node = contextNameNodeMap.get(name);
                if (node == null) {
                    // 入口节点为空,初始化入口节点 EntranceNode
                    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                    // 添加入口节点到 ROOT
                    Constants.ROOT.addChild(node);
                    // 将入口节点放入缓存
                    Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                    newMap.putAll(contextNameNodeMap);
                    newMap.put(name, node);
                    contextNameNodeMap = newMap;
                }
            } finally {
                LOCK.unlock();
            }
        }
        // 创建Context,参数为:入口节点 和 contextName
        context = new Context(node, name);
        // 设置请求来源 origin
        context.setOrigin(origin);
        // 放入ThreadLocal
        contextHolder.set(context);
    }
    // 返回
    return context;
}

综合流程

Seata 分布式事务

Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。

官网地址:http://seata.io/

CAP定理和Base理论

这两个在前面弄Nacos的时候已经说过了

CAP定理 这是分布式事务中的一个方法论

  1. C 即:Consistency 数据一致性。指的是:用户访问分布式系统中的任意节点,得到的数据必须一致
  2. A 即:Availability 可用性。指的是:用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
  3. P 即:Partition Tolerance 分区容错性。指的是:由于某种原因导致系统中任意信息的丢失或失败都不能影响系统的继续独立运作

注: 分区容错性是必须满足的,数据一致性( C )和 可用性( A )只满足其一即可,一般的搭配是如下的(即:取舍策略):

  1. CP 保证数据的准确性
  2. AP 保证数据的及时性

既然CAP定理都整了,那就再加一个Base理论吧,这个理论是对CAP中C和A这两个矛盾点的调和和选择

  1. BA 即:Basically Available 基本可用性。指的是:在发生故障的时候,可以允许损失“非核心部分”的可用性,保证系统正常运行,即保证核心部分可用
  2. S 即:Soft State 软状态。指的是:允许系统的数据存在中间状态,只要不影响整个系统的运行就行
  3. E 即:Eventual Consistency 最终一致性。指的是:无论以何种方式写入数据库 / 显示出来,都要保证系统最终的数据是一致的

分布式事务最大问题就是各个子事务的数据一致性问题,由CAP定理和Base理论进行综合之后,得出的分布式事务中的两个模式:

  1. AP模式 ——–> 最终一致性:各个分支事务各自执行和提交,允许出现短暂的结果不一致,采用弥补措施将数据进行同步,从而恢复数据,达到最终数据一致
  2. CP模式 ——–> 强一致性:各个分支事务执行后互相等待,同时提交或回滚,达成数据的强一致性

Seata 的架构

Seata事务管理中有三个重要的角色:

  1. TC (Transaction Coordinator) – 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚
  2. TM (Transaction Manager) – 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务
  3. RM (Resource Manager) – 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

Seata基于上述架构提供了四种不同的分布式事务解决方案:

  1. XA模式:强一致性分阶段事务模式,牺牲了一定的可用性。无业务侵入
  2. AT模式:最终一致的分阶段事务模式,也是Seata的默认模式。无业务侵入
  3. TCC模式:最终一致的分阶段事务模式。有业务侵入
  4. SAGA模式:长事务模式。有业务侵入

无论哪种方案,都离不开TC,也就是事务的协调者

部署TC服务

  1. 下载Seata-Server 并 解压。链接:https://github.com/seata/seata/releases 或 http://seata.io/zh-cn/blog/download.html
  2. 修改 conf/registry.conf 文件
registry {
  # TC服务的注册中心	file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  # 配置Nacos注册中心信息
  nacos {
    application = "seata-tc-server"
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    cluster = "HZ"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # 配置中心:读取TC服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
  #  file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "DEFAULT_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}
  1. 在Nacos的控制台配置管理中配置2中的 seataServer.properties,内容如下:
# 数据存储方式,db代表数据库
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=zixieqing072413
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
# 事务、日志等配置
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none
# 关闭metrics功能,提高性能
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
  1. 创建数据库表:tc服务在管理分布式事务时,需要记录事务相关数据到数据库中(3中配置了的)
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- 分支事务表
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table`  (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `status` tinyint(4) NULL DEFAULT NULL,
  `client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime(6) NULL DEFAULT NULL,
  `gmt_modified` datetime(6) NULL DEFAULT NULL,
  PRIMARY KEY (`branch_id`) USING BTREE,
  INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- 全局事务表
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table`  (
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `timeout` int(11) NULL DEFAULT NULL,
  `begin_time` bigint(20) NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime NULL DEFAULT NULL,
  `gmt_modified` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`xid`) USING BTREE,
  INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE,
  INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;
  1. 启动seat-server

  1. 验证是否成功

Spring Cloud集成Seata

  1. 依赖
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <!--版本较低,1.3.0,因此排除-->
        <exclusion>
            <artifactId>seata-spring-boot-starter</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>${seata.version}</version>
</dependency>
  1. 给需要注册到TC的微服务的YAML文件配置如下内容:
seata:
  registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址	参考tc服务自己的registry.conf中的配置
    type: nacos
    nacos: # tc
      server-addr: 127.0.0.1:8848
      namespace: ""
      group: DEFAULT_GROUP
      application: seata-tc-server # tc服务在nacos中的服务名称
  tx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称
  service:
    vgroup-mapping: # 事务组与TC服务cluster的映射关系
      seata-demo: HZ

经过如上操作就集成成功了

分布式事务之XA模式

XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持。实现的原理都是基于两阶段提交

  1. 正常情况:

  1. 异常情况:

一阶段:

  1. 事务协调者通知每个事务参与者执行本地事务
  2. 本地事务执行完成后报告事务执行状态给事务协调者,此时事务不提交,继续持有数据库锁

二阶段:事务协调者基于一阶段的报告来判断下一步操作

  1. 如果一阶段都成功,则通知所有事务参与者,提交事务
  2. 如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务

Seata之XA模式 – 强一致性

应用场景: 并发量不大,但数据很重要的项目

Seata对原始的XA模式做了简单的封装和改造,以适应自己的事务模型

RM一阶段的工作:

  1. 注册分支事务到TC
  2. 执行分支业务sql但不提交
  3. 报告执行状态到TC

TC二阶段的工作:TC检测各分支事务执行状态

  1. 如果都成功,通知所有RM提交事务
  2. 如果有失败,通知所有RM回滚事务

RM二阶段的工作:

  • 接收TC指令,提交或回滚事务

XA模式的优点:

  1. 事务的强一致性,满足ACID原则
  2. 常用数据库都支持,实现简单,并且没有代码侵入

XA模式的缺点:

  1. 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
  2. 依赖关系型数据库实现事务

Java实现Seata的XA模式

  1. 修改注册到TC的微服务的YAML配置
seata:
  data-source-proxy-mode: XA	# 开启XA模式
  1. 给发起全局事务的入口方法添加 @GlobalTransactional 注解。就是要开启事务的方法,如下:

  1. 重启服务即可成功实现XA模式了

Seata之AT模式 – 最终一致性

AT模式同样是分阶段提交的事务模型,不过却弥补了XA模型中资源锁定周期过长的缺陷

应用场景: 高并发互联网应用,允许数据出现短时不一致

基本架构图:

RM阶段一的工作:

  1. 注册分支事务
  2. 记录undo-log(数据快照)
  3. 执行业务sql并提交
  4. 报告事务状态

阶段二提交时RM的工作:删除undo-log即可

阶段二回滚时RM的工作:根据undo-log恢复数据到更新前。恢复数据之后也会把undo-log中的数据删掉

流程图如下:

AT模式与XA模式的区别是什么?

  • XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源。
  • XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
  • XA模式强一致;AT模式最终一致

AT模式的脏写问题

解决思路就是引入了全局锁的概念。在释放DB锁之前,先拿到全局锁。避免同一时刻有另外一个事务来操作当前数据,从而来做到写隔离

  • 全局锁: 由TC记录当前正在执行数据的事务,该事务持有全局锁,具备执行权

但就算引入了全局锁,也还会有BUG,因为上面两个事务都是Seata管理,若事务1是Seata管理,而事务2是非Seata管理,同时这两个事务都在修改同一条数据,那么就还会造成脏写问题

为了防止这个问题,Seata在保存快照时实际上会记录2份快照,一份是修改之前的快照,一份是修改之后的快照

  1. 在恢复快照数据时,会将更新后的快照值和当前数据库的实际值进行比对(类似CAS过程)

    如果数值不匹配则说明在此期间有另外的事务修改了数据,此时直接释放全局锁,事务1记录异常,发送告警信息让人工介入

    如果一致则恢复数据,释放全局锁即可

AT模式的优点:

  1. 一阶段完成直接提交事务,释放数据库资源,性能比较好
  2. 利用全局锁实现读写隔离
  3. 没有代码侵入,框架自动完成回滚和提交

AT模式的缺点:

  1. 两阶段之间属于软状态,属于最终一致
  2. 框架的快照功能会影响性能,但比XA模式要好很多

Java实现AT模式

AT模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入

只不过,AT模式需要一个表来记录全局锁、另一张表来记录数据快照undo_log。其中:

  • lock_table表:需要放在“TC服务关联”的数据库中。例如表结构如下:
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (
  `row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `branch_id` bigint(20) NOT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime NULL DEFAULT NULL,
  `gmt_modified` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`row_key`) USING BTREE,
  INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
  • undo_log表 :需要放在“微服务关联”的数据库中。例如表结构如下:
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;

然后修改注册到TC中的微服务的YAML配置,最后重启服务,模式就变为AT模式了

seata:
  data-source-proxy-mode: AT # 默认就是AT

Seata之TCC模式 – 最终一致性

应用场景: 高并发互联网应用,允许数据出现短时不一致,可通过对账程序或补录来保证最终一致性

TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:

  1. Try:资源的检测和预留
  2. Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功
  3. Cancel:预留资源释放,可以理解为try的反向操作。

举例说明三个方法:一个扣减用户余额的业务。假设账户A原来余额是100,需要余额扣减30元

TCC模式的架构

TCC模式的每个阶段是做什么的?

  1. Try:资源检查和预留
  2. Confirm:业务执行和提交
  3. Cancel:预留资源的释放

TCC的优点是什么?

  1. 一阶段完成直接提交事务,释放数据库资源,性能好
  2. 相比AT模型,无需生成快照,无需使用全局锁,性能最强
  3. 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库(如:Redis)

TCC的缺点是什么?

  1. 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
  2. 软状态,事务是最终一致
  3. 需要考虑Confirm和Cancel的失败情况,做好幂等处理

空回滚和业务悬挂

空补偿 / 空回滚: 未执行try(原服务)就执行了cancel(补偿服务)。即当某分支事务的try阶段阻塞时,可能导致全局事务超时而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是“空回滚”

因此:执行cancel操作时,应当判断try是否已经执行,如果尚未执行,则应该空回滚

业务悬挂: 已经空回滚的业务,之前阻塞的try恢复了,然后继续执行try,之后就永不可能执行confirm或cancel,从而变成“业务悬挂”

因此:执行try操作时,应当判断cancel是否已经执行过了,如果已经执行,应当阻止空回滚后的try操作,避免悬挂

Java实现TCC模式示例

Try业务:

  • 根据xid查询account_freeze ,如果已经存在则证明Cancel已经执行,拒绝执行try业务
  • 记录冻结金额和事务状态到account_freeze表
  • 扣减account表可用金额

Confirm业务

  • 需判断此方法的幂等性问题
  • 根据xid删除account_freeze表的冻结记录

Cancel业务

  • 需判断此方法的幂等性问题
  • 根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚
  • 修改account_freeze表,冻结金额为0,state为2
  • 修改account表,恢复可用金额
  1. 在业务管理的库中建表:是为了实现空回滚、防止业务悬挂,以及幂等性要求。所以在数据库记录冻结金额的同时,记录当前事务id和执行状态
CREATE TABLE `account_freeze_tbl` (
  `xid` varchar(128) NOT NULL COMMENT '全局事务id',
  `user_id` varchar(255) DEFAULT NULL COMMENT '用户id',
  `freeze_money` int(11) unsigned DEFAULT '0' COMMENT '冻结金额',
  `state` int(1) DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
  PRIMARY KEY (`xid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT
  1. 业务接口定义try+confirm+cancel三个方法
package com.zixieqing.account.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import org.springframework.stereotype.Service;

/**
 * Seata之TCC模式实现业务的account接口
 *
 * "@LocalTCC"    SpringCloud + Feign,Feign的调用基于http
 *                此注解所在的接口需要实现TCC的两阶段提交对应方法才行
 *
 * <p>@author       : ZiXieqing</p>
 */

@Service
@LocalTCC
public interface AccountTccService {
    /**
     * 扣款
     *
     * Try逻辑	资源检查和预留,同时需要判断Cancel是否已经执行,是则拒绝执行本次业务
     *
     * "@TwoPhaseBusinessAction" 中
     * 								name属性				 要与当前方法名一致,用于指定Try逻辑对应的方法
     * 								commitMethod属性值		就是confirm逻辑的方法
     * 								rollbackMethod属性值	就是cancel逻辑的方法
     *
     * "@BusinessActionContextParameter" 将指定的参数传递给confirm和cancel
     *
     * @param userId 用户id
     * @param money 要扣的钱
     */
    @TwoPhaseBusinessAction(
            name = "deduct",
            commitMethod = "confirm",
            rollbackMethod = "cancel"
    )
    void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
                @BusinessActionContextParameter(paramName = "money") int money);

    /**
     * 二阶段confirm确认方法	业务执行和提交		另外需考虑幂等性问题
     * 						 方法名可以另命名,但需保证与commitMethod一致
     *
     * @param context 上下文,可以传递try方法的参数
     * @return boolean 执行是否成功
     */
    boolean confirm(BusinessActionContext context);

    /**
     * 二阶段回滚方法	预留资源释放	另外需考虑幂等性问题	需要判断try是否已经执行,否就需要空回滚
     * 				 方法名须保证与rollbackMethod一致
     *
     * @param context 上下文,可以传递try方法的参数
     * @return boolean 执行是否成功
     */
    boolean cancel(BusinessActionContext context);
}
  1. 实现类逻辑编写
package com.zixieqing.account.service.impl;

import com.zixieqing.account.entity.AccountFreeze;
import com.zixieqing.account.mapper.AccountFreezeMapper;
import com.zixieqing.account.mapper.AccountMapper;
import com.zixieqing.account.service.AccountTccService;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * 扣款业务
 *
 * <p>@author       : ZiXieqing</p>
 */


public class AccountTccServiceImpl implements AccountTccService {
    @Autowired
    private AccountMapper accountMapper;
    @Autowired
    private AccountFreezeMapper accountFreezeMapper;

    /**
     * 扣款
     *
     * Try逻辑	资源检查和预留,同时需要判断Cancel是否已经执行,是则拒绝执行本次业务
     *
     * "@TwoPhaseBusinessAction" 中
     * 		name属性要				与当前方法名一致,用于指定Try逻辑对应的方法
     * 		commitMethod属性值		就是confirm逻辑的方法
     * 		rollbackMethod属性值	就是cancel逻辑的方法
     *
     * "@BusinessActionContextParameter" 将指定的参数传递给confirm和cancel
     *
     * @param userId 用户id
     * @param money  要扣的钱
     */
    @Override
    public void deduct(String userId, int money) {
        // 获取事务ID,RootContext 是seata中的
        String xid = RootContext.getXID();
        AccountFreeze accountFreeze = accountFreezeMapper.selectById(xid);
        // 业务悬挂处理:判断cancel是否已经执行,若执行过则free表中肯定有数据
        if (accountFreeze != null) {
            // CANCEL执行过,拒绝业务
            return:
        }
        // 进行扣款
        accountMapper.deduct(userId, money);
        // 记录本次状态
        AccountFreeze freeze = new AccountFreeze();
        freeze.setXid(xid)
            .setUserId(userId)
            .setFreezeMoney(money)
            .setState(AccountFreeze.State.TRY);
            
        accountFreezeMapper.insert(freeze);
    }

    /**
     * 二阶段confirm确认方法	业务执行和提交		另外需考虑幂等性问题
     * 方法名可以另命名,但需保证与commitMethod一致
     *
     * @param context 上下文,可以传递try方法的参数
     * @return boolean 执行是否成功
     */
    @Override
    public boolean confirm(BusinessActionContext context) {
        // 删掉freeze表中的记录即可  delete方法本身就具有幂等性
        return accountFreezeMapper.deleteById(context.getXid()) == 1;
    }

    /**
     * 二阶段回滚方法	预留资源释放	另外需考虑幂等性问题	需要判断try是否已经执行,否 就需要空回滚
     * 方法名须保证与rollbackMethod一致
     *
     * @param context 上下文,可以传递try方法的参数
     * @return boolean 执行是否成功
     */
    @Override
    public boolean cancel(BusinessActionContext context) {
        // 空回滚处理:判断try是否已经执行
        AccountFreeze freeze = accountFreezeMapper.selectById(context.getXid());
        // 若为null,则try肯定没执行
        if (freeze == null) {
            // 需要进行空回滚
            freeze = new AccountFreeze();
            freeze.setXid(context.getXid())
                    // getActionContext("userId") 的key就是@BusinessActionContextParameter(paramName = "userId")的pramName值
                    .setUserId(context.getActionContext("userId").toString())
                    .setFreezeMoney(0)
                    .setState(AccountFreeze.State.CANCEL);
            
            return accountFreezeMapper.insert(freeze) == 1;
        }

        // 幂等性处理
        if (freeze.getState() == AccountFreeze.State.CANCEL) {
            // 说明已经执行过一次cancel了,直接拒绝执行本次业务
            return true;
        }

        // 不为null,则回滚数据
        accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney());
        // 将冻结金额归0,并修改本次状态
        freeze.setFreezeMoney(0)
                .setState(AccountFreeze.State.CANCEL);
        
        return accountFreezeMapper.updateById(freeze) == 1;
    }
}

最后正常使用service调用使用3中的实现类即可

Seata之Saga模式 – 最终一致性

Saga 模式是 Seata 的长事务解决方案,由蚂蚁金服主要贡献

其理论基础是Hector & Kenneth 在1987年发表的论文Sagas

Seata官网对于Saga的指南:https://seata.io/zh-cn/docs/user/saga.html

适用场景:

  1. 业务流程长、业务流程多且需要保证事务最终一致性的业务系统
  2. 银行业金融机构
  3. 需要与第三方交互,如:调用支付宝支付接口->出库失败->调用支付宝退款接口

优点:

  1. 事务参与者可以基于事件驱动实现异步调用,吞吐高
  2. 一阶段直接提交事务,无锁,性能好
  3. 不用编写TCC中的三个阶段,实现简单

缺点:

  1. 软状态持续时间不确定,时效性差
  2. 由于一阶段已经提交本地数据库事务,且没有进行“预留”动作,所以不能保证隔离性,同时也没有锁,所以会有脏写

Saga模式是SEATA提供的长事务解决方案。也分为两个阶段:

  1. 一阶段:直接提交本地事务
  2. 二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚

Saga 是一种补偿协议,Saga 正向服务与补偿服务也需要业务开发者实现。在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交;如果任何一个正向操作执行失败,那么分布式事务会退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态

Seata四种模式对比

XAATTCCSAGA
一致性强一致弱一致弱一致最终一致
隔离性完全隔离基于全局锁隔离基于资源预留隔离无隔离
代码侵入有,要编写三个接口有,要编写状态机和补偿业务
性能非常好非常好
场景对一致性、隔离性有高要求的业务基于关系型数据库的大多数分布式事务场景都可以对性能要求较高的事务。有非关系型数据库要参与的事务业务流程长、业务流程多参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

对应md文档

  • 链接:Spring Cloud 整合
温馨提示:本文发布时间2023-11-26 22:24:00 更新于2023-11-26 22:24:00,某些文章具有时效性,若有错误或已失效,请在下方留言或联系站长
© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发