基本概念
Sentinel是什么?
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性,所以也称为流量防卫兵。
资源
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。在接下来的文档中,我们都会用资源来描述代码块。 只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。
规则
围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。
Sentinel 功能和设计理念
流量控制
什么是流量控制?
服务方接收的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状,如下图所示:
流量控制设计理念
方式:QPS、线程数; 策略:限流当前资源、关联资源、链路资源; 效果:直接限流(快速失败)、预热、排队等
关联资源:
举例来说,
read_db
和write_db
这两个资源分别代表数据库读写,我们可以给read_db
设置限流规则来达到写优先的目的:设置strategy
为RuleConstant.STRATEGY_RELATE
同时设置refResource
为write_db
。这样当写库操作过于频繁时,读数据的请求会被限流。
链路资源:
一棵典型的调用树如下图所示:
root / \ / \ Entrance1 Entrance2 \ / \ / DefaultNode(nodeA)
上图中来自入口
Entrance1
和Entrance2
的请求都调用到了资源NodeA
,Sentinel 允许只根据某个入口的统计信息对资源限流。比如我们可以设置strategy
为RuleConstant.STRATEGY_CHAIN
,同时设置refResource
为Entrance1
来表示只有从入口Entrance1
的调用才会记录到NodeA
的限流统计当中,而不关心经Entrance2
到来的调用。
熔断降级
什么是熔断
除了流量控制以外,降低调用链路中的不稳定资源也是 Sentinel 的使命之一。由于调用关系的复杂性,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。 当调用链路中某个资源出现不稳定,例如,表现为 timeout、异常比例升高、异常数过多的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩。
熔断设计理念
熔断策略:慢调用比例、异常数量、异常比例
熔断效果:熔断降级
熔断状态:CLOSED、OPEN、HALF_OPEN
降级、限流、熔断区别
降级:是在系统某些服务组件不可用的时候、流量暴增、资源耗尽等情况下,暂时屏蔽掉出问题的服务,继续提供降级服务,给用户尽可能的友好提示,返回兜底数据。
限流:是指在使用缓存和降级无效的场景。比如当达到阈值后限制接口调用频率等,在出现服务不可用之前,提前把服务降级。
熔断:当系统出现问题时,开启熔断开关,拒绝流量访问,避免大流量对后端的过载请求。
底层原理解析
熔断实现
1.规则初始化;
2.定义切面;
//1.规则初始化
@PostConstruct
private voivoid initDegradeRules() {
DegradeRule orderDegradeRule = new DegradeRule();
//设置降级规则所对应的资源名称
orderDegradeRule.setResource("order");
//根据发生异常的次数进行降级
orderDegradeRule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT);
//至少要有x次请求才会降级
orderDegradeRule.setMinRequestAmount(2);
//发生异常的次数达到x后开始降级
orderDegradeRule.setCount(3);
//熔断时长,降级的时间窗口,在x秒内,所有的请求都直接降级,单位s
orderDegradeRule.setTimeWindow(5);
//统计时长,表示熔断后x秒内的其他调用都走熔断方法,单位ms
orderDegradeRule.setStatIntervalMs(10*1000);
List<DegradeRule> degradeRuleList=new ArrayList<>();
//定义降级规则,把降级规则添加到降级规则列表
degradeRuleList.add(orderDegradeRule);
//把降级列表添加到降级规则管理器
DegradeRuleManager.loadRules(degradeRuleList);
}
//2.定义切面
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
return pjp.proceed();
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
在SphU.entry("HelloWorld") 方法中,SphU的默认实现类是CtSphU,最终调用CtSphU的entryWithPriority()方法。
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } if (context == null) { // Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
通过ProcessorSlotChain()方法来构建处理链。
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//获取ProcessorSlot的实现类,通过注解@Spi order值进行排序
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息;FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制;DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级;SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;
如何实现熔断限流?熔断的判断在DegradeSlot
中,熔断首先判断状态是否开启,如果开启,则判断是否达到重试时间,然后更新状态为探测恢复,如下代码所示:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
@Override
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
//判断当前时间是否大于熔断结束时间
protected boolean retryTimeoutArrived() {
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
//更新熔断结束时间 = 当前时间 + 熔断时长
protected void updateNextRetryTimestamp() {
this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
}
// 如果熔断已经结束,则更新状态为HALF_OPEN(探测恢复)
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
exit方法:
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
//获取当前窗口,统计错误数和总数
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
// 如果HALF_OPEN,则就看当前请求是否有异常,如果没有异常,关闭sentinel状态,否则再打开
if (currentState.get() == State.HALF_OPEN) {
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
//遍历窗口,统计数据异常数和总请求数
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
//如果小于最小请求数,则返回
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
////计算异常比例
curCount = errCount * 1.0d / totalCount;
}
//大于阈值,则打开sentinel,threshold即配置的count(异常多少次熔断)
if (curCount > threshold) {
transformToOpen(curCount);
}
}
protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
}
//状态变更为打开,同时更新下次重试时间
protected boolean fromCloseToOpen(double snapshotValue) {
State prev = State.CLOSED;
if (currentState.compareAndSet(prev, State.OPEN)) {
updateNextRetryTimestamp();
notifyObservers(prev, State.OPEN, snapshotValue);
return true;
}
return false;
}
滑动窗口实现
Sentinel 统计数据一共有三个窗口,分别是秒级时间窗口、分钟级时间窗口、连接数时间窗口,通过ArrayMetric、LeapArray实现滑动窗口统计。
以秒级时间窗口为例:
//1.初始化秒级时间窗口,类型为ArrayMetric
//SAMPLE_COUNT=2表示窗口数量、INTERVAL=1000ms表示区间长度,所以每个窗口长度500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
//2.ArrayMetric有个属性是data,data类型是LeapArray<MetricBucket>,即滑动窗口
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
//OccupiableBucketLeapArray是LeapArray的子类,
//new OccupiableBucketLeapArra()相当于执行 new LeapArray()
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
}
//3.滑动窗口类
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
//array是一个数组,类型为AtomicReferenceArray,
//array 元素为WindowWrap<T>,WindowWrap是一个窗口的包装类
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
public class WindowWrap<T> {
/**
* 窗口长度.
*/
private final long windowLengthInMs;
/**
* 窗口开始时间
*/
private long windowStart;
/**
* 统计数据,类型是MetricBucket或SimpleErrorCounter
*/
private T value;
}
//秒级、分钟级滑动窗口value
public class MetricBucket {
private final LongAdder[] counters;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
}
}
//连接数滑动窗口value
static class SimpleErrorCounter {
private LongAdder errorCount;
private LongAdder totalCount;
}
统计类型枚举,包含窗口通过量、阻塞量、异常量等
public enum MetricEvent {
//记录通过sentinel校验的数量
PASS,
BLOCK,
EXCEPTION,
//记录通过并且成功的数量
SUCCESS,
//记录耗时
RT
}
秒级时间窗口如下
连接数时间窗口
获取当前窗口索引
private int calculateTimeIdx(long timeMillis) {
// 当前时间 / 每个窗口的时间长度
long timeId = timeMillis / windowLengthInMs;
// timeId % 窗口长度
return (int)(timeId % array.length());
}
获取当前窗口开始时间
protected long calculateWindowStart(long timeMillis) {
// 当前时间 - 当前时间 % 每个窗口的时间长度
return timeMillis - timeMillis % windowLengthInMs;
}
获取当前窗口
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 获取当前窗口索引
int idx = calculateTimeIdx(timeMillis);
// 获取当前窗口开始时间
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 第一次进入,新建窗口,并使用cas的方式设置,如果出现争抢导致设置失败,暂时让出执行权待其它线程成功设置
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 当前时间对应的窗口开始时间等于获取到的窗口开始时间,那么当前获取到的窗口就是我们需要的
return old;
} else if (windowStart > old.windowStart()) {
// 当前时间对应的窗口开始时间大于获取到的窗口开始时间,那么当前获取到的窗口为已过期窗口,加锁重置
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
如果对应下标窗口为null,那么就是第一次进入,创建新窗口并使用cas设置。如果非空走下面的逻辑。
如果获取到的窗口开始时间等于当前时间计算出来的对应窗口开始时间,那么就拿到了当前时间需要的窗口,直接返回。
如果获取到的窗口开始时间小于当前时间计算出来的对应窗口开始时间,那么就说明这个窗口已经过期了,所以加锁重置,然后重复使用。
当前时间小于旧的窗口的开始时间,理论上来说是不应该出现这种情况的,如果存在这种情况,那么返回一个无效的空窗口。