
5.2 AbstractNioChannel源码分析


     * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
    public interface NioUnsafe extends Unsafe {
         * Return underlying {@link SelectableChannel}
        SelectableChannel ch(); // 对应NIO中的JDK实现的Channel

         * Finish connect
        void finishConnect(); // 连接完成

         * Read from underlying {@link SelectableChannel}
        void read(); // 从JDK的Channel中读取数据

        void forceFlush();


     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
    private ChannelPromise connectPromise; // 连接异步结果
    private ScheduledFuture<?> connectTimeoutFuture; // 连接超时检测任务异步结果
    private SocketAddress requestedRemoteAddress; // 连接的远端地址


        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;   // Channel已被关闭

            try {
                if (connectPromise != null) {
                    // Already a connect in process.
                    throw new ConnectionPendingException(); // 已有连接操作正在进行

                boolean wasActive = isActive();
                // 模板方法,细节子类完成
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive); // 连接操作已完成
                } else {
                    // 连接操作尚未完成
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // 这部分代码为Netty的连接超时机制
                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);

                    promise.addListener(new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture future) throws Exception {
                            // 连接操作取消则连接超时检测任务取消
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                connectPromise = null;
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));


private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return; // 操作已取消或Promise已被通知

            // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
            // We still need to ensure we call fireChannelActive() in this case.
            boolean active = isActive();

            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && active) {

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {


        public final void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                fulfillConnectPromise(connectPromise, wasActive); // 首次Active触发Active事件
            } catch (Throwable t) {
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);  // 连接完成,取消超时检测任务
                connectPromise = null;


        protected final void flush0() {
            // Flush immediately only when there's no pending flush.
            // If there's a pending flush operation, event loop will call forceFlush() later,
            // and thus there's no need to call it now.
            if (!isFlushPending()) {
                super.flush0(); // 调用父类方法,在父类判断是否已经又调用;

        public final void forceFlush() {
            // directly call super.flush0() to force a flush now

        private boolean isFlushPending() {
            SelectionKey selectionKey = selectionKey();
            return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;


protected final void removeReadOp() {
            SelectionKey key = selectionKey();
            // Check first if the key is still valid as it may be canceled as part of the deregistration
            // from the EventLoop
            // See
            if (!key.isValid()) {
                return; // selectionKey已被取消
            int interestOps = key.interestOps();
            if ((interestOps & readInterestOp) != 0) {
                // only remove readInterestOp if needed
                key.interestOps(interestOps & ~readInterestOp); //设置为不再感兴趣

 Netty中将服务端的OP_ACCEPT和客户端的Read统一抽象为Read事件,在NIO底层I/O事件使用bitmap表示,一个二进制位对应一个I/O事件。当一个二进制位为1时表示关心该事件,readInterestOp的二进制表示只有1位为1,所以体会interestOps & ~readInterestOp的含义,可知removeReadOp()的功能是设置SelectionKey不再关心Read事件。类似的,还有setReadOp()、removeWriteOp()、setWriteOp()等等。

private final SelectableChannel ch; // 包装的JDK Channel
    protected final int readInterestOp; // Read事件,服务端OP_ACCEPT,其他OP_READ
    volatile SelectionKey selectionKey; // JDK Channel对应的选择键
    boolean readPending;  // 底层读事件进行标记


     * Create a new instance
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent); = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false); // 设置为非阻塞模式
        } catch (IOException e) {
            try {
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                            "Failed to close a partially initialized socket.", e2);

            throw new ChannelException("Failed to enter non-blocking mode.", e);


    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no operation was called yet.
                    // 选择键取消重新selectNow(),清除因取消操作而缓存的选择键
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;

    protected void doDeregister() throws Exception {

 对于Register事件,当Channel属于NIO时,已经可以确定注册操作的全部细节:将Channel注册到给定NioEventLoop的selector上即可。注意,其中第二个参数0表示注册时不关心任何事件,第三个参数为Netty的NioChannel对象本身。对于Deregister事件,选择键执行cancle()操作,选择键表示JDK Channel和selector的关系,调用cancle()终结这种关系,从而实现从NioEventLoop中Deregister。需要注意的是:cancle操作调用后,注册关系不会立即生效,而会将cancle的key移入selector的一个取消键集合,当下次调用select相关方法或一个正在进行的select调用结束时,会从取消键集合中移除该选择键,此时注销才真正完成。一个Cancel的选择键为无效键,调用它相关的方法会抛出CancelledKeyException。

    protected void doBeginRead() throws Exception {
        // or was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return; // 选择键被取消而不再有效

        readPending = true; // 设置底层读事件正在进行

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 选择键关心Read事件
            selectionKey.interestOps(interestOps | readInterestOp);


    protected void doClose() throws Exception {
        ChannelPromise promise = connectPromise;
        if (promise != null) {
            // Use tryFailure() instead of setFailure() to avoid the race against cancel().
            // 连接操作还在进行,但用户调用close操作
            connectPromise = null;

        ScheduledFuture<?> future = connectTimeoutFuture;
        if (future != null) { // 如果有连接超时检测任务,则取消
            connectTimeoutFuture = null;


    protected boolean isCompatible(EventLoop loop) {
        return loop instanceof NioEventLoop;


5.3 AbstractNioMessageChannel源码分析


        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf); // 模板方法,读取消息
                        if (localRead == 0) { // 没有数据可读
                        if (localRead < 0) { // 读取出错
                            closed = true;

                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false; // 已没有底层读事件
                    pipeline.fireChannelRead(readBuf.get(i)); //触发ChannelRead事件,用户处理
                // ChannelReadComplete事件中如果配置autoRead则会调用beginRead,从而不断进行读操作
                pipeline.fireChannelReadComplete(); // 触发ChannelReadComplete事件,用户处理

                if (exception != null) {
                    closed = closeOnReadError(exception);


                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise()); // 非serverChannel且打开则关闭
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called or in channelRead(...) method
                // * The user called or in channelReadComplete(...) method
                // See
                if (!readPending && !config.isAutoRead()) {
                    // 既没有配置autoRead也没有底层读事件进行
                    removeReadOp(); // 清除read事件,不再关心


5.4 NioServerSocketChannel源码分析


    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // JDK版本1.7以上
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());

    protected void doClose() throws Exception {

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            // 一个NioSocketChannel为一条消息
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);

        return 0;


     * Create a new instance
    public NioServerSocketChannel() {

     * Create a new instance using the given {@link SelectorProvider}.
    public NioServerSocketChannel(SelectorProvider provider) {

     * Create a new instance using the given {@link ServerSocketChannel}.
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());


5.5 AbstractNioByteChannel源码分析


     * Create a new instance
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);


        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

            ByteBuf byteBuf = null; // 创建一个ByteBuf
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator); // 创建一个ByteBuf
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));  // doReadBytes模板方法,子类实现细节
                    if (allocHandle.lastBytesRead() <= 0) { // 没有数据可读
                        // nothing was read. release the buffer.
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0; // 读取数据量为负数表示对端已经关闭
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;

                    readPending = false; // 没有底层读事件进行
                    pipeline.fireChannelRead(byteBuf); // 触发ChannelRead事件,用户处理
                    byteBuf = null;
                } while (allocHandle.continueReading());

                // ReadComplete结束时,如果开启autoRead则会调用beginRead,从而可以继续read

                if (close) {
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called or in channelRead(...) method
                // * The user called or in channelReadComplete(...) method
                // See
                if (!readPending && !config.isAutoRead()) {
                    // 既没有配置autoRead也没有底层读事件进行


private void closeOnRead(ChannelPipeline pipeline) {
            if (!isInputShutdown0()) {
                if (isAllowHalfClosure(config())) {
                } else {
                    close(voidPromise()); // 直接关闭
            } else {
                inputClosedSeenErrorOnRead = true;

 这段代码正是Channel参数ALLOW_HALF_CLOSURE的意义描述,该参数为True时,会触发用户事件ChannelInputShutdownEvent,否则,直接关闭该Channel。抛出异常时,会调用handleReadException(pipeline, byteBuf, t, close)方法:

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) {
            if (byteBuf != null) { // 已读取到数据
                if (byteBuf.isReadable()) { // 数据可读
                    readPending = false;
                } else { // 数据不可读
            if (close || cause instanceof IOException) {


    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;

            return newDirectBuffer(buf); // 非DirectBuf转为DirectBuf

        if (msg instanceof FileRegion) {
            return msg;

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);


     * Write objects to the OS.
     * @param in the collection which contains objects to write.
     * @return The value that should be decremented from the write quantum which starts at
     * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
     * <ul>
     *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
     *     is encountered</li>
     *     <li>1 - if a single call to write data was made to the OS</li>
     *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
     *     data was accepted</li>
     * </ul>
     * @throws Exception if an I/O exception occurs during write.
    protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
        Object msg = in.current();
        if (msg == null) {
            // Directly return here so incompleteWrite(...) is not called.
            return 0;
        return doWriteInternal(in, in.current());

    private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (!buf.isReadable()) {
                return 0;

            final int localFlushedAmount = doWriteBytes(buf); // 模板方法,子类实现细节
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount); // 记录进度
                if (!buf.isReadable()) {
                    in.remove();  // 完成时,清理缓冲区
                return 1; // 跳出循环执行incompleteWrite()
        } else if (msg instanceof FileRegion) {
            FileRegion region = (FileRegion) msg;
            if (region.transferred() >= region.count()) {
                return 0; // 跳出循环执行incompleteWrite()

            long localFlushedAmount = doWriteFileRegion(region);
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount); // 记录进度
                if (region.transferred() >= region.count()) {
                return 1;
        } else {
            // Should not reach here.
            throw new Error(); // 其他类型不支持

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = config().getWriteSpinCount();
        do {
            Object msg = in.current();
            if (msg == null) { // 数据已全部写完
                // Wrote all messages.
                clearOpWrite();  // 清除OP_WRITE事件
                // Directly return here so incompleteWrite(...) is not called.
            writeSpinCount -= doWriteInternal(in, msg);
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;

            return newDirectBuffer(buf); // 非DirectBuf转为DirectBuf

        if (msg instanceof FileRegion) {
            return msg;

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);


protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {
            setOpWrite();  // 设置继续关心OP_WRITE事件
        } else {
            // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
            // use our write quantum. In this case we no longer want to set the write OP because the socket is still
            // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
            // and set the write OP if necessary.

            // Schedule flush again later so other tasks can be picked up in the meantime
            eventLoop().execute(flushTask); // 再次提交一个flush()任务


5.6 NioSocketChannel源码分析


    protected void doBind(SocketAddress localAddress) throws Exception {

    private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // JDK版本1.7以上
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            SocketUtils.bind(javaChannel().socket(), localAddress);


    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                // 设置关心OP_CONNECT事件,事件就绪时调用finishConnect()
            success = true;
            return connected;
        } finally {
            if (!success) {

    protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();


    protected void doDisconnect() throws Exception {

    protected void doClose() throws Exception {
        super.doClose(); // AbstractNioChannel中关于连接超时的处理


    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());

    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);

    protected long doWriteFileRegion(FileRegion region) throws Exception {
        final long position = region.transferred();
        return region.transferTo(javaChannel(), position);


    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite(); // 所有数据已写完,不再关心OP_WRITE事件
                // Directly return here so incompleteWrite(...) is not called.

            // Ensure the pending writes are made of ByteBufs only.
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See
            switch (nioBufferCnt) {
                case 0: // 没有ByteBuffer,也就是只有FileRegion
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    writeSpinCount -= doWrite0(in); // 使用父类方法进行普通处理
                case 1: { // 只有一个ByteBuffer,此时的处理等效于父类方法的处理
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                default: { // 多个ByteBuffer,采用gathering方法处理
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    long attemptedBytes = in.nioBufferSize();
                    // gathering方法,此时一次写多个ByteBuffer
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                    in.removeBytes(localWrittenBytes); // 清理缓冲区
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0); 

 在明白了父类的doWrite方法后,这段代码便容易理解,本段代码做的优化是:当输出缓冲区中有多个buffer时,采用Gathering Writes将数据从这些buffer写入到同一个channel。

        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    // We need to cancel this key of the channel so we may not end up in a eventloop spin
                    // because we try to read or write until the actual close happens which may be later due
                    // SO_LINGER handling.
                    // See
                    doDeregister(); // 取消选择键selectionKey
                    return GlobalEventExecutor.INSTANCE;
            } catch (Throwable ignore) {
                // Ignore the error as the underlying channel may be closed in the meantime and so
                // getSoLinger() may produce an exception. In this case we just return null.
                // See
            return null;

 SO_LINGER表示Socket关闭的延时时间,在此时间内,内核将继续把TCP缓冲区的数据发送给对端且执行close操作的线程将阻塞直到数据发送完成。Netty的原则是I/O线程不能被阻塞,所以此时返回一个Executor用于执行阻塞的doClose()操作。doDeregister()取消选择键selectionKey是因为:延迟关闭期间, 如果selectionKey仍然关心OP_WRITE事件,而输出缓冲区又为null,这样write操作直接返回,不会再执行clearOpWrite()操作取消关心OP_WRITE事件,而Channel一般是可写的,这样OP_WRITE事件会不断就绪从而耗尽CPU,所以需要取消选择键删除注册的事件。

