概述
Sentinel 是一个用于微服务架构的流量控制和熔断保护框架,旨在帮助开发者构建更加稳定和可靠的系统。核心功能包括限流、熔断、降级和系统监控。它不仅支持基于线程数、QPS等多种限流策略,还能实时监控系统状态,帮助开发者快速识别和解决问题。
基础概念
Sentinel 需要对每个资源的访问进行管理和追踪,这就涉及到两个核心概念:Entry 和 Context。
-
Entry:用于保护资源访问的核心抽象。每次对某个受保护资源的访问,都会通过创建一个
Entry
来标识这次请求。Entry
记录了资源的名称、类型、调用方信息等,用于后续的统计和控制。 -
Context:代表了调用链的上下文。通过
Context
,Sentinel 能够跟踪整个调用链中多个资源的访问情况,并为后续的流量控制、限流、熔断等功能提供支持。
在谈及流量控制时,首先需要解决的问题无疑是数据统计。只有在对请求进行充分的数据统计后,才能实施有效的流量控制。为此,Sentinel 设计了一系列类来进行精确的数据统计,为后续的流量管理提供基础支持。
-
StatisticNode
:最为基础的统计节点,用于存储统计数据。 -
DefaultNode
:上下文资源维度数据存储。 -
ClusterNode
:集群维度数据存储。 -
EntranceNode
:上下文维度数据存储,通过遍历上下文下面的所有子节点统计数据。
有了数据统计的类,接下来就要考虑如何采集数据并实施流量控制。这时,Sentinel 中的另一个关键组件——ProcessorSlotChain 就派上用场了。ProcessorSlotChain 通过将不同功能的 Slot 串联在一起,形成一条责任链,负责数据采集和流量控制。每个 Slot 在责任链中承担着不同的任务,例如流量统计、熔断判断和降级处理等。
核心原理
在分析 Sentinel 核心源码之前,先通过一个简单的 demo
展示 Sentinel 的基本使用方法。通过这个例子,我们可以看到如何设置限流规则并保护资源。
实例演示:流量控制
public class SentinelDemo {
public static void main(String[] args) {
// 定义流量控制规则
FlowRule rule = new FlowRule();
rule.setResource("testResource"); // 设置资源名称
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // 按照 QPS 限流
rule.setCount(1); // 设置 QPS 限制为 1
// 加载规则
FlowRuleManager.loadRules(Collections.singletonList(rule));
while (true) {
// 使用 Entry 保护资源
try (Entry entry = SphU.entry("testResource")) {
// 被保护的业务逻辑
System.out.println("访问资源成功!");
} catch (BlockException e) {
// 限流后的处理逻辑
System.out.println("资源被限流!");
}
}
}
}
在上述代码中,资源 testResource
设置了每秒最多允许一次访问。程序会在资源被限流时打印 "资源被限流!",否则打印 "访问资源成功!"。
代码分析:核心流程解读
这个简单的 demo
展现了 Sentinel 的基本工作流程,接下来我们深入分析其中的核心实现逻辑。
1. SphU.entry():创建资源访问入口
SphU.entry()
是 Sentinel 中保护资源的入口方法。每次调用 entry()
方法时,Sentinel 会为该资源创建一个 Entry
,并依赖其背后的责任链机制执行流量控制和熔断判断。
2. ProcessorSlotChain:责任链的核心
ProcessorSlotChain
是 Sentinel 内部用来处理流量控制、统计和熔断的核心组件。每次调用 SphU.entry()
,都将触发一系列 Slot
的执行,如流量统计、限流判断等。
public class CtSph implements Sph {
// 同一资源使用同一个调用链
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
Entry e = new CtEntry(resourceWrapper, chain, context, count, args);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}
为更好地理解 ProcessorSlotChain
的执行流程,通过一张链路图来直观展示各个 Slot 在责任链中的工作顺序:
ProcessorSlotChain
中的 fireEntry()
方法依次触发不同的 Slot
,每个 Slot
完成自己的逻辑后,再调用下一个 Slot
。通常这些 Slot
实现限流、统计、熔断、权限控制等功能。每个 Slot
的实现继承自 AbstractLinkedProcessorSlot
,负责接收请求并执行特定的逻辑。
NodeSelectorSlot
是责任链中的第一个 Slot
,负责根据资源名构建资源调用树,并为每个资源创建 Node
,将其与当前 Context
绑定,通过资源树结构,帮助统计和监控整个调用链路的数据。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
// 同一资源在不同上下文被访问时,会创建多个DefaultNode
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
ClusterBuilderSlot
负责为每个资源构建簇点 (ClusterNode
),用于记录资源的全局统计数据(如 QPS、异常次数等),以及按调用来源(IP 或应用)划分的统计数据。
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 同一资源使用同一个ClusterNode
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
StatisticSlot
是统计请求数据的核心 Slot
,它会为当前资源记录 QPS、响应时间、异常次数等关键信息,为限流和熔断的触发提供数据基础。
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
}
}
SystemSlot
主要负责根据系统的整体状态(如系统负载、CPU 使用率等)进行自适应的限流。它主要防止系统过载,确保在高负载时系统能够平稳运行。
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
AuthoritySlot
负责基于访问权限进行控制。通过 IP 或应用来源的黑白名单进行限流,确保只有授权的请求才能访问某些资源。
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
FlowSlot
是 Sentinel 中实现限流的核心 Slot
,它会根据当前资源的实时数据(如 QPS、并发线程数等)判断是否需要触发限流,并执行相应的限流逻辑。
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
DegradeSlot
负责实现熔断和降级功能。当某个资源的异常比率或响应时间超过预设的阈值时,触发熔断,暂停该资源的访问。
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
总结
Sentinel 的核心架构基于责任链模式,将各个功能模块(如流量控制、统计、熔断、权限管理等)以 Slot
的形式串联起来。每个 Slot
负责执行特定的任务,整个调用链条从资源访问的入口一直贯穿到流量保护、熔断、降级等逻辑。