Netty源码分析<一>ServerBootStrap启动流程

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

-------------------------调用bind(PORT)后发生的流程-----------------

跳入到AbstractBootstrap的bind方法

/**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(InetAddress inetHost, int inetPort) {
        return bind(new InetSocketAddress(inetHost, inetPort));
    }

    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        .....//省略一会分析
}

先看看initAndRegister()方法,这个方法做2件事

1通过反射实例化在ServerBootStrap.channel(NioServerSocketChannel.class)配置的具体Channel的Class,并初始化它channel实例
2.将新创建的Channel注册到分配到的事件执行器上对应的selector,然后发布channel的register事件

final ChannelFuture initAndRegister() {
        final Channel channel = channelFactory().newChannel();
        try {
     1.       init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

    2.     ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

先分析init(channel)方法。这是一个多态的方法,Bootstrap和ServerBootstrap有各自的实现。(将自定义行为延后到子类实现,达到了可扩展)

1.将配置在bootstrap上的ChannelOption 设置到Channel对应的ChannelConfig上

2.将配置在bootstrap上的AttributeKey设置到Channel上

3.将配置在bootstrap上的ChannleHandler加到channel的channelPipeline最后(channelPipeline是一个类似链表的结构)

4.增加一个ChannelInitializer到pipeline最后。通过ChannelInitializer增加一个继承自ChannelInboundHandlerAdapter的ServerBootstrapAcceptor实例到pipeline最后。ServerBootstrapAcceptor接收配置在bootstrap上childGroup,childHandler,ChildOptions,ChildAttrs几个参数,用于将accept到的client channel做初始化

void init(Channel channel) throws Exception {
        //1.
        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        //2.
        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        //3.
        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        //4.
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }

再分析ChannelFuture regFuture = group().register(channel)方法,上面传入的eventLoopGroup是NioEventLoopGroup,发现register(channel)进入的是MultithreadEventLoopGroup类的方法,分析一下register(channel)方法。有2个步骤。

1.首先调用了父类MultithreadEventExecutorGroup类的next()方法,该方法是通过一个事件执行器选择器来选择一个事件执行器实例

 public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }  
    
   public EventExecutor next() {
        return chooser.next();
    }  

代码中可以看出netty对性能的极致追求,提供了2种选择器,选择器的算法都一样,都是基于轮训算法,比如有5台机器,第一次分请求到了第一台机器,第二次到了第二台机器,第三次请求到了第三台请求,以此类推一直到第五台机器,然后第六次又到了第一台机器,这样一个轮流的调用

区别在于如果线程数是2的次幂的时候,采用移位的方式算出下一个(原理是2次幂的数-1后其最高位为0,其余最低位为1,因此能表示通过& 能取0-该数-1的位置)。否则采用取模的方式。

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        if (isPowerOfTwo(children.length)) {
            chooser = new PowerOfTwoEventExecutorChooser();
        } else {
            chooser = new GenericEventExecutorChooser();
        }
        ..........省略
        }
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            //位移方式轮训
            return children[childIndex.getAndIncrement() & children.length - 1];
        }
    }

    private final class GenericEventExecutorChooser implements EventExecutorChooser {
        @Override
        public EventExecutor next() {
            //取模方式轮训
            return children[Math.abs(childIndex.getAndIncrement() % children.length)];
        }
    }

2.,然后调用选择到的SingleThreadEventLoop的register(channel)方法来注册channel,创建了一个DefaultChannelPromise,因为netty里面IO操作都是异步的,这个DefaultChannelPromise代表了注册逻辑的异步结果,实际最终起作用的是通过channel对应的unsafe

 @Override
    public ChannelFuture register(Channel channel) {
        return register(channel, new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (promise == null) {
            throw new NullPointerException("promise");
        }

        channel.unsafe().register(this, promise);
        return promise;
    }

debug最终进入到AbstractChannel的内部类AbstractUnsafe的register(EventLoop, promise)方法
分析下register0(ChannelPromise promise)方法。
先尝试把注册异步结果设置为不可取消和检查channel还在打开,将eventloop赋值给channel的eventloop属性
调用doRegister()成功将serversocketchannel注册到selector上后,在channel的pipeline上传递ChannelRegistered事件。

protected abstract class AbstractUnsafe implements Unsafe {
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            
            //首先检查channel未注册过
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            
            //检查channel是和指定eventloop兼容
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            
            //分配指定eventLoop给channel
            AbstractChannel.this.eventLoop = eventLoop;

            //判断当前线程与eventLoop绑定的线程是否一致,这是为了避免竞态条件,不需要锁做同步
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                //如果分配给channel的eventLoop对应的线程与当前线程不是同一个线程,则封装成任务投递给eventloop执行
                try {
                    eventLoop.execute(new OneTimeTask() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                     ...
                }
            }
        }

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (firstRegistration && isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

跳转到AbstractNioChannel的doRegister();
javaChannel()获取java原生的ServerSocketChannel,然后调register(selector,interOps,Obj)方法将channel注册到selector上

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

顺便说一下,之前的代码分析中,在init(channel)方法中有将一个channelinitializer添加到pipeline的最后,那么这个channelinitializer在哪里生效呢,就在pipeline.fireChannelRegistered()中被调用。因为channelinitializer继承于ChannelInboundHandlerAdapter类,用于拦截inbound事件,当触发ChannelRegistered事件时,channelinitializer的channelRegistered方法拦截执行。
initChannel((C) ctx.channel())方法是Channelinitializer的一个抽象方法,用于扩展不同的初始化实现。所以继承自ChannelInboundHandlerAdapter的ServerBootstrapAcceptor实例到pipeline最后。开始执行accept功能
可以看到,首先执行了子类自定义实现的initChannel((C) ctx.channel())方法,然后在pipeline中删除Channelinitializer自己,所以之前再调用 ctx.fireChannelRegistered();把事件往下传递

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ChannelPipeline pipeline = ctx.pipeline();
        boolean success = false;
        try {
            initChannel((C) ctx.channel());
            pipeline.remove(this);
            ctx.fireChannelRegistered();
            success = true;
        } catch (Throwable t) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
        } finally {
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
            if (!success) {
                ctx.close();
            }
        }
    }

接着分析doBind() initAndRegister()方法后的代码.

首先判断注册异步结果是否完成

1.如果已经完成,则直接调用doBind0(regFuture, channel, localAddress, promise);

2.未完成的话通过增加一个监听器的方式,等通知注册完成时调用doBind0(regFuture, channel, localAddress, promise);

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
            return promise;
        }
}

分析下doBind0()的代码,如果注册异步结果是成功了的,就调用channel的bind()方法。

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

channel的bind()方法,实际上是调用pipeline的bind方法

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }

pipeline是把bind请求从尾部往前传递。(由此可见oubound请求都是从tail->head)

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }

TailContext继承于AbstractChannelHandlerContext,调用的是父类的bind方法

static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        private static final String TAIL_NAME = generateName0(TailContext.class);

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
        }
        
        ....
    }

分析AbstractChannelHandlerContext的bind()方法

1.findContextOutbound()方法是找出pipeline链中下一个的outbound的channelHandler对应的context(挂在pipeline链表上的是包装了channelHandler的channelHandlerContext)。分析代码可以看到是往前面迭代的,找到一个outbound标志位为true的context就返回。

2.找到的context从channel中获取到channel注册的那个eventloop,然后判断当前线程与该eventloop对应的线程是否同一个,是则调用context的invokeBind方法。不是的话则包装成一个任务投递到eventloop的任务队列中执行。(可以看到netty对多线程的处理是无锁编程,将对应的操作投递给属于自己的线程执行)

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (!validatePromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }

        return promise;
    }
    
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
    
    private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

3.最终是执行AbstractChannelHandlerContext的invokeBind()方法

实际是调用context对应的channleHandler的bind()方法。在创建pipeline时会加一个HeadContext的实例到pipeline作为链头,HeadContext实现了ChannelOutboundHandler,从而实现了bind()方法,其实outbound请求——bind请求最后的处理者是HeadContext

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

看HeadContext的bind方法,所以最后做实际事情bind操作的是unsafe的bind方法。这个unsafe来自于channel自身对应的unsafe

public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { 
unsafe.bind(localAddress, promise); 
}

进入到AbstractNioChannel的AbstractUnsafe的bind方法

先确保channel还是在open状态,doBind方法在NioServerSocketChannel实现,里面实现就是调用java原生serverSocketChannel的bind方法,然后投递OneTimeTask任务到channel对应的eventloop,任务主要做的事情是开始传递Inbound事件ChannelActive,

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }
        
        
        protected void doBind(SocketAddress localAddress) throws Exception {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }

pipeline的fireChannelActive方法可以看出inbound事件是从head->tail这样传递的,Tail是实现了ChannelInboundHandler接口的,实现的channelRead是释放消息引用。

 static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        private static final String TAIL_NAME = generateName0(TailContext.class);

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
 
 
 
 public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();

        //Channel状态为active后则触发channel.read()
        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }
    }

channel.read()实际调用是AbstractChannel.read()方法

 @Override
    public Channel read() {
        pipeline.read();
        return this;
    }  

最终调用的是DefaultChannelPipeline的read(),调用了tail的read方法

 public ChannelPipeline read() {
        tail.read();
        return this;
    }

oubound请求read从tail->head往前传递

 public ChannelHandlerContext read() {
        invokedPrevRead = true;
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }
        return this;
    }

read请求传递到HeadContext的read方法

public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

实际处理的是AbstractNioMessageChannel的内部类NioMessageUnsafe

 public final void beginRead() {
            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new OneTimeTask() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

调用的是AbstractNioChannel的doBeginRead(),做的事情其实修改selectionKey感兴趣的操作,增加对accept操作感兴趣

protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        if (inputShutdown) {
            return;
        }

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        //readInterestOp=16=1<<4

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

暂停下分析一下eventLoop.execute(Runnable task)方法。上面可以看到当eventLoop.inEventLoop()不为true时就封装为OneTimeTask投递到eventLoop中执行,这样是为了所有相关操作都在channel对应的事件循环线程上执行,省去了锁同步实现无锁,该方法实际是SingleThreadEventExecutor类的方法

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        //首先当前线程是不是与SingleThreadEventExecutor实例绑定的线程是一致的
        //如果一致则直接加入到taskQueue任务队列里面去
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        //判断是不是需要wakeup唤醒当前eventloop对应的selector  (判断条件:未唤醒&&当前任务需要唤醒)因为有任务了就需要去执行,不能让selector一直阻塞在select上
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
    //判断当前线程与eventloop的线程一致而且没唤醒过则唤醒selector
      @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }

如果一致则直接加入到taskQueue任务队列里面去

 protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (isShutdown()) {
            reject();
        }
        taskQueue.add(task);
    }

如果不一致则先启动本eventloop对应的线程,然后在调用添加任务addTask方法添加任务到任务队列

 private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                schedule(new ScheduledFutureTask<Void>(
                        this, Executors.<Void>callable(new PurgeTask(), null),
                        ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
                thread.start();
            }
        }
    }

那么启动NioEventLoop的线程的run方法在做什么呢?

protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }

        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;

        //该线程是事件执行器对应的线程,上面startThread方法调用后会进入到该线程的run方法
        //1.首先更新最后执行了多长时间
        //2.调用SingleThreadEventExecutor的多态的abstract修饰的run方法,这个方法根据每个子类实现不同
         thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });

        taskQueue = newTaskQueue();
    }

实际最终启动eventloop时执行的是NioEventLoop的run方法,这个就是reactor线程执行事件轮训和执行任务的主要逻辑。

protected void run() {
        for (;;) {
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select(oldWakenUp);

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

然后Reactor线程的run方法那边一直在循环做select和处理提交的任务

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351