Feign源码分析(1)-重试机制没那么简单

/ 默认分类 / 0 条评论 / 1797浏览

Feign内部实现原理-源码分析(一)

一.引入

下面是feign源码中默认的http客户端配置信息,这些信息在前面的介绍feign中已经说过了,并且官方文档中也有很详细的基础知识的介绍,这里不再说明

  public static class Builder {

    private final List<RequestInterceptor> requestInterceptors =
        new ArrayList<RequestInterceptor>();
    private Logger.Level logLevel = Logger.Level.NONE;
    private Contract contract = new Contract.Default();
    private Client client = new Client.Default(null, null);
    private Retryer retryer = new Retryer.Default();
    private Logger logger = new NoOpLogger();
    private Encoder encoder = new Encoder.Default();
    private Decoder decoder = new Decoder.Default();
    private ErrorDecoder errorDecoder = new ErrorDecoder.Default();
    private Options options = new Options();
    private InvocationHandlerFactory invocationHandlerFactory =
        new InvocationHandlerFactory.Default();
    private boolean decode404;

二.源码分析

1. 关于RetryableException

首先需要明白,在feign中,封装了一个主要的异常类

public class FeignException extends RuntimeException {

另外还有一个比较重要的异常子类就是

 public class RetryableException extends FeignException {

  private static final long serialVersionUID = 1L;
//这个参数非常重要
  private final Long retryAfter;

  public RetryableException(String message, Throwable cause, Date retryAfter) {
    super(message, cause);
    this.retryAfter = retryAfter != null ? retryAfter.getTime() : null;
  }

  public RetryableException(String message, Date retryAfter) {
    super(message);
    this.retryAfter = retryAfter != null ? retryAfter.getTime() : null;
  }

  public Date retryAfter() {
    return retryAfter != null ? new Date(retryAfter) : null;
  }
}

可以看到,这里有一个参数retryAfter,说到这个需要了解一下在http协议中有一个响应状态叫做429 ,这个状态码表示对于当前http服务,用户在规定时间段内调用的频次超出了限制,也就是收服务端接口可能做了限流操作,并且接口遵循http协议规范,返回了429,另外在http协议中针对超出请求次数限制的响应也做了说明,可以在响应头中添加Retry-After字段来告诉客户端需要等待多久再来请求我

ps:这里需要和大家说一下,http协议是一个规范,当然我说的是废话,协议肯定是一种规范,所以这里告诉客户端的需要等待的时间可以不叫做Retry-After,完全可以按照你系统中或者你开发的http服务框架来自定义,只不过,如果你想要你的系统符合规范或者你开发封装的http客户端可以被更多的用户使用,那么最好遵循规范,这就类似于你所有的amqp中间件,所有的jdbc驱动是一样的道理

所以现在介绍了429这个响应,前面说的RetryableException这个异常中的属性retryAfter其实就是http响应告诉客户端需要等待多久再来调用(如果响应有提供这个字段的话)

ps:你可能会好奇,不是说分析源码吗?为啥一上来直接介绍两个异常,后面你就知道了,这个异常类型在feign进行http调用的过程中扮演非常重要的角色

2.ErrorDecoder-异常http响应处理器

Feign客户端还可以配置一个错误异常处理器ErrorDecorder,该处理器可以处理所有非2xx的http响应(排除404)

public interface ErrorDecoder {
	//规范就是下面的decode方法,处理响应,并返回指定异常
  public Exception decode(String methodKey, Response response);

    
    //feign默认提供了下面这个错误异常处理器
  public static class Default implements ErrorDecoder {
	//RetryAfterDecoder就是用来解析http响应所带的Retry-After的
    private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();

    @Override
    public Exception decode(String methodKey, Response response) {
        //按照响应的非2xx的http响应的响应码和信息构建一个FerignException
      FeignException exception = errorStatus(methodKey, response);
//调用apply方法解析Retry-After响应头字段,下面有源码,也很简单,返回可以调用的限制截至时间,所以这里返回的是Date,apply方法中做了封装处理,无论http响应的Retry-After是时间格式还是毫秒格式都统一处理为时间(firstOrNull方法返回的是当前http响应的Retry-After字段值,如果没有则返回null,代码我贴在后面)
      Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
        //如果响应中包含了Retry-After,因为Feign是一个http客户端封装,所以当然需要遵循http协议标准,这里判断如果响应中包含了Retry-After,那么就说明当前就是因为触发了当前调用的服务端接口的限流策略(当然如果对方没有遵循http协议,限流了但是没有返回429+Retry-After或者使用了429但是没有加Retry-After,只要没有加Retry-After就不行),那么这里接受到异常的http响应(如果使用默认的Feign的异常错误处理器),那么都不会抛出RetryableException,因为下面这个if一定为false,retryAfter==null,那么就指挥抛出FeignException
      if (retryAfter != null) {
        return new RetryableException(exception.getMessage(), exception, retryAfter);
      }
      return exception;
    }

    private <T> T firstOrNull(Map<String, Collection<T>> map, String key) {
      if (map.containsKey(key) && !map.get(key).isEmpty()) {
        return map.get(key).iterator().next();
      }
      return null;
    }
  }

  static class RetryAfterDecoder {

    static final DateFormat
        RFC822_FORMAT =
        new SimpleDateFormat("EEE, dd MMM yyyy HH🇲🇲ss 'GMT'", US);
    private final DateFormat rfc822Format;

    RetryAfterDecoder() {
      this(RFC822_FORMAT);
    }

    RetryAfterDecoder(DateFormat rfc822Format) {
      this.rfc822Format = checkNotNull(rfc822Format, "rfc822Format");
    }

    protected long currentTimeMillis() {
      return System.currentTimeMillis();
    }

    public Date apply(String retryAfter) {
      if (retryAfter == null) {
        return null;
      }
      if (retryAfter.matches("^[0-9]+$")) {
        long deltaMillis = SECONDS.toMillis(Long.parseLong(retryAfter));
        return new Date(currentTimeMillis() + deltaMillis);
      }
      synchronized (rfc822Format) {
        try {
          return rfc822Format.parse(retryAfter);
        } catch (ParseException ignored) {
          return null;
        }
      }
    }
  }
}

上面说了,只有带了Retry-After字段的非2xx的异常http响应才会被处理返回RetryableException,那么这有什么目的呢?下面来看下Feign底层的调用机制和重试是怎样实现的你就明白了

feign.SynchronousMethodHandler.invoke()

@Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
          //处理当前请求(RequestTemplate是feign中封装的http客户端发送请求的对象)
        return executeAndDecode(template);
      } catch (RetryableException e) {
          //可以看到,这里捕获了RetryableException,
          //然后执行默认的重试策略的continueOrPropagate方法,而整个在while死循环中,
//所以这里就是重试过程需要等待的逻辑策略,后面我解析了默认的Retryer
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }

    Response response;
    long start = System.nanoTime();
    try {
      response = client.execute(request, options);
      // ensure the request is set. TODO: remove in Feign 10
      response.toBuilder().request(request).build();
    } catch (IOException e) {
        //1.如果客户端发送请求过程中发生网络io异常
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
        //这里方法封装的抛出的异常是RetryableException,但是使用的构造方法public RetryableException(String message, Throwable cause, Date retryAfter)中,retryAfter是null,但是这里如果发送请求的时候就出现网络io异常,那么是可以被捕获到的
      throw errorExecuting(request, e);
    }
    //2.接下来就是处理获取到的响应
    long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);

    boolean shouldClose = true;
    try {
      if (logLevel != Logger.Level.NONE) {
        response =
            logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
        // ensure the request is set. TODO: remove in Feign 10
        response.toBuilder().request(request).build();
      }
      if (Response.class == metadata.returnType()) {
        if (response.body() == null) {
          return response;
        }
        if (response.body().length() == null ||
                response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
          shouldClose = false;
          return response;
        }
        // Ensure the response body is disconnected
        byte[] bodyData = Util.toByteArray(response.body().asInputStream());
        return response.toBuilder().body(bodyData).build();
      }
       //这里就是正常的2xxhttp响应处理,直接使用消息数据解码器
      if (response.status() >= 200 && response.status() < 300) {
        if (void.class == metadata.returnType()) {
          return null;
        } else {
            //decode方法我在后面贴出来了源码
          return decode(response);
        }
      } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
        return decode(response);
      } else {
//可以看到这里也印证了前面说的,ErrorDecoder异常错误解析器就是处理非2xx的http响应的,但是404除外,所以这里抛出的异常是错误异常处理器的deCode方法返回的,所以这就来到了前面介绍的默认的错误异常处理器,前面介绍了只有服务端响应头包含RetryAfter字段才会返回RetryableException
        throw errorDecoder.decode(metadata.configKey(), response);
      }
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
      }
		//如果是在解析响应Response的过程中捕获到了异常,下面的方法封装的是抛出FeignException
      throw errorReading(request, response, e);
    } finally {
      if (shouldClose) {
        ensureClosed(response.body());
      }
    }
  }
  Object decode(Response response) throws Throwable {
    try {
        //就是调用当前构建的Feign客户赋值的响应数据解码器(比如json解码)
      return decoder.decode(response, metadata.returnType());
    } catch (FeignException e) {
      throw e;
    } catch (RuntimeException e) {
      throw new DecodeException(e.getMessage(), e);
    }
  }
  static FeignException errorReading(Request request, Response ignored, IOException cause) {
    return new FeignException(
        format("%s reading %s %s", cause.getMessage(), request.method(), request.url()),
        cause);
  }
