Sentinel实现原理解析

基本概念

Sentinel是什么?

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性,所以也称为流量防卫兵。

资源

资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。在接下来的文档中,我们都会用资源来描述代码块。 只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。

规则

围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。

Sentinel 功能和设计理念

流量控制

什么是流量控制?

服务方接收的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:

image

流量控制设计理念

方式:QPS、线程数; 策略:限流当前资源、关联资源、链路资源; 效果:直接限流(快速失败)、预热、排队等

关联资源

举例来说,read_dbwrite_db 这两个资源分别代表数据库读写,我们可以给 read_db 设置限流规则来达到写优先的目的:设置 strategyRuleConstant.STRATEGY_RELATE 同时设置 refResourcewrite_db。这样当写库操作过于频繁时,读数据的请求会被限流。

链路资源

一棵典型的调用树如下图所示:

                      root
                    /       \
                   /         \
             Entrance1     Entrance2
                  \           /             
                   \         /              
                DefaultNode(nodeA)

上图中来自入口 Entrance1Entrance2 的请求都调用到了资源 NodeA,Sentinel 允许只根据某个入口的统计信息对资源限流。比如我们可以设置 strategyRuleConstant.STRATEGY_CHAIN,同时设置 refResourceEntrance1 来表示只有从入口 Entrance1 的调用才会记录到 NodeA 的限流统计当中,而不关心经 Entrance2 到来的调用。

熔断降级

什么是熔断

除了流量控制以外,降低调用链路中的不稳定资源也是 Sentinel 的使命之一。由于调用关系的复杂性,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。 当调用链路中某个资源出现不稳定,例如,表现为 timeout、异常比例升高、异常数过多的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩。

熔断设计理念

熔断策略:慢调用比例、异常数量、异常比例

熔断效果:熔断降级

熔断状态:CLOSED、OPEN、HALF_OPEN

降级、限流、熔断区别

降级:是在系统某些服务组件不可用的时候、流量暴增、资源耗尽等情况下,暂时屏蔽掉出问题的服务,继续提供降级服务,给用户尽可能的友好提示,返回兜底数据。

限流:是指在使用缓存和降级无效的场景。比如当达到阈值后限制接口调用频率等,在出现服务不可用之前,提前把服务降级。

熔断:当系统出现问题时,开启熔断开关,拒绝流量访问,避免大流量对后端的过载请求。

底层原理解析

熔断实现

1.规则初始化;

2.定义切面;

//1.规则初始化
@PostConstruct
private voivoid initDegradeRules() {
    DegradeRule orderDegradeRule = new DegradeRule();
    //设置降级规则所对应的资源名称
    orderDegradeRule.setResource("order");
    //根据发生异常的次数进行降级
    orderDegradeRule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT);
    //至少要有x次请求才会降级
    orderDegradeRule.setMinRequestAmount(2);
    //发生异常的次数达到x后开始降级
    orderDegradeRule.setCount(3);
    //熔断时长,降级的时间窗口,在x秒内,所有的请求都直接降级,单位s
    orderDegradeRule.setTimeWindow(5);
    //统计时长,表示熔断后x秒内的其他调用都走熔断方法,单位ms
    orderDegradeRule.setStatIntervalMs(10*1000);

    List<DegradeRule> degradeRuleList=new ArrayList<>();
    //定义降级规则,把降级规则添加到降级规则列表
    degradeRuleList.add(orderDegradeRule);
    //把降级列表添加到降级规则管理器
    DegradeRuleManager.loadRules(degradeRuleList);
}

//2.定义切面
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
    return new SentinelResourceAspect();
}

@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}

@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
    Method originMethod = resolveMethod(pjp);

    SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
    if (annotation == null) {
        // Should not go through here.
        throw new IllegalStateException("Wrong state for SentinelResource annotation");
    }
    String resourceName = getResourceName(annotation.value(), originMethod);
    EntryType entryType = annotation.entryType();
    int resourceType = annotation.resourceType();
    Entry entry = null;
    try {
        entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
        return pjp.proceed();
    } catch (BlockException ex) {
        return handleBlockException(pjp, annotation, ex);
    } catch (Throwable ex) {
        Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
        // The ignore list will be checked first.
        if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
            throw ex;
        }
        if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
            traceException(ex);
            return handleFallback(pjp, annotation, ex);
        }

        // No fallback function can handle the exception, so throw it out.
        throw ex;
    } finally {
        if (entry != null) {
            entry.exit(1, pjp.getArgs());
        }
    }
}

