流量整形浅析

漏斗算法

它有点像我们生活中用到的漏斗,液体倒进去以后,总是从下端的小口中以固定速率流出,漏斗算法也类似,不管突然流量有多大,漏斗都保证了流量的常速率输出,也可以类比于调用量,比如,不管服务调用多么不稳定,我们只固定进行服务输出,比如每10毫秒接受一次服务调用。既然是一个桶,那就肯定有容量,由于调用的消费速率已经固定,那么当桶的容量堆满了,则只能丢弃了,漏斗算法如下图:

image.png

缺点

漏斗算法其实是悲观的,因为它严格限制了系统的吞吐量,从某种角度上来说,它的效果和并发量限流很类似。漏斗算法也可以用于大多数场景,但由于它对服务吞吐量有着严格固定的限制,如果在某个大的服务网络中只对某些服务进行漏斗算法限流,这些服务可能会成为瓶颈。其实对于可扩展的大型服务网络,上游的服务压力可以经过多重下游服务进行扩散,过多的漏斗限流似乎意义不大。

实现:

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * <p>
 * 对请求做限速流控
 * </p>
 *
 * @author wangguangdong
 * @version 1.0
 * @Date 17/3/3
 */
public class LimitRequestByTime {
    long limit = 1000;

    Map<Long, AtomicLong> map = Collections.synchronizedMap(new LinkedHashMap<>(8));

    public boolean limitReq() {
        // 统计当前秒数
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        map.putIfAbsent(currentTimeMillis, new AtomicLong(0));
        // 获取秒速流控
        AtomicLong currentAtomicLong = map.get(currentTimeMillis);
        return !(currentAtomicLong.incrementAndGet() >= limit);
    }
    public static void main(String[] args) {
        LimitRequestByTime limitRequestByTime = new LimitRequestByTime();
        // 统计所有的请求次数
        Map<Long, AtomicLong> totalMap = new ConcurrentHashMap<>();

        Map<Long, AtomicLong> successTotalMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 100000000; i++) {
            // 统计当前这一秒的请求书
            long currentTimeMillis = System.currentTimeMillis() / 1000;
            totalMap.putIfAbsent(currentTimeMillis, new AtomicLong(0));
            // 自增加1
            totalMap.get(currentTimeMillis).incrementAndGet();

            successTotalMap.putIfAbsent(currentTimeMillis, new AtomicLong(0));
            if (limitRequestByTime.limitReq()) {
                successTotalMap.get(currentTimeMillis).incrementAndGet();
            }
        }

        for (Map.Entry<Long, AtomicLong> total : totalMap.entrySet()) {
            Long totalKey = total.getKey();
            System.out.println(String
                .format("在%d这一秒一共发送了%d次请求,通过的请求数量为%d", totalKey, totalMap.get(totalKey).get(),
                    successTotalMap.get(totalKey).get()));
        }
    }
}

令牌桶算法

令牌桶算法从某种程度上来说是漏桶算法的一种改进,漏桶算法能够强行限制请求调用的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许某种程度的突发调用。在令牌桶算法中,桶中会有一定数量的令牌,每次请求调用需要去桶中拿取一个令牌,拿到令牌后才有资格执行请求调用,否则只能等待能拿到足够的令牌数,大家看到这里,可能就认为是不是可以把令牌比喻成信号量,那和前面说的并发量限流不是没什么区别嘛?其实不然,令牌桶算法的精髓就在于“拿令牌”和“放令牌”的方式,这和单纯的并发量限流有明显区别,采用并发量限流时,当一个调用到来时,会先获取一个信号量,当调用结束时,会释放一个信号量,但令牌桶算法不同,因为每次请求获取的令牌数不是固定的,比如当桶中的令牌数还比较多时,每次调用只需要获取一个令牌,随着桶中的令牌数逐渐减少,当到令牌的使用率(即使用中的令牌数/令牌总数)达某个比例,可能一次请求需要获取两个令牌,当令牌使用率到了一个更高的比例,可能一次请求调用需要获取更多的令牌数。同时,当调用使用完令牌后,有两种令牌生成方法,第一种就是直接往桶中放回使用的令牌数,第二种就是不做任何操作,有另一个额外的令牌生成步骤来将令牌匀速放回桶中。如下图:

image1.png

代码实现


import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.Preconditions;
/**
 * <p>
 *     令牌桶算法
 * </p>
 *
 * @author wangguangdong
 * @version 1.0
 * @Date 17/3/23
 */
public class TokenBucket  {

    // 默认桶大小个数 即最大瞬间流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

    // 一个桶的单位是1字节
    private int everyTokenSize = 1;

    // 瞬间最大流量
    private int maxFlowRate;

    // 平均流量
    private int avgFlowRate;

    // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);

    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    private volatile boolean isStart = false;

    private ReentrantLock lock = new ReentrantLock(true);

    private static final byte A_CHAR = 'a';

    public TokenBucket() {
    }

    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public void addTokens(Integer tokenNum) {

        // 若是桶已经满了,就不再家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(A_CHAR);
        }
    }

    public TokenBucket build() {

        start();
        return this;
    }

    /**
     * 获取足够的令牌个数
     *
     * @return
     */
    public boolean getTokens(byte[] dataSize) {

        Preconditions.checkNotNull(dataSize);
        Preconditions.checkArgument(isStart, "please invoke start method first !");

        int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
            if (!result) {
                return false;
            }

            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }

            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }


    public void start() {

        // 初始化桶队列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
        }

        // 初始化令牌生产者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
        isStart = true;

    }


    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }


    public boolean isStarted() {
        return isStart;
    }

    class TokenProducer implements Runnable {

        private int avgFlowRate;
        private TokenBucket tokenBucket;

        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }

        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }

    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }

    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }

    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }

    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }

    private String stringCopy(String data, int copyNum) {

        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }

        return sbuilder.toString();

    }

    public static void main(String[] args) throws IOException, InterruptedException {

        tokenTest();
    }

    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }

    private static void tokenTest() throws InterruptedException, IOException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();

        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
        String data = "xxxx";// 四个字节
        for (int i = 1; i <= 1000; i++) {

            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                bufferedWriter.write("token pass --- index:" + i1);
                System.out.println("token pass --- index:" + i1);
            } else {
                bufferedWriter.write("token rejuect --- index" + i1);
                System.out.println("token rejuect --- index" + i1);
            }

            bufferedWriter.newLine();
            bufferedWriter.flush();
        }

        bufferedWriter.close();
    }

}

漏斗算法和令牌桶的异同点

漏斗算法会限制平均的qps,对每个时间段的流控都是一样的,如果突然一瞬间的大流量进来,那么有可能会有大量请求被拦截住,
令牌桶算法的话除了能够限制数据的平均传输速率外,还允许一定时间内的大流量涌入,相当于漏斗算法的升级版本。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容