Spring Retry框架——看这篇就够了 - MarvelCode - OSCHINA - 中文开源技术交流社区

标签: | 发表时间:2020-09-14 23:49 | 作者:
出处:https://my.oschina.net

简介

    软件架构从当初的单机,演变到后来的集群,再到后来的分布式应用。原本看似可以信任的服务调用,加上了网络因素就变得不再可靠。再考虑到一些调用链路的特殊性,又要保证性能,又要尽可能增加成功率,所以调用方必须肩负起重试的责任。

 

自己写,怎样实现?

    重试并不复杂,首先来分析下重试的调用场景,可以想到业务当中不止一处会需要重试能力,并且业务其实更关乎自己的代码块被重试就可以了,而不在乎如何实现的重试。

    所以变化的是一段可以重复执行的代码块,以及重试次数等。

① 代码块可以用Java8支持的函数式编程解决

② 次数可以用入参/配置实现

    首先定义一个执行的模版,结合上面我们的分析,这个模版需要一个待实行的 方法块,以及 配置。(次数等区分场景,用枚举定义,方便全局管控):

      @AllArgsConstructor
@Getter
public enum RetrySceneEnums {

    QUERY_USER_INFO("查询用户信息", 2),
    ;

    private String desc;

    private int retryTimes;
}
      abstract class MyRetryTemplate<Req, Resp> {
    /** 配置 */
    private IntegrationRetryEnums retryEnum;
    /** 方法块 */
    private Function<Req, Resp> fuction;

    MyRetryTemplate(IntegrationRetryEnums retryEnum, Function<Req, Resp> fuction) {
        this.retryEnum = retryEnum;
        this.fuction = fuction;
    }

    /**
     * 构建函数
     */
    private Supplier<Function<Req, Resp>> retry = () -> (param) -> {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        return execute(param, 1, stopWatch);
    };

    private Resp execute(Req param, int index, StopWatch stopWatch) {
        if (index > retryEnum.getRetryTimes()) {
            stopWatch.split();
            // 最大的重试次数后仍失败,可以打印摘要日志
            return null;
        }
        try {
            // 执行方法块
            Resp result = fuction.apply(param);
            stopWatch.split();
            // 认为成功
            return result;
        } catch (Throwable e) {
            // 递归重试
            return execute(param, ++index, stopWatch);
        }
    }

    Function<Req, Resp> getRetryService() {
        return retry.get();
    }
}

    逻辑很简单,利用 递归思想(当然也可以用循环),默认调用方法抛出异常才会重试,成功则返回,否则递归直到最大次数,使用 StopWatch 统计总耗时,业务可以打印摘要日志并配置监控等,代码简短,但基本实现了功能。

    对 抛出异常才会重试稍微解释下,为什么不支持对业务成功/失败进行重试。首先重试框架感知不到具体的业务响应结构,进而也无法判断业务的成功与失败,当然重试框架也可以自定义一个响应体并强制使用,但这便于业务代码形成耦合。再退一步讲,也没有必要对业务失败进行重试,因为框架毕竟是框架,只需要做通用的事情,业务的失败到底需不需要重试,需要看具体的场景。

    模板完工了,看下具体怎样改造一个已有的方法,使其支持重试功能,假设当前有一个查询用户信息的方法 UserService#queryById(String id)。需要在 Spring 启动后将其包装以支持重试,通过事件机制监听 ContextRefreshedEvent 事件。

      @Component
public class RetryServiceFactory implements ApplicationListener<ContextRefreshedEvent> {

    private static AtomicBoolean INIT = new AtomicBoolean(false);

    @Autowired
    private UserService userService;

    private Function<String, WorkOrderDTO> uerQueryFunction;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (INIT.compareAndSet(false, true)) {
            this.uerQueryFunction = new MyRetryTemplate<String, WorkOrderDTO>(QUERY_USER_INFO, userService::queryById) {
            }.getRetryService();
        }
    }

    public Function<String, WorkOrderDTO> getUserQueryService() {
        return uerQueryFunction;
    }
}

    需要使用此能力的地方,这么调用即可:

      @Autowired
private RetryServiceFactory retryServiceFactory;

public void businessLogic(String userId){

    // 调用工厂获取自带重试功能的服务,并执行
    UserInfo userInfo = retryServiceFactory.getUserQueryService().apply(userId);
}

    到此,自己手写的重试工具已经完成,如果执行时遇到网络抖动,超时等,都会进行最大2次的重试(次数由 RetrySceneEnums 设置)。

 

    这时候再回头审视下代码,可以发现诸多局限性:

  • 比如 Function 的使用,就限定了入参只能是一个,且必须有返回(即不支持 void);
  • 每次接入一个服务,就要在 RetryServiceFactory 扩充;
  • 考虑网络抖动可能有区间性,盲目连续请求可能全部失败,如要实现等待一段实现再重试,怎么办?
  • 假使底层服务真的挂了,但上层业务系统依旧重试,重试每次都以超时告终(占用线程资源),流量上来以后,必然会牵连业务系统也不可用。自然想到熔断,那重试如何加入断路器(Circuit Breaker)模式?

    遇到共性问题,首先要想到有没有现成的框架可以使用,而不是造轮子。目前我已知的重试框架,有 spring-retry以及  guava-retrying,接下来将对 spring-retry的使用和原理进行讲解。

 

