RocketMQ Runtime ServiceThread的设计理念

最近正好在看Java Concurrency In Practice(以下简称JCIP), 里面的很多思想都在rocketmq runtime的源码中有所体现,因此就尝试着从ServiceThread这个类出发,看看我能从这本书里悟到多少东西。

我首先要做的是猜ServiceThread是用来解决什么问题的。在我看来,ServiceThread主要有如下几个特性

  • 当处于stopped 状态的时候,不执行任何任务
  • 当不处于stopped状态的时候,用户调用waitForRunning()可以启动一段定时器,并阻塞一段时间,或者
  • 使用wakeup()立即结束跑完当前的定时器,立即退出阻塞状态

一个典型的使用场景是某个Service 直接集成ServiceThread,run()方法重写如下

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(10000);
             this.rebalanceImpl.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

这段代码就是告诉线程,每隔十秒钟跑一次rebalanceImpl.doRebalance()(当然这个函数根据业务不同而异)
然后其他地方可以显式调用wakeup(),在还没到十秒的时候后就执行rebalanceImpl.doRebalance(),比如当有worker变化的时候,我们要立刻触发一次doRebalance()

    class WorkerStatusListenerImpl implements ClusterManagementService.WorkerStatusListener {

        /**
         * When alive workers change.
         */
        @Override
        public void onWorkerChange() {
            log.info("Wake up rebalance service");
            RebalanceService.this.wakeup();
        }
    }

ServiceThread 实现了Runnable接口,但是同时也持有一个Thread对象,至于为什么要这么设计我还不是很懂。

成员变量

ServiceThread的主要成员变量如下

    protected final Thread thread;
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;
    private static final long JOIN_TIME = 90 * 1000;
  • Thread thread: 用来跑ServiceThread的run()方法,不过我不知道这是不是一个标准的自定义的thread的实现(因为用到了Composition Over Inheritance 的理念,感觉应该是很标准的写法)

  • CountDownLatch2 waitPoint: 这个CountDownLatch2是RocketMQ Connect自己基于AQS实现的一个同步器,和CountDownLatch不同的是它支持reset(),也就是可以被反复利用。我们知道通常来说CyclicBarrier也能被反复利用,他们区别在哪里呢?一个是用来等待事件完成,一个是用来等待其他线程执行到某一步,等我们深入了解CountDownLatch的具体实现时再进行讲解。在ServiceThread里,这个waitPoint主要用来提供我们之前说的特性里的第2,3点:既能定时执行任务,又能通过其他显式调用方法“提前执行任务”,具体实现之后会讲到。

  • volatile AtomicBoolean hasNotified: 这个变量是用来保证当用户调用waitForRunning()时,如果发现已经有线程调用了wakeup(),但是waitPoint还没有被释放的时候(count还不为0),可以立即退出(因为马上waitPoint就要被释放了)。这是由于,可能出于效率的考虑,wakeup()并不是atomic的,它会首先将hasNotified设置为true,告诉其他人,我马上就要把count减为0了(也就是释放这个CountDownLatch),然后再紧接着把count减为0。这两步并不是原子的。之后在讲核心方法的时候会更详细的解说。

  • volatile boolean stopped: 这个比较简单,它不需要原子性的原因在于,stopped被设置成True之后是允许上层代码继续执行 waitForRuning(),也就是说具体停了之后还跑不跑waitForRunning()是由上层代码决定,stopped只是提供给上层一个“君子协议”一样的标志位:有人告诉我应该stop了,所以你最好check一下是不是stopped了,如果stopped了就不要再调用waitForRunning()了。stopped并不在waitForRunning()里起任何作用。

问题:如果有两个线程同时调用waitForRunning(),第三个线程调用wakeup(),那此时两个线程都被允许继续执行,那么这是expected behaviour嘛

核心方法

