目录
一、计数器固定窗口算法
二、计数器滑动窗口算法
三、漏斗算法
四、令牌桶算法
五、代码
六、总结
概念
#1.为什么有限流算法
(1)系统面临高并发、请求量大的情况下,为了保证系统服务的安全稳定性,则需要对新的流量进行限制,也叫限流
(2)常用的限流算法有计数器固定窗口算法、滑动窗口算法、漏斗算法和令牌桶算法
#2.例子:比如web服务、对外API,这种类型的服务有以下几种可能导致机器被拖垮
(1)web服务
用户增长过快(这是好事)
因为某个热点事件(微博热搜)
竞争对象爬虫,恶意的刷单
无法预知的:不知道什么时候会有10倍甚至20倍的流量打进来,如果真碰上这种情况,扩容是根本来不及的(弹性扩容都是虚谈,一秒钟你给我扩一下试试)
(2)对外API
例子:BCDE --> A
一个服务A的接口可能被BCDE多个服务进行调用,在B服务发生突发流量时,直接把A服务给调用挂了,导致A服务对CDE也无法提供服务。
解决方案:
1、每个调用方采用线程池进行资源隔离
2、使用限流手段对每个调用方进行限流
一、计数器固定窗口算法
image-20211226102840809.png
#1.原理
(1)计数器固定窗口算法是最基础和简单的一种限流算法。
(2)算法核心:对一段固定时间窗口内的请求进行计数
--如果请求数超过了阈值,则舍弃该请求
--如果没有达到设定的阈值,则接受该请求,且计数加1。
--当时间窗口结束时,重置计数器为0。
#2.结果分析
优点:实现简单,容易理解。
缺点:流量曲线可能不够平滑,有“突刺现象”(如下图所示)
#3.该算法存在两个问题
(1)一段时间内(不超过时间窗口)系统服务不可用:比如窗口大小为1s,限流大小为100,然后恰好在某个窗口的第1ms来了100个请求,然后第2ms-999ms的请求就都会被拒绝,这段时间用户会感觉系统服务不可用。
(2)窗口切换时可能会产生两倍于阈值流量的请求(但是符合限流规则):比如窗口大小为1s,限流大小为100,然后恰好在某个窗口的第999ms来了100个请求,窗口前期没有请求,所以这100个请求都会通过。再恰好,下一个窗口的第1ms有来了100个请求,也全部通过了,那也就是在2ms之内通过了200个请求,而我们设定的阈值是100,通过的请求达到了阈值的两倍。
image-20211226103032199.png
image-20211226103238568.png
二、计数器滑动窗口算法
image-20211226103324853.png
#1.原理
(1)计数器滑动窗口算法是计数器固定窗口算法的改进,解决了固定窗口切换时可能会产生两倍于阈值流量请求的缺点。
(2)算法核心:
--小计数器:在固定窗口的基础上,将计时窗口分成了若干个小窗口,然后每个小窗口维护一个独立的计数器。
--平移小窗口(头丢弃,尾添加):当请求的时间大于当前窗口的最大时间时,则将计时窗口向前平移一个小窗口。平移时,将第一个小窗口的数据丢弃,然后将第二个小窗口设置为第一个小窗口,同时在最后面新增一个小窗口,将新的请求放在新增的小窗口中
--计数器总和小于阈值:同时要保证整个窗口中所有小窗口的请求数目之后不能超过设定的阈值。
#2.分析
(1)升级版本:滑动窗口算法就是固定窗口的升级版
(2)分割小窗口,细粒度越高,限流越精准:将计时窗口划分成一个小窗口,滑动窗口算法就退化成了固定窗口算法。而滑动窗口算法其实就是对请求数进行了更细粒度的限流,窗口划分的越多,则限流越精准。
#3.总结
(1)避免了计数器固定窗口算法固定窗口切换时可能会产生两倍于阈值流量请求的问题;
(2)和漏斗算法相比,新来的请求也能够被处理到,避免了漏斗算法的饥饿问题。
三、漏斗算法
image-20211226104029262.png
#1.原理
(1)算法核心:
--漏斗空:请求来了先进到漏斗,漏斗以恒定的速率将请求流出处理,从而起到平滑流量的作用。
--漏斗满:当请求的流量过大时,漏斗达到最大容量时会溢出,此时请求被丢弃。
(2)保护系统,兜底:加了一层漏斗算法限流之后,就能够保证请求以恒定的速率流出。在系统看来,请求永远是以平滑的传输速率过来,从而起到了保护系统的作用。因为我们不知道请求会以多大的速率来,这就给系统的安全性埋下了隐患
#2.分析
(1)可以看到1-5号请求被接受,而6-10号请求被拒绝,说明此时漏斗已经溢出了,符合我们的预期
(2)漏斗算法的特点:即虽然请求流量是瞬时产生的,但是请求以固定速率流出被处理。
#3.产生问题
(1)漏桶的漏出速率是固定的,可以起到整流的作用:即虽然请求的流量可能具有随机性,忽大忽小,但是经过漏斗算法之后,变成了有固定速率的稳定流量,从而对下游的系统起到保护作用。
(2)不能解决流量突发的问题:我们设定的漏斗速率是2个/秒,然后突然来了10个请求
--受限于漏斗的容量5,只有5个请求被接受,另外5个被拒绝。
--你可能会说,漏斗速率是2个/秒,然后瞬间接受了5个请求,这不就解决了流量突发的问题吗?不,这5个请求只是被接受但没马上处理,处理速度仍是设定的2个/秒,所以没有解决流量突发的问题(令牌桶算法能够在一定程度上解决流量突发的问题)
四、令牌桶算法
image-20211226104706688.png
#1.原理
(1)算法概念:令牌桶算法是对漏斗算法的一种改进,除了能够起到限流的作用外,还允许一定程度的流量突发。
(2)算法核心
--存在一个令牌桶,算法中存在一种机制以恒定的速率向令牌桶中放入令牌,令牌桶也有一定的容量,如果满了令牌就无法放进去了。
--当请求来时,会首先到令牌桶中去拿令牌,如果拿到了令牌,则该请求会被处理,并消耗掉拿到的令牌;---如果令牌桶为空,则该请求会被丢弃。
#2.分析
(1)例子:同样取令牌桶算法的容量是5,产生令牌的速度为2个/秒,然后模拟了连续的10个请求,编号从1-10
(2)对于这10个请求,令牌桶和漏斗算法一样接受了5个请求,拒绝了5个请求,但不同的是令牌桶是马上处理了这5个请求,可以认为处理速度是5个/秒,
对于10个请求,令牌桶算法和漏斗算法一样,都是接受了5个请求,拒绝了5个请求。与漏斗算法不同的是,令牌桶算法马上处理了这5个请求,处理速度可以认为是5个/秒,超过了我们设定的2个/秒的速率,即允许一定程度的流量突发。这一点也是和漏斗算法的主要区别
(3)限平均速率,允许一定流量突发:令牌桶算法是对漏桶算法的一种改进,除了能够在限制调用的平均速率的同时还允许一定程度的流量突发。
#3.other
(1)Guava的ratelimit实现思路:他的限流就是基于令牌桶算法,但是比较遗憾的是在单机下的限流
(2)合理设置间隔的时间,减小服务器压力
--间隔如果过短的话,一次性向桶里添加的令牌数量则是桶的最大容量!那么某个时间的瞬间请求过来,服务器的压力是非常大的.
--所以此处增加令牌数可以设置的稍微合理些,哪怕间隔时间再长!
五、代码
5.1 计数器固定窗口算法
package com.fong.CountLimiter;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author fong
* @date 2021/12/18 - 7:19:25
*/
public class CounterLimiter {
private int windowSize; // 窗口大小,毫秒为单位(窗口宽度)
private int limit;// 窗口内限流大小(窗口容量)
private AtomicInteger count;// 当前窗口的原子计数器(窗口计数器)
private CounterLimiter() {
}
public CounterLimiter(int windowSize, int limit) {
this.limit = limit;
this.windowSize = windowSize;
count = new AtomicInteger(0);
// 开启一个线程,达到窗口结束时清空count(每隔一段时间刷新一次窗口的计数器)
// 相当于Redis的过期时间
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
count.set(0);
try {
Thread.sleep(windowSize);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
// 请求到达后先调用本方法,若返回true,则请求通过,否则限流
public boolean tryAcquire() {
int newCount = count.addAndGet(1);
return newCount <= limit;
}
public static void main(String[] args) throws InterruptedException {
// 1.构造滑动窗口, 每秒允许20个请求
CounterLimiter counterLimiter = new CounterLimiter(1000, 20);
int count = 0;
// 2.模拟请求
// 2.1模拟50次请求,看多少能通过
for (int i = 0; i < 50; i++) {
if (counterLimiter.tryAcquire()) {
count++;
}
}
System.out.println("第一拨50次请求中通过:" + count + ",限流:" + (50 - count));
// 2.2过一秒再请求
Thread.sleep(1000);
// 2.3模拟50次请求,看多少能通过
count = 0;
for (int i = 0; i < 50; i++) {
if (counterLimiter.tryAcquire()) {
count++;
}
}
System.out.println("第二拨50次请求中通过:" + count + ",限流:" + (50 - count));
}
}
#分布式
// redis的ttl特性完美的满足了这一需求,将时间窗口设置为key的失效时间,然后将key的值每次请求+1即可.伪代码实现思路:
// 1.判断是否存在该key
if(EXIT(key)){
// 1.1自增后判断是否大于最大值,并返回结果
if(INCR(key) > maxPermit){
return false;
}
return true;
}
// 2.不存在key,则设置key初始值为1,失效时间为3秒
SET(KEY,1);
EXPIRE(KEY,3);
5.2 计数器滑动窗口算法
package com.fong.CountLimiter;
/**
* @author fong
* @date 2021/12/18 - 8:05:30
*/
public class CounterSlideWindowLimiter {
private int windowSize; // 窗口大小,毫秒为单位
private int limit;// 窗口内限流大小
private int splitNum;// 切分小窗口的数目大小
private int[] counters;// 每个小窗口的计数数组
private int index;// 当前小窗口计数器的索引
private long startTime;// 窗口开始时间
private CounterSlideWindowLimiter() {
}
//
public CounterSlideWindowLimiter(int windowSize, int limit, int splitNum) {
this.limit = limit;
this.windowSize = windowSize;
this.splitNum = splitNum;
counters = new int[splitNum];
index = 0;
startTime = System.currentTimeMillis();
}
// 请求到达后先调用本方法,若返回true,则请求通过,否则限流
public synchronized boolean tryAcquire() {
long curTime = System.currentTimeMillis();
// 计算滑动小窗口的数量 = 第二个大窗口往后多出来的时间 / 小窗口时间
long windowsNum = Math.max(curTime - windowSize - startTime, 0) / (windowSize / splitNum);
slideWindow(windowsNum);// 滑动窗口
int count = 0;
for (int i = 0; i < splitNum; i++) {
count += counters[i];
}
if (count >= limit) {
return false;
} else {
counters[index]++;
return true;
}
}
private synchronized void slideWindow(long windowsNum) {
if (windowsNum == 0)
return;
// 新的滑动窗口的计算器置为0
long slideNum = Math.min(windowsNum, splitNum);
for (int i = 0; i < slideNum; i++) {
index = (index + 1) % splitNum;
counters[index] = 0;
}
// 更新滑动窗口时间
startTime = startTime + windowsNum * (windowSize / splitNum);
}
public static void main(String[] args) throws InterruptedException {
// 每秒20个请求
int limit = 20;
CounterSlideWindowLimiter counterSlideWindowLimiter = new CounterSlideWindowLimiter(1000, limit, 10);
int count = 0;
Thread.sleep(3000);
// 计数器滑动窗口算法模拟100组间隔30ms的50次请求
System.out.println("计数器滑动窗口算法测试开始");
System.out.println("开始模拟100组间隔150ms的50次请求");
int failCount = 0;
for (int j = 0; j < 100; j++) {
count = 0;
for (int i = 0; i < 50; i++) {
if (counterSlideWindowLimiter.tryAcquire()) {
count++;
}
}
Thread.sleep(150);
// 模拟50次请求,看多少能通过
for (int i = 0; i < 50; i++) {
if (counterSlideWindowLimiter.tryAcquire()) {
count++;
}
}
if (count > limit) {
System.out.println("时间窗口内放过的请求超过阈值,放过的请求数" + count + ",限流:" + limit);
failCount++;
}
Thread.sleep((int) (Math.random() * 100));
}
System.out.println("计数器滑动窗口算法测试结束,100组间隔150ms的50次请求模拟完成,限流失败组数:" + failCount);
System.out.println("===========================================================================================");
// 计数器固定窗口算法模拟100组间隔30ms的50次请求
System.out.println("计数器固定窗口算法测试开始");
// 模拟100组间隔30ms的50次请求
CounterLimiter counterLimiter = new CounterLimiter(1000, limit);
System.out.println("开始模拟100组间隔150ms的50次请求");
failCount = 0;
for (int j = 0; j < 100; j++) {
count = 0;
for (int i = 0; i < 50; i++) {
if (counterLimiter.tryAcquire()) {
count++;
}
}
Thread.sleep(150);
// 模拟50次请求,看多少能通过
for (int i = 0; i < 50; i++) {
if (counterLimiter.tryAcquire()) {
count++;
}
}
if (count > limit) {
System.out.println("时间窗口内放过的请求超过阈值,放过的请求数" + count + ",限流:" + limit);
failCount++;
}
Thread.sleep((int) (Math.random() * 100));
}
System.out.println("计数器滑动窗口算法测试结束,100组间隔150ms的50次请求模拟完成,限流失败组数:" + failCount);
}
}
5.3 漏斗算法
package com.fong.CountLimiter;
import java.util.Date;
import java.util.LinkedList;
public class LeakyBucketLimiter {
private int capaticy;// 漏斗容量
private int rate;// 漏斗速率
private int left;// 剩余容量
private LinkedList<Request> requestList;
private LeakyBucketLimiter() {}
public LeakyBucketLimiter(int capaticy, int rate) {
this.capaticy = capaticy;
this.rate = rate;
this.left = capaticy;
requestList = new LinkedList<>();
// 开启一个定时线程,以固定的速率将漏斗中的请求流出,进行处理
new Thread(new Runnable() {
@Override
public void run() {
while(true){
if(!requestList.isEmpty()){
Request request = requestList.removeFirst();
handleRequest(request);
}
try {
Thread.sleep(1000 / rate); //睡眠
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
/**
* 处理请求
* @param request
*/
private void handleRequest(Request request){
request.setHandleTime(new Date());
System.out.println(request.getCode() + "号请求被处理,请求发起时间:"
+ request.getLaunchTime() + ",请求处理时间:" + request.getHandleTime() + ",处理耗时:"
+ (request.getHandleTime().getTime() - request.getLaunchTime().getTime()) + "ms");
}
public synchronized boolean tryAcquire(Request request){
if(left <= 0){
return false;
}else{
left--;
requestList.addLast(request);
return true;
}
}
public static void main(String[] args) {
LeakyBucketLimiter leakyBucketLimiter = new LeakyBucketLimiter(5,2);
for(int i = 1;i <= 10;i ++){
Request request = new Request(i,new Date());
if(leakyBucketLimiter.tryAcquire(request)){
System.out.println(i + "号请求被接受");
}else{
System.out.println(i + "号请求被拒绝");
}
}
}
/**
* 请求类,属性包含编号字符串、请求达到时间和请求处理时间
*/
static class Request{
private int code;
private Date launchTime;
private Date handleTime;
private Request() { }
public Request(int code,Date launchTime) {
this.launchTime = launchTime;
this.code = code;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public Date getLaunchTime() {
return launchTime;
}
public void setLaunchTime(Date launchTime) {
this.launchTime = launchTime;
}
public Date getHandleTime() {
return handleTime;
}
public void setHandleTime(Date handleTime) {
this.handleTime = handleTime;
}
}
}
5.4 令牌桶算法
package com.fong.CountLimiter;
import java.util.Date;
public class TokenBucketLimiter {
private int capaticy;// 令牌桶容量
private int rate;// 令牌产生速率
private int tokenAmount;// 令牌数量
public TokenBucketLimiter(int capaticy, int rate) {
this.capaticy = capaticy;
this.rate = rate;
tokenAmount = capaticy;
new Thread(new Runnable() {
@Override
public void run() {
// 以恒定速率放令牌
while (true) {
synchronized (this) {
tokenAmount++;
if (tokenAmount > capaticy) {
tokenAmount = capaticy;
}
}
try {
Thread.sleep(1000 / rate);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
public synchronized boolean tryAcquire(Request request) {
if (tokenAmount > 0) {
tokenAmount--;
handleRequest(request);
return true;
} else {
return false;
}
}
/**
* 处理请求
*
* @param request
*/
private void handleRequest(Request request) {
request.setHandleTime(new Date());
System.out.println(request.getCode() + "号请求被处理,请求发起时间:"
+ request.getLaunchTime() + ",请求处理时间:" + request.getHandleTime() + ",处理耗时:"
+ (request.getHandleTime().getTime() - request.getLaunchTime().getTime()) + "ms");
}
/**
* 请求类,属性只包含一个名字字符串
*/
static class Request {
private int code;
private Date launchTime;
private Date handleTime;
private Request() {
}
public Request(int code, Date launchTime) {
this.launchTime = launchTime;
this.code = code;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public Date getLaunchTime() {
return launchTime;
}
public void setLaunchTime(Date launchTime) {
this.launchTime = launchTime;
}
public Date getHandleTime() {
return handleTime;
}
public void setHandleTime(Date handleTime) {
this.handleTime = handleTime;
}
}
public static void main(String[] args) throws InterruptedException {
TokenBucketLimiter tokenBucketLimiter = new TokenBucketLimiter(5, 2);
for (int i = 1; i <= 10; i++) {
Request request = new Request(i, new Date());
if (tokenBucketLimiter.tryAcquire(request)) {
System.out.println(i + "号请求被接受");
} else {
System.out.println(i + "号请求被拒绝");
}
}
}
}
#pom.xml
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
#java
package com.fong.other;
import com.google.common.util.concurrent.RateLimiter;
public class RateLimiterMain {
public static void main(String[] args) {
// 创建了一个每秒生成10个令牌的限流器,即100ms生成一个,并最多保存10个令牌,多余的会被丢弃
RateLimiter rateLimiter = RateLimiter.create(10);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
rateLimiter.acquire();
System.out.println("pass");
}
}).start();
}
}
}
六、总结
#1算法
计数器固定窗口算法:实现简单,容易理解。和漏斗算法相比,新来的请求也能够被马上处理到。但是流量曲线可能不够平滑,有“突刺现象”,在窗口切换时可能会产生两倍于阈值流量的请求。
计数器滑动窗口算法:计数器固定窗口算法的一种改进,有效解决了窗口切换时可能会产生两倍于阈值流量请求的问题。
漏斗算法:能够对流量起到整流的作用,让随机不稳定的流量以固定的速率流出,但是不能解决流量突发的问题。
令牌桶算法:作为漏斗算法的一种改进,除了能够起到平滑流量的作用,还允许一定程度的流量突发。
#2.没有最好的算法,只有最合适的算法。
--令牌桶算法(自身的系统实际的处理能力强于配置的流量限制):一般用于保护自身的系统,对调用者进行限流,保护自身的系统不被突发的流量打垮。可以允许一定程度的流量突发,使得实际的处理速率高于配置的速率,充分利用系统资源。
--而漏斗算法(保护第三方的系统,支付系统...):比如自身的系统需要调用第三方的接口,为了保护第三方的系统不被自身的调用打垮,便可以通过漏斗算法进行限流,保证自身的流量平稳的打到第三方的接口上
image-20211226124621430.png
ref:
四种限流算法:
https://blog.csdn.net/weixin_41846320/article/details/95941361
https://blog.csdn.net/linhui258/article/details/81155622
https://segmentfault.com/a/1190000017078491
https://zhuanlan.zhihu.com/p/228412634