zk源码阅读47:SyncRequestProcessor源码解析

摘要

在上一节ProposalRequestProcessor处理器中,后续会有两个处理器CommitProcessor和SyncRequestProcessor
如果是事务请求,会经过SyncRequestProcessor,本节对SyncRequestProcessor进行讲解
SyncRequestProcessor:事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。

主要讲解

介绍
属性
函数
  构造函数
  processRequest:生产者,加入请求队列
  run:核心方法,消费请求队列,批处理进行快照以及刷到事务日志
  flush:批处理的思想,把事务日志刷到磁盘,让下一个处理器处理
  shutdown:队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
思考

介绍

可以参考类的注释,该处理器在leader,Follower,Observer中都存在

/**
 * This RequestProcessor logs requests to disk. It batches the requests to do
 * the io efficiently. The request is not passed to the next RequestProcessor
 * until its log has been synced to disk.
 *
 * SyncRequestProcessor is used in 3 different cases
 * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
 *             send ack back to itself.
 * 2. Follower - Sync request to disk and forward request to
 *             SendAckRequestProcessor which send the packets to leader.
 *             SendAckRequestProcessor is flushable which allow us to force
 *             push packets to leader.
 * 3. Observer - Sync committed request to disk (received as INFORM packet).
 *             It never send ack back to the leader, so the nextProcessor will
 *             be null. This change the semantic of txnlog on the observer
 *             since it only contains committed txns.
 */

在leader中处理链的位置如下

在leader处理链的位置

属性

    private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
    private final ZooKeeperServer zks;
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();// 请求队列
    private final RequestProcessor nextProcessor;//下一个处理器

    private Thread snapInProcess = null;//处理快照的线程
    volatile private boolean running;//是否在运行

    /**
     * Transactions that have been written and are waiting to be flushed to
     * disk. Basically this is the list of SyncItems whose callbacks will be
     * invoked after flush returns successfully.
     */
    private final LinkedList<Request> toFlush = new LinkedList<Request>();//等待被刷到磁盘的请求队列
    private final Random r = new Random(System.nanoTime());
    /**
     * The number of log entries to log before starting a snapshot
     */
    private static int snapCount = ZooKeeperServer.getSnapCount();//快照的个数
    
    /**
     * The number of log entries before rolling the log, number
     * is chosen randomly
     */
    private static int randRoll;// 一个随机数,用来帮助判断何时让事务日志从当前“滚”到下一个

    private final Request requestOfDeath = Request.requestOfDeath;// 结束请求标识

函数

构造函数

    public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;//下一个处理器
        running = true;
    }

processRequest

生产者,加入请求队列

    public void processRequest(Request request) {//生产者,加入请求队列
        // request.addRQRec(">sync");
        queuedRequests.add(request);
    }

run

核心方法,消费请求队列,批处理进行快照以及刷到事务日志

    public void run() {//核心方法,消费请求队列,批处理进行快照以及刷到事务日志
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            setRandRoll(r.nextInt(snapCount/2)); //randRoll是一个 snapCount/2以内的随机数, 避免所有机器同时进行snapshot
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) { //没有要刷到磁盘的请求
                    si = queuedRequests.take();//消费请求队列
                } else {//有需要刷到磁盘的请求
                    si = queuedRequests.poll();
                    if (si == null) {//如果请求队列的当前请求为空
                        flush(toFlush);//刷到磁盘
                        continue;
                    }
                }
                if (si == requestOfDeath) {//结束标识请求
                    break;
                }
                if (si != null) {//请求队列取出了请求
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) {//请求添加至日志文件,只有事务性请求才会返回true
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {//如果logCount到了一定的量
                            randRoll = r.nextInt(snapCount/2);//下一次的随机数重新选
                            // roll the log
                            zks.getZKDatabase().rollLog();//事务日志滚动到另外一个文件记录
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {//正在进行快照
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();//进行快照,将sessions和datatree保存至snapshot文件
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();//启动线程
                            }
                            logCount = 0;//重置
                        }
                    } else if (toFlush.isEmpty()) {//刷到磁盘的队列为空
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);//下个处理器处理
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();//下个处理器可以刷,就刷
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);//刷的队列添加记录
                    if (toFlush.size() > 1000) {//超过了1000条就一起刷到磁盘
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

几个注意的点在思考里面说

里面调用了flush函数

flush

批处理的思想,把事务日志刷到磁盘,让下一个处理器处理

    private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException// 刷新到磁盘
    {
        if (toFlush.isEmpty())//队列为空,没有需要刷的
            return;

        zks.getZKDatabase().commit();//事务日志刷到磁盘
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
            if (nextProcessor != null) {
                nextProcessor.processRequest(i);//下一个处理器处理
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();//下个处理器也可以刷,就刷
        }
    }

shutdown

队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor

    public void shutdown() {//队列添加requestOfDeath请求,线程结束后,调用flush函数,最后关闭nextProcessor
        LOG.info("Shutting down");
        queuedRequests.add(requestOfDeath);
        try {
            if(running){
                this.join();
            }
            if (!toFlush.isEmpty()) {
                flush(toFlush);
            }
        } catch(InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }

思考

SyncRequestProcessor和快照,事务日志的关系

就和介绍里面说的一样,他就是事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照。
里面调用了很多相关方法,如rollLog,append,commit,takeSnapshot等方法,底层都是FileTxnLog,FileSnap来执行的

run方法注意的点

1.randRoll的意义,可以看到语句 if (logCount > (snapCount / 2 + randRoll))
这是用来判断logCount是否足够,如果足够了,代表一个事务日志记录的量够了,
下面调用rollLog,就会生成下一个事务日志文件了。

2.“批”处理的思想
当 logCount > (snapCount / 2 + randRoll) 时,批处理的思想提箱
对于事务日志,此时才调用rollLog写入到下一个事务日志
对于快照,如果可行,就调用zks.takeSnapshot()进行快照

而不是每一个请求就一个事务日志,不是每一个请求就生成一次快照

然后对于事务日志,当toFlush.size() > 1000才会调用flush函数

refer

http://www.cnblogs.com/leesf456/p/6438411.html
http://www.cnblogs.com/leesf456/p/6279956.html (FileTxnLog)
zk源码阅读3:FileTxnLog
《paxos到zk》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,290评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,107评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,872评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,415评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,453评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,784评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,927评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,691评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,137评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,472评论 2 326
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,622评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,289评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,887评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,741评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,316评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,490评论 2 348

推荐阅读更多精彩内容

  • MySQL技术内幕:InnoDB存储引擎(第2版) 姜承尧 第1章 MySQL体系结构和存储引擎 >> 在上述例子...
    沉默剑士阅读 7,398评论 0 16
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,629评论 18 139
  • 很实用的编程英语词库,共收录一千五百余条词汇。 第一部分: application 应用程式 应用、应用程序app...
    春天的蜜蜂阅读 1,341评论 0 22
  • 花,可谓俗语物,也可谓圣洁之物!爱美之心人人皆有,有喜欢高贵般的牡丹;有喜欢清雅的白玉兰;有喜欢出淤泥而不染的荷花...
    啊阳瑟喔阅读 409评论 1 0
  • 金色圈阅读 57评论 0 0