spring-retry(3.源码查看policy包、stats包、listener包、support包)


Policy包

RetryPolicy.png
  1. RetryPolicy是这个包的基接口,也就是说这个包都是各种实际可能用到的重试策略。
  2. NeverRetryPolicy类:
    有个内部静态类-NeverRetryContext,扩展自RetryContextSupport类,额外加上finished属性。该属性默认为false.(open()方法中构造),在registerThrowable()方法中就变成true了,之后再调用canRetry()判断都是返回false。
    也就是说,这个策略只有一次机会,出现异常也再也不能重试了。
    P.S. 内部静态类:首先是内部类,其他类都不可见。其次是静态类,就不依附于外部类,可以独立实例化。但如果是内部类,还藏着一个this指针,可以方便的使用外部类的属性和方法。两种方式各有利弊,需要看情况使用。
  3. AlwaysRetryPolicy类:
    扩展自NeverRetryPolicy类,其canRetry()始终返回true,也就是说,可以无限次重试。
  4. CompositeRetryPolicy类:
    是一个组合策略,有个内部静态类CompositeRetryContext,包含RetryContext[]数组和RetryPolicy[]数组.内部维护一个RetryPolicy[]数组,可通过set()方法设置,然后在open()方法中传递给CompositeRetryContext。
    另外有个optimistic指示,用了做组合策略判断canRetry的标尺。
    a. 当optimistic为true,积极型时,canRetry遍历各个RetryPolicy,只要有一个策略返回可以重试,即认为组合为可以重试。
    b. 当optimistic为false,消极型时,canRetry遍历各个RetryPolicy,只要有一个策略返回不可以重试,即认为组合为不可以重试。
  5. SimpleRetryPolicy类:
    这里就是应用到Classifier的地方了,内部定义了一个BinaryExceptionClassifier,在构造器中初始化.traverseCauses是传递给BinaryExceptionClassifier用的,默认值为false。
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions,
            boolean traverseCauses, boolean defaultValue) {
        super();
        this.maxAttempts = maxAttempts;
        //通过给定的映射Map和默认值构造BinaryExceptionClassifier
        this.retryableClassifier = new BinaryExceptionClassifier(retryableExceptions, defaultValue);
        this.retryableClassifier.setTraverseCauses(traverseCauses);
    }

它的canRetry()就比较简单了,从上下文中获取canRetry,然后调用retryableClassifier.classify()的结果和最大次数匹配一下即可。
6.ExpressionRetryPolicy类:
扩展自SimpleRetryPolicy,而且是1.2新增的一个策略。在父类canRetry的基础上,加上对lastThrowable的的表达式判断,符合特地表达式的异常才能重试。

  1. CircuitBreakerRetryPolicy类:
    有个内部类CircuitBreakerRetryContext,断路器重试上下文。提供过载保护的策略,如果在时间间隔openTimeout内,直接短路,不允许重试,只有超过间隔的才能重试,具体判断是否熔断的代码如下:
public boolean isOpen() {
            long time = System.currentTimeMillis() - this.start;
            boolean retryable = this.policy.canRetry(this.context);
            //当前不允许重试
            if (!retryable) {
                //如果已经超过重置时间,重新闭合,关闭熔断器
                if (time > this.timeout) {
                    logger.trace("Closing");
                    this.context = createDelegateContext(policy, getParent());
                    this.start = System.currentTimeMillis();
                    retryable = this.policy.canRetry(this.context);
                }
                // 如果小于熔断器打开时间,读取关闭状态,如果熔断器是关闭的,就打开熔断器,重置熔断计时器
                else if (time < this.openWindow) {
                    if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
                        logger.trace("Opening circuit");
                        setAttribute(CIRCUIT_OPEN, true);
                    }
                    this.start = System.currentTimeMillis();
                    return true;
                }
            }
            //允许重试
            else {
                //判断是否在openWindow熔断器电路打开的超时时间之外,超过打开时间,就重置上下文,并且返回false
                if (time > this.openWindow) {
                    logger.trace("Resetting context");
                    this.start = System.currentTimeMillis();
                    this.context = createDelegateContext(policy, getParent());
                }
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Open: " + !retryable);
            }
            setAttribute(CIRCUIT_OPEN, !retryable);
            return !retryable;
        }

