store模块阅读18:CommitLog(2): FlushCommitLogService的三个刷盘策略

说明

这里讲解CommitLog内部类FlushCommitLogService以及其实现类CommitRealTimeService,FlushRealTimeService,GroupCommitService,用于处理刷盘行为
三者针对不同的场景,单独工作或者配合工作

相关上文为:
调用方在putMessage函数中
1.调用了result = mappedFile.appendMessage(msg, this.appendMessageCallback);更新了mappedFile对应的wrotePosition
2.后续再调用handleDiskFlush的,准备进行刷盘的

下面说明三者的区别
三者是与mappedFile的wrotePosition,commitPosition,FlushPosition相关的
参照refer更方便理解三种模式与mappedFile三个属性的关系

-- 同步 异步,关闭TransientStorePool 异步,开启TransientStorePool
类名 GroupCommitService FlushRealTimeService FlushRealTimeService+ CommitRealTimeService
底层mappedFile调用 flush flush CommitRealTimeService先调用mappedFile.commit, FlushRealTimeService再调用 mappedFile.flush

画个图更好理解


三种刷盘方式与mappedFile的关系

UML图如下

image.png

ServiceThread之后讲

处理刷盘
详见 思考 部分的理解

FlushCommitLogService

抽象父类

    abstract class FlushCommitLogService extends ServiceThread {
        protected static final int RETRY_TIMES_OVER = 10;
    }

同步刷盘

涉及GroupCommitRequest 以及 GroupCommitService

GroupCommitRequest

这个是同步刷盘请求 ,源码如下

    public static class GroupCommitRequest {
        private final long nextOffset;
        private final CountDownLatch countDownLatch = new CountDownLatch(1);//用于控制等待
        private volatile boolean flushOK = false;//是否刷盘成功

        public GroupCommitRequest(long nextOffset) {
            this.nextOffset = nextOffset;
        }

        public long getNextOffset() {
            return nextOffset;
        }

        /**
         * 唤醒customer,设置flushOK值
         */
        public void wakeupCustomer(final boolean flushOK) {
            this.flushOK = flushOK;
            this.countDownLatch.countDown();
        }

        /**
         * 等待刷盘timeout的时间,由异步线程调用 wakeupCustomer 唤醒
         */
        public boolean waitForFlush(long timeout) {
            try {
                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                return this.flushOK;//flushOK 是volatile的
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
                return false;
            }
        }
    }

解释如下:

1.nextOffSet的定义是
        /**
         * 下一个请求的offset, 可以结合mappedFileQueue.getFlushedWhere判断,验证是否flushOk
         * 赋值往往是构造函数中,nextOffset = result.getWroteOffset() + result.getWroteBytes()
         * 代表当前偏移 + 当前size
         */

2.wakeupCustomer是用于唤醒waitForFlush的,两者结合使用
A线程调用waitForFlush,等待一定时间,到真正的countDown
B线程调用wakeupCustomer,进行唤醒

GroupCommitService

属性

        //分别为读写请求
        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

每次生成同步请求时,加入队列requestsWrite
每次准备消费时,把requestsWrite和requestsRead 交换(其实requestsRead 当时size为0)

方法

putRequest:同步刷盘且需要落盘应答是,放置同步请求

        public synchronized void putRequest(final GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            //ServiceThread方法,唤醒waitForRunning
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

run 线程方法,循环调用waitForRunning,doCommit

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);//最多等待10ms,期间调用onWaitEnd将读写队列交换,会被putRequest唤醒
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

waitForRunning在父类方法中会调用onWaitEnd

        @Override
        protected void onWaitEnd() {//线程waitForRunning执行的函数
            this.swapRequests();//读写请求交换
        }

swapRequests如下

        /**
         * requestsWrite 和 requestsRead 两个队列互换
         * requestsRead 其实每次调用doCommit之后都会清空
         * 实际上就是Write赋值给Read
         */
        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

doCommit如下,完成刷盘以及刷盘的检查

        /**
         * 叫doFlush更好,更新mappedFile的flushPosition
         */
        private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {//需要读的队列不为空
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        //重试,有可能 之前的请求刷的时候,顺便把这个请求也刷了,那么flushOk直接为true
                        //如果之前没有刷到,那么自己再刷
                        //  如果是同一个mappedFile,那么执行一次flush就成功
                        //  如果跨越了两个mappedFile,那么执行两次flush才成功
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//是否已经刷到了下一个需要刷的位置

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);//参数为0代表尝试flush,更新storeTimeStamp
                            }
                        }

                        req.wakeupCustomer(flushOK);//设置req的flushOk值,唤醒等待请求
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();//获取在commitLog的存储时间
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);//更新storeCheckPoint的时间
                    }

                    this.requestsRead.clear();//每次都清空 读队列
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    // 不需要落盘应答的
                    CommitLog.this.mappedFileQueue.flush(0);//参数为0代表尝试flush,更新storeTimeStamp
                }
            }
        }

异步刷盘

FlushRealTimeService : 未开启内存字节缓冲区

