摘要
在上一节ProposalRequestProcessor处理器中,后续会有两个处理器CommitProcessor和SyncRequestProcessor
如果是事务请求,会经过SyncRequestProcessor,本节对SyncRequestProcessor进行讲解
SyncRequestProcessor:事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。
主要讲解
介绍
属性
函数
构造函数
processRequest:生产者,加入请求队列
run:核心方法,消费请求队列,批处理进行快照以及刷到事务日志
flush:批处理的思想,把事务日志刷到磁盘,让下一个处理器处理
shutdown:队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
思考
介绍
可以参考类的注释,该处理器在leader,Follower,Observer中都存在
/**
* This RequestProcessor logs requests to disk. It batches the requests to do
* the io efficiently. The request is not passed to the next RequestProcessor
* until its log has been synced to disk.
*
* SyncRequestProcessor is used in 3 different cases
* 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
* send ack back to itself.
* 2. Follower - Sync request to disk and forward request to
* SendAckRequestProcessor which send the packets to leader.
* SendAckRequestProcessor is flushable which allow us to force
* push packets to leader.
* 3. Observer - Sync committed request to disk (received as INFORM packet).
* It never send ack back to the leader, so the nextProcessor will
* be null. This change the semantic of txnlog on the observer
* since it only contains committed txns.
*/
在leader中处理链的位置如下
属性
private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
private final ZooKeeperServer zks;
private final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();// 请求队列
private final RequestProcessor nextProcessor;//下一个处理器
private Thread snapInProcess = null;//处理快照的线程
volatile private boolean running;//是否在运行
/**
* Transactions that have been written and are waiting to be flushed to
* disk. Basically this is the list of SyncItems whose callbacks will be
* invoked after flush returns successfully.
*/
private final LinkedList<Request> toFlush = new LinkedList<Request>();//等待被刷到磁盘的请求队列
private final Random r = new Random(System.nanoTime());
/**
* The number of log entries to log before starting a snapshot
*/
private static int snapCount = ZooKeeperServer.getSnapCount();//快照的个数
/**
* The number of log entries before rolling the log, number
* is chosen randomly
*/
private static int randRoll;// 一个随机数,用来帮助判断何时让事务日志从当前“滚”到下一个
private final Request requestOfDeath = Request.requestOfDeath;// 结束请求标识
函数
构造函数
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;//下一个处理器
running = true;
}
processRequest
生产者,加入请求队列
public void processRequest(Request request) {//生产者,加入请求队列
// request.addRQRec(">sync");
queuedRequests.add(request);
}
run
核心方法,消费请求队列,批处理进行快照以及刷到事务日志
public void run() {//核心方法,消费请求队列,批处理进行快照以及刷到事务日志
try {
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
setRandRoll(r.nextInt(snapCount/2)); //randRoll是一个 snapCount/2以内的随机数, 避免所有机器同时进行snapshot
while (true) {
Request si = null;
if (toFlush.isEmpty()) { //没有要刷到磁盘的请求
si = queuedRequests.take();//消费请求队列
} else {//有需要刷到磁盘的请求
si = queuedRequests.poll();
if (si == null) {//如果请求队列的当前请求为空
flush(toFlush);//刷到磁盘
continue;
}
}
if (si == requestOfDeath) {//结束标识请求
break;
}
if (si != null) {//请求队列取出了请求
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {//请求添加至日志文件,只有事务性请求才会返回true
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {//如果logCount到了一定的量
randRoll = r.nextInt(snapCount/2);//下一次的随机数重新选
// roll the log
zks.getZKDatabase().rollLog();//事务日志滚动到另外一个文件记录
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {//正在进行快照
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();//进行快照,将sessions和datatree保存至snapshot文件
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();//启动线程
}
logCount = 0;//重置
}
} else if (toFlush.isEmpty()) {//刷到磁盘的队列为空
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);//下个处理器处理
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();//下个处理器可以刷,就刷
}
}
continue;
}
toFlush.add(si);//刷的队列添加记录
if (toFlush.size() > 1000) {//超过了1000条就一起刷到磁盘
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
几个注意的点在思考里面说
里面调用了flush函数
flush
批处理的思想,把事务日志刷到磁盘,让下一个处理器处理
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException// 刷新到磁盘
{
if (toFlush.isEmpty())//队列为空,没有需要刷的
return;
zks.getZKDatabase().commit();//事务日志刷到磁盘
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
nextProcessor.processRequest(i);//下一个处理器处理
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();//下个处理器也可以刷,就刷
}
}
shutdown
队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
public void shutdown() {//队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
LOG.info("Shutting down");
queuedRequests.add(requestOfDeath);
try {
if(running){
this.join();
}
if (!toFlush.isEmpty()) {
flush(toFlush);
}
} catch(InterruptedException e) {
LOG.warn("Interrupted while wating for " + this + " to finish");
} catch (IOException e) {
LOG.warn("Got IO exception during shutdown");
} catch (RequestProcessorException e) {
LOG.warn("Got request processor exception during shutdown");
}
if (nextProcessor != null) {
nextProcessor.shutdown();
}
}
思考
SyncRequestProcessor和快照,事务日志的关系
就和介绍里面说的一样,他就是事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。
里面调用了很多相关方法,如rollLog,append,commit,takeSnapshot等方法,底层都是FileTxnLog,FileSnap来执行的
run方法注意的点
1.randRoll的意义,可以看到语句 if (logCount > (snapCount / 2 + randRoll))
这是用来判断logCount是否足够,如果足够了,代表一个事务日志记录的量够了,
下面调用rollLog,就会生成下一个事务日志文件了。
2.“批”处理的思想
当 logCount > (snapCount / 2 + randRoll) 时,批处理的思想提箱
对于事务日志,此时才调用rollLog写入到下一个事务日志
对于快照,如果可行,就调用zks.takeSnapshot()进行快照
而不是每一个请求就一个事务日志,不是每一个请求就生成一次快照
然后对于事务日志,当toFlush.size() > 1000才会调用flush函数
refer
http://www.cnblogs.com/leesf456/p/6438411.html
http://www.cnblogs.com/leesf456/p/6279956.html (FileTxnLog)
zk源码阅读3:FileTxnLog
《paxos到zk》