需要配合看

public boolean canRetry(RetryContext context) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        //如果熔断器处于打开状态,就直接短路,返回失败
        if (circuit.isOpen()) {
            circuit.incrementShortCircuitCount();
            return false;
        }
        else {
            //重置熔断器
            circuit.reset();
        }
        return this.delegate.canRetry(circuit.context);
    }
  • 当重试失败,且在熔断器打开时间窗口[0,openWindow) 内,立即熔断;
  • 当重试失败,且在指定超时时间后(>timeout),熔断器电路重新闭合;
  • 在熔断器半打开状态[openWindow, timeout]时,只要重试成功则重置上下文,断路器闭合。
熔断器.png
  1. TimeoutRetryPolicy类:
    有个TimeoutRetryContext,超时上下文,默认一秒内不允许重试,只有超时之后才能进行重试。
  2. RetryContextCache接口及其实现:
    专门存放RetryContext的缓存,有两种实现MapRetryContextCache,使用HashMap保存。SoftReferenceMapRetryContextCache,使用SoftReference进行保存。

stats包

stats包含了重试的统计信息


RetryStatistics.png
  1. MutableRetryStatistics接口:
    增加了incrementc××()方法,增加统计值
  2. DefaultRetryStatistics类:
    统计的默认实现,定义了一堆的AtomicInterger存储统计值,同时扩展自AttributeAccessorSupport,还能存放其他信息。
  3. ExponentialAverageRetryStatistics类:
    增加了指数平均指标的统计值,
  4. RetryStatisticsFactory接口和DefaultRetryStatisticsFactory实现:
    就一个create()方法,构造MutableRetryStatistics。默认是生产ExponentialAverageRetryStatistics统计类。
  5. StatisticsRepository接口:
    统计仓库,可以存放多个统计信息的接口
  6. DefaultStatisticsRepository类:
    统计仓库的默认实现,内部有一个DefaultRetryStatisticsFactory,如果找不到对应名字的统计,就由工厂生产一个。

listener包

就一个类RetryListenerSupport,其具体实现子类StatisticsListener位于stats包中。主要监听close()和onError()事件,并调用repository进行记录统计信息。

support包

  1. DefaultRetryState类:
    代表了一个新的重试尝试的状态。包含3个属性:
    //该状态的键值
    final private Object key;
    //是否需要强制刷新
    final private boolean forceRefresh;
    //回滚的转换器,当转换为true值是,这个异常将会引起回滚操作
    final private Classifier<? super Throwable, Boolean> rollbackClassifier;

