FlowSlot
则用于根据预设的限流规则,以及前面 slot 统计的状态,来进行限流。
官方文档:如何使用Sentinel
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//流控检查
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// 获取该Resource对应配置的流控规则 一个resource可以指定多个规则
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
List<FlowRule> rules = flowRules.get(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
//规则逐一校验
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
//检查是否通过
boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int count, boolean prioritized) {
return FlowRuleChecker.passCheck(rule, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
在流控检查的时候,判断是否对该Resource有配置规则,如果配置了流控规则,则逐一检查配置规则。在检查规则的时候,又分为集群控流
或者单机控流
,集群控流或者单机控流具体文档参考://www.greatytc.com/p/a52bf4073873
在流控的时候,首先要根据请求的Resource和请求策略(详细见上图),选择流控节点,再根据不同的流控策略,选择不同的Controller去判断是否通过。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//判断是否通过
return rule.getRater().canPass(selectedNode, acquireCount);
}
//根据请求选择节点
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
根据配置
FlowRule
配置的controlBehavior
选择不同的Controller对应关系:
- default(reject directly)
DefaultController
- default(reject directly)
- warm up
WarmUpController
- warm up
- rate limiter
RateLimiterController
- rate limiter
- warm up + rate limiter
WarmUpRateLimiter
- warm up + rate limiter
在默认的Controller中,首先获取当前的线程数,或者QPS数,当当前的线程数或者QPS+申请的数量>配置的总数,则不通过,如果当前线程数或者QPS+申请的数量<=配置的总数,则直接通过。
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前node节点的线程数或者请求的qps总数
int curCount = avgUsedTokens(node);
//当前请求数+申请总数是否>该资源配置的总数
if (curCount + acquireCount > count) {
return false;
}
return true;
}
//获取当前node节点的线程数或者请求的qps总数
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int) node.passQps();
}
private void sleep(int timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
// Ignore.
}
}
}
和其他Slot一样,FlowRule的配置在FlowRuleManager#loadRules (List<FlowRule> rules)
的,最终更新由FlowPropertyListener
来完成。
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
@Override
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
}
}