Netty源码-对象池Recycler

1 概述

为了较少对象的数量,提高内存的使用率以及GC的效率,Netty对某些经常分配的对象采用了对象池技术避免了重复的实例化对象。

在Netty中,对象池的基础实现类是RecyclerRecycler是一个抽象类,要想使用它必须继承该类并重写其newObject方法,该方法用于在第一次从对象池中获取对象或者对象池中没有空闲对象时告诉其如何创建新的对象。

使用Recycler的实例可以见PooledUnsafeDirectByteBuf.newInstance方法的实现:

//PooledUnsafeDirectByteBuf
//RECYCLER为Recycler子类,实现了netObject方法
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
    @Override
    protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
        return new PooledUnsafeDirectByteBuf(handle, 0);
    }
};

//直接调用RECYCLER.get()从对象池中获取对象,如果是第一次获取对象
//或者池中没有空闲对象时会调用上面的newObject方法新建对象并返回
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
    PooledUnsafeDirectByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}

在Netty中,Recycler对象池是和ThreadLocal结合实现的,每个线程都有一个线程池,如果从某个线程的对象池中取出对象被传递给其他线程,其他线程在使用完对象之后还要将该对象归还到原线程的对象池中。

本文会介绍Netty对象池Recycler的基本实现,包括如何创建对象、对象如何回收等相关实现。

2 对象池相关类介绍

2.1 Recycler

Recycler是对象池的管理者,负责从对象池的容器Stack(下一小节介绍)中取出和归还对象,上面列出的代码已经说到,使用Recycler需要定义一个其子类,重写newObject方法。 另外Recycler使用FastThreadLocal(关于FastThreadLocal的介绍可以查看这篇文章)持有Stack实例,如下:

//Recycler
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
    @Override
    protected Stack<T> initialValue() {
        //FTL的初始化函数返回一个Stack实例
        //Stack构造函数下面会介绍
        return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                ratioMask, maxDelayedQueuesPerThread);
    }

    @Override
    protected void onRemoval(Stack<T> value) {
        // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
        if (value.threadRef.get() == Thread.currentThread()) {
            if (DELAYED_RECYCLED.isSet()) {
                DELAYED_RECYCLED.get().remove(value);
            }
        }
    }
};

2.2 Stack

Stack可以看做是对象池的容器,负责管理池中的对象,每个线程都会使用FTL持有一个Stack实例,FTL在Recycler中的定义上面已经列出过源码,下面看Stack的构造函数:

//Recycler.Stack
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
        int ratioMask, int maxDelayedQueues) {
    this.parent = parent;
    //threadRef用于记录该Stack被哪个Thread持有,
    //因为使用Netty基本上会采用多线程,从对象池中
    //取出的对象,经常会传递给其他线程,所以
    //这里需要记录Stack的持有者线程
    threadRef = new WeakReference<Thread>(thread);
    //该对象池最大可持有的对象数量
    this.maxCapacity = maxCapacity;
    availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
    //对象池持有的每个元素都使用一个Handle进行管理
    //因为使用完元素之后,需要将元素归还给相应的Stack
    //所以从Recycle取出的对象都会有一个Handle字段
    //用于处理对象归还
    //Stack的初始大小取INITIAL_CAPACITY和maxCapacity中
    //的较小值,避免声明较大的数组取用不了那么大的空间
    //必要时会进行扩容
    elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
    this.ratioMask = ratioMask;
    this.maxDelayedQueues = maxDelayedQueues;
}

这里介绍下Stack初始化时数组的默认容量INITIAL_CAPACITY计算公式,代码比较简单,不再说明,通过代码可以看到默认大小和Netty的配置参数io.netty.recycler.maxCapacityPerThread以及io.netty.recycler.maxCapacity有关。

DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
                SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
        if (maxCapacityPerThread < 0) {
            maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
        }

DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);

Stack的构造函数介绍完之后,我们再看其重要的域:

//用于持有实际对象的容器,采用数组持有对象
private DefaultHandle<?>[] elements;
//持有对象的最大数量
private final int maxCapacity;
//该Stack的持有者线程
final WeakReference<Thread> threadRef;
//用于对象回收,其他线程使用完从该线程Stack中取出的对象后
//会将该对象放入专门为该线程准备的WeakOrderQueue中,该
//WeakOrderQueues使用单链表结构链接起来,head则指向第一个
//WeakOrderQueue
private volatile WeakOrderQueue head;
//其他线程用于归还不属于自己对象池管理对象的
//WeakOrderQueue容量大小
final int maxDelayedQueues;

2.3 DefaultHandle

从上面的介绍中知道DefaultHandle用于包装每个实际的对象,并负责对象的归还,其最重要的域和方法如下:

//DefaultHandle
//Stack容器实际管理的是Handle,Handle也会持有该Stack,用于对象
//回收使用
private Stack<?> stack;
//该Handle管理的实际对象
private Object value;

