1 概述
为了较少对象的数量,提高内存的使用率以及GC的效率,Netty对某些经常分配的对象采用了对象池技术避免了重复的实例化对象。
在Netty中,对象池的基础实现类是Recycler
,Recycler
是一个抽象类,要想使用它必须继承该类并重写其newObject
方法,该方法用于在第一次从对象池中获取对象或者对象池中没有空闲对象时告诉其如何创建新的对象。
使用Recycler
的实例可以见PooledUnsafeDirectByteBuf.newInstance
方法的实现:
//PooledUnsafeDirectByteBuf
//RECYCLER为Recycler子类,实现了netObject方法
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};
//直接调用RECYCLER.get()从对象池中获取对象,如果是第一次获取对象
//或者池中没有空闲对象时会调用上面的newObject方法新建对象并返回
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
在Netty中,Recycler
对象池是和ThreadLocal
结合实现的,每个线程都有一个线程池,如果从某个线程的对象池中取出对象被传递给其他线程,其他线程在使用完对象之后还要将该对象归还到原线程的对象池中。
本文会介绍Netty对象池Recycler
的基本实现,包括如何创建对象、对象如何回收等相关实现。
2 对象池相关类介绍
2.1 Recycler
Recycler
是对象池的管理者,负责从对象池的容器Stack
(下一小节介绍)中取出和归还对象,上面列出的代码已经说到,使用Recycler
需要定义一个其子类,重写newObject
方法。 另外Recycler
使用FastThreadLocal
(关于FastThreadLocal
的介绍可以查看这篇文章)持有Stack
实例,如下:
//Recycler
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
//FTL的初始化函数返回一个Stack实例
//Stack构造函数下面会介绍
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
2.2 Stack
Stack
可以看做是对象池的容器,负责管理池中的对象,每个线程都会使用FTL持有一个Stack
实例,FTL在Recycler
中的定义上面已经列出过源码,下面看Stack
的构造函数:
//Recycler.Stack
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues) {
this.parent = parent;
//threadRef用于记录该Stack被哪个Thread持有,
//因为使用Netty基本上会采用多线程,从对象池中
//取出的对象,经常会传递给其他线程,所以
//这里需要记录Stack的持有者线程
threadRef = new WeakReference<Thread>(thread);
//该对象池最大可持有的对象数量
this.maxCapacity = maxCapacity;
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
//对象池持有的每个元素都使用一个Handle进行管理
//因为使用完元素之后,需要将元素归还给相应的Stack
//所以从Recycle取出的对象都会有一个Handle字段
//用于处理对象归还
//Stack的初始大小取INITIAL_CAPACITY和maxCapacity中
//的较小值,避免声明较大的数组取用不了那么大的空间
//必要时会进行扩容
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask;
this.maxDelayedQueues = maxDelayedQueues;
}
这里介绍下Stack
初始化时数组的默认容量INITIAL_CAPACITY
计算公式,代码比较简单,不再说明,通过代码可以看到默认大小和Netty的配置参数io.netty.recycler.maxCapacityPerThread
以及io.netty.recycler.maxCapacity
有关。
DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
}
DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
Stack
的构造函数介绍完之后,我们再看其重要的域:
//用于持有实际对象的容器,采用数组持有对象
private DefaultHandle<?>[] elements;
//持有对象的最大数量
private final int maxCapacity;
//该Stack的持有者线程
final WeakReference<Thread> threadRef;
//用于对象回收,其他线程使用完从该线程Stack中取出的对象后
//会将该对象放入专门为该线程准备的WeakOrderQueue中,该
//WeakOrderQueues使用单链表结构链接起来,head则指向第一个
//WeakOrderQueue
private volatile WeakOrderQueue head;
//其他线程用于归还不属于自己对象池管理对象的
//WeakOrderQueue容量大小
final int maxDelayedQueues;
2.3 DefaultHandle
从上面的介绍中知道DefaultHandle
用于包装每个实际的对象,并负责对象的归还,其最重要的域和方法如下:
//DefaultHandle
//Stack容器实际管理的是Handle,Handle也会持有该Stack,用于对象
//回收使用
private Stack<?> stack;
//该Handle管理的实际对象
private Object value;
//DefaultHandle的构造函数传入Stack,用于后期归还对象
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
//回收对象,从Recycle取出的对象都会有一个Handle字段
//比如采用引用计数控制对象生命的ByteBuf就持有一个Handle域
//recyclerHandle,在引用计数为0时会调用Handle.recycle对对象
//进行回收
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
2.4 WeakOrderQueue
上面已经介绍过,Netty多线程环境中,经常发生一个线程对象池取出的对象被传递给其他线程使用,比如Netty用于网络读取的线程使用的ByteBuf
会被传递给业务线程使用,所以Netty中每个线程会使用FastThreadLocal
为其他线程维护一个WeakOrderQueue
的Map,该Map会被链接到对应Stack
的WeakOrderQueue
链表中,线程在使用完从其他线程对象池取出的对象后,会放入到为其他线程准备的WeakOrderQueue
中。
3 从对象池中获取对象
从Recycler
中获取对象通过调用Stack.get
实现:
//Recycler
@SuppressWarnings("unchecked")
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
//首先尝试从该Recycler的Stack中弹出一个对象
//如果Stack中还没有对象或者没有空闲对象,则
//新建一个Handle和s实际对象
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
//新建一个Handle对象
handle = stack.newHandle();
//新建一个实际使用的对象,该方法需要在子类重写
handle.value = newObject(handle);
}
//返回Handle实际持有的对象
return (T) handle.value;
}
//Stack
//返回一个DefaultHandle对象
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}
下面我们重点看下Stack.pop
方法:
//Stack
@SuppressWarnings({ "unchecked", "rawtypes" })
DefaultHandle<T> pop() {
int size = this.size;
//如果Stack当前数组大小为0,表示没有现成可用的对象,
//尝试使用scavenge进行回收,scavenge方法我们后面介绍
if (size == 0) {
if (!scavenge()) {
//没有可回收对象,则返回null
return null;
}
size = this.size;
}
//如果上面成功回收了对象,则这里返回刚回收的对象,并
//将该对象所在的数组位置置空,表示对象被分配出去
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
4 对象回收
对象回收分两种情况,一种是对象从对象池取出来后,一直由该对象池所有者线程使用,那么也即由该线程回收;另一种情况是对象从某个线程对象池取出来后被传递给其他线程,其他线程需要保证该对象被返还给原先的持有者线程。下面我们分开来看。
4.1 对象由所有者线程自己归还
对象在使用完毕后,会调用Handle.recycle
进行回收,这里我们看Handle
实现类DefaultHandle.recycle
的实现:
//DefaultHandle
@Override
public void recycle(Object object) {
//Handle对象包装了实际对象,如果归还的对象和Handle包装的对象
//不相等,表示不是从该Handle取出的对象,抛错
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
//调用Stack.push方法进行归还
stack.push(this);
}
//Stack
void push(DefaultHandle<?> item) {
//获取当前操作所在线程
Thread currentThread = Thread.currentThread();
//当前线程和Handle所属Stack的持有者线程进行对比,
//如果相等,则表示该对象由持有者线程自己归还
//也即上面说的第一种情况
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
//归还对象的操作不是由原先的持有者线程进行的,是由其他线程归还
//也即上面介绍的第二章情况
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
这一小节我们看第一种情况,看下Stack.pushNow
方法的实现,其实实现比较简单,下面在注释里简单介绍下:
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
//如果当前Stack已经超过了其定义的最大容量,则不进行归还操作
//删除该Handle
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
//没有达到最大容量,则进行对象回收,回收前还要检查下是否
//需要进行扩容,扩容时容量会变为原先数组容量的2倍
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
//实际归还,也就是把Handle放入数组中,然后计数器加一
elements[size] = item;
this.size = size + 1;
}
4.2 对象由其他线程归还到线程池所有者线程
这种情况就是对象从某个线程的对象池中取出来,但是被传递给其他线程,其他线程(下文称其为归还线程)负责归还给原先线程(下文称其为所有者线程)的对象池中,也就是Stack.push
源码中的pushLater
方法,在看pushLater
方法之前我们需要知道,其他线程归还对象到所有者线程分两步,第一步是放入到所有者线程对应的WeakOrderQueue
队列中,这些对象相当于处于待回收状态;第二步则是在从Stack.pop
函数中判断没有现成可用对象时采用scavenge
将待回收状态的对象放入Stack
容器数组中,完成这两步才是回收完毕。
//Stack
private void pushLater(DefaultHandle<?> item, Thread thread) {
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
//每个线程都在自己的ThreadLocal中为每个其他所有者线程维护了一个
//WeakOrderQueue,归还从该所有者线程对象池取出的对象时
//其实就是将归还的对象方法到该队列中
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
//如果队列为空
if (queue == null) {
//如果目标所有者线程待回收对象已经大于等于maxDelayedQueues
//则不能再回收,使用DUMMY队列作标识,并直接返回
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
//如果目标所有者线程没有“满”,且上面判断队列为空,
//则新建一个队列,allocate源码列在下面
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
//如果取出的队列为上面初始化的DUMMY队列,则直接放回
// drop object
return;
}
//将待回收对象放入到该所有者线程对应的队列中
queue.add(item);
}
//WeakOrderQueue
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
//New一个WeakOrderQueue队列
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
//将该队列连接到所有者线程待回收队列的链表中
stack.setHead(queue);
// We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in
// the stack. This is needed as we not have a good life-time control over the queue as it is used in a
// WeakHashMap which will drop it at any time.
final Head head = queue.head;
//这里在介绍FastThreadLocal介绍过,是使用后台回收线程进行对象清理
//可看本专题中的相关文章
ObjectCleaner.register(queue, head);
return queue;
}
至此,归还线程已经将需要回收的对象放入队列中,且该队列也已经被连接到所有者线程的Stack
中,完成了对象回收的第一步,对象处于待回收状态,下面看Stack.pop
函数中调用的scavenge
方法实现,这里简单列出其源码,大致过程就是从链接的一个个WeakOrderQueue
队列中取出对象使用transfer
方法将对象放入到Stack
容器数组中,下面也不再详细展开介绍了。
//Stack
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
do {
if (cursor.transfer(this)) {
success = true;
break;
}
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}