3. Retryer请求重试机制
  public static class Default implements Retryer {

    private final int maxAttempts;//最大重试次数
    private final long period;//重试间隔时间单元
    private final long maxPeriod;//最大重试间隔时间
    int attempt;//当前重试次数
    long sleptForMillis;//重试导致的总共的间隔时间
	//可以看到,默认就是重试次数为5
    public Default() {
      this(100, SECONDS.toMillis(1), 5);
    }
    public Default(long period, long maxPeriod, int maxAttempts) {
        this.period = period;
        this.maxPeriod = maxPeriod;
        this.maxAttempts = maxAttempts;
        this.attempt = 1;
    }
    //这个方法就是控制重试的主要逻辑
    public void continueOrPropagate(RetryableException e) {
      //如果当前重试次数已经达到最大重试次数,那么直接抛出异常,这里注意,可以看到抛出的是RetryableException,后面会说明
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
        //前面说了,只有异常的http响应的header带有Retry-After,默认的异常响应处理器才会抛出
        //RetryableException,所以才会进入等待重试,下面就是判断这个Retry-After头是否有
        //值存在,如果有那么进行一系列的判断,逻辑比较简单,就是和最大等待周期比较等等,interval就是本次需要重试等待的时间,sleptForMillis不需要管,代表本次请求总共的重试等待时间
      if (e.retryAfter() != null) {
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
      }
      sleptForMillis += interval;
    }

三.问题解释

  1. 上面从源码的层面分析了feign中的http请求过程和重试机制的实现,这里需要注意的是,这里说的Retry,从源码中可以看出来,这里的重试是指http请求重试,不要和spring-retry这个项目混淆了,spring-retry是一个规范的标准的代码执行重试框架,后面我会抽时间单独总结一下
  2. 介绍了feign的重试原理后,对于下面这个情况,你觉得是会发生http重试吗?会在什么时候发生?
@Slf4j
public class CardClient {

    private String appKey;

    private CardApi CardApi;

    public CardClient(String url, String appKey) {
        this.appKey = appKey;

        if (StringUtil.isEmpty(url)) {
            throw new RuntimeException("Card URL不能为空");
        }

        Decoder decoder = new JacksonDecoder();

        CardApi = Feign.builder()
                .encoder(new JacksonEncoder())
                .decoder(decoder)
                .errorDecoder(new CardErrorDecoder(decoder))
                .logger(new Slf4jLogger(PostPayLoanClient.class))
                .options(new Request.Options(10*1000,30*1000))
                .logLevel(Logger.Level.FULL)
                .target(CardApi.class, url);
    }

    public InitializeRsp initializeLock(InitializeReq req) {
        Map<String, Object> params= JSONUtil.parseObj(req,true);
        Resp<InitializeRsp> resp = CardApi.initializeLock(headerMap(params), req);
        return Resp.getData(resp);
    }
}


public interface CardApi {

 
    @RequestLine(value = "POST /api/partner/lock/v1/initializeLock")
    Resp<InitializeRsp> initializeLock(@HeaderMap Map<String,String> headerMap, InitializeReq req);
}


@Configuration
public class CardConfig {

    @Resource
    private CardProperties CardProperties;

    @Bean
    public CardClient CardClient(){
        if(StringUtils.isEmpty(CardProperties.getUrl()) || StringUtils.isEmpty(CardProperties.getApiKey())){
            log.error("Card配置异常");
            return null;
        }
        return new CardClient(CardProperties.getUrl(),CardProperties.getApiKey());
    }

}


public class CardErrorDecoder implements ErrorDecoder {

    final Decoder decoder;
    final ErrorDecoder defaultDecoder = new Default();

    CardErrorDecoder(Decoder decoder) {
        this.decoder = decoder;
    }

    @Override
    public Exception decode(String methodKey, Response response) {
        try {
            CardException error = new CardException(response.status(), response.reason(),null);
            Map<String, Collection<String>> headers = response.request().headers();

            log.error("请求异常,code: {},reason: {}", response.status(), response.reason());

            return error;
        } catch (Exception e) {
            return defaultDecoder.decode(methodKey, response);
        }
    }

}

假设就是上面的代码,那么你觉的重试会是什么样子的?

也就是在响应的时候需要指定http响应码和Retry-After头信息 ,就像下面这样

    @GetMapping("/user/get")
    public String getUser(String param, HttpServletResponse response){
        System.out.println("次数"+111111);
        response.setStatus(HttpStatus.BAD_GATEWAY.value());
        response.setHeader("Retry-After","50");
        return "user->" + param;
    }