Spring Retry 使用

    同spring事务框架一样,retry框架同样支持编程式和声明式两种。仅需要一个maven依赖。如果使用声明式,需要额外引入 aspectj 。

      <dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.3.0</version>
</dependency>

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.9.2</version>
    <scope>runtime</scope>
</dependency>

编程式

    先来介绍下 编程式使用到的几个主要组件:

  • 重试模板类:org.springframework.retry.support.RetryTemplate
  • 重试策略类:org.springframework.retry.RetryPolicy
  • 重试回退策略(两次重试间等待策略):org.springframework.retry.backoff.BackOffPolicy
  • 重试上下文:org.springframework.retry.RetryContext
  • 监听器:org.springframework.retry.RetryListener

   接下来来看下如何使用,然后穿插讲下不同策略的区别。

    首先是开启重试,并声明一个模板bean。因为只是模板,所以可以借助 Spring IOC 声明为单例。

      @Configuration
@EnableRetry(proxyTargetClass = true)
public class RetryConfig {

    @Bean
    public RetryTemplate simpleRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        // 最大重试次数策略
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        // 每隔1s后再重试
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        return retryTemplate;
    }

}

    例子代码使用了 SimpleRetryPolicy 和 FixedBackOffPolicy,当然还有其他重试策略和回退策略。枚举如下:

重试策略
策略类 效果 关键参数
MaxAttemptsRetryPolicy 设置最大的重试次数,超过之后执行recover maxAttempts:最大重试次数
BinaryExceptionClassifierRetryPolicy 可以指定哪些异常需要重试,哪些异常不需要重试 exceptionClassifier:BinaryExceptionClassifier,异常识别类,其实就是存在一个Map映射,Map<Class<? extends Throwable>, Boolean>,value为true的话就说明需要重试。
SimpleRetryPolicy 上面两者的结合,支持次数和自定义异常重试 maxAttempts 和 exceptionClassifier
TimeoutRetryPolicy 在指定的一段时间内重试 timeout:单位ms
ExceptionClassifierRetryPolicy

支持异常和重试策略的映射,比如异常A对应重试策略A,异常B对应重试策略B

exceptionClassifier:本质就是异常和重试策略的映射
CompositeRetryPolicy 策略类的组合类,支持两种模式,乐观模式:只要一个重试策略满足即执行,悲观模式:另一种是所有策略模式满足再执行 policies:策略数组
optimistic:true-乐观;false-悲观
CircuitBreakerRetryPolicy 熔断器模式 delegate:策略代理类
openTimeout:[0,openTimeout]会一直执行代理类策略,(openTimeout, resetTimeout] 会半打开,如果代理类判断不可以重试,就会熔断,执行recover逻辑,如果代理类判断还可以重试且重试成功,开关会闭合。
resetTimeout:超过指定时间,开关重置闭合
ExpressionRetryPolicy 符合表达式就重试 expression:表达式,见 org.springframework.expression.Expression实现
AlwaysRetryPolicy 一直重试  
NeverRetryPolicy 从不重试  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

回退策略
策略类 效果 关键参数
FixedBackOffPolicy 间隔固定时间重试 sleeper:支持线程sleep和Object#wait,区别在于是否释放锁
backOffPeriod:等待间隔
NoBackOffPolicy 无等待直接重试  
UniformRandomBackOffPolicy 在一个设置的时间区间内。随机等待后重试 minBackOffPeriod:区间下限
maxBackOffPeriod:区间上限
sleeper:同上
ExponentialBackOffPolicy 在一个设置的时间区间内,等待时长为上一次时长的递增 initialInterval:默认起始等待时间
maxInterval:最大等待时间
multiplier:递增倍数
sleeper:同上
ExponentialRandomBackOffPolicy 在 ExponentialBackOffPolicy 基础上,乘数随机  

    至此,完成了重试模板的定义。接下来是需要在具体的业务场景下调用模板方法即可。同样的,为了保证复用性,仍然借助函数式编程定义:

      @Service
public class RetryTemplateService {

    @Autowired
    private RetryTemplate simpleRetryTemplate;

    public <R, T> R retry(Function<T, R> method, T param) throws Throwable {
        return simpleRetryTemplate.execute(new RetryCallback<R, Throwable>() {
            @Override
            public R doWithRetry(RetryContext context) throws Throwable {
                return method.apply(param);
            }
        }, new RecoveryCallback<R>() {
            @Override
            public R recover(RetryContext context) throws Exception {
                return null;
            }
        });
    }
}
      @Service