//DefaultHandle的构造函数传入Stack,用于后期归还对象
DefaultHandle(Stack<?> stack) {
    this.stack = stack;
}

//回收对象,从Recycle取出的对象都会有一个Handle字段
//比如采用引用计数控制对象生命的ByteBuf就持有一个Handle域
//recyclerHandle,在引用计数为0时会调用Handle.recycle对对象
//进行回收
@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }
    stack.push(this);
}

2.4 WeakOrderQueue

上面已经介绍过,Netty多线程环境中,经常发生一个线程对象池取出的对象被传递给其他线程使用,比如Netty用于网络读取的线程使用的ByteBuf会被传递给业务线程使用,所以Netty中每个线程会使用FastThreadLocal为其他线程维护一个WeakOrderQueue的Map,该Map会被链接到对应StackWeakOrderQueue链表中,线程在使用完从其他线程对象池取出的对象后,会放入到为其他线程准备的WeakOrderQueue中。

3 从对象池中获取对象

Recycler中获取对象通过调用Stack.get实现:

//Recycler
@SuppressWarnings("unchecked")
public final T get() {
    if (maxCapacityPerThread == 0) {
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    Stack<T> stack = threadLocal.get();
    //首先尝试从该Recycler的Stack中弹出一个对象
    //如果Stack中还没有对象或者没有空闲对象,则
    //新建一个Handle和s实际对象
    DefaultHandle<T> handle = stack.pop();
    if (handle == null) {
        //新建一个Handle对象
        handle = stack.newHandle();
        //新建一个实际使用的对象,该方法需要在子类重写
        handle.value = newObject(handle);
    }
    //返回Handle实际持有的对象
    return (T) handle.value;
}

//Stack
//返回一个DefaultHandle对象
DefaultHandle<T> newHandle() {
    return new DefaultHandle<T>(this);
}

下面我们重点看下Stack.pop方法:

//Stack
@SuppressWarnings({ "unchecked", "rawtypes" })
DefaultHandle<T> pop() {
    int size = this.size;
    //如果Stack当前数组大小为0,表示没有现成可用的对象,
    //尝试使用scavenge进行回收,scavenge方法我们后面介绍
    if (size == 0) {
        if (!scavenge()) {
            //没有可回收对象,则返回null
            return null;
        }
        size = this.size;
    }
    //如果上面成功回收了对象,则这里返回刚回收的对象,并
    //将该对象所在的数组位置置空,表示对象被分配出去
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    if (ret.lastRecycledId != ret.recycleId) {
        throw new IllegalStateException("recycled multiple times");
    }
    ret.recycleId = 0;
    ret.lastRecycledId = 0;
    this.size = size;
    return ret;
}

4 对象回收

对象回收分两种情况,一种是对象从对象池取出来后,一直由该对象池所有者线程使用,那么也即由该线程回收;另一种情况是对象从某个线程对象池取出来后被传递给其他线程,其他线程需要保证该对象被返还给原先的持有者线程。下面我们分开来看。

4.1 对象由所有者线程自己归还

对象在使用完毕后,会调用Handle.recycle进行回收,这里我们看Handle实现类DefaultHandle.recycle的实现:

//DefaultHandle
@Override
public void recycle(Object object) {
    //Handle对象包装了实际对象,如果归还的对象和Handle包装的对象
    //不相等,表示不是从该Handle取出的对象,抛错
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }
    //调用Stack.push方法进行归还
    stack.push(this);
}

//Stack
void push(DefaultHandle<?> item) {
    //获取当前操作所在线程
    Thread currentThread = Thread.currentThread();
    //当前线程和Handle所属Stack的持有者线程进行对比,
    //如果相等,则表示该对象由持有者线程自己归还
    //也即上面说的第一种情况
    if (threadRef.get() == currentThread) {
        // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
        pushNow(item);
    } else {
        //归还对象的操作不是由原先的持有者线程进行的,是由其他线程归还
        //也即上面介绍的第二章情况
        // The current Thread is not the one that belongs to the Stack
        // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
        // happens later.
        pushLater(item, currentThread);
    }
}

这一小节我们看第一种情况,看下Stack.pushNow方法的实现,其实实现比较简单,下面在注释里简单介绍下:

private void pushNow(DefaultHandle<?> item) {
    if ((item.recycleId | item.lastRecycledId) != 0) {
        throw new IllegalStateException("recycled already");
    }
    item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

    int size = this.size;
    //如果当前Stack已经超过了其定义的最大容量,则不进行归还操作
    //删除该Handle
    if (size >= maxCapacity || dropHandle(item)) {
        // Hit the maximum capacity or should drop - drop the possibly youngest object.
        return;
    }
    //没有达到最大容量,则进行对象回收,回收前还要检查下是否
    //需要进行扩容,扩容时容量会变为原先数组容量的2倍
    if (size == elements.length) {
        elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
    }
    //实际归还,也就是把Handle放入数组中,然后计数器加一
    elements[size] = item;
    this.size = size + 1;
}

4.2 对象由其他线程归还到线程池所有者线程

