netty源码分析(6)-NioEventLoop创建过程

前面几章,在我们分析的时候注册Selector相关代码的时候,提到过一部分NioEventLoop的创建过程。接下来详细分析。new NioEventLoopGroup();

    public NioEventLoopGroup(int nThreads, Executor executor) {
        //从jdk地层中获取了一个SelectorProvider
        this(nThreads, executor, SelectorProvider.provider());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        //接着提供了一种默认的SelectStrategy工厂
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        //提供了一种拒绝执行的异常处理器
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //如果没有指定线程数,那么就使用默认的值,通过断点发现DEFAULT_EVENT_LOOP_THREADS的值是8
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        // 提供了默认的EventExecutor选择器工厂   
        //该值为:new DefaultEventExecutorChooserFactory();
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

最终走到了MultithreadEventExecutorGroup的构造方法。主要做了几件事

  1. new ThreadPerTaskExecutor(newDefaultThreadFactory())
  2. 根据线程数初始化EventExecutor数组,并提供足够的具体EventLoop,这里是NioEventLoop
  3. 初始化chooser,提供不同的事件执行的选择器,已选择EventExecutor数组种的EventLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
       //省略代码

        if (executor == null) {
            //初始化executor : 每个任务的执行器
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            //实例化具体的NioEventLoop
            children[i] = newChild(executor, args);
            
        }
        //初始化 EventExecutorChooserFactory.EventExecutorChooser chooser
        chooser = chooserFactory.newChooser(children);

         //省略相关代码
    }
  • new ThreadPerTaskExecutor(newDefaultThreadFactory())
    任务执行器给了一个默认的工厂,定义了该具体每个线程的名字,类似nioEventLoopGroup-2-1这样的线程名字,第一个数字在初始化的时候便定义了为poolId,第二个数字这是在执行newThread()的时候定义的nextIdThreadPerTaskExecutor还定义了execute方法。
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        //调用工厂具体的执行方法执行 该task: command
        threadFactory.newThread(command).start();
    }
}

关于poolIdnextId我们发现前者是静态的,因此,每次new NioEventLoopGroup()的时候递增,因此代表线程池,而后者并非静态的,因此仅仅是调用递增而已。

 private static final AtomicInteger poolId = new AtomicInteger();
 private final AtomicInteger nextId = new AtomicInteger();

跟进最终调用的是DefaultThreadFactory#newThread(),并且发现,结果是返回了FastThreadLocalThread,该线程为netty自定义线程。执行的也是这类型的线程。因此们该对象每次执行任务(调用ThreadPerTaskExecutor#execute()),其实都创建了一个线程实体。

@Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
       //省略代码
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
  • 至于初始化chooser,线程选择器,前面章节《Selector注册》有提到过,其实就是分开两种策略去循环使用EventExecutor数组。

  • 调用newChild()初始化NioEventLoop的时候,这里创建并绑定了selector用于去轮询注册到它上面的一些连接。由此可见一个 selector 对应一个 eventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

值得注意的是,父类构造方法SingleThreadEventLoop初始化了tailTasks。该队列用于在外部线程执行一些netty任务的时候,判断是否在NioEventLoop中,如果不在的话,则放到该队列中去执行。讲这些任务放到一个线程去执行。

    private final Queue<Runnable> tailTasks;
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks);
    }
    
    //由于本例NioEventLoop重写了该方法,因此调用的子类
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }

    @Override
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return PlatformDependent.newMpscQueue(maxPendingTasks);
    }

再看看super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);再初始化tailTasks之前还干了一件事:初始化taskQueue

    //成员变量
    private final Queue<Runnable> taskQueue;

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        //初始化taskQueue
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

taskQueue是在NioEventLoop启动后,直接存放任务的地方,存放eventLoop任务

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,640评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,254评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,011评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,755评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,774评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,610评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,352评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,257评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,717评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,894评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,021评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,735评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,354评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,936评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,054评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,224评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,974评论 2 355

推荐阅读更多精彩内容