4.Netty执行IO事件和非IO任务

回顾NioEventLoop的run方法流程

上文说到NioEventLoop的run方法可以分为3个步骤:

  1. 轮询channel中就绪的IO事件
  2. 处理轮询出的IO事件
  3. 处理所有任务,也包括定时任务

其中步骤1已在上一节讲述,这里接着讲述下面2个步骤

IO事件与非IO任务

首先看一下在步骤2和步骤3的主干代码

final int ioRatio = this.ioRatio;
// 将所有任务执行完
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        runAllTasks();
    }
} else {
    // 记录IO事件消耗的时间,然后按比例处理分配时间处理非IO任务
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        // ioRatio默认50,(100-ioRatio)/ioRatio刚好等于1,做到平均分配
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

ioRadio是NioEventLoop的一个成员变量,用来控制分配花费在IO事件与非IO任务时间的比例。默认情况下,ioRadio是50,表示IO事件与非IO任务
将分配相同时间。而当ioRatio为100时,该值失效,不再平衡两种动作的时间分配比值。
了解了这一点,上述两种分支代码就不难理解了,我们直接进入processSelectedKeys,看看netty如何执行IO事件

处理IO事件

先进入processSelectedKeys方法内部。

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

可以看到这里又根据selectedKeys是否为空这个条件来确定是处理优化过的keys还是普通keys。关于selectedKeys,在NioEventLoop介绍这一节中,
我们介绍了NioEventLoop的创建,在创建过程中,默认会将SelectedKeys由Hashset替换为数组实现,此处的selectedKeys正是替换过后的实现。
我们继续跟进到processSelectedKeysOptimized方法

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

方法内部用一个for循环处理selectedKeys。key的attchment默认是在注册时附加上去的NioServerSocketChannel和NioSocketChannel。
继续跟进processSelectedKey(k, (AbstractNioChannel) a)方法。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop = ch.eventLoop();  
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise());
        return;
    }

    int readyOps = k.readyOps();
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        unsafe.finishConnect();
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

netty首先对selectionKey的有效性做了一个判断。当key无效时,关闭key所在的channel。当key有效时,委托NioUnsafe对象对key进行IO操作。
注意这里先进行OP_CONNECT,再执行OP_WRITE,最后执行OP_READ和OP_ACCEPT。关于Unsafe的这些IO操作留待以后分析。

processSelectedKeysPlain方法流程类似,略过

处理非IO任务

由于IoRatio默认为50,我们先进入runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法。

protected boolean runAllTasks(long timeoutNanos) {
    // 步骤1
    fetchFromScheduledTaskQueue();
    // 步骤2
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 步骤3
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 步骤4
        safeExecute(task);
        runTasks ++;
        // 步骤5
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 步骤6
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

非IO任务的执行可以分为6个步骤

  1. 从定时任务队列聚合任务到普通任务队列
  2. 从普通队列中获取任务
  3. 计算任务执行的超时时间
  4. 安全执行任务
  5. 任务执行到一定次数,计算是否超时
  6. 执行完taskQueue普通队列里的任务后,再去执行tailTaskQueue里的任务。但目前暂时没有看到tailTaskQueue使用的地方,也许是一个扩展点吧,这里先略过。

我们一个一个步骤讲解

聚合定时任务到普通任务队列

首先看一下整体流程

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        if (!taskQueue.offer(scheduledTask)) {
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

首先先判断定时任务队列是否有任务,然后调用了一个AbstractScheduledEventExecutor.nanoTime(),该方法返回ScheduledFutureTask类从初始化
到当前时刻的差值。也即将ScheduledFutureTask初始化的时刻当成零时刻。
获取到零时刻到当前时刻的差值后,用一个for循环不断去定时任务队列里获取终止时刻在当前时刻之后的任务(scheduledTask.deadlineNanos() - nanoTime<=0)
当获取到定时任务后,将它添加到普通任务队列taskQueue里。同时添加失败后,还会再重新添加回定时任务队列,防止任务直接丢失。

说到定时任务队列,也少不了一探其实现。scheduledTaskQueue初始化代码如下:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<>(
                SCHEDULED_FUTURE_TASK_COMPARATOR,
                11);
    }
    return scheduledTaskQueue;
}

采用的是一个懒加载的方式,在调用scheduledTaskQueue()创建定时任务时才进行初始化。从名字可以看出,它是一个优先级队列,初始化容量为11,
采用的Comparator是调用2个ScheduledFutureTask的compareTo方法,首先比较任务的终止时间,然后比较两个任务的id。代码较简单,就不列了。

然后我们看下调度方法schedule

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task.setId(nextTaskId++));
    } else {
        executeScheduledRunnable(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task.setId(nextTaskId++));
            }
        }, true, task.deadlineNanos());
    }
    return task;
}

可以发现,netty将"添加定时任务"也当做一个任务,放入任务队列里。

从普通队列中获取任务

// NioEventLoop中定义的pollTask方法
protected Runnable pollTask() {
    Runnable task = super.pollTask();
    if (needsToSelectAgain) {
        selectAgain();
    }
    return task;
}
// super.pollTask调用了此方法,定义在SingleThreadEventExecutor中
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) {
                return task;
            }
        }
    }

这里依然是通过轮询从任务队列里取出任务,并且忽略WAKEUP_TASK这个标记性任务。

计算任务执行的超时时间

在当前时间上,加上IO事件执行的时间,作为非IO任务执行的超时时间

安全执行

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

捕获所有异常,使得定时任务报错时不退出

计算是否超时

由于nanoTime()是一个相对耗时的操作,netty默认执行了64次非IO任务后,才计算是否超时。若执行了超过64个任务没或者任务队列已经没有任务,
就打断循环,并将当前时间更新为lastExecutionTime。

总结

到了这里,我们已经介绍完了大部分NioEventLoop的内容,限于笔者水平和文章篇幅,nioEventLoop所使用的任务队列MpscQueue和ScheduleFutureTask
内部执行原理不再进一步深究。但这也已经足够对NioEventLoop塑造一个比较整体性的认识了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容