在SphU.entry("HelloWorld") 方法中,SphU的默认实现类是CtSphU,最终调用CtSphU的entryWithPriority()方法。

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        // Using default context.
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }

    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

通过ProcessorSlotChain()方法来构建处理链。

public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    //获取ProcessorSlot的实现类,通过注解@Spi order值进行排序
    List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }

        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }

    return chain;
}

NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级; ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据; StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息; FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制; DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级; SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

image

如何实现熔断限流?熔断的判断在DegradeSlot 中,熔断首先判断状态是否开启,如果开启,则判断是否达到重试时间,然后更新状态为探测恢复,如下代码所示:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    performChecking(context, resourceWrapper);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(Context context, ResourceWrapper r) throws BlockException {
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    if (circuitBreakers == null || circuitBreakers.isEmpty()) {
        return;
    }
    for (CircuitBreaker cb : circuitBreakers) {
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}
@Override
public boolean tryPass(Context context) {
    // Template implementation.
    if (currentState.get() == State.CLOSED) {
        return true;
    }
    if (currentState.get() == State.OPEN) {
        // For half-open state we allow a request for probing.
        return retryTimeoutArrived() && fromOpenToHalfOpen(context);
    }
    return false;
}

//判断当前时间是否大于熔断结束时间
protected boolean retryTimeoutArrived() {
    return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}

//更新熔断结束时间 = 当前时间 + 熔断时长
protected void updateNextRetryTimestamp() {
    this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
}

// 如果熔断已经结束,则更新状态为HALF_OPEN(探测恢复)
protected boolean fromOpenToHalfOpen(Context context) {
    if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
        notifyObservers(State.OPEN, State.HALF_OPEN, null);
        Entry entry = context.getCurEntry();
        entry.whenTerminate(new BiConsumer<Context, Entry>() {
            @Override
            public void accept(Context context, Entry entry) {
                // Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
                // Without the hook, the circuit breaker won't recover from half-open state in some circumstances
                // when the request is actually blocked by upcoming rules (not only degrade rules).
                if (entry.getBlockError() != null) {
                    // Fallback to OPEN due to detecting request is blocked
                    currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                    notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                }
            }
        });
        return true;
    }
    return false;
}

exit方法:

@Override
public void onRequestComplete(Context context) {
    Entry entry = context.getCurEntry();
    if (entry == null) {
        return;
    }
    Throwable error = entry.getError();
    //获取当前窗口,统计错误数和总数
    SimpleErrorCounter counter = stat.currentWindow().value();
    if (error != null) {
        counter.getErrorCount().add(1);
    }
    counter.getTotalCount().add(1);

    handleStateChangeWhenThresholdExceeded(error);
}

private void handleStateChangeWhenThresholdExceeded(Throwable error) {
    if (currentState.get() == State.OPEN) {
        return;
    }

    // 如果HALF_OPEN,则就看当前请求是否有异常,如果没有异常,关闭sentinel状态,否则再打开
    if (currentState.get() == State.HALF_OPEN) {
        if (error == null) {
            fromHalfOpenToClose();
        } else {
            fromHalfOpenToOpen(1.0d);
        }
        return;
    }

    //遍历窗口,统计数据异常数和总请求数
    List<SimpleErrorCounter> counters = stat.values();
    long errCount = 0;
    long totalCount = 0;
    for (SimpleErrorCounter counter : counters) {
        errCount += counter.errorCount.sum();
        totalCount += counter.totalCount.sum();
    }
    //如果小于最小请求数,则返回
    if (totalCount < minRequestAmount) {
        return;
    }
    double curCount = errCount;

    if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
        ////计算异常比例
        curCount = errCount * 1.0d / totalCount;
    }
    //大于阈值,则打开sentinel,threshold即配置的count(异常多少次熔断)
    if (curCount > threshold) {
        transformToOpen(curCount);
    }
}

