说明
这里讲解CommitLog内部类FlushCommitLogService以及其实现类CommitRealTimeService,FlushRealTimeService,GroupCommitService,用于处理刷盘行为
三者针对不同的场景,单独工作或者配合工作
相关上文为:
调用方在putMessage函数中
1.调用了result = mappedFile.appendMessage(msg, this.appendMessageCallback);更新了mappedFile对应的wrotePosition
2.后续再调用handleDiskFlush的,准备进行刷盘的
下面说明三者的区别
三者是与mappedFile的wrotePosition,commitPosition,FlushPosition相关的
参照refer更方便理解三种模式与mappedFile三个属性的关系
-- | 同步 | 异步,关闭TransientStorePool | 异步,开启TransientStorePool |
---|---|---|---|
类名 | GroupCommitService | FlushRealTimeService | FlushRealTimeService+ CommitRealTimeService |
底层mappedFile调用 | flush | flush | CommitRealTimeService先调用mappedFile.commit, FlushRealTimeService再调用 mappedFile.flush |
画个图更好理解
UML图如下
ServiceThread之后讲
处理刷盘
详见 思考 部分的理解
FlushCommitLogService
抽象父类
abstract class FlushCommitLogService extends ServiceThread {
protected static final int RETRY_TIMES_OVER = 10;
}
同步刷盘
涉及GroupCommitRequest 以及 GroupCommitService
GroupCommitRequest
这个是同步刷盘请求 ,源码如下
public static class GroupCommitRequest {
private final long nextOffset;
private final CountDownLatch countDownLatch = new CountDownLatch(1);//用于控制等待
private volatile boolean flushOK = false;//是否刷盘成功
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
public long getNextOffset() {
return nextOffset;
}
/**
* 唤醒customer,设置flushOK值
*/
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
/**
* 等待刷盘timeout的时间,由异步线程调用 wakeupCustomer 唤醒
*/
public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK;//flushOK 是volatile的
} catch (InterruptedException e) {
log.error("Interrupted", e);
return false;
}
}
}
解释如下:
1.nextOffSet的定义是
/**
* 下一个请求的offset, 可以结合mappedFileQueue.getFlushedWhere判断,验证是否flushOk
* 赋值往往是构造函数中,nextOffset = result.getWroteOffset() + result.getWroteBytes()
* 代表当前偏移 + 当前size
*/
2.wakeupCustomer是用于唤醒waitForFlush的,两者结合使用
A线程调用waitForFlush,等待一定时间,到真正的countDown
B线程调用wakeupCustomer,进行唤醒
GroupCommitService
属性
//分别为读写请求
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
每次生成同步请求时,加入队列requestsWrite
每次准备消费时,把requestsWrite和requestsRead 交换(其实requestsRead 当时size为0)
方法
putRequest:同步刷盘且需要落盘应答是,放置同步请求
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
//ServiceThread方法,唤醒waitForRunning
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
run 线程方法,循环调用waitForRunning,doCommit
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);//最多等待10ms,期间调用onWaitEnd将读写队列交换,会被putRequest唤醒
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
waitForRunning在父类方法中会调用onWaitEnd
@Override
protected void onWaitEnd() {//线程waitForRunning执行的函数
this.swapRequests();//读写请求交换
}
swapRequests如下
/**
* requestsWrite 和 requestsRead 两个队列互换
* requestsRead 其实每次调用doCommit之后都会清空
* 实际上就是Write赋值给Read
*/
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
doCommit如下,完成刷盘以及刷盘的检查
/**
* 叫doFlush更好,更新mappedFile的flushPosition
*/
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {//需要读的队列不为空
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
//重试,有可能 之前的请求刷的时候,顺便把这个请求也刷了,那么flushOk直接为true
//如果之前没有刷到,那么自己再刷
// 如果是同一个mappedFile,那么执行一次flush就成功
// 如果跨越了两个mappedFile,那么执行两次flush才成功
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//是否已经刷到了下一个需要刷的位置
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);//参数为0代表尝试flush,更新storeTimeStamp
}
}
req.wakeupCustomer(flushOK);//设置req的flushOk值,唤醒等待请求
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();//获取在commitLog的存储时间
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);//更新storeCheckPoint的时间
}
this.requestsRead.clear();//每次都清空 读队列
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
// 不需要落盘应答的
CommitLog.this.mappedFileQueue.flush(0);//参数为0代表尝试flush,更新storeTimeStamp
}
}
}
异步刷盘
FlushRealTimeService : 未开启内存字节缓冲区
比较好理解
class FlushRealTimeService extends FlushCommitLogService {
private long lastFlushTimestamp = 0;
private long printTimes = 0;//时间每过一个 FlushCommitLogThoroughInterval,则+1
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();//异步flush是否是固定周期,默认false
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//异步刷盘的间隔,默认500ms
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//异步刷盘至少要几页,默认4
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();//彻底flush时间周期,默认10s
boolean printFlushProgress = false;//是否输出flush进度
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {//如果距离上一次flush过了 彻底周期
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;//此时至少刷0页就行
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
if (flushCommitLogTimed) {//固定周期flush
Thread.sleep(interval);
} else {//ServiceThread方法,可以被唤醒
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {//shutdown之后,再flush多次
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();//输出flush进度
CommitLog.log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return FlushRealTimeService.class.getSimpleName();
}
private void printFlushProgress() {//输出flush进度,目前没干事情
// CommitLog.log.info("how much disk fall behind memory, "
// + CommitLog.this.mappedFileQueue.howMuchFallBehind());
}
@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}
注意几点:
1.获取异步刷盘的间隔interval(默认500ms),看异步flush是否是固定周期,是的话每次sleep这么久,否则是waitForRunning这么久,中间可以被唤醒
2.获取flushPhysicQueueLeastPages代表默认刷盘要刷几页,正常interval周期需要有这么多页才去刷
3.获取flushPhysicQueueThoroughInterval代表强制刷盘周期,默认10s,距离上次刷盘间隔这么久时间之后,无论页数是否满足,都去刷盘
CommitRealTimeService:开启内存字节缓冲区
看懂了上面的,这个也比较简单
class CommitRealTimeService extends FlushCommitLogService {
private long lastCommitTimestamp = 0;//最近一次commit时间
@Override
public String getServiceName() {
return CommitRealTimeService.class.getSimpleName();
}
@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//提交间隔,默认200
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//提交页,默认4
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//默认200
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {//距离上一次commit过了强制commit间隔,就不管页数是否达到要求
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {//此时代表commit成功
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();//唤醒flush
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);//等待wakeup 唤醒
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
注意点如下:
1.interval,commitDataLeastPages,commitDataThoroughInterval意义和上面类似,就是commit间隔默认200ms,至少commit页数默认4,彻底commit时间为200ms(此时不管页数是否到满足)
2.如果commit成功,唤醒flushCommitLogService,完成将mappedFile中的writeBuffer刷到磁盘中去
3.注意一点,异步刷盘,开启了缓存池,也是CommitRealTimeService 以及 FlushRealTimeService配合完成的
思考
CommitLog中提交执行的顺序
putMessage -> handleDiskFlush -> 三种情况(同步,异步开启TransientStorePool,异步关闭TransientStorePool
GroupCommitService 代码执行是同步的吗
不是的,主线程执行waitForFlush,异步线程执行wakeupCustomer
但是官方文档也是叫"同步刷盘",也许是时间设置了,GroupCommitService#run最多等待10ms就执行一次,时间接近于"同步"
同步刷盘,是否需要落盘应答对应的不同逻辑
调用方:参考CommitLog#handleDiskFlush
被调用方:详见GroupCommitService#doCommit中
需要落盘应答的,会处理requestsWrite和requestsRead,依次检查各个GroupCommitRequest是否flushOk,通知waitForFlush
不需要落盘应答的,requestsWrite和requestsRead size都为0,只会进入else条件
CommitLog.this.mappedFileQueue.flush(0);是干啥
就是尝试flush,更新storeTimeStamp
同步刷盘需要落盘应答时,为什么会for循环两次
//重试,有可能 之前的请求刷的时候,顺便把这个请求也刷了,那么flushOk直接为true
//如果之前没有刷到,那么自己再刷
// 如果是同一个mappedFile,那么执行一次flush就成功
// 如果跨越了两个mappedFile,那么执行两次flush才成功
同步刷盘不需要落盘 和 异步刷盘没有开启缓存池 的区别
几乎就是时间上的区别
同步刷盘不需要落盘:GroupCommitService#run,每10ms(不可配置,固定)检查一次是否需要刷
异步刷盘没有开启缓存池:FlushRealTimeService#run, 每500ms(可配置)检查一次是否需要刷,每10s(可配置)强制刷
三种刷盘方式总结
见最上面 说明 中的表格以及图片
问题
三种刷盘方式的比较
暂时不清楚哪种效率最高,各自的优缺点
吐槽
同步刷盘且等待落盘应答的时间大小配置
GroupCommitService#run中调用的
this.waitForRunning(10);
也就是10ms转换一次读写队列
CommitLog#handleDiskFlush中允许等待的时间是
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());//同步刷盘且等待落盘应答时,默认等待5s
默认5s
也就是以后配置的话,这个参数其实是有大小限制的,至少要有(10ms + doCommit的代码执行时间)
否则<10ms的话会出现数据不匹配的情况,同步wait返回flushOk为false但是异步GroupCommitService最终flush了
但是似乎并没有这个配置检查。
GroupCommitService的doCommit函数名
这个里面执行的都是flush,但是函数名叫doCommit,容易让人误解
CommitRealTimeService的interval与commitDataThoroughInterval
现在默认都是200ms,后者应该比前者大才对,还是需要自己配置
refer
//www.greatytc.com/p/2b9135fb6b5d 自己之前对flush以及commit的整理
https://github.com/YunaiV/Blog/blob/master/RocketMQ/1004-RocketMQ%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%9AMessage%E5%AD%98%E5%82%A8.md
http://blog.csdn.net/prestigeding/article/details/76652063
http://blog.csdn.net/meilong_whpu/article/details/76919267