概述
Netty使用ByteBuf作为其底层数据传输的容器,其实现有两种方式:基于堆内存与基于直接内存。为了减少传输过程中在用户缓冲区与内核缓冲区数据拷贝带来的消耗,底层实现又分为unsafe与非unsafe两种方式。同时为了降低内存分配与释放消耗,采用池化方式对ByteBuf进行缓存。
ByteBuf数据结构
ByteBuf底层是一个字节数组,内部维护了两个索引:readerIndex与writerIndex。其中0 --> readerIndex部分为可丢弃字节,表示已被读取过,readerIndex --> writerIndex部分为可读字节,writerIndex --> capacity部分为可写字节。ByteBuf支持动态扩容,在实例化时会传入maxCapacity,当writerIndex达到capacity且capacity小于maxCapacity时会进行自动扩容。
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
ByteBuf类图
ByteBuf子类可以按照以下三个纬度进行分类:
池化与非池化:是否对ByteBuf进行缓存
unsafe与非unsafe:是否使用unsafe进行读写
堆内存与直接内存:在堆内存或者直接内存进行分配
内存管理
在进入内存分配核心逻辑前,我们先对Netty内存分配相关概念做下了解。Netty内存管理借鉴jemalloc思想,为了提高内存利用率,根据不同内存规格使用不同的分配策略,并且使用缓存提高内存分配效率。
内存分配相关概念
内存规格
Netty有四种内存规格,tiny表示16B ~ 512B之间的内存块,samll表示512B ~ 8K之间的内存块,normal表示8K ~ 16M的内存块,Huge表示大于16M的内存块。
Chunk是Netty向操作系统申请内存的单位,默认一次向操作系统申请16M内存,Netty内部将Chunk按照Page大小划分为2048块。我们申请内存时如果大于16M,则Netty会直接向操作系统申请对应大小内存,如果申请内存在8k到16M之间则会分配对应个数Page进行使用。如果申请内存远小于8K,那么直接使用一个Page会造成内存浪费,SubPage就是对Page进行再次分配,减少内存浪费。
如果申请内存小于8K,会对Page进行再次划分为SubPage,SubPage大小为Page大小/申请内存大小。SubPage又划分为tiny与small两种。
PoolArena
负责管理从操作系统中申请到的内存块,Netty为了减少多线程竞争arena,采用多arena设计,arena数量默认为2倍CPU核心数。线程与arena关系如下:
PoolThreadLocalCache
线程本地缓存,负责创建线程缓存PoolThreadCache。PoolThreadCache中会初始化三种类型MemoryRegionCache数组,用以缓存线程中不同规格的内存块,分别为:tiny、small、normal。tiny类型数组缓存的内存块大小为16B ~ 512B之间,samll类型数组缓存的内存块大小为512B ~ 8K之间的内存块,normal类型数组缓存的内存块大小受DEFAULT_MAX_CACHED_BUFFER_CAPACITY配置影响,默认只缓存8K、16K、32K三种类型内存块。
MemoryRegionCache
内存块缓存容器,负责缓存tiny、small、normal三种内存块。其内部维护一个队列,用于缓存同种内存大小的内存块。
PoolChunk
负责管理从操作系统申请的内存,内部采用伙伴算法以Page为单位进行内存的分配与管理。
PoolChunkList
负责管理Chunk列表,根据内存使用率,分为:qInit、q000、q025、q050、q075、q100六种。每个PoolChunkList中存储内存使用率相同的Chunk,Chunk以双向链表进行关联,同时不同使用率的PoolChunkList也以双向列表进行关联。这样做的目的是因为随着内存的分配,Chunk使用率会发生变化,以链表形式方便Chunk在不同使用率列表进行移动。
PoolSubpage
PoolSubpage负责tiny、small类型内存的管理与分配,实现基于SLAB内存分配算法。PoolArena中有两种PoolSubpage类型数组,分别为:tinySubpagePools、smallSubpagePools。tinySubpagePools负责管理tiny类型内存,数组大小为512/16=32种。smallSubpagePools负责管理small类型内存,数组大小为4。
PoolSubpage数组中存储不同内存大小的PoolSubpage节点,相同大小节点以链表进行关联。PoolSubpage内部使用位图数组记录内存分配情况。
内存分配器
Netty通过ByteBufAllocator进行内存分配,ByteBufAllocator有两个实现类:PooledByteBufAllocator与UnpooledByteBufAllocator,其中,是否在堆内存或者直接内存分配与是否使用unsafe进行读写操作都封装在其实现类中。
我们先看下ByteBufAllocator类图:
PooledByteBufAllocator与UnpooledByteBufAllocator内存分配类似,可以通过newHeapBuffer与newDirectBuffer进行分配内存,我们以PooledByteBufAllocator为例分析下内存分配流程:
实例化内存分配器
以PooledByteBufAllocator为例来分析下内存分配器实例化过程。首先调用PooledByteBufAllocator#DEFAULT方法实例化PooledByteBufAllocator
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
#PooledByteBufAllocator
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
PooledByteBufAllocator实例化时会初始化几个比较重要的属性:
- DEFAULT_NUM_HEAP_ARENA:heap类型arena数量,默认两倍CPU核心数
- DEFAULT_NUM_DIRECT_ARENA: Arena数组长度,默认为两倍CPU核心数
- DEFAULT_PAGE_SIZE:页大小,默认为8K
- DEFAULT_MAX_ORDER:树深度,默认为11
- DEFAULT_TINY_CACHE_SIZE:缓存的tiny类型的内存个数,默认为512
- DEFAULT_SMALL_CACHE_SIZE:缓存的small类型的内存个数,默认为256
- DEFAULT_NORMAL_CACHE_SIZE:缓存的normal类型的内存个数,默认为64
- DEFAULT_MAX_CACHED_BUFFER_CAPACITY:最大可以被缓存的内存大小,默认为32K
- DEFAULT_CACHE_TRIM_INTERVAL:缓存经过多少次回收后被清理,默认为8192
最终会调用PooledByteBufAllocator如下构造方法:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
// 调用父类构造方法初始化directByDefault属性(默认使用直接内存,默认值为true)、初始化一个空的ByteBuf对象
super(preferDirect);
// 初始化threadCache属性,用于创建本地线程缓存
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
// ...
// 初始化堆内存类型PoolArena数组
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
// 初始化直接内存类型PoolArena数组
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
PooledByteBufAllocator构造方法主要做了两件事情,一是:初始化PoolThreadLocalCache属性,二是:初始化堆内存与直接内存类型PoolArena数组,我们进入PoolArena.DirectArena构造方法,来分析下PoolArena初始化时主要做了哪些事情:
#PoolArena.DirectArena
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}
#PoolArena
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);
## 初始化tiny类型PoolSubpage数组
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
## 初始化small类型PoolSubpage数组
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
##初始化PoolChunkList
q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
DirectArena构造方法会调用其父类PoolArena构造方法,在PoolArena构造方法中会初始化tiny类型与small类型PoolSubpage数组,并初始化六种不同内存使用率的PoolChunkList,每个PoolChunkList以双向链表进行关联。
分配内存
以分配直接内存为例,分析内存分配的主要流程:
ByteBuf byteBuf = allocator.directBuffer(8);
PooledByteBufAllocator#directBuffer方法最终会调用如下构造方法,其中maxCapacity为Integer.MAX_VALUE:
#PooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
## 获取线程缓存
PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
## 从线程缓存中获取PoolArena
PoolArena<ByteBuffer> directArena = cache.directArena;
Object buf;
## 分配内存
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
## 将ByteBuf转为具有内存泄漏检测功能的ByteBuf
return toLeakAwareBuffer((ByteBuf)buf);
}
该方法主要分三步,第一步:获取线程缓存,第二步:分配内存,第三步:将ByteBuf转为具有内存泄漏检测功能的ByteBuf,我们来分析下每一步具体做了哪些事情:
1.获取线程缓存,从PoolThreadLocalCache中获取PoolThreadCache,首次调用会先进行进行初始化,并将结果缓存下来:
#FastThreadLocal
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 初始化PoolThreadCache
return initialize(threadLocalMap);
}
初始化方法在PoolThreadLocalCache中,首先会循环找到使用最少的PoolArena,然后调用PoolThreadCache构造方法创建PoolThreadCache:
#PoolThreadLocalCache
@Override
protected synchronized PoolThreadCache initialValue() {
// 获取使用最少的PoolArena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
/**
*heapArena、directArena为PoolArena数组大小,默认为2倍CPU核数
*tinyCacheSize tiny类型缓存池大小,默认为512
*smallCacheSize small类型缓存池大小,默认为256
*normalCacheSize normal类型缓存池大小,默认为64
*DEFAULT_MAX_CACHED_BUFFER_CAPACITY normal类型缓存池缓存的最大内存大小,默认为32K
*DEFAULT_CACHE_TRIM_INTERVAL 分配次数阈值默认为8192,超过后释放内存池
*/
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
PoolThreadCache构造方法中会初始化tinySubPageDirectCaches、smallSubPageDirectCaches、normalDirectCaches这三种MemoryRegionCache数组:
#PoolThreadCache
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
// ...
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
// ...
}
createSubPageCaches方法中会创建并初始化MemoryRegionCache数组,其中tiny类型数组大小为32,small类型数组大小为4,normal类型数组大小为3:
#PoolThreadCache
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
最终会调用MemoryRegionCache构造方法进行创建,我们看下MemoryRegionCache结构:
#PoolThreadCache
private abstract static class MemoryRegionCache<T> {
// 队列大小,tiny类型size大小为512,small类型size大小为256,normal类型size大小为64
private final int size;
// 同种内存大小缓存队列
private final Queue<Entry<T>> queue;
// sizeClass分为三种:Tiny、Small、Normal
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
}
2.分配内存,首先会获取PooledByteBuf,然后进行内存分配:
#PoolArena
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
newByteBuf方法会尝试从对象池里面获取pooledByteBuf,如果没有则进行创建。allocate方法为内存分配核心逻辑,主要分为两种分配方式:page级别内存分配(8k16M)、subPage级别内存分配(08K)、huge级别内存分配(>16M)。page与subPage级别内存分配首先会尝试从缓存上进行内存分配,如果分配失败则重新申请内存。huge级别内存分配不会通过缓存进行分配。我们看下allocate方法主要流程:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 标准化容量,比如10B会标准化为16B,7K会标准化为8K
final int normCapacity = normalizeCapacity(reqCapacity);
// tiny或者small类型内存分配
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
// normal类型内存分配
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
allocateNormal(buf, reqCapacity, normCapacity);
} else {
// huge类型内存分配
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
SubPage级别内存分配
首先尝试从缓存中进行分配:
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
cacheForTiney方法先根据分配内存大小定位到对应的tinySubPageDirectCaches数组中MemoryRegionCache,如果没有定位到则不能在缓存中进行分配。如果有则从MemoryRegionCache对应的队列中弹出一个PooledByteBuf对象进行初始化,同时为了复用PooledByteBuf对象,会将其缓存下来。
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
// 初始化PooledByteBuf
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
// 将PooledByteBuf对象放入栈中复用
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
如果从缓存中分配不成功,则会从对应的PoolSubpage数组上进行分配,如果PoolSubpage数组对应的内存大小下标中有可分配空间则进行分配,并对PooledByteBuf进行初始化。
如果在PoolSubpage数组上分配不成功,则表示没有可以用来分配的SubPage,则会尝试从Page上进行分配。先尝试从不同内存使用率的ChunkList进行分配,如果仍分配不成功,则表示没有可以用来分配的Chunk,此时会创建新的Chunk进行内存分配。
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 从不同内存使用率的ChunkList进行分配
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
++allocationsNormal;
return;
}
// 创建一个新Chunk
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
// 分配内存
long handle = c.allocate(normCapacity);
++allocationsNormal;
assert handle > 0;
// 初始化PooledByteBuf
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
进入PoolChunk#allocate方法看下分配流程:
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
}
}
allocateRun方法用来分配大于等于8K的内存,allocateSubpage用来分配小于8K的内存,进入allocateSubpage方法:
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
// 找到Page
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 创建并初始化PoolSubpage
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
// 分配内存
return subpage.allocate();
}
}
内存分配成功后会调用initBuf方法初始化PoolByteBuf:
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
Page级别内存分配
Page级别内存分配和SubPage级别类似,同样是先从缓存中进行分配,分配不成功则尝试从不同内存使用率的ChunkList进行分配,如果仍分配不成功,则表示没有可以用来分配的Chunk,此时会创建新的Chunk进行内存分配,不同点在allocate方法中:
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 找到Page
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
Huge级别内存分配
因为大于16M的内存分配Netty不会进行缓存,所以Huge级别内存分配会直接申请内存并进行初始化:
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
// 创建Chunk
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
// 初始化PoolByteBuf
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}
内存回收
调用ByteBuf#release方法会进行内存释放,方法中会判断当前byteBuf 是否被引用,如果没有被引用, 则调用deallocate方法进行释放:
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
// 释放内存
deallocate();
return true;
}
return false;
}
}
}
进入deallocate方法看下内存释放流程:
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
// 释放内存
chunk.arena.free(chunk, handle, maxLength, cache);
// 将PoolByteBuf对象加入到对象池
recycle();
}
}
free方法会把释放的内存加入到缓存,如果加入缓存不成功则会标记这段内存为未使用:
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
// 加入到缓存
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
// 标记内存为未使用
freeChunk(chunk, handle, sizeClass);
}
}
recycle方法会将PoolByteBuf对象放入到对象池中:
#DefaultHandle
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}