netty源码accept

  • boss线程执行NioSocketServerChannel的Selector,监听OP_ACCEPT事件
  • ChannelPipeline使用
  • JDK nio channel 的regiter和bind
  • Channel handler的handlerAdded、channelRegistered、channelActive

示例代码

以Rocket MQ一段代码为例

public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 2088)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)  {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler()
                            );
                    }
                });
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

1. ServerBootstrap

image.png

1.1 AbstractBootstrap.bind()

public ChannelFuture bind() {
        // 校验group != null
        // ChannelFactory != null
        logger.info("校验group != null 和 ChannelFactory != null");
        validate();
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        return doBind(localAddress);
    }
  • 初始化NioServerSocketChannel
  • 设置NIO socket参数
  • ChannelPipeline中添加ServerBootstrapAcceptor Handler
  • boss线程执行JDK channel register方法
  • 如果boss线程执行结束, main线程直接执行JDK channel bind方法
    否则添加ChannelFuture listener, 执行结束register方法,在执行JDK channel bind方法
   private ChannelFuture doBind(final SocketAddress localAddress) {
        // 1. 初始化NioServerSocketChannel
        // 2. 设置nio socket 参数
        // 3. DefaultChannelPipeline添加ChannelHandler {
        //      public void run() {
        //          pipeline.addLast(new ServerBootstrapAcceptor(
        //                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        //      }
        // }
        final ChannelFuture regFuture = initAndRegister();
        // NioServerSocketChannel
        final Channel channel = regFuture.channel();
        // 失败直接返回
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 执行结束, 比较快main线程执行dobind0方法
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            // DefaultChannelPromise
            ChannelPromise promise = channel.newPromise();
            System.err.println("run thread: " + Thread.currentThread().getName() + " boss线程执行channel register结束, 执行doBind0");
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        }
        // boss线程没有执行结束, 添加执行结束listener
        else {
            System.err.println("run thread: " + Thread.currentThread().getName() + " boss线程没有执行channel register结束, 添加执行结束listener");
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {

                private String creator = Thread.currentThread().getName();

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        System.out.println("创建线程: " +  creator + ", 执行线程: " + Thread.currentThread().getName() + ", 执行doBind0");
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // ReflectiveChannelFactory
            // 反射默认构造方法创建NioServerSocketChannel
            channel = channelFactory.newChannel();

            // 1. 设置nio socket 参数
            // 2. DefaultChannelPipeline添加ChannelHandler {
            //      public void run() {
            //          pipeline.addLast(new ServerBootstrapAcceptor(
            //                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            //      }
            // }
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                // javaChannel().close();
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // EventLoopGroup(boss event loop group)选择EventLoop执行
        // SingleThreadEventLoop.register(channel)
        // regFuture = DefaultChannelPromise
        // NioMessageUnsafe.register(EventLoop, ChannelPromise);
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            // 如果javaChannel已经register
            // 关闭javaChannel
            // 并且清空write的ChannelOutboundBuffer缓存
            if (channel.isRegistered()) {
                channel.close();
            }
            // 关闭javaChannel
            else {
                // javaChannel().close();
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

1.1.1 NioServerSocketChannel

image.png
  • 继承AbstractNioMessageChannel
  • config = NioServerSocketChannelConfig
  • unsafe = NioMessageUnsafe
  • pipeline = DefaultChannelPipeline
  • 其他属性: JDK nio server channel、nio事件SelectionKey.OP_ACCEPT
  private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
    // SelectorProvider 
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

    private final ServerSocketChannelConfig config;

    /**
     * Create a new instance
     */
   public NioServerSocketChannel() {
        // private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    protected ChannelId newId() {
        return DefaultChannelId.newInstance();
    }

    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

1.1.2 EventLoopGroup.register(Channel)

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // NioMessageUnsafe
        Channel.Unsafe unsafe = promise.channel().unsafe();
        System.out.println("event loop register exe  "
                + unsafe.getClass().getName() + ".register(this, " + promise.getClass().getName() + ")");
        unsafe.register(this, promise);
        return promise;
    }

1.1.2.1 Unsafe.register(EventLoop eventLoop, final ChannelPromise promise)

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            // 判断此unsafe是否已经注册过channel
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            // return loop instanceof NioEventLoop;
            // 是否是NioEventLoop
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            // 是否在event loop线程执行
            if (eventLoop.inEventLoop()) {
                // 直接执行
                register0(promise);
            }
            // 启动event loop线程执行
            else {
                try {
                    eventLoop.execute(new NamedRunnable() {
                        @Override
                        public String name() {
                            return "AbstractChannel[489] > register0(promise) ";
                        }

                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
  • javaChannel().register
  • 执行channel handler的handlerAdded方法
    ChannelInitializer类型的handler执行完handlerAdded方法会从handler链中删除自己
    其他类型handler不会
  • ChannelPromise sucess, 后续执行javaChannel().bind方法
  • 执行channel handler的channelRegister方法
  • isActive() 多余判断, 因为bind方法也是提交到boss event loop的任务队列里面
    private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                // promise 是否被取消
                // java channel是否是open状态
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                // 注册标记
                boolean firstRegistration = neverRegistered;
                // javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                // 执行DefaultChannelPipeline中
                // PendingHandlerCallback包装的ChannelHander的handlerAdded方法
                // 如果ChannelHander extends ChannelInitializer执行handlerAdded
                // 后会从DefaultChannelPipeline移除此ChannelHander
                pipeline.invokeHandlerAddedIfNeeded();
                // 设置成功, notify 注册的listener
                safeSetSuccess(promise);

                // 从head开始执行DefaultChannelPipeline的ChannelHandler链
                // 此示例: 执行完handlerAdded后方法后, 仅剩下默认的HeadContext -> TailContext
                // 添加ServerBootstrapAcceptor还在任务队列没有执行
                // 这两个ChannlHandler, 没有任何操作
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                // javaChannel.socket().isBound()
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
  • 向boss event loop的JDK Selector注册, 但未设置监听事件
  @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 注册, event loop的Selector(JDK Selector)
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

1.1.2.2 DefaultChannelPipeline.invokeHandlerAddedIfNeeded()

  • 在ServerBootstrap.init方法中, 添加了new ChannelInitializer channel handler
  • 当前已经存在3个handler


    image.png
final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        // 每个channel只执行一次
        if (firstRegistration) {
            firstRegistration = false;
            // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
            // that were added before the registration was done.
            callHandlerAddedForAllHandlers();
        }
    }
private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            // This Channel itself was registered.
            // pipeline自己重新设置一次所属的channel register
            registered = true;

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            // 为空, 等待GC
            this.pendingHandlerCallbackHead = null;
        }

        // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
        // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
        // the EventLoop.
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // 也是个链表, 遍历执行
        // 都是在JDK channel没有register之前添加的handler
        // PendingHandlerAddedTask 执行handler的handlerAdded
        // PendingHandlerRemovedTask 执行handler的handlerRemoved
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }
  • 此示例没有remove handler, 只有添加
 private final class PendingHandlerAddedTask extends PendingHandlerCallback {

        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                executor, ctx.name(), e);
                    }
                    remove0(ctx);
                    ctx.setRemoved();
                }
            }
        }
    }
  • handler为ServerBootStrap.init方法添加的ChannelInitializer
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
            // any pipeline events ctx.handler() will miss them because the state will not allow it.
            ctx.setAddComplete();
            System.out.println("执行" + ctx.handler() + " 添加成功handlerAdded 方法");
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            boolean removed = false;
            try {
                // 从handler 链中移除此handler
                remove0(ctx);
                try {
                    // 执行handler的handlerRemoved方法
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    // 设置状态为移除状态
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            // 执行handler(不存在当前异常handler, 之前已经从链表中移除了)的exceptionCaught方法
            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }

1.1.2.3 ChannelInitializer

  • handlerAdded
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            initChannel(ctx);
        }
    }
  • finally会删除此channel
   private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            }
            // 会删除此channel
            finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

