Netty源码愫读(五)NioEventLoop与NioEventLoopGroup相关源码学习

1、nettey的线程模型

1.1、Rector模式

Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。

单线程模型:

单线程模型下,所有的IO操作都由同一个Reactor线程来完成,其主要职责如下:

  • 作为服务端,接收客户端的TCP连接;
  • 作为客户端,向服务端发起TCP连接;
  • 读取通信对端的请求或者应答消息;
  • 向通信对端发送消息请求或者应答消息。

Reactor单线程模型原理图如下:

reactor单线程模型.png

如图所示,由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞。通常Reactor线程中聚合了多路复用器负责监听网络事件,当有新连接到来时,触发连接事件,Disdatcher负责使用Acceptor接受客户端连接,建立通信链路;当I/O事件就绪后,Disdatcher负责将事件分发到对应的event handler上负责处理。

该模型的缺点很明显,不适用于高负载、高并发的应用场景;由于只有一个Reactor线程,一旦挂彩,整个系统通信模块将不可用。

多线程模型:

该模型的特点:

  • 专门由一个Reactor线程-Acceptor线程用于监听服务端,接收客户端连接请求;
  • 网络I/O操作读、写等由Reactor线程池负责处理;
  • 一个Reactor线程可同时处理多条链路,但一条链路只能对应一个Reactor线程,这样可避免并发操作问题。
reactor多线程模型.png

绝大多数场景下,Reactor多线程模型都可以满足性能需求,但是,在极个别特殊场景中,一个Reactor线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。因此,诞生了第三种线程模型。

主从多线程模型:

主从多线程模型.png

该模型的特点:

  • 服务端使用一个独立的主Reactor线程池来处理客户端连接,当服务端收到连接请求时,从主线程池中随机选择一个Reactor线程作为Acceptor线程处理连接;
  • 链路建立成功后,将新创建的SocketChannel注册到sub reactor线程池的某个Reactor线程上,由它处理后续的I/O操作。

具体介绍请参照://www.greatytc.com/p/b4de9b85c79d

1.2、netty中的Reactor模式

Netty同时支持Reactor单线程模型 、Reactor多线程模型和Reactor主从多线程模型,用户可根据启动参数配置在这三种模型之间切换。Netty线程模型原理图如下:

netty中的reactor模型.png

2、NioEventLoopGroup相关源码学习

NioEventLoopGroup类继承图:

NioEventLoopGroup类继承图.png

2.1、NioEventLoopGroup功能介绍

NioEventLoopGroup主要包含两方面的功能:

  • 注册Channel,主要是将Channel与某个线程池绑定等;
  • 线程池管理及任务管理等;

2.1.1、注册Channel

NioEventLoopGroup主要通过EventLoopGroup的register()方法进行Channel注册,具体实现是通过EventLoop的register()将Channel注册到对应EventLoop的Selector上,由Selector来调度Channel的相关事件,如读、写、Accept等事件。

而EventLoopGroup的设计是,它包含多个EventLoop(每一个EventLoop通常内部包含一个线程),在执行上述注册过程中是需要选择其中的一个EventLoop来执行上述注册行为,这里就出现了一个选择策略的问题,该选择策略接口是EventExecutorChooser,你也可以自定义一个实现。

2.1.2、线程池管理及任务执行

EventLoopGroup继承了EventExecutorGroup,EventExecutorGroup也是EventExecutor的集合,EventExecutorGroup也是掌管着EventExecutor的初始化工作,EventExecutorGroup对于Runnable任务的执行也是选择内部中的一个EventExecutor来做具体的执行工作。

netty中很多任务都是异步执行的,一旦当前线程要对某个EventLoop执行相关操作,如注册Channel到某个EventLoop,如果当前线程和所要操作的EventLoop内部的线程不是同一个,则当前线程就仅仅向EventLoop提交一个注册任务,对外返回一个ChannelFuture。

2.2、NioEventLoopGroup源码学习

NioEventLoopGroup中主要有以下实现:

2.2.1、线程组创建及初始化

主要是调用父类MultithreadEventLoopGroup的构造方法进行初始,而MultithreadEventLoopGroup又调用MultithreadEventExecutorGroup对线程组进行初始化,其中包括EventExecutor数组初始化、EventExecutorChooser初始化及相关线程监听器初始化等;

2.2.2、IoRatio初始化

源码如下:

public void setIoRatio(int ioRatio) {
    for (EventExecutor e: this) {
        ((NioEventLoop) e).setIoRatio(ioRatio);
    }
}

其主要是调用NioEventLoop的方法对执行时间占比进行设置;

2.2.3、重建selector

源码如下:

public void rebuildSelectors() {
    for (EventExecutor e: this) {
        ((NioEventLoop) e).rebuildSelector();
    }
}

主要是调用NioEventLoop的方法进行重建;

2.2.3、创建子执行器

源码如下:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoopGroup创建NioEventLoop类型的子执行器;

2.3、MultithreadEventExecutorGroup源码学习