public class UserService {

    @Autowired
    private RetryTemplateService retryTemplateService;

    public User queryById(String id) {
        // TODO 业务逻辑
    }

    public User queryByIdWithRetry(String id) throws Throwable {
        return retryTemplateService.retry(this::queryById, id);
    }
}

    通过调用  org.springframework.retry.support.RetryTemplate#execute,指定需要支持重试的业务代码回调  org.springframework.retry.RetryCallback,以及全部重试失败的兜底回调  org.springframework.retry.RecoveryCallback

     RetryTemplate 支持四种重载的 execute方法,这里不全部展开分析,大体十分相似,变化的是策略(见上面表格),以及 RetryState(有无状态,下面分析)。

声明式

    通过上面的编码式,还需要针对不同场景编写一到多套模板方法,那有没有什么可以简化这一步呢,毕竟业务没必要关注这些具体的模板代码。就像事务一样,只需要方法上加  @Transactional就可以了。当然,spring-retry 也支持,直接上代码:

      @Service
public class UserService {

    @Retryable(include = RetryException.class, exclude = IllegalArgumentException.class, 
      listeners = {"defaultRetryListener"}, backoff = @Backoff(delay = 1000, maxDelay = 2000))
    public User queryById(String id) {
        // TODO 业务逻辑
    }

    @Recover
    public User recover(RetryException e, String id) {
        User user = new User();
        user.setName("兜底");
        return user;
    }
}
      @Service
public class DefaultRetryListener implements RetryListener {

    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        System.out.println("==前置回调==");
        return true;
    }

    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("==后置回调==");
    }

    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("==执行报错==");
    }
}

    这样,直接调用 UserService#queryById就支持重试了,比编程式节省了不少工作量。

    我这里一股脑设置了很多注解参数,其实都对应上面编程式的策略,毕竟编程式有的,我声明式也得要。使用时按需即可,接下来对照着 @Retryable中定义的所有属性,解释下含义:

  • recover:指定兜底/补偿的方法名。如果不指定,默认对照 @Recover标识的,第一入参为重试异常,其余入参和出参一致的方法;
  • interceptor:指定方法切面 bean, org.aopalliance.intercept.MethodInterceptor实现类
  • value / include:两者用途一致,指出哪些类型需要重试;
  • exclude:和 include 相反,指出哪些异常不需要重试;
  • label:可以指定唯一标签,用于统计;
  • stateful:默认false。重试是否是有状态的;
  • maxAttempts:最大的重试次数;
  • backoff:指定 @Backoff,回退策略;
  • listeners:指定  org.springframework.retry.RetryListener实现 bean。

    对照着上面编程式,可以看出回退策略用注解 @Backoff支持了。这里还补充了上面编程式没有用到的  RetryListener,它定义了3个方法:

<T,EextendsThrowable>booleanopen(RetryContextcontext,RetryCallback<T,E> callback);
<T,EextendsThrowable>voidclose(RetryContextcontext,RetryCallback<T,E> callback,Throwablethrowable);
<T,EextendsThrowable>voidonError(RetryContextcontext,RetryCallback<T,E> callback,Throwablethrowable);

    open 和 close 分别在重试整体的前置和后置被回调一次,onError 则是重试整体过程中,每次异常都会被回调。(具体细节见下面  源码分析

     @Backoff 的属性如下,基本都对应上面编程式的几种策略,仅简单解释下

  • value / delay:两者都标识延迟时间,为 0则对应 NoBackOffPolicy 策略。
  • maxDelay:最大延迟时间
  • multiplier:递增乘数
  • random:递增乘数是否随机

    重试模式中的断路器,由于其场景的特殊性以及属性的复杂性,则被单独定义成了一个注解 @CircuitBreaker,从注释的定义可以看出,它是在 @Retryable基础之上的扩展。

      @Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Retryable(stateful = true)
public @interface CircuitBreaker {
   ....
}

    其中属性的定义,也是在 @Retryable的属性基础上,额外扩充了断路器特有的几个属性,对应上面重试策略表格中—— CircuitBreakerRetryPolicy的关键参数:

  • resetTimeout:重置闭合超时时间
  • openTImeout:半打开状态超时时间

 

源码解析

    那具体 spring-retry 的原理是怎样的呢?是递归还是循环,注解又是怎样实现的?带着这些问题开始源码分析,顺带着可以看下上面没有讲到的 org.springframework.retry.RetryContext 组件在业务中如何使用。 

    首先从模板方法源码开始:

      public class RetryTemplate implements RetryOperations {

	@Override
	public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback) throws E {
		return doExecute(retryCallback, recoveryCallback, null);
	}

    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
                                                   RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {

        // 通过 set方法设置的重试策略,默认 SimpleRetryPolicy
        RetryPolicy retryPolicy = this.retryPolicy;
        // 通过 set方法设置的回退策略,默认 NoBackOffPolicy
        BackOffPolicy backOffPolicy = this.backOffPolicy;

        // 无状态的:重试策略自定义 org.springframework.retry.RetryPolicy.open
        // 有状态的:根据策略,每次新建/从缓存获取
        RetryContext context = open(retryPolicy, state);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("RetryContext retrieved: " + context);
        }
        // 绑定到 ThreadLocal 以便线程内全局获取
        RetrySynchronizationManager.register(context);

        Throwable lastException = null;

        boolean exhausted = false;
        try {
            // 回调所有的 org.springframework.retry.RetryListener.open
            boolean running = doOpenInterceptors(retryCallback, context);
            // 任意一个监听器的open返回 false则抛出异常
            if (!running) {
                throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
            }

            // Get or Start the backoff context...
            BackOffContext backOffContext = null;
            Object resource = context.getAttribute("backOffContext");

            if (resource instanceof BackOffContext) {
                backOffContext = (BackOffContext) resource;
            }

            if (backOffContext == null) {
                // 回调重试策略的 start 方法
                backOffContext = backOffPolicy.start(context);
                if (backOffContext != null) {
                    context.setAttribute("backOffContext", backOffContext);
                }
            }

            // 回调 org.springframework.retry.RetryPolicy.canRetry 判断是否可以重试
            // 同时可以调用 org.springframework.retry.RetryContext.setExhaustedOnly进行干预,不再进行重试
            while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry: count=" + context.getRetryCount());
                    }
                    lastException = null;
                    // 回调业务代码块
                    return retryCallback.doWithRetry(context);
                } catch (Throwable e) {
                    lastException = e;
                    try {
                        // 回调 org.springframework.retry.RetryPolicy.registerThrowable
                        registerThrowable(retryPolicy, state, context, e);
                    } catch (Exception ex) {
                        throw new TerminatedRetryException("Could not register throwable", ex);
                    } finally {
                        // 回调 org.springframework.retry.RetryListener.onError
                        doOnErrorInterceptors(retryCallback, context, e);
                    }
                    // 同上面的判断一样,允许重试的话,会回调回退策略
                    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            // 这一步会根据策略进行等待/立即执行
                            backOffPolicy.backOff(backOffContext);
                        } catch (BackOffInterruptedException ex) {
                            lastException = e;
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
                            }
                            throw ex;
                        }
                    }

                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
                    }
                    // 如果指定了 org.springframework.retry.RetryState,会判断是否针对该异常进行抛出,即进行重试阻断
                    if (shouldRethrow(retryPolicy, context, state)) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
                        }
                        throw RetryTemplate.<E>wrapIfNecessary(e);
                    }

                }

                if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                    break;
                }
            }

            if (state == null && this.logger.isDebugEnabled()) {
                this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
            }

            exhausted = true;
            // 回调兜底/补偿 org.springframework.retry.RecoveryCallback.recover
            return handleRetryExhausted(recoveryCallback, context, state);

        } catch (Throwable e) {
            throw org.springframework.retry.support.RetryTemplate.<E>wrapIfNecessary(e);
        } finally {
            // 回调 org.springframework.retry.RetryPolicy.close
            close(retryPolicy, context, state, lastException == null || exhausted);
            // 回调 org.springframework.retry.RetryListener.close
            doCloseInterceptors(retryCallback, context, lastException);
            // 清除 ThreadLoacal中存储的 org.springframework.retry.RetryContext
            RetrySynchronizationManager.clear();
        }

    }
}

    整体模板方法不是很复杂,上面注释也标明具体回调哪些方法。大体逻辑就是:

初始化上下文并绑定 -> 回调监听器open判断是否可以重试 -> 回调重试策略start方法 -> 通过重试策略判断是否可以重试 -> 可以的话执行业务代码块 -> 下面两个分支 成功/异常
成功:回调重试策略的close -> 回调监听器的close -> 清除上下文              链路①
异常:回调重试策略的registerThrowable -> 回调监听器的onError -> 回调回退策略的backoff -> 如果设置了 Retrystate,判断是否需要抛异常阻断重试 -> 兜底/补偿逻辑 -> 链路①

    大体流程了解后,先来看下上下文接口的定义:

      public interface RetryContext extends AttributeAccessor {

    String NAME = "context.name";  // 上下文的自定义名称

    String STATE_KEY = "context.state";  // RetryState定义的key

    String CLOSED = "context.closed"; // 标识重试是否close

    String RECOVERED = "context.recovered";  // 标识是否执行了兜底/补偿

    String EXHAUSTED = "context.exhausted";  // 标识重试最大次数仍失败

	void setExhaustedOnly();

	boolean isExhaustedOnly();

	RetryContext getParent();

	int getRetryCount();

	Throwable getLastThrowable();
}
      public interface AttributeAccessor {
    void setAttribute(String name, @Nullable Object value);

    @Nullable
    Object getAttribute(String name);

    @Nullable
    Object removeAttribute(String name);

    boolean hasAttribute(String name);

    String[] attributeNames();
}

    加上继承类 AttributeAccessor支持的key-value存储,作为上下文,贯穿重试框架的一次调用。框架定义的key常量接口内定义了部分,还有一些分散在实现类以及策略类中。如果业务所需,也可以放置数据到上下文,在一次重试策略调用周期内共享。

    那就来看下上下文的初始化逻辑:

      public class RetryTemplate implements RetryOperations {

    protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
        // 如果是无状态的,每次都调用 org.springframework.retry.RetryPolicy.open创建
        if (state == null) {
            return doOpenInternal(retryPolicy);
        }

        Object key = state.getKey();
        // 如果有状态,但设置了强制刷新,同样 org.springframework.retry.RetryPolicy.open创建,并写入 state.key
        if (state.isForceRefresh()) {
            return doOpenInternal(retryPolicy, state);
        }
        // 尝试从缓存获取,不存在走新建
        if (!this.retryContextCache.containsKey(key)) {
            // The cache is only used if there is a failure.
            return doOpenInternal(retryPolicy, state);
        }
        // 缓存获取
        RetryContext context = this.retryContextCache.get(key);
        if (context == null) {
            // 异常场景
            if (this.retryContextCache.containsKey(key)) {
                throw new RetryException("Inconsistent state for failed item: no history found. "
                        + "Consider whether equals() or hashCode() for the item might be inconsistent, "
                        + "or if you need to supply a better ItemKeyGenerator");
            }
            // 因为整体没有锁,所以有这一步作为补偿(没仔细琢磨,会不会有并发逻辑)
            return doOpenInternal(retryPolicy, state);
        }

        // 因为是缓存,所以同一个 state.key 会共享这个上下文,清除会影响其他正在校验这几个key的地方
        context.removeAttribute(RetryContext.CLOSED);
        context.removeAttribute(RetryContext.EXHAUSTED);
        context.removeAttribute(RetryContext.RECOVERED);
        return context;

    }
}

    从这里可以看出,如果个别场景想要在多次调用间共享 RetryContext,就需要定义 state.key,且设置 forceRefresh=false。每次重试独占上下文的话,要么就使用无状态的重试,要么就设置 forceRefresh=true。

    到此为止,基本线索都集中到 RetryPolicy的实现上了,基本都是回调 RetryPolicy 定义的方法。话不多说,来看两个重试策略的实现。

      public class SimpleRetryPolicy implements RetryPolicy {

    private volatile int maxAttempts;

    private BinaryExceptionClassifier retryableClassifier = new BinaryExceptionClassifier(false);

    public SimpleRetryPolicy(int maxAttempts, BinaryExceptionClassifier classifier) {
        super();
        this.maxAttempts = maxAttempts;
        this.retryableClassifier = classifier;
    }

    @Override
    public boolean canRetry(RetryContext context) {
        // 获取最新一次重试的异常
        Throwable t = context.getLastThrowable();
        // 允许重试的异常 并且 次数<最大重试次数
        return (t == null || retryForException(t)) && context.getRetryCount() < this.maxAttempts;
    }

    @Override
    public void close(RetryContext status) {
    }

    @Override
    public void registerThrowable(RetryContext context, Throwable throwable) {
        SimpleRetryContext simpleContext = ((SimpleRetryContext) context);
        // 记录最近一次异常,并自增重试次数
        simpleContext.registerThrowable(throwable);
    }

    @Override
    public RetryContext open(RetryContext parent) {
        return new SimpleRetryContext(parent);
    }

    private static class SimpleRetryContext extends RetryContextSupport {

        public SimpleRetryContext(RetryContext parent) {
            super(parent);
        }

    }

    private boolean retryForException(Throwable ex) {
        return this.retryableClassifier.classify(ex);
    }

}
      public class CircuitBreakerRetryPolicy implements RetryPolicy {

    public static final String CIRCUIT_OPEN = "circuit.open";

    public static final String CIRCUIT_SHORT_COUNT = "circuit.shortCount";

    private static Log logger = LogFactory.getLog(CircuitBreakerRetryPolicy.class);

    private final RetryPolicy delegate;

    private long resetTimeout = 20000;

    private long openTimeout = 5000;

    public CircuitBreakerRetryPolicy(RetryPolicy delegate) {
        this.delegate = delegate;
    }

    public void setResetTimeout(long timeout) {
        this.resetTimeout = timeout;
    }

    public void setOpenTimeout(long timeout) {
        this.openTimeout = timeout;
    }

    @Override
    public boolean canRetry(RetryContext context) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        // 判断断路器开关是否打开
        if (circuit.isOpen()) {
            // 打开则不允许重试
            circuit.incrementShortCircuitCount();
            return false;
        } else {
            circuit.reset();
        }
        // 断路器开关闭合 or 半打开,是否允许重试交给代理实现
        return this.delegate.canRetry(circuit.context);
    }

    @Override
    public RetryContext open(RetryContext parent) {
        return new CircuitBreakerRetryContext(parent, this.delegate, this.resetTimeout, this.openTimeout);
    }

    @Override
    public void close(RetryContext context) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        // 代理
        this.delegate.close(circuit.context);
    }

    @Override
    public void registerThrowable(RetryContext context, Throwable throwable) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        circuit.registerThrowable(throwable);
        // 代理
        this.delegate.registerThrowable(circuit.context, throwable);
    }

    static class CircuitBreakerRetryContext extends RetryContextSupport {

        private volatile RetryContext context;

        private final RetryPolicy policy;

        private volatile long start = System.currentTimeMillis();

        private final long timeout;

        private final long openWindow;

        private final AtomicInteger shortCircuitCount = new AtomicInteger();

        public CircuitBreakerRetryContext(RetryContext parent, RetryPolicy policy, long timeout, long openWindow) {
            super(parent);
            this.policy = policy;
            this.timeout = timeout;
            this.openWindow = openWindow;
            this.context = createDelegateContext(policy, parent);
            setAttribute("state.global", true);
        }

        public void reset() {
            shortCircuitCount.set(0);
            setAttribute(CIRCUIT_SHORT_COUNT, shortCircuitCount.get());
        }

        public void incrementShortCircuitCount() {
            shortCircuitCount.incrementAndGet();
            setAttribute(CIRCUIT_SHORT_COUNT, shortCircuitCount.get());
        }

        private RetryContext createDelegateContext(RetryPolicy policy, RetryContext parent) {
            RetryContext context = policy.open(parent);
            reset();
            return context;
        }

        /* 判断断路器开关是否打开  **/
        public boolean isOpen() {
            // 计算时间间隔
            long time = System.currentTimeMillis() - this.start;
            // 判断是否允许重试
            boolean retryable = this.policy.canRetry(this.context);
            if (!retryable) {
                // 闭合重置开关超时时间
                if (time > this.timeout) {
                    logger.trace("Closing");
                    // 重建上下文
                    this.context = createDelegateContext(policy, getParent());
                    this.start = System.currentTimeMillis();
                    // 判断是否允许重试
                    retryable = this.policy.canRetry(this.context);
                }
                // [0,openTimeout)
                else if (time < this.openWindow) {
                    // 指定开关打开
                    if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
                        logger.trace("Opening circuit");
                        setAttribute(CIRCUIT_OPEN, true);
                    }
                    this.start = System.currentTimeMillis();
                    return true;
                }
            }
            // 允许重试
            else {
                // (openWindow,resetTimeout]
                if (time > this.openWindow) {
                    logger.trace("Resetting context");
                    // 重建上下文
                    this.start = System.currentTimeMillis();
                    this.context = createDelegateContext(policy, getParent());
                }
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Open: " + !retryable);
            }
            // 设置开关打开的标志位,不能重试=开关打开
            setAttribute(CIRCUIT_OPEN, !retryable);
            return !retryable;
        }

        @Override
        public int getRetryCount() {
            return this.context.getRetryCount();
        }

        @Override
        public String toString() {
            return this.context.toString();
        }

    }

}

    除了断路器策略( CircuitBreakerRetryPolicy)稍微复杂点,其他基本都像 SimpleRetryPolicy一样,逻辑简单。

    首先 canRetry在模板方法内是判断是否要重试的实现。 SimpleRetryPolicy实现就是判断了次数和异常,是否满足要求。 CircuitBreakerRetryPolicy,主要逻辑集中在  org.springframework.retry.policy.CircuitBreakerRetryPolicy.CircuitBreakerRetryContext#isOpen,开关打开不允许重试,否则根据代理类判断是否允许重试。

   

    重试策略看完,再来看回退策略代码就更简单了,看一个固定时长的回退策略实现:

      public class FixedBackOffPolicy extends StatelessBackOffPolicy implements SleepingBackOffPolicy<FixedBackOffPolicy> {
    // 默认使用 Thread.sleep等待
	private Sleeper sleeper = new ThreadWaitSleeper();

    // 可以指定 org.springframework.retry.backoff.ObjectWaitSleeper
	public void setSleeper(Sleeper sleeper) {
		this.sleeper = sleeper;
	}

	protected void doBackOff() throws BackOffInterruptedException {
		try {
			sleeper.sleep(backOffPeriod);
		}
		catch (InterruptedException e) {
			throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
		}
	}
}

    默认的就是 Thread.sleep一段时间。  

 

    至此,编程式的源码简单的流程已经了解了。接下来分析下注解的实现原理,可预见的,注解也借助了编程式这套模板。

    大体逻辑主要分为应用启动时,对全局内 @Retryable方法的收集,以及运行期的动态代理。

      @Configuration
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware {


	@PostConstruct
	public void init() {
		Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
		retryableAnnotationTypes.add(Retryable.class);
        // 切点定义,也就是所有被 @Retryable标识的方法
		this.pointcut = buildPointcut(retryableAnnotationTypes);
        // 切面构建
		this.advice = buildAdvice();
		if (this.advice instanceof BeanFactoryAware) {
			((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
		}
	}

	protected Advice buildAdvice() {
        // 看这个实现
		AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
		if (retryContextCache != null) {
			interceptor.setRetryContextCache(retryContextCache);
		}
		if (retryListeners != null) {
			interceptor.setListeners(retryListeners);
		}
		if (methodArgumentsKeyGenerator != null) {
			interceptor.setKeyGenerator(methodArgumentsKeyGenerator);
		}
		if (newMethodArgumentsIdentifier != null) {
			interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);
		}
		if (sleeper != null) {
			interceptor.setSleeper(sleeper);
		}
		return interceptor;
	}
}

    上面配置类,init 生命中期内定义了切点和切面,切点很简单,就是所有 @Retryable标识的方法;切面的话需要移步  AnnotationAwareRetryOperationsInterceptor

      public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {

	@Override
	public Object invoke(MethodInvocation invocation) throws Throwable {
        // 获取切面代理
		MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
		if (delegate != null) {
            // 执行代理方法
			return delegate.invoke(invocation);
		}
		else {
			return invocation.proceed();
		}
	}
}

    这里切面的构建并不是我们关心的,所以 getDelegate 感兴趣的可以自行研究,里面加了一道缓存,保证代理对象只会创建一次。

    继续跟源码,可以找到代理切面实现类根据有无状态有两种(根据 @Retryable#stateful()设置):

  •   org.springframework.retry.interceptor.RetryOperationsInterceptor
  • org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor  (@CircuitBreaker强制是这种
      public class RetryOperationsInterceptor implements MethodInterceptor {

    private RetryOperations retryOperations = new RetryTemplate();

    public Object invoke(final MethodInvocation invocation) throws Throwable {

        String name;
        if (StringUtils.hasText(label)) {
            name = label;
        } else {
            name = invocation.getMethod().toGenericString();
        }
        final String label = name;
        // 构建 org.springframework.retry.RetryCallback
        RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
                invocation, label) {

            @Override
            public Object doWithRetry(RetryContext context) throws Exception {

                context.setAttribute(RetryContext.NAME, label);

                if (invocation instanceof ProxyMethodInvocation) {
                    try {
                        // 回调业务代码块
                        return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
                    } catch (Exception e) {
                        throw e;
                    } catch (Error e) {
                        throw e;
                    } catch (Throwable e) {
                        throw new IllegalStateException(e);
                    }
                } else {
                    throw new IllegalStateException(
                            "MethodInvocation of the wrong type detected - this should not happen with Spring AOP, "
                                    + "so please raise an issue if you see this exception");
                }
            }

        };

        // 根据是否有 @Recover 兜底/补偿方法,调用模板的不同方法
        if (recoverer != null) {
            RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), recoverer);
            return this.retryOperations.execute(retryCallback, recoveryCallback);
        }

        return this.retryOperations.execute(retryCallback);

    }

}
      public class StatefulRetryOperationsInterceptor implements MethodInterceptor {

    private RetryOperations retryOperations;

    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {

        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Executing proxied method in stateful retry: " + invocation.getStaticPart() + "("
                    + ObjectUtils.getIdentityHexString(invocation) + ")");
        }

        Object[] args = invocation.getArguments();
        Object defaultKey = Arrays.asList(args);
        if (args.length == 1) {
            defaultKey = args[0];
        }

        Object key = createKey(invocation, defaultKey);
        RetryState retryState = new DefaultRetryState(key,
                this.newMethodArgumentsIdentifier != null && this.newMethodArgumentsIdentifier.isNew(args),
                this.rollbackClassifier);

        Object result = this.retryOperations.execute(new StatefulRetryOperationsInterceptor.StatefulMethodInvocationRetryCallback(invocation, label),
                this.recoverer != null ? new StatefulRetryOperationsInterceptor.ItemRecovererCallback(args, this.recoverer) : null, retryState);

        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Exiting proxied method in stateful retry with result: (" + result + ")");
        }

        return result;
    }
}

    最终 invoke的实现里面,可以看到调用模板方法。利用切面,省得每个方法都写一遍 execute方法。    

 