protected boolean fromHalfOpenToOpen(double snapshotValue) {
    if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
        updateNextRetryTimestamp();
        notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
        return true;
    }
    return false;
}

//状态变更为打开,同时更新下次重试时间
protected boolean fromCloseToOpen(double snapshotValue) {
    State prev = State.CLOSED;
    if (currentState.compareAndSet(prev, State.OPEN)) {
        updateNextRetryTimestamp();

        notifyObservers(prev, State.OPEN, snapshotValue);
        return true;
    }
    return false;
}

滑动窗口实现

image

Sentinel 统计数据一共有三个窗口,分别是秒级时间窗口、分钟级时间窗口、连接数时间窗口,通过ArrayMetric、LeapArray实现滑动窗口统计。

以秒级时间窗口为例:

//1.初始化秒级时间窗口,类型为ArrayMetric
//SAMPLE_COUNT=2表示窗口数量、INTERVAL=1000ms表示区间长度,所以每个窗口长度500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

//2.ArrayMetric有个属性是data,data类型是LeapArray<MetricBucket>,即滑动窗口
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        //OccupiableBucketLeapArray是LeapArray的子类,
        //new OccupiableBucketLeapArra()相当于执行 new LeapArray()
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
}

//3.滑动窗口类
public abstract class LeapArray<T> {

    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;

    protected final AtomicReferenceArray<WindowWrap<T>> array;

    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        //array是一个数组,类型为AtomicReferenceArray,
        //array 元素为WindowWrap<T>,WindowWrap是一个窗口的包装类
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
}

public class WindowWrap<T> {

    /**
     * 窗口长度.
     */
    private final long windowLengthInMs;

    /**
     * 窗口开始时间
     */
    private long windowStart;

    /**
     * 统计数据,类型是MetricBucket或SimpleErrorCounter
     */
    private T value; 
}

//秒级、分钟级滑动窗口value
public class MetricBucket {

    private final LongAdder[] counters;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
    }
}

//连接数滑动窗口value
static class SimpleErrorCounter {
    private LongAdder errorCount;
    private LongAdder totalCount;
}

统计类型枚举,包含窗口通过量、阻塞量、异常量等

public enum MetricEvent {

    //记录通过sentinel校验的数量
    PASS,
    BLOCK,
    EXCEPTION,
    //记录通过并且成功的数量
    SUCCESS,
    //记录耗时
    RT
}

秒级时间窗口如下

image

连接数时间窗口

image

获取当前窗口索引

private int calculateTimeIdx(long timeMillis) {
    // 当前时间 / 每个窗口的时间长度
    long timeId = timeMillis / windowLengthInMs;
    // timeId % 窗口长度
    return (int)(timeId % array.length());
}

获取当前窗口开始时间

protected long calculateWindowStart(long timeMillis) {
    // 当前时间 - 当前时间 % 每个窗口的时间长度
    return timeMillis - timeMillis % windowLengthInMs;
}

获取当前窗口

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
        // 获取当前窗口索引
    int idx = calculateTimeIdx(timeMillis);
        // 获取当前窗口开始时间
    long windowStart = calculateWindowStart(timeMillis);

    /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            // 第一次进入,新建窗口,并使用cas的方式设置,如果出现争抢导致设置失败,暂时让出执行权待其它线程成功设置
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            // 当前时间对应的窗口开始时间等于获取到的窗口开始时间,那么当前获取到的窗口就是我们需要的
            return old;
        } else if (windowStart > old.windowStart()) {
            // 当前时间对应的窗口开始时间大于获取到的窗口开始时间,那么当前获取到的窗口为已过期窗口,加锁重置
            if (updateLock.tryLock()) {
                try {
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

  • 如果对应下标窗口为null,那么就是第一次进入,创建新窗口并使用cas设置。如果非空走下面的逻辑。

  • 如果获取到的窗口开始时间等于当前时间计算出来的对应窗口开始时间,那么就拿到了当前时间需要的窗口,直接返回。

  • 如果获取到的窗口开始时间小于当前时间计算出来的对应窗口开始时间,那么就说明这个窗口已经过期了,所以加锁重置,然后重复使用。

  • 当前时间小于旧的窗口的开始时间,理论上来说是不应该出现这种情况的,如果存在这种情况,那么返回一个无效的空窗口。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容