系列
- Sentinel流程介绍
- Sentinel资源节点树构成
- Sentinel滑动窗口介绍
- Sentinel流量控制
- Sentinel的职责链slot介绍
- Sentinel熔断降级
- Sentinel Dashboard和应用通信
- Sentinel 控制台
开篇
- 流量控制(flow control),其原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
- FlowSlot 会根据预设的规则,结合前面 NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot 统计出来的实时信息进行流量控制。
- 流量控制的详细介绍可以参考Sentinel流量控制的介绍。
流控规则介绍
- 流控规则在管理后台的配置参数如上图所示。
- 流控规则核心属性如上图所示。
流控规则配置
private static void initFlowRule(int interfaceFlowLimit, boolean method) {
FlowRule flowRule = new FlowRule(INTERFACE_RES_KEY)
.setCount(interfaceFlowLimit)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
List<FlowRule> list = new ArrayList<>();
if (method) {
FlowRule flowRule1 = new FlowRule(RES_KEY)
.setCount(5)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
list.add(flowRule1);
}
list.add(flowRule);
FlowRuleManager.loadRules(list);
}
- 通过配置流控规则并且通过FlowRuleManager生效限流规则。
流控规则定义
public interface Rule {
String getResource();
}
public abstract class AbstractRule implements Rule {
// 资源名
private String resource;
// 流控对应的调用来源
private String limitApp;
}
public class FlowRule extends AbstractRule {
// 流控类型:0=线程,1=QPS FLOW_GRADE_THREAD = 0 FLOW_GRADE_QPS = 1;
private int grade = RuleConstant.FLOW_GRADE_QPS;
// 流控阈值,配置的是qps类型则代表qps的值;配置的是线程数类型则代表线程数
private double count;
// 流控限流策略
private int strategy = RuleConstant.STRATEGY_DIRECT;
// 关联流控的资源
private String refResource;
// 流控效果控制 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
// 对应流控效果为Warm Up情况下,出现的预热时长
private int warmUpPeriodSec = 10;
// 对应流控效果为排队等待情况下,出现的超时时间
private int maxQueueingTimeMs = 500;
// 对应新增流控规则页面的是否集群
private boolean clusterMode;
// 集群流控的相关配置
private ClusterFlowConfig clusterConfig;
// 流量整形的实现,不同流控效果有不同算法
private TrafficShapingController controller;
}
一条限流规则主要由下面几个因素组成,我们可以组合这些元素来实现不同的限流效果:
- resource:资源名,即限流规则的作用对象
- count: 限流阈值
- grade: 限流阈值类型(QPS 或并发线程数)
- limitApp: 流控针对的调用来源,若为 default 则不区分调用来源
- strategy: 调用关系限流策略
- controlBehavior: 流量控制效果(直接拒绝、Warm Up、匀速排队)
流控流程
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 校验是否限流
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
}
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// 获取匹配的规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 检查规则能否通过
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
String limitApp = rule.getLimitApp();
// 集群模式下的规则检测
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 单机模式下的规则检测
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 选择流量统计的节点进行限流计算
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
// rule.getRater返回TrafficShapingController对象,
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
- 流控的核心逻辑在FlowSlot进行处理,通过FlowRuleChecker进行限流规则生效判断
- FlowRuleChecker的checkFlow会遍历FlowRule进行canPassCheck判断
- canPassCheck单机模式执行passLocalCheck,集群模式执行passClusterCheck
- passLocalCheck根据流控效果策略和获取的统计节点判断是否限流
- selectNodeByRequesterAndStrategy返回ClusterBuilderSlot阶段生成的ClusterNode
- TrafficShapingController在默认模式下返回流控效果策略DefaultController。
- DefaultController负责实现流量控制。
流控效果策略
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 获取当前已使用的token
int curCount = avgUsedTokens(node);
// 当前已使用token + 获取的token 大于token数量的场景
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
}
- 默认流控策略比较已使用token和此次消耗token是否大于token数量判断限流
流控策略
com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController
public final class FlowRuleUtil {
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
// 预热/冷启动
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
// 匀速排队
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
// 冷启动+匀速排队
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 直接拒绝
return new DefaultController(rule.getCount(), rule.getGrade());
}
}
- 流控效果策略根据不同的规则返回不同的流控策略。
- 直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出FlowException。
- Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
- 匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。
流控节点选择
public class FlowRuleChecker {
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
// 实际访问的分支
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
}
- 根据请求和策略来来返回数据统计节点用以流控限制。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 负责创建DefaultNode,职责链以资源维度,slot以职责链维度
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 以资源作为key保存的全局的集群节点
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 将DefaultNode设置进集群节点
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
- NodeSelectorSlot生成的DefaultNode在ClusterBuilderSlot被设置ClusterNode。
- ClusterNode记录访问的统计数据会被用在流控当中。