总结

    总之,重试框架并不复杂,已经有现成的工具,就不要重复造轮子。本文也是通过阅读源码后写出来的,如果有理解不正确的地方,欢迎各位指正。

相关 [spring retry 框架] 推荐:

Spring Retry框架——看这篇就够了 - MarvelCode - OSCHINA - 中文开源技术交流社区

- -
    软件架构从当初的单机,演变到后来的集群,再到后来的分布式应用. 原本看似可以信任的服务调用,加上了网络因素就变得不再可靠. 再考虑到一些调用链路的特殊性,又要保证性能,又要尽可能增加成功率,所以调用方必须肩负起重试的责任.     重试并不复杂,首先来分析下重试的调用场景,可以想到业务当中不止一处会需要重试能力,并且业务其实更关乎自己的代码块被重试就可以了,而不在乎如何实现的重试.

Spring-Retry重试实现原理 | Alben's home

- -
Spring实现了一套重试机制,功能简单实用. 本文将讲述如何使用Spring Retry的及其重试机制的实现原理. Spring实现了一套重试机制,功能简单实用. Spring Retry是从Spring Batch独立出来的一个功能,已经广泛应用于Spring Batch,Spring Integration, Spring for Apache Hadoop等Spring项目.

微服务框架Spring Cloud介绍 Part2: Spring Cloud与微服务

