简介
令牌桶作为限流算法之一,具体原理是一段时间内向桶里加入一定的token数,使用时进行扣减,判断扣减数和剩余令牌数之间的关系,这种限流的好处是更灵活,能够处理突发流量,有流量波峰时,令牌桶内提前已经放入了足够多的token数,流量一样可以通过。不过需要注意的是,限流保护的系统需要事先验证能扛住这么高的突发流量。
简单实现
使用变量availableTokens标识桶内令牌数,消费前先按时间加入对应的令牌数,然后对令牌数进行扣减。
public class TokenBucket {
private final long capacity;
private final double refillTokensPerOneMillis;
private double availableTokens;
private long lastRefillTimestamp;
public TokenBucket(long capacity, long refillTokens, long refillPeriodMillis) {
this.capacity = capacity;
this.refillTokensPerOneMillis = (double) refillTokens / (double) refillPeriodMillis;
this.availableTokens = capacity;
this.lastRefillTimestamp = System.currentTimeMillis();
}
synchronized public boolean tryConsume(int numberTokens) {
refill();
if (availableTokens < numberTokens) {
return false;
} else {
availableTokens -= numberTokens;
return true;
}
}
private void refill() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis > lastRefillTimestamp) {
long millisSinceLastRefill = currentTimeMillis - lastRefillTimestamp;
double refill = millisSinceLastRefill * refillTokensPerOneMillis;
this.availableTokens = Math.min(capacity, availableTokens + refill);
this.lastRefillTimestamp = currentTimeMillis;
}
}
public static void main(String[] args) throws InterruptedException {
TokenBucket tokenBucket = new TokenBucket(10, 10, 1000);
while (true) {
if (tokenBucket.tryConsume(2)) {
System.out.println("pass");
} else {
System.out.println("block");
}
Thread.sleep(100);
System.out.println("当前可用token数目为:" + tokenBucket.availableTokens);
}
}
}
sentinel中应用
利用token数进行预热限流,利用了令牌桶的思维,根据桶内的令牌数取判断预热是否完成,令牌数多的情况下说明当前流量较小,需要进行预热,预热时按warningQps进行计算。
public class WarmUpController implements TrafficShapingController {
protected double count;
private int coldFactor;
protected int warningToken = 0;
private int maxToken;
protected double slope;
protected AtomicLong storedTokens = new AtomicLong(0);
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}