为什么要限流
众所周知,互联网电商的各类活动是越来越多,例如削减男同胞钱包厚度的双十一、618、双十二、各类秒杀活动等,几乎所有的互联网电商企业都会参与其中,冲击GMV,会电商平台带来巨大的流量与可观的利润。
作为互联网电商中的一员,我自己所属的公司虽然远比不上淘宝、京东等,但作为社交电商领域的领头羊,我们在上述对于电商企业及其特殊的日子,流量也是不容小觑的。
嗯,毕竟我们的注册用户数已经超过了6000W了,
供应商已经超过1.5W家了,去年双十一单天的GMV也突破了2亿RMB,
嘿嘿,强行给自己公司打Call
好了,让我们进入这期的主题。例如在双十一、或者周年庆等这种特殊的日子,当12点刚到那一刻,巨大的用户流量涌入你们的系统,访问量突然剧增时,我们是如何保证系统的可用性、稳定性。我们的解决方案主要是通过Sentinel的限流、降级、熔断(增加服务器数量就不说了)以及消息中间件的削峰(我会专门写一期关于消息中间件的文章,到时候大家可以看看)。没错,本期的主角出现了,他就是Sentinel,阿里开源的面向分布式服务框架的轻量级流量控制框架。官网如下:https://github.com/alibaba/Sentinel
为什么选择Sentinel
以下是另一个开源的流量控制框架hystrix与Sentinel的对比
由上图显而易见,Sentinel相比于hystrix有更加强大的功能,它支持hystrix不具有的系统负载保护与限流以及强大的监控API等。更加适配的Dubbo(我们公司用的RCP就是它)的也是Sentinel;更加重要的一点就是我们技术部的管理层都是阿里出身,技术选型也就都是阿里那一套,毕竟国际大厂,品质还是有保证的,哈哈。
Sentinel名词解释
资源
分布式系统中,限流的资源可以是一个http接口,也可使是某个分布式应用中的API;一般我们针对C端的http接口进行限流,针对API进行熔断降级。
限流
限制请求的数量,限制某段时间内的请求总量对于超出的总量的请求,可以直接拒绝,也可以在请求的时候对请求分组,允许特殊请求进来,剩下的拒绝,也可以放入消息队列,削峰填谷。
限流的实现方式:
- 计数器(滑动窗口):维护一个counter,每个时间段清零,对时间段内的请求进行计数,计数前判断counter是否达到阙值,如果没有就加一,达到则拒绝
- 漏斗算法:一个固定容量的桶,当桶为空的时候,不会漏出水滴,流入桶的水的速率是任意的,漏出水的速率是固定的,如果流入桶的水超出桶的容量,进行拒绝
一般的实现方法是队列,队列模拟漏斗,空的时候不再出队,满的时候拒绝 - 令牌桶算法:和漏斗算法很类似,不过除了一个队列以外,还加入了一个中间人,它会以一定的速率发放令牌(token)到桶内,队列中的的等待着只有拿到token才能通过漏斗限制了传输速率,而令牌桶在限制的同时,还允许突然的大流量,即:在大流量到来的时候,有足够空间的情况下(足够的队列和桶内有足够的令牌),就允许进入
降级
服务降级是从整个系统的负荷情况出发和考虑的,对某些负荷会比较高的情况,为了预防某些功能(业务场景)出现负荷过载或者响应慢的情况,在其内部暂时舍弃对一些非核心的接口和数据的请求,而直接返回一个提前准备好的fallback(退路)错误处理信息。这样,虽然提供的是一个有损的服务,但却保证了整个系统的稳定性和可用性。例如:当双11活动时,把无关交易的服务统统降级,如查看历史订单、工单等等。
熔断
在微服务架构中,微服务是完成一个单一的业务功能,这样做的好处是可以做到解耦,每个微服务可以独立演进。但是,一个应用可能会有多个微服务组成,微服务之间的数据交互通过远程过程调用完成。这就带来一个问题,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务。如果调用链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的“雪崩效应”。
熔断机制是应对雪崩效应的一种微服务链路保护机制。服务熔断的作用类似于我们家用的保险丝,当某服务出现不可用或响应超时的情况时,为了防止整个系统出现雪崩,暂时停止对该服务的调用。熔段解决如下几个问题:
- 当所依赖的对象不稳定时,能够起到快速失败的目的;
- 快速失败后,能够根据一定的算法动态试探所依赖对象是否恢复
Sentinel源码解析
本源码解析以限流为例,降级具体实现可自行参考源码Sentinel采用滑动窗口算法来实现限流的。限流的直接表现是在执行 Entry nodeA = SphU.entry(资源名字) 的时候抛出 FlowException 异常。FlowException 是BlockException 的子类,您可以捕捉 BlockException 来自定义被限流之后的处理逻辑。
public static void main(String[] args) throws Exception {
//初始化一个规则
initFlowRule();
// 触发内部初始化
Entry entry = null;
try {
entry = SphU.entry(KEY);
} catch (BlockException e){
//如果被限流了,那么会抛出这个异常
e.printStackTrace();
} finally {
if (entry != null) {
entry.exit();
}
}
}
由上可知,会先初始化一个限流规则,initFlowRule方法中将创建一个限流规则FlowRule对象,主要限流参数如下
public class FlowRule extends AbstractRule {
/**
* 限流阈值类型,QPS 或线程数,默认QPS
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 限流阈值
*/
private double count;
/**
* 根据调用关系选择策略
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* 资源名,即限流规则的作用对象
* 1-预热/冷启动
* 2-速率限制
* 3-预热/冷启动+速率限制
*/
private String refResource;
/**
* 限流控制行为,默认0-直接拒绝
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
}
并设置其相应的限流规则属性,最后通过FlowRuleManager.loadRules(rules)加载限流规则。
public class FlowRuleManager {
/**
* Load {@link FlowRule}s, former rules will be replaced.
* @param rules new rules to load.
*/
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
}
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
@Override
public boolean updateValue(T newValue) {
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
//监听修改限流规则
listener.configUpdate(newValue);
}
return true;
}
}
限流规则初始化之后,通过entry= SphU.entry(resource)触发内部初始化。
从 SphU.entry() 方法往下执行会进入到 Sph.entry() ,Sph的默认实现类是 CtSph,而最终会进入CtSph 的entry 方法
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
//封装一个资源对象
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
通过我们给定的资源去封装了一个 StringResourceWrapper ,然后传入自己的重载方法,继而调用 entryWithPriority(resourceWrapper, count, false, args):
public class CtSph implements Sph {
//......
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// 使用默认上下文
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 全局开关关闭,没有规则检查,返回CtEntry对象
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 获取该资源对应的 chain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 获取的chain对象为空,返回CtEntry对象
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//执行chain的 entry方法
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}
由上述方法可知,主要是为了获取该资源对应的资源处理链,让我们来看下slotChain是如何获取的
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//这里与dubbo(双重检查锁)中如出一辙,采用本地Map缓存机制
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// SlotChain最大阈值:6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 构造SlotChain对象
chain = SlotChainProvider.newSlotChain();
// 资源 --> 处理链
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
// 放入本地map缓存
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
当Map缓存中不存在ProcessorSlotChain实例,则具体通过 SlotChainProvider 去构造处理链
public final class SlotChainProvider {
// SlotChain的构造器
private static volatile SlotChainBuilder slotChainBuilder = null;
/**
* 构造SlotChain对象
*/
public static ProcessorSlotChain newSlotChain() {
// 若slotChainBuilder存在,直接调用构造方法
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 通过SpiLoader加载SlotChainBuilder
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
}
继续让我们来看下slotChainBuilder的build方法中做了些什么
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 获取ProcessorSlot实例集合
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
// 过滤非AbstractLinkedProcessorSlot类型的Slot
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
// 加入链路中
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
我们可以看出上述底层源码是一个标准的责任链设计模式,通过查看ProcessorSlot的具体实现类,我们可以知道该责任链中的具体节点如图所示
执行对应的这些节点,具有有不同的职责,例如:
- NodeSelectorSlot :收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
- ClusterBuilderSlot :用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
- StatisticSlot :用于记录、统计不同纬度的 runtime 指标监控信息;
- SystemSlot :通过系统的状态,例如 load1 等,来控制总的入口流量;
- AuthoritySlot :根据配置的黑白名单和调用来源信息,来做黑白名单控制;
- FlowSlot :用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
- DegradeSlot :通过统计信息以及预设的规则,来做熔断降级;
slot执行链路可参考如下
上面获得的ProcessorSlotChain的实例是DefaultProcessorSlotChain,那么执行chain.entry方法,就会执行DefaultProcessorSlotChain.first的entry方法,而DefaultProcessorSlotChain.first的entry方法是这样的:
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
下图所示是各个slot对应的entry方法的具体实现
我们以StatisticSlot为例,来看看这些具体实现类内部的逻辑是怎样的。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 传播到下一个Slot.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 执行到这里表示通过检查,不被限流
node.increaseThreadNum();
// 请求通过了sentinel的流控等规则,记录当次请求
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// 增加线程统计
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setBlockError(e);
// 增加QPS统计
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
请求通过了sentinel的流控等规则,再通过node.addPassRequest() 将当次请求记录下来
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
addPassRequest方法如下
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
addPass方法如下
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
WindowWrap主要属性如下
public class WindowWrap<T> {
/**
* 时间窗口的长度
*/
private final long windowLengthInMs;
/**
* 时间窗口的开始时间,单位是毫秒
*/
private long windowStart;
/**
* S时间窗口的内容,在 WindowWrap 中是用泛型表示这个值的,但实际上就是 MetricBucket 类
*/
private T value;
}
我们再看看获取当前窗口的方法 data.currentWindow()
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
我们再回到
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
获取到窗口以后通过 wrap.value().addPass(count)增加统计的 QPS。而这里的 wrap.value() 得到的是之前提到的 MetricBucket ,在 Sentinel 中QPS相关数据的统计结果是维护在这个类的 LongAdder[] 中,最终由这个指标来与我们实现设置好的规则进行匹配,查看是否限流,也就是 StatisticSlot的entry 方法中的。在执行StatisticSlot的entry前都要先进入到FlowSlot的entry方法进行限流过滤:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 限流校验
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
让我们进入checkFlow的内部
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
再此处我们拿到了设置的 FlowRule ,循环匹配资源进行限流过滤。这就是Sentinel 能做到限流的原因。
Sentinel配置介绍
我们可以通过Sentinel的客户端查看接入了sentinel的各个系统。可针对系统中的各个资源设置相应的限流规则,如QPS或者线程数;或者设置相应的降级规则,如平均RT,异常比例以及异常数。
[{
"resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
"count": 1000,//RT threshold or exception ratio threshold count
"grade": 0, //Degrade strategy (0: average RT, 1: exception ratio)
"passCount": 0,
"timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
},
{
"resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
"count": 0.5,//RT threshold or exception ratio threshold count
"grade": 1,// Degrade strategy (0: average RT, 1: exception ratio)
"passCount": 0,
"timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
}]