Netty服务器源码分析

本文基于Netty 4

在讨论Netty服务器启动之前,先回顾一下服务端使用Java nio selector的启动过程:

//1. 获取服务端通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 绑定端口
ssChannel.bind(new InetSocketAddress(9898));
//3. 设置为非阻塞模式
ssChannel.configureBlocking(false);

Selector selector = Selector.open();
//4. 向监听器注册accept事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

while (selector.select() > 0) {
    //5. 获取监听器上所有的监听事件值
    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
    while (it.hasNext()) {
        SelectionKey key = it.next();
        if (key.isAcceptable()) {            
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel socketChannel = server.accept();
            socketChannel.configureBlocking(false);
            //注册read事件
            socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (key.isReadable()) {
            SocketChannel channel = (SocketChannel) key.channel();
            readMsg(channel);
        } else if(...) {
            ...
        }
        it.remove();
    }
}

Netty nio模式的启动过程:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup);
    b.channel(NioServerSocketChannel.class);
    b.option(ChannelOption.SO_BACKLOG, 128);
    b.childOption(ChannelOption.SO_REUSEADDR, true);
    b.childOption(ChannelOption.TCP_NODELAY, true);
    b.childOption(ChannelOption.SO_KEEPALIVE, true);
    b.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("http-decoder", new HttpRequestDecoder());
            pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
            pipeline.addLast("encoder", new HttpResponseEncoder());
            pipeline.addLast(new MyInboundHandler());
        }
    });

    ChannelFuture f = b.bind(8080).sync();
    f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

其实,无论是Java nio,还是Netty nio,总体都包含两个方面:1. 线程模型;2. IO模型;下面先分析Netty的线程模型。

Netty线程启动

NioEventLoopGroup是Netty线程的核心,下面看一下这个类的初始化
NioEventLoopGroup # 构造方法

public NioEventLoopGroup() {
      this(0);
}
//无参的构造转调到了这里
public NioEventLoopGroup(int nThreads) {
     //这里的Executor传入了null
     this(nThreads, (Executor) null);
}

上面的构造方法最终会调用父类的构造方法
MultithreadEventLoopGroup # 构造方法

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     //这里若threads==0,会初始化一个值;
    //该初始值的算法 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2))
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

最终走到这里 MultithreadEventExecutorGroup # 构造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                       EventExecutorChooserFactory chooserFactory, Object... args) {
 if (nThreads <= 0) {
     throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
 }

 if (executor == null) {
     //之前executor设为了null,这里会赋一个值,这个executor的作用是创建新 Thread并运行任务;后面会发现它的作用
     executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
 }

 //这里会根据threads的值创建若干个EventExecutor
 children = new EventExecutor[nThreads];

 for (int i = 0; i < nThreads; i ++) {
     boolean success = false;
     try {
         //往里看一下children为何物
         children[i] = newChild(executor, args);
         success = true;
     } catch (Exception e) {
         // TODO: Think about if this is a good exception type
         throw new IllegalStateException("failed to create a child event loop", e);
     } finally {
         if (!success) {
              .........
         }
     }
 }
 //初始了chooser,后面会使用该chooser选择一个EventLoop
 chooser = chooserFactory.newChooser(children);

 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
     @Override
     public void operationComplete(Future<Object> future) throws Exception {
         if (terminatedChildren.incrementAndGet() == children.length) {
             terminationFuture.setSuccess(null);
         }
     }
 };

 for (EventExecutor e: children) {
     e.terminationFuture().addListener(terminationListener);
 }

 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
 Collections.addAll(childrenSet, children);
 readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

NioEventLoopGroup # newChild()
上面的children[] 最终保存的是NioEventLoop

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
     return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop中最核心的方法就是execute()

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    //系统刚启动时,NioEventLoop中的Thread肯定为null,且state==ST_NOT_STARTED,那么肯定会走到else中
    if (inEventLoop) {
        addTask(task);
    } else {
        //这里会走到doStartThread()
        startThread();
        //startThread()方法如下
        //////////////////////////////////////////// 
           if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                //改变EventLoop的状态并启动真正干活的线程,CAS操作保证了每个EventLoop只启动一个线程
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
         ////////////////////////////////////////////
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
private void doStartThread() {
    assert thread == null;
    //这个executor就是上文在初始化时创建的 ThreadPerTaskExecutor
    //ThreadPerTaskExecutor 会创建新的线程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            //这个thread常用于inEventLoop()方法
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                //run()为abstract方法,子类会实现
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                ......
            }
        }
    });
}

NioEventLoop # run()
该方法会进入死循环,不停地按预先分配的时间比例处理IO任务和其他的例如scheduled任务

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //NIO多路复用选择器
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        ......
    }
}

通过上面的介绍,我们知道了初始化NioEventLoopGroup时会创建nThreads个NioEventLoop,当有新task到来时,会根据其所属Chanenl选择对应的EventLoop执行(execute())

NioEventLoopGroup

线程已经先行启动,等着数据的到来;接下来看一下IO相关的初始化,Netty的IO模型是多路复用;

IO的启动