initChannel((C) ctx.channel()); 就是ServerBootstrap.init中的ChannelInitializer

 pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

1.2 init

  • ChannelPipeline添加channel handler
  • currentChildGroup 为 worker
  • 添加的channel handler主要是继续添加ServerBootstrapAcceptor
void init(Channel channel) throws Exception {
        // 设置nio socket 参数
        // .option(ChannelOption.SO_BACKLOG, 1024)
        // .option(ChannelOption.SO_REUSEADDR, true)
        // .option(ChannelOption.SO_KEEPALIVE, false)
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        // {} 此示例为空
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        // DefaultChannelPipeline
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        // new ChannelInitializer() {}
        // worker 处理流程handler 自定义
        // .childHandler(new ChannelInitializer<SocketChannel>() {
        //    @Override
        //    public void initChannel(SocketChannel ch) throws Exception {
        //        ch.pipeline()
        //                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
        //                        new HandshakeHandler(TlsSystemConfig.tlsMode))
        //                .addLast(defaultEventExecutorGroup,
        //                        new NettyEncoder(),
        //                        new NettyDecoder(),
        //                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
        //                        new NettyConnectManageHandler(),
        //                        new NettyServerHandler()
        //                );
        //    }
        // });
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        logger.info("DefaultChannelPipeline.addLast添加 ChannelInitializer");
        // 向DefaultChannelPipeline中添加handler
        // 当javaChannel().register(...) 后
        //      会调用ChannelInitializer.handlerAdded方法
        //      if (ctx.channel().isRegistered()) {
        //             initChannel(ctx);
        //      }
        // 如果javaChannel没有register, 则会PendingHandlerCallback延迟在chnnel register后执行
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new NamedRunnable() {
                    @Override
                    public String name() {
                        return "ServerBootstrap[211] > pipeline.addLast(ServerBootstrapAcceptor)";
                    }

                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

1.2.1 DefaultChannelPipeline.addLast(ChannelHandler... handlers)

   @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // ChannelHandlerAdapter类型handler校验
            // if (!h.isSharable() && h.added) {  throw new ChannelPipelineException() }
            checkMultiplicity(handler);
            // 生成handler name, 不指定name就生成
            // 存在相同name的handler就抛出异常
            String handlerName = filterName(name, handler);
            // 封装在DefaultChannelHandlerContext
            newCtx = newContext(group, handlerName, handler);
            // 链表维护
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 如果channel没有register
            if (!registered) {
                // 设置状态为ADD_PENDING
                newCtx.setAddPending();
                // 添加pendingHandlerCallbackHead(是一个单向链表), 在channel register后执行
                // 添加pendingHandlerCallbackHead = PendingHandlerAddedTask;
                // 移除pendingHandlerCallbackHead = PendingHandlerRemovedTask
                // 执行handler的handlerAdded方法
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            // 不存在EventExecutorGroup, 使用AbstractChannelHandlerContext -> DefaultChannelPipeline -> channel的EventLoop
            EventExecutor executor = newCtx.executor();
            // 当前线程和event loop线程, 不是同一线程
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new NamedRunnable() {
                    @Override
                    public String name() {
                        return "DefaultChannelPipeline[245] > " + newCtx.handler() + ".callHandlerAdded0";
                    }

                    @Override
                    public void run() {
                        // 执行handler的handlerAdded方法
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        // 执行handler的handlerAdded方法
        callHandlerAdded0(newCtx);
        return this;
    }
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
            // any pipeline events ctx.handler() will miss them because the state will not allow it.
            ctx.setAddComplete();
            System.out.println("执行" + ctx.handler() + " 添加成功handlerAdded 方法");
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            boolean removed = false;
            try {
                // 从handler 链中移除此handler
                remove0(ctx);
                try {
                    // 执行handler的handlerRemoved方法
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    // 设置状态为移除状态
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            // 执行handler(不存在当前异常handler, 之前已经从链表中移除了)的exceptionCaught方法
            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }

1.3 doBind0

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        // boss event loop添加任务
        channel.eventLoop().execute(new NamedRunnable() {
            @Override
            public String name() {
                return "AbstractBootstrap[412] > channel.bind(SocketAddress, ChannelPromise)";
            }

            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

1.4 Channel.bin(SocketAddress, ChannelPromise)

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

1.4.1 DefaultChannelPipeline.bind(SocketAddress, ChannelPromise)

    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

1.4.2 TailContext.bind(SocketAddress, ChannelPromise )

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        // 一堆promise的校验
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        // 返回handler链第一个outbound  channel handler
        // next = AbstractChannelHandlerContext(HeadContext)
        final AbstractChannelHandlerContext next = findContextOutbound();
        // 获得handler的event loop
        // HeadContext executor为空, 返回
        //      DefaultChannelPipeline -> channel(NioServerSocketChannel)的EventExecutor(boss event loop)
        EventExecutor executor = next.executor();
        // 当前执行线程和executor loop线程为同一个, 直接执行
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        }
        // 否则添加到executor的任务队列中
        else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

1.4.3 HeadContext.bind(SocketAddress, ChannelPromise )

 @Override
  public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) 
 throws Exception {
            unsafe.bind(localAddress, promise);
   }

1.4.4 Unsafe.bind(SocketAddress, ChannelPromise )

  • 执行handler channelActive, server开始等待accept连接
  • HeadContext、ServerBootstrapAcceptor、TailContext
   public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            // ChannelPromise被取消 || channel已经关闭
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            // UDP 广播, 说不好
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            //  javaChannel().socket().isBound();
            boolean wasActive = isActive();
            try {
                // javaChannel().bind(localAddress, config.getBacklog());
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            // 如果bind成功(javaChannel().socket().isBound())
            // 执行handler链的channelActive方法
            if (!wasActive && isActive()) {
                invokeLater(new NamedRunnable() {
                    @Override
                    public String name() {
                        return "[AbstractChannel.599]: pipeline.fireChannelActive()";
                    }

                    @Override
                    public void run() {
                         // TODO DEBUG LOG
                        AbstractChannelHandlerContext head =  pipeline.head;
                        while(head != null) {
                            System.err.println(">>>>>> handler: " + head);
                            head = head.next;
                        }
                        pipeline.fireChannelActive();
                    }
                });
            }
            // 执行成功, 并通知所有listener
            safeSetSuccess(promise);
        }

1.4.4.1 HeadContext.channelActive

  • ServerBootstrapAcceptor和TailContext的channelActive 没有什么操作
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            readIfIsAutoRead();
        }
      // 默认auto read
      private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }

1.4.4.2 Channel.read

   public Channel read() {
        pipeline.read();
        return this;
    }
   public final ChannelPipeline read() {
        tail.read();
        return this;
    }
  • 还是找到了HeadContext执行  ̄へ ̄
public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new NamedRunnable() {
                    @Override
                    public String name() {
                        return "AbstractChannelHandlerContext[726] > " + next.handler() + ".read(...)";
                    }

                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }
  • HeadContext.read
       public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
  • Unsafe.beginRead()
public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }
  • 添加NioServerSocketChannel创建时候的readInterestOp
  • readInterestOp = SelectionKey.OP_ACCEPT
protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

一些日志


image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345

推荐阅读更多精彩内容