比较好理解

    class FlushRealTimeService extends FlushCommitLogService {
        private long lastFlushTimestamp = 0;
        private long printTimes = 0;//时间每过一个 FlushCommitLogThoroughInterval,则+1

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();//异步flush是否是固定周期,默认false

                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//异步刷盘的间隔,默认500ms
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//异步刷盘至少要几页,默认4

                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();//彻底flush时间周期,默认10s

                boolean printFlushProgress = false;//是否输出flush进度

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {//如果距离上一次flush过了 彻底周期
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;//此时至少刷0页就行
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    if (flushCommitLogTimed) {//固定周期flush
                        Thread.sleep(interval);
                    } else {//ServiceThread方法,可以被唤醒
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // Normal shutdown, to ensure that all the flush before exit
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {//shutdown之后,再flush多次
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();//输出flush进度

            CommitLog.log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return FlushRealTimeService.class.getSimpleName();
        }

        private void printFlushProgress() {//输出flush进度,目前没干事情
            // CommitLog.log.info("how much disk fall behind memory, "
            // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
        }

        @Override
        public long getJointime() {
            return 1000 * 60 * 5;
        }
    }

注意几点:

1.获取异步刷盘的间隔interval(默认500ms),看异步flush是否是固定周期,是的话每次sleep这么久,否则是waitForRunning这么久,中间可以被唤醒
2.获取flushPhysicQueueLeastPages代表默认刷盘要刷几页,正常interval周期需要有这么多页才去刷
3.获取flushPhysicQueueThoroughInterval代表强制刷盘周期,默认10s,距离上次刷盘间隔这么久时间之后,无论页数是否满足,都去刷盘

CommitRealTimeService:开启内存字节缓冲区

看懂了上面的,这个也比较简单

    class CommitRealTimeService extends FlushCommitLogService {

        private long lastCommitTimestamp = 0;//最近一次commit时间

        @Override
        public String getServiceName() {
            return CommitRealTimeService.class.getSimpleName();
        }

        @Override
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//提交间隔,默认200

                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//提交页,默认4

                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//默认200

                long begin = System.currentTimeMillis();
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {//距离上一次commit过了强制commit间隔,就不管页数是否达到要求
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }

                try {
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {//此时代表commit成功
                        this.lastCommitTimestamp = end; // result = false means some data committed.
                        //now wake up flush thread.
                        flushCommitLogService.wakeup();//唤醒flush
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    this.waitForRunning(interval);//等待wakeup 唤醒
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }

注意点如下:

1.interval,commitDataLeastPages,commitDataThoroughInterval意义和上面类似,就是commit间隔默认200ms,至少commit页数默认4,彻底commit时间为200ms(此时不管页数是否到满足)
2.如果commit成功,唤醒flushCommitLogService,完成将mappedFile中的writeBuffer刷到磁盘中去
3.注意一点,异步刷盘,开启了缓存池,也是CommitRealTimeService 以及 FlushRealTimeService配合完成的

思考

CommitLog中提交执行的顺序

putMessage -> handleDiskFlush -> 三种情况(同步,异步开启TransientStorePool,异步关闭TransientStorePool

GroupCommitService 代码执行是同步的吗

不是的,主线程执行waitForFlush,异步线程执行wakeupCustomer
但是官方文档也是叫"同步刷盘",也许是时间设置了,GroupCommitService#run最多等待10ms就执行一次,时间接近于"同步"

同步刷盘,是否需要落盘应答对应的不同逻辑

调用方:参考CommitLog#handleDiskFlush


区别

被调用方:详见GroupCommitService#doCommit中
需要落盘应答的,会处理requestsWrite和requestsRead,依次检查各个GroupCommitRequest是否flushOk,通知waitForFlush
不需要落盘应答的,requestsWrite和requestsRead size都为0,只会进入else条件

不需要落盘应答的进入else逻辑

CommitLog.this.mappedFileQueue.flush(0);是干啥

就是尝试flush,更新storeTimeStamp

同步刷盘需要落盘应答时,为什么会for循环两次

for循环两次

//重试,有可能 之前的请求刷的时候,顺便把这个请求也刷了,那么flushOk直接为true
//如果之前没有刷到,那么自己再刷
// 如果是同一个mappedFile,那么执行一次flush就成功
// 如果跨越了两个mappedFile,那么执行两次flush才成功

同步刷盘不需要落盘 和 异步刷盘没有开启缓存池 的区别

几乎就是时间上的区别

同步刷盘不需要落盘:GroupCommitService#run,每10ms(不可配置,固定)检查一次是否需要刷
异步刷盘没有开启缓存池:FlushRealTimeService#run, 每500ms(可配置)检查一次是否需要刷,每10s(可配置)强制刷

三种刷盘方式总结

见最上面 说明 中的表格以及图片

问题

三种刷盘方式的比较

暂时不清楚哪种效率最高,各自的优缺点

吐槽

同步刷盘且等待落盘应答的时间大小配置

GroupCommitService#run中调用的

  this.waitForRunning(10);

也就是10ms转换一次读写队列

CommitLog#handleDiskFlush中允许等待的时间是

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());//同步刷盘且等待落盘应答时,默认等待5s

默认5s
也就是以后配置的话,这个参数其实是有大小限制的,至少要有(10ms + doCommit的代码执行时间)

否则<10ms的话会出现数据不匹配的情况,同步wait返回flushOk为false但是异步GroupCommitService最终flush了

但是似乎并没有这个配置检查。

GroupCommitService的doCommit函数名

这个里面执行的都是flush,但是函数名叫doCommit,容易让人误解

CommitRealTimeService的interval与commitDataThoroughInterval

现在默认都是200ms,后者应该比前者大才对,还是需要自己配置

refer

//www.greatytc.com/p/2b9135fb6b5d 自己之前对flush以及commit的整理
https://github.com/YunaiV/Blog/blob/master/RocketMQ/1004-RocketMQ%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%9AMessage%E5%AD%98%E5%82%A8.md
http://blog.csdn.net/prestigeding/article/details/76652063
http://blog.csdn.net/meilong_whpu/article/details/76919267

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容