ServerBootstrap是IO的核心类,下面就从这个类入手,看一下IO的启动过程

1. 参数的设置
ServerBootstrap b = new ServerBootstrap();
//上面启动的线程(池)设置在这里
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
//tcp 三次握手使用的参数
b.option(ChannelOption.SO_BACKLOG, 128);
2. 绑定端口

在继续深入源码之前,需要区分一下register的含义;Java NIO的register一般指把ServerSocketChannel注册到Selector上;在Netty中,注册的含义不仅仅包含这层含义,还包括把NioServerSocketChannel与EventLoop关联在一起

bind(port)会进入AbstractBootstrap的doBind()中
AbstractBootstrap # doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    // 注册已经在EventLoop中进行,因此下列方法可能和注册过程并发进行
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 注册是通过线程池完成,因此这里要判断是否注册完成
    // 注册过程可能很快,这里已经完成
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 绑定本地端口
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        // 没有完成注册的话,就注册一个监听器,注册完成后进行回调
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

AbstractBootstrap # initAndRegister()
这个方法开始使用前面已经初始化的NioEventLoopGroup

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //设置参数时,有 b.channel(NioServerSocketChannel.class),
        //因此这里的channel实际就是 NioServerSocketChannel
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        ......
        return ......
    }
    //这里的group()实际返回的就是bossGroup,即上面创建的第一个NioEventLoopGroup,因此真正的注册任务是由线程池来完成;
    //只有当EventLoop与Channel关联在一起,才能算注册成功
    ChannelFuture regFuture = config().group().register(channel);    
    .......  
    return regFuture;
}

在真正注册之前,会先实例化NioServerSocketChannel,接下来就看一下NioServerSocketChannel初始化过程;NioServerSocketChannel的继承关系如下:

NioServerSocketChannel 继承关系图

channelFactory.newChannel()会调用NioServerSocketChannel的构造方法:

public NioServerSocketChannel(ServerSocketChannel channel) {
        // NioServerSocketChannel 关心的事件是Accept,即新的客户端连接
        super(null, channel, SelectionKey.OP_ACCEPT);
        //这里也会进行内存分配相关类的初始化
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

AbstractChannel

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //每个channel都有一个id
        id = newId(); 
        //真正与下层交互的东东,比如读写socket等操作
        unsafe = newUnsafe();
        //初始化为DefaultChannelPipeline
        pipeline = newChannelPipeline();
}

NioServerSocketChannel已初始化完毕,但还没有与EventLoopGroup或EventLoop发生任何联系

ServerBootstrap # init()

//上面已完成Channel的初始化,这里的channel就是 NioServerSocketChannel
void init(Channel channel) throws Exception {
    //一些attrs、options、handler以及childAttrs、childOptions、childHandler的设置
    .......
    //这里就是上面创建的DefaultChannelPipeline
    ChannelPipeline p = channel.pipeline();
    //之前创建的第二个NioEventLoopGroup(workerGroup)
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    //pipeline的链表最后加入了一个ChannelHandler
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

在addLast时,有一点要注意:如果还没有完成注册,则把Handler保存在一个临时变量中,等注册完毕后再调用相应方法;如果已完成注册,则应调用handlerAdded()等回调方法;
DefaultChannelPipeline # addLast()

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);
        // 加入到Handler链中
        addLast0(newCtx);
       
        // 由前面的分析可知,系统刚启动时init()会走到这一步,此时并没有完成注册,因此会进入if;
        if (!registered) {
            newCtx.setAddPending();        
            //在注册完成后,会调用pendingHandlerCallbackHead的相关方法
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

callHandlerCallbackLater()

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while (pending.next != null) {
            pending = pending.next;
        }
        // 注册完成后,按照链的顺序依次调用
        pending.next = task;
    }
}

addLast()完成后,Pipeline中Handler链的顺序如下:


handler链

init()完成后,开始异步注册
AbstractBootStrap # initAndRegister()

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
           .......
        }
        // 异步注册
        // 使用chooser在EventLoopGroup中选择一个EventLoop进行注册
        ChannelFuture regFuture = config().group().register(channel);
        ......
        return regFuture;
 }

SingleThreadEventLoop # register()

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

AbstractChannel # AbstractUnsafe # register()

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
     // NioServerSocketChannel的eventLoop 赋了值
    AbstractChannel.this.eventLoop = eventLoop;
    // 系统刚启动时,EventLoopGroup中所有的EventLoop都是未启动状态,EventLoop的thread属性也为null
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 由上文可知,execute()方法用来提交任务,启动一个新的EventLoop
            // 在此,channel已经和EventLoop产生了联系
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ........
        }
    }
}

AbstractChannel # AbstractUnsafe # register0()

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // JDK的注册:将ServerSocketChannel注册到selector上;有一点要注意,这里注册的时候,并没有注册感兴趣的事件;
        // selectionKey = javaChannel().register(eventLoop().selector, 0, this);
        doRegister();
        neverRegistered = false;
        registered = true;
        // 到这里注册完成
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