有个rollbackFor()用来判断某个异常是否需要引起回滚,如果没有rollbackClassifier,默认返回true,否则按照rollbackClassifier转换值进行判断。

  1. RetrySimulation类:
    代表一个发生器,有个SleepSequence内部类,代表一组sleep值,有最长的,有总和的。而发生器根据序列维护内部的sleepHistogram直方图,在获得百分比是能返回对应值。
  2. RetrySimulator类:
    用来执行补偿+重试的操作的工具类。在构造函数中传入SleepingBackOffPolicy和RetryPolicy作为内部属性。在executeSingleSimulation()方法中,设置好补偿机制和重置策略,然后直接通过template执行失败的FailingRetryException,模拟失败的动作,进行补偿和重试的组合操作。
  3. RetrySynchronizationManager类:
    在ThreadLocal中存放RetryContext,用来保证一个线程只能维护一个重试上下文,进行一个重试操作。毕竟Sleep是用Tread.sleep,如果多个重试,这个补偿机制就无法生效了。
  4. RetryTemplate类:
    这个是这个包最重要的一个类了,之前看到重试策略,回退策略,缓存、监听器都是应用在这里。
    a.它实现了RetryOperations接口。
    b.很有风格的是,execute是给外部调用的,真正内部起作用的是doExecute()
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
            RecoveryCallback<T> recoveryCallback, RetryState state)
            throws E, ExhaustedRetryException {
        //重试策略
        RetryPolicy retryPolicy = this.retryPolicy;
        //补偿策略
        BackOffPolicy backOffPolicy = this.backOffPolicy;

        // Allow the retry policy to initialise itself...
        // 根据当前策略,状态,重新生成重试上下文
        RetryContext context = open(retryPolicy, state);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("RetryContext retrieved: " + context);
        }

        // Make sure the context is available globally for clients who need
        // it...
        // 在当前现场注册这个重试上下文,方便需要的客户端取出。
        RetrySynchronizationManager.register(context);

        Throwable lastException = null;

        boolean exhausted = false;
        try {

            // 调用拦截器,执行RetryListener中open
            boolean running = doOpenInterceptors(retryCallback, context);
            // 判断拦截器是否允许重试
            if (!running) {
                throw new TerminatedRetryException(
                        "Retry terminated abnormally by interceptor before first attempt");
            }

            // 从conext中获取补偿上下文
            BackOffContext backOffContext = null;
            Object resource = context.getAttribute("backOffContext");

            if (resource instanceof BackOffContext) {
                backOffContext = (BackOffContext) resource;
            }
            // 如果当前没有补充上下文,就构建一个新的,并放入重试上下文中
            if (backOffContext == null) {
                backOffContext = backOffPolicy.start(context);
                if (backOffContext != null) {
                    context.setAttribute("backOffContext", backOffContext);
                }
            }

            // 在允许情况下,使用while循环控制重试流程
            while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

                try {
                    //根据日志等级,打印重试次数日志
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry: count=" + context.getRetryCount());
                    }
                    lastException = null;
                    // 回调传入的具体执行动作
                    return retryCallback.doWithRetry(context);
                }
                catch (Throwable e) {
                    // 调用失败处理
                    lastException = e;

                    try {
                        //先在上下文中注册异常
                        registerThrowable(retryPolicy, state, context, e);
                    }
                    catch (Exception ex) {
                        throw new TerminatedRetryException("Could not register throwable",
                                ex);
                    }
                    finally {
                        // 调用拦截器,执行RetryListener中onError
                        doOnErrorInterceptors(retryCallback, context, e);
                    }

                    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            // 支持补偿机制,比如延迟2秒
                            backOffPolicy.backOff(backOffContext);
                        }
                        catch (BackOffInterruptedException ex) {
                            lastException = e;
                            // back off was prevented by another thread - fail the retry
                            if (this.logger.isDebugEnabled()) {
                                this.logger
                                        .debug("Abort retry because interrupted: count="
                                                + context.getRetryCount());
                            }
                            throw ex;
                        }
                    }

                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug(
                                "Checking for rethrow: count=" + context.getRetryCount());
                    }

                    if (shouldRethrow(retryPolicy, context, state)) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Rethrow in retry for policy: count="
                                    + context.getRetryCount());
                        }
                        throw RetryTemplate.<E>wrapIfNecessary(e);
                    }

                }
                
                if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                    break;
                }
            }
            // 打印日志
            if (state == null && this.logger.isDebugEnabled()) {
                this.logger.debug(
                        "Retry failed last attempt: count=" + context.getRetryCount());
            }

            exhausted = true;
            //重试失败后,如果有RecoveryCallback,则执行此回调
            return handleRetryExhausted(recoveryCallback, context, state);

        }
        catch (Throwable e) {
            //保底的catch,根据情况再包装一下抛出的异常
            throw RetryTemplate.<E>wrapIfNecessary(e);
        }
        finally {
           
            close(retryPolicy, context, state, lastException == null || exhausted);
            // 调用拦截器,执行RetryListener中close
            doCloseInterceptors(retryCallback, context, lastException);
            // 清理线程中重试上下文
            RetrySynchronizationManager.clear();
        }

    }

参考:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容