2.3.1、基本属性

private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

children:EventExecutor类型的子执行器数组
readonlyChildren:只读执行器
terminatedChildren:已终止的执行器个数;
terminationFuture:线程终止异步结果
chooser:子执行器选择器;

2.3.2、构造函数

实现源码:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

处理流程:

  • 初始化NioEventLoop的执行线程,本处执行线程类型为:ThreadPerTaskExecutor,顾名思义,既每个任务一个线程,以下为任务提交处理源码:
@Override

public void execute(Runnable command) {

threadFactory.newThread(command).start();

}

由以上源码可知,每个任务都会创建一个线程进行执行。

  • 执行器组初始化,其调用子类的newChild()方法获取执行器,具体为调用NioEventLoopGroup的newChild()方法创建EventLoop类型的执行器。
  • 初始化执行器选择器,默认选择器实现是通过DefaultEventExecutorChooserFactory的newChooser()获取的,其有两种具体的实现:PowerOfTwoEventExecutorChooser(基于原子自增通线程数做与运算获取),GenericEventExecutorChooser(做取模运算获取);
  • 给每个子执行器添加终止监听器,当所有子执行器都有终止时,设置NioEventLoopGroup的终止异步结果。

2.3.3、其他实现

  • 获取下个执行器:通过chooser获取
public EventExecutor next() {
  return chooser.next();
}
  • 获取执行器个数:
public final int executorCount() {
  return children.length;
}

2.4、其他实现类

MultithreadEventLoopGroup:

  • 默认执行器组个数初始化:

主要对默认执行器组的个数进行初始化,默认为cpu的processor个数的2倍。

  • Channel注册:

具体调用EventLoop的register()方法将Channel注册到EventLoop中。

3、NioEventLoop相关源码学习

NioEventLoop类继承图:

NioEventLoop类继承图.png

3.1、NioEventLoop功能介绍

NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O读写之外,还兼顾处理以下两类任务:

  • 系统Task:通过调用NioEventLoop的execute(Runnable task)方法实现,Netty有很多系统Task,创建它们的主要原因是:当I/O线程和用户线程同时操作网络资源时,为防止并发操作导致的锁的竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程复制执行,这样就实现了局部无锁化。
  • 定时任务:通过调用NioEventLoop的schedule(Runnable command,long delay,TimeUnit unit)方法实现。

3.2、NioEventLoop源码学习

3.2.1、基本属性

private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier;
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;

SELECTOR_AUTO_REBUILD_THRESHOLD:重建selector的select次数上限;
selectNowSupplier:selectNow()的包装实现;
selector:本EventLoop的Selector;
unwrappedSelector:未包装的jdk原生Selector;
selectedKeys:Selector获取到的SelectedSelectionKeySet;
provider:Selector的提供者;
wakenUp:Selector的wakeup标志;
selectStrategy:select的处理策略;
ioRatio:I/O任务与其他任务执行时间占比;
cancelledKeys:已取消的keys计数;
needsToSelectAgain:是否需要再次select;

3.2.2、构造函数

NioEventLoop构造函数源码:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

provider :默认实现为SelectorProvider.provider()(在NioEventLoopGroup中设定)即调用jdk的默认实现;

openSelector:构建本EventLoop对应的Selector,selectorTuple包含原生unwrappedSelector和netty包装实现的selector。

selectStrategy:select的处理策略,其具体实现类为:DefaultSelectStrategy(在NioEventLoopGroup中设定);

3.2.3、主要方法实现解析

3.2.3.1、openSelector()实现

openSelector()实现源码:

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    if (DISABLE_KEYSET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        }
        return new SelectorTuple(unwrappedSelector);
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

主要处理流程:

  • 调用provider(SelectorProvider)的openSelector()方法获取Selector,即调用jdk的SelectorProvider的实现获取Selector;
  • DISABLE_KEYSET_OPTIMIZATION为true表示禁止SelectedKeys的优化处理,则直接返回原生的Selector实现。
  • 利用AccessController获取平台相关的Selector实现类:sun.nio.ch.SelectorImpl;
  • 如果未获取到sun.nio.ch.SelectorImpl类型的实现或其不是jdk原生Selector实现或其子类实现,则直接返回原生的Selector实现;
  • 利用AccessController实现sun.nio.ch.SelectorImpl的包装器类,将sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys属性替换为本地的SelectedSelectionKeySet,这样实现将Selector实现中的底层SelectedSelectionKeySet替换为NioEventLoop中的属性。
  • Selector的包装器类实现失败,则返回jdk原生的Selector实现,否则返回包装后的本地实现SelectedSelectionKeySetSelector。

3.2.3.2、run()实现

run()实现源码:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

主要处理流程:

  • 通过selectStrategy.calculateStrategy()方法决定select的处理方式,当EventLoop中有未处理的任务时时,直接调用selectorNowSupplier.get(),其实际调用selectNow(),而selectNow()实际调用selector.selectNow()这个非阻塞方法;
  • 当返回值为SelectStrategy.CONTINUE则继续进行下一次处理,当为SelectStrategy.SELECT时立即进行select()操作。
  • 其他情况下,则调用processSelectedKeys()处理I/O任务,调用runAllTask()处理EventLoop中的任务。