到这里可以看出Netty中完成注册的标识必须满足下面三个条件:

  1. NioServerSocketChannel 的eventLoop 字段赋上了值
  2. 这个EventLoop要启动(新建的EventLoop是未启动状态)
  3. JDK级别的ServerSocketChannel 注册到selector上(不一定有事件)

在前面我们准备了一些动作必须等注册完成后才能触发,这里再回顾一下是哪些动作:
a. AbstractBootstrap # doBind() 里注册了一个监听器,注册完成后调用;
b. DefaultChannelPipeline # addLast() 里注册了一个回调链,也是等注册完成后调用,实际就是调用handlerAdded();

注册后继续做了如下动作:

pipeline.invokeHandlerAddedIfNeeded();
// 设置成功标志,这里会调用listener;前面注册的ChannelFutureListener就会在此调用,进行本地端口的绑定;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
//
if (isActive()) {
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        beginRead();
    }
}

pipeline.invokeHandlerAddedIfNeeded()会走到下面代码

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }
    //  这里就是 上文添加的Handler链;ChannelInitializer就是其中之一
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        // 这里会调用到ChannelInitializer的initChannel()
        task.execute();
        task = task.next;
    }
}

ChannelInitializer # initChannel()

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            .......
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

上述操作会导致Pipeline中Handler链发生下面的变化:

handler链

safeSetSuccess(promise) 会走到如下代码
DefaultPromise # trySuccess()

public boolean trySuccess(V result) {
     if (setSuccess0(result)) {
          //调用listener;上文注册的ChannelFutureListener就会在此被调用
          notifyListeners(); 
          return true;
     }
     return false;
}

到此Netty服务器已经启动,等待客户端连接,启动大致流程如下:


image.png

触发channelActive()

服务器注册完成后,Pipeline中Handler链的结构如下,此时调用pipeline.fireChannelActive()


Handler链
设置ACCEPT事件

DefaultChannelPipeline # fireChannelActive()

public final ChannelPipeline fireChannelActive() {
    // 这里最终会走到HeadContext中
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

HeadContext # channelActive()

public void channelActive(ChannelHandlerContext ctx) throws Exception {
     //这个会沿着Handler链的方向调用下一个handler的方法
     //在目前的Handler链中,其实什么都没做
     ctx.fireChannelActive();

     readIfIsAutoRead();
     /////////////////////////////////////////// readIfIsAutoRead的方法体
     private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                 //这个是可以配置关闭的,默认都是打开;这个开关与限流有关
                // 这个方法最终走到AbstractNioChannel的doBeginRead()
                channel.read();
            }
     }
     ///////////////////////////////////////////
}

AbstractNioChannel # doBeginRead()

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // readInterestOp在channel初始化时设为了accept
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

由上文可知,注册时没有注册感兴趣的事件 (selectionKey = javaChannel().register(eventLoop().selector, 0, this),当时传入的是0)。这里才完成感兴趣事件的设置(至于为什么滞后设置,我觉得是要把所有相关的类都初始化完成后才注册,否则在这个过程中有客户端访问,可能会出错)。此时EventLoop的循环才真正可以检查IO事件。

接收客户端连接

当客户端连接到来时,EventLoop的循环方法(run())会检测到这一事件,进入processSelectedKey()方法
NioEventLoop # processSelectedKey()

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    ......
    try {
        ......
        //处理 read or accept
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 调用NioMessageUnsafe的read()
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

NioMessageUnsafe # read()

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                ///////////////////////////////////////////// NioServerSocketChannel.doReadMessages()
                SocketChannel ch = javaChannel().accept();
                try {
                      if (ch != null) {
                            buf.add(new NioSocketChannel(this, ch));
                            return 1;
                      }
                } catch (Throwable t) {
                    ......
                }
                return 0;
                  
                /////////////////////////////////////////////
                ......
                
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        
        pipeline.fireChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        .......
    } finally {
        .......
    }
}

这里会调用accept()接受客户端连接并保存在list中;接着触发了pipeline.fireChannelRead(),channelRead事件就会从HeadContext传播到ServerBootstrapAcceptor
ServerBootstrapAcceptor # channelRead()

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //msg 是前文accept()接收的NioSocketChannel
    final Channel child = (Channel) msg;
    // 添加ChannelInitializer
    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        //childGroup为workerGroup
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

上面的方法会将accept的SocketChannel进行注册,与ServerSocketChannel的注册唯一不同的是此处会把Handler与另一个NioEventLoopGroup关联;下面的代码是我们对这个socket的初始设置。

ServerBootstrap b = new ServerBootstrap();
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("http-decoder", new HttpRequestDecoder());
        pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
        pipeline.addLast("encoder", new HttpResponseEncoder());
        pipeline.addLast(new MyInboundHandler());
    }
});

Tips: Netty中将与accept()返回的socket有关的参数称之为childXXX,比如childOption()就是给该socket设置参数,childHandler()是给该socket设置处理器

channelRead()后,系统中就形成了一条新的Handler链,其结构如下;之后与客户端的读写交互都由这个Handler链处理

Child Handler

Netty工作流程图

netty工作流程图
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容