- - skaka的博客
之前介绍过 微服务的概念与Finagle框架, 这个系列介绍Spring Cloud.. Spring Cloud还是一个相对较新的框架, 今年(2016)才推出1.0的release版本. 虽然Spring Cloud时间最短, 但是相比我之前用过的Dubbo和Finagle, Spring Cloud提供的功能最齐全..

Spring框架学习【基础知识】

- - CSDN博客推荐文章
1.在java开发领域,Spring相对于EJB来说是一种轻量级的,非侵入性的Java开发框架,曾经有两本很畅销的书《Expert one-on-one J2EE Design and Development》和《Expert one-on-one J2EEdevelopment without EJB》是java高手进阶必看的宝典,Spring就是从这两本书的理论发展起来的.

springboot 整合retry(重试机制) - 简书

- -
当我们调用一个接口可能由于网络等原因造成第一次失败,再去尝试就成功了,这就是重试机制,spring支持重试机制,并且在Spring Cloud中可以与Hystaix结合使用,可以避免访问到已经不正常的实例. 写一个简单的demo,加入依赖:. @EnableRetry注解,表示启用重试机制. 定义一个简单的controller层:.

Spring 框架笔记—4.16.2 标准与自定义事件

- - 企业架构 - ITeye博客
            Spring 框架笔记—4.16.2 标准与自定义事件. If a bean that implements the ApplicationListener interface is deployed into the context, every time an ApplicationEvent gets published to the ApplicationContext, that bean is notified.