3.2.3.3、select()实现

select实现源码:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}

主要处理流程:

  • 通过delayNanos()获取处理的超时时间,当有任务时,获取的是任务的超时时间;当无任务时,获取的默认超时时间1秒;
  • 当超时时间小于或等于0,当未进行select操作,立即进行selectNow()操作,然后则退出循环;
  • 若EventLoop有任务,且wakeUp为false,则立即进行selectNow操作并退出循环;
  • 进行selector.select()操作,直到有I/O事件或超时,select计数加1;
  • 当有I/O事件或被用户唤醒或有任务需要处理,则终止循环;
  • 当处理未超时,则初始化select计数为1,并继续循环处理;否则若select计数大于重建selector的限值,表示遇到了select的bug,则调用rebuildSelector()进行selector的重建,并立即调用selectcNow()进行处理。

3.2.3.4、rebuildSelector()实现

rebuildSelector0()源码实现:

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }

            int interestOps = key.interestOps();
            key.cancel();
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }

    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }

    if (logger.isInfoEnabled()) {
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

主要处理流程:

  • 调用openSelector()获取新的SelectorTuple对象;
  • 遍历就的Selector中的SelectionKey,获取感兴趣的事件,对旧的SelectionKey做取消操作;
  • 将原Channel注册到新的Selector中,若SelectionKey中的attachment为AbstractNioChannel,则设置其对于的selectionKey为新的;
  • 若注册失败则关闭对于的Channel或发起ChannelUnregistered事件。
  • 将EventLoop中的selector及unwrappedSelector替换为新的,并关闭就的Selector。

3.2.3.5、I/O事件处理实现

processSelectedKeys()源码实现:

private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

当DISABLE_KEYSET_OPTIMIZATION为false,即启用了Keyset优化处理并优化处理成功,则调用processSelectedKeysOptimized()对I/O事件进行处理,否则调用processSelectedKeysPlain(selector.selectedKeys())对I/O事件进行处理;

processSelectedKeysOptimized()实现源码:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

processSelectedKeysPlain()实现源码:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // check if the set is empty and if so just return to not create garbage by
    // creating a new Iterator every time even if there is nothing to process.
    // See https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

processSelectedKeysOptimized()与processSelectedKeysPlain()处理大同小异,主要是遍历事件的SelectedKeys对事件进行处理;

3.2.3.6、任务处理实现

实现源码:

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

主要处理流程:

  • 调用fetchFromScheduledTaskQueue()获取任务,并将任务填充到taskQueue中,直到taskQueue填满;
  • 调用runAllTasksFrom()批量处理任务;

4、NioEventLoopGroup启动流程分析

启动调用流程如下:

启动调用流程.png

up中的newChild()新建事件循环器并组成group;

(4)、当用户调用通过Bootstrap进行bind或connect操作时,实际是向NioEventLoopGroup中添加任务;

(5)、最终向NioEventLoop中添加任务,即调用SingleThreadEventExecutor的execute()添加任务;

(6)、execute()中会调用startThread()启动线程;

(7)、startThread()中调用NioEventLoop的run()方法;

(8)、NioEventLoop的run()中循环进行I/O事件处理及任务处理;

相关阅读:
Netty源码愫读(一)ByteBuf相关源码学习 【//www.greatytc.com/p/016daa404957
Netty源码愫读(二)Channel相关源码学习【//www.greatytc.com/p/02eac974258e
Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习【//www.greatytc.com/p/be82d0fcdbcc
Netty源码愫读(四)ChannelHandler相关源码学习【//www.greatytc.com/p/6ee0a3b9d73a
Netty源码愫读(六)ServerBootstrap相关源码学习【//www.greatytc.com/p/a71a9a0291f3

参考书籍:
《Netty实战》
《Netty权威指南》

参考博客:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容

  • 在本文中主要是深入了解EventLoop,以便对netty的线程模型有更好的了解。Netty是Reactor模型的...
    xiehongm_信陵阅读 1,014评论 0 2
  • 我们知道, 一个 Netty 程序启动时, 至少要指定一个 EventLoopGroup(如果使用到的是 NIO,...
    tracy_668阅读 4,448评论 1 7
  • 从前面的文章中我们已经知道了,一个 Netty 程序启动时,至少要指定一个 EventLoopGroup(如果使用...
    WEIJAVA阅读 319评论 0 0
  • 清明抽空看了下netty部分的知识点,总结一下发布在简书上,netty本身涵盖的知识点比较多,一篇文章也讲不完那么...
    晴天哥_王志阅读 1,290评论 2 3
  • 这是一次说走就走的旅行,事先没有任何安排。走之前我就只定了车票,之后的行程全部是临时决定的。去华山是因为酒店订错了...
    zozo买手日志阅读 403评论 0 1