最重要的方法就如下所示:

    protected void waitForRunning(long interval) {
        // if hasNotified == True, means either wakeup() or stop() 
        // has been executed by another thread
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
            // TODO why we swallowed the exceptions here
        } catch (InterruptedException e) {
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

那我们先看第一个CAS操作,首先如果hasNotified是false的话,意味着要么没有其他线程执行了waitForRunning(), 要么有其他线程执行了waitForRunnning()只是没有任何的wakeup()。那这时会继续执行到waitPoint.reset(),也就是即使之前有人已经调用过waitForRunning()了,我们现在也给它reset()掉,重新开始计时。如果是True的话,意味着已经有一个wakeup()或者stop()被执行了,于是此次调用把hasNotified设置为false,相当于清空标志位,下一次waitForRunning就可以正常进行了。这里有个值得注意的点在于,假设hasNotified是True,但是wakeup()里的waitPoint.countDown();仍然没有被执行,所以理论上正在执行waitPoint.await(...)的前一个线程没有被释放,而立刻调用waitForRunning()的线程因为看到hasNotified是true之后,将hasNotified设置为false后就退出了。

一开始我会想,这可能是个不公平的实现,因为最开始调用waitForRunning()的线程需要等到waitPoint.await()才能执行,而当前因为提前return而继续执行的线程之则完全不用经过waitPoint.await()。当然这样也不会有太大的差别,尤其是考虑到几乎不会有多个线程同时调用waitForRunning()的情况下()

这样做应该没有问题的,因为我们认为waitPoint.countDown()立即就会执行,所以最终waitPoint.await()也会很快被释放:要么因为timeout或者

问题:waitPoint.countDown() 以及 waitPoint.await()之间能保证不插入其他任何指令吗,也就是说,waitPoint.countDown()只要变成0之后,一定会让当时正在等待的waitPoint.await()得到释放吗?我觉得应该要这样才对,否则如果在waitPoint.countDown() 中有另外一个线程调用了waitPoint.reset(),那么await()就不会被立即释放了,之后可以去CountDownLatch2里仔细看看是不是有这样的保证。

    public void wakeup() {
        // TODO end the waiting before interval happens
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }

在wakeup()里,首先使用CAS将hasNotified设置为true,然后调用waitPoint.countDown(),这两步如前所述不是原子性的,但我们证明了即使这样写也没有什么问题。(至少我现在还看不出来)

一些其他的思考

注意这一段

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
            // TODO why we swallowed the exceptions here
        } catch (InterruptedException e) {
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }

我们catch住了InterruptedException e但是却没有做任何处理(也就是swallow吞掉了这个Exception),按照JCIP里的说法,interrupt通常的语义是cancellation,只有最顶层的调用代码应该对InterruptedException做处理,而所有不拥有这个Thread的代码要么:继续抛出InterruptedException, 要么将interrupted标志位重新设置为true。

基于这个原则,我们就应该判断,ServiceThread是不是应该来swallow这个InterruptedException。通常情况下waitPoint.await() 会跑在调用者的线程内,而会调用waitForRunning()的线程就是ServiceThread所持有的那个Thread object

    public ServiceThread() {
        this.thread = new Thread(this, this.getServiceName());
    }

而一般来说waitForRunning只会在这个Runnable的run()方法中被调用,因run()方法才是最上层的调用者,看起来说waitForRunning()不应该去swallow这个InterruptedException。当然这只是理论,实际上,我们知道即使这个InterruptedException被吞掉了也无伤大雅:如果被interrupt,那就当成被wakeup(),直接提前结束。

不过现在又有个问题了,什么情况下,waitPoint.await()会被Interrupt呢?是Crtl + C嘛?我们又要去看JVM的Interrupt是怎么传播的,以及为什么我按了Ctrl+C之后java程序就结束了。

当然这里还有一些publish相关的问题,如果waitForRunning只应该在run()中使用的话,是不是waitForRunning就不应该是public的呢?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355

推荐阅读更多精彩内容