盘点 Spring Security 框架中的八大经典设计模式

- - SegmentFault 最新的文章
上次有小伙伴建议,源码分析太枯燥了,要是能够结合设计模式一起来,这样更有助于大家理解 Spring Security 源码,同时还能复习一波设计模式. 因此松哥今天就试着整一篇,和大家来聊一聊 Spring Security 中涉及到的设计模式,不过 Spring Security 中涉及到的设计模式还是非常多的,松哥这里讲几个,剩下的欢迎小伙伴们留言补充.

【SSH三大框架】Spring基础第二篇:Spring依赖注入的三种方式

- - CSDN博客架构设计推荐文章
控制反转(Inversion of Control)和依赖注入(Dependency Injection):. 应用控制反转,对象在被创建的时候,由一个调控系统内所有对象的外界实体将其所依赖的对象的引用传递给它. 所以,控制反转是,关于一个对象如何获取他所依赖的对象的引用,这个责任的反转. 对于依赖注入,有三种方式:.

微服务架构的基础框架选择:Spring Cloud还是Dubbo?

- - 程序猿DD
最近一段时间不论互联网还是传统行业,凡是涉及信息技术范畴的圈子几乎都在讨论 微服务架构. 近期也看到各大技术社区开始组织一些沙龙和论坛来分享Spring Cloud的相关实施经验,这对于最近正在整理Spring Cloud相关套件内容与实例应用的我而言,还是有不少激励的. 目前,Spring Cloud在国内的知名度并不高,在前阵子的求职过程中,与一些互联网公司的架构师、技术VP或者CTO在交流时,有些甚至还不知道该项目的存在.

java spring框架中方法级redis的连接自动获取和释放实现

- - Java - 编程语言 - ITeye博客
java中使用redis总是需要处理redis连接的获取,释放等操作,每次使用都会使代码变的特别丑陋,模仿spring中aop的实现,用动态代理写一个 连接自动获取和释放的工具. JedisManageSupport 抽象类 类似于 aop的切入点,所有继承了该类(一般都是service层)的类,可以使用提供的获取redis的方法获取redis,并且不需要释放.