这种情况就是对象从某个线程的对象池中取出来,但是被传递给其他线程,其他线程(下文称其为归还线程)负责归还给原先线程(下文称其为所有者线程)的对象池中,也就是Stack.push源码中的pushLater方法,在看pushLater方法之前我们需要知道,其他线程归还对象到所有者线程分两步,第一步是放入到所有者线程对应的WeakOrderQueue队列中,这些对象相当于处于待回收状态;第二步则是在从Stack.pop函数中判断没有现成可用对象时采用scavenge将待回收状态的对象放入Stack容器数组中,完成这两步才是回收完毕。

//Stack
private void pushLater(DefaultHandle<?> item, Thread thread) {
    // we don't want to have a ref to the queue as the value in our weak map
    // so we null it out; to ensure there are no races with restoring it later
    // we impose a memory ordering here (no-op on x86)
    //每个线程都在自己的ThreadLocal中为每个其他所有者线程维护了一个
    //WeakOrderQueue,归还从该所有者线程对象池取出的对象时
    //其实就是将归还的对象方法到该队列中
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    WeakOrderQueue queue = delayedRecycled.get(this);
    //如果队列为空
    if (queue == null) {
        //如果目标所有者线程待回收对象已经大于等于maxDelayedQueues
        //则不能再回收,使用DUMMY队列作标识,并直接返回
        if (delayedRecycled.size() >= maxDelayedQueues) {
            // Add a dummy queue so we know we should drop the object
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
        //如果目标所有者线程没有“满”,且上面判断队列为空,
        //则新建一个队列,allocate源码列在下面
        if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
            // drop object
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        //如果取出的队列为上面初始化的DUMMY队列,则直接放回
        // drop object
        return;
    }

    //将待回收对象放入到该所有者线程对应的队列中
    queue.add(item);
}

//WeakOrderQueue
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
    //New一个WeakOrderQueue队列
    final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
    // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
    // may be accessed while its still constructed.
    //将该队列连接到所有者线程待回收队列的链表中
    stack.setHead(queue);

    // We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in
    // the stack. This is needed as we not have a good life-time control over the queue as it is used in a
    // WeakHashMap which will drop it at any time.
    final Head head = queue.head;
    //这里在介绍FastThreadLocal介绍过,是使用后台回收线程进行对象清理
    //可看本专题中的相关文章
    ObjectCleaner.register(queue, head);

    return queue;
}

至此,归还线程已经将需要回收的对象放入队列中,且该队列也已经被连接到所有者线程的Stack中,完成了对象回收的第一步,对象处于待回收状态,下面看Stack.pop函数中调用的scavenge方法实现,这里简单列出其源码,大致过程就是从链接的一个个WeakOrderQueue队列中取出对象使用transfer方法将对象放入到Stack容器数组中,下面也不再详细展开介绍了。

//Stack
boolean scavenge() {
    // continue an existing scavenge, if any
    if (scavengeSome()) {
        return true;
    }

    // reset our scavenge cursor
    prev = null;
    cursor = head;
    return false;
}

boolean scavengeSome() {
    WeakOrderQueue prev;
    WeakOrderQueue cursor = this.cursor;
    if (cursor == null) {
        prev = null;
        cursor = head;
        if (cursor == null) {
            return false;
        }
    } else {
        prev = this.prev;
    }

    boolean success = false;
    do {
        if (cursor.transfer(this)) {
            success = true;
            break;
        }
        WeakOrderQueue next = cursor.next;
        if (cursor.owner.get() == null) {
            // If the thread associated with the queue is gone, unlink it, after
            // performing a volatile read to confirm there is no data left to collect.
            // We never unlink the first queue, as we don't want to synchronize on updating the head.
            if (cursor.hasFinalData()) {
                for (;;) {
                    if (cursor.transfer(this)) {
                        success = true;
                    } else {
                        break;
                    }
                }
            }

            if (prev != null) {
                prev.setNext(next);
            }
        } else {
            prev = cursor;
        }

        cursor = next;

    } while (cursor != null && !success);

    this.prev = prev;
    this.cursor = cursor;
    return success;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容

  • 内存管理的主要目的合理分配内存,减少内存碎片,及时回收资源,提高内存的使用效率。从操作系统层面来说,各个软件在运行...
    史圣杰阅读 1,172评论 0 1
  • 什么是对象池技术?对象池应用在哪些地方? 对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象,类似线程池的...
    BlackManba_24阅读 4,316评论 0 8
  • 在netty的池化ByteBuf分配中,包含ByteBuf对象的池化和真实内存(array或者DirectByte...
    YDDMAX_Y阅读 3,730评论 2 2
  • 1、Netty基础入门 Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应...
    我是嘻哈大哥阅读 4,689评论 0 31
  • 到底也为你哭过痛过心动过 见不得你双眉紧蹙 也不想还你一生没有我的清净 到底也对你盼过等过失望过 又不愿今后杳无音...
    妃卿阅读 190评论 0 1