1.8.5.1 环境准备
- 安装 redis:Soul 需要集群部署,所以需要将令牌集中管理,Redis 无疑是一个很好的选择。
-
在 soul admin 开启 ratelimiter 插件。
图中 url 即reidis 地址
- BootStrap 引入 reteLimiter 依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId>
<version>${project.version}</version>
</dependency>
1.8.5.2 插件初体验
-
配置限流策略,包括 selector 和 rule 如图,图中 capacity 和 rate 都为1,代表初始化令牌数量为 1, 每秒限流 1 个。
-
使用 jmeter 进行测试,设置 10 个线程同时访问该接口,然后观察。得到结果如下,因为我们设置 capacity 为 1,所以第一个请求通过,然后到下一秒一开始,的那个请求通过,接着的请求都被拒绝了。
1.8.5.3 源码解析
按照惯用套路我们先观察一下 RateLimiterPluginConfiguration 做了什么:这里初始化了 RateLimiterPlugin 和 RateLimiterPluginDataHandler ,前者是标准的 soulPlugin ,而后者是插件配置数据处理器,用于监听数据变化,并更新策略。我们来先看一下 RateLimiterPlugin ,我们以前讲过,AbstractSoulPlugin 有个 default 的 excute , 用于匹配 Selector 和 Rule ,比配成功就会调用 plugin 实现的 doExecute 方法。我们看一下 doExcute 方法
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final String handle = rule.getHandle();
final RateLimiterHandle limiterHandle = GsonUtils.getInstance().fromJson(handle, RateLimiterHandle.class);
return redisRateLimiter.isAllowed(rule.getId(), limiterHandle.getReplenishRate(), limiterHandle.getBurstCapacity())
.flatMap(response -> {
if (!response.isAllowed()) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
Object error = SoulResultWrap.error(SoulResultEnum.TOO_MANY_REQUESTS.getCode(), SoulResultEnum.TOO_MANY_REQUESTS.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
});
}
它主要通过 redisRateLimiter 的 isAllowed 方法根据 rule 的 id capacity 和 rate 检测是否可以通过检查。假如检查不通过,则直接返回错误。我们再看一下 isAllowed 方法:
public Mono<RateLimiterResponse> isAllowed(final String id, final double replenishRate, final double burstCapacity) {
// 检查是否初始化
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
// 生成 key
List<String> keys = getKeys(id);
// 生成 lua 脚本参数
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
// 通过 ReactiveRedisTemplate 调用 Redis lua 脚本,
Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(this.script, keys, scriptArgs);
return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> { // 将结果转成 ArrayList
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L; // 假如为第一个结果 1 则代表通过
Long tokensLeft = results.get(1); // token
RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft); // 生成结果
log.info("RateLimiter response:{}", rateLimiterResponse.toString());
return rateLimiterResponse;
}).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage()));
}
我们前面说过 Soul 的限流是通过 redis 通过管理令牌,他是通过 一个 lua 脚本实现的,这里 isAllowed 方法主要是调用 redis lua 脚本,然后看返回结果,然后根据结果构造返回值。我们重点来看一下这个 lua 脚本。它位于 /META-INF/scripts/request_rate_limiter.lua 这个路径。
-- 传入 token 的 key
local tokens_key = KEYS[1]
-- 传入 timestamp 对应的 key
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
-- 传入的第一个参数即速率
local rate = tonumber(ARGV[1])
-- 传入的第二个参数容量
local capacity = tonumber(ARGV[2])
-- 传入的第三个参数秒,时间点,这里是每秒对应个一个key
local now = tonumber(ARGV[3])
-- 所需要的 token 数量,soul 写死为 1
local requested = tonumber(ARGV[4])
-- 计算桶装满需要多少秒
local fill_time = capacity/rate
-- 向下取整 fill_time
local ttl = math.floor(fill_time*2)
-- 获取 token 对应的 key
local last_tokens = tonumber(redis.call("get", tokens_key))
-- 如果为空 则设置为 capacity 的时间
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
-- 获取上一个 token 的时间戳,假如不存在则设置为 0
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
-- 查看当前时间和最新 token 生成时间间隔
local delta = math.max(0, now-last_refreshed)
-- 生成令牌令牌的总个数取当前容量或者时间间隔生成令牌数的总量的最小值
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 判断是否允许通过,假如大于需求的总数则可以通过
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
-- 如果通过则减去当前取出的token数量
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
-- 重新设置新的 tokens_key 和 timestamp 超时时间设置为 下一个周期 = capacity/rate
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }
我们可以根据上面的注解了解具体 lua 脚本是如何判断的,这里最重要的是一个概念周期,一个周期等于总的容量除以速率。
在压测的时候我们可以查看一下 redis 中存储的值以方便理解,如下:
1.8.5.4 总结
soul 中通过 redis 执行 lua 脚本实现了令牌桶协议,这同时也可以应用到我们项目中 不仅是限流的场景,也可以使用在 分布式锁 的场景中。