ByteBuf
ByteBuf是什么
为了平衡数据传输时CPU与各种IO设备速度的差异性,计算机设计者引入了缓冲区这一重要抽象。jdkNIO库提供了java.nio.Buffer接口,并且提供了7种默认实现,常见的实现类为ByteBuffer。不过netty并没有直接使用nio的ByteBuffer,这主要是由于jdk的Buffer有以下几个缺点:
- 当调用allocate方法分配内存时,Buffer的长度就固定了,不能动态扩展和收缩,当写入数据大于缓冲区的capacity时会发生数组越界错误
- Buffer只有一个位置标志位属性position,读写切换时,必须先调用flip或rewind方法。不仅如此,因为flip的切换
- Buffer只提供了存取、翻转、释放、标志、比较、批量移动等缓冲区的基本操作,想使用高级的功能(比如池化),就得自己手动进行封装及维护,使用非常不方便。
也因此,netty实现了自己的缓冲区——ByteBuf,连名字都如此相似。那么ByteBuf是如何规避ByteBuffer的缺点的?
第一点显然是很好解决的,由于ByteBuf底层也是数组,那么它就可以像ArrayList一样,在写入操作时进行容量检查,当容量不足时进行扩容。
第二点,ByteBuf通过2个索引readerIndex,writerIndex将数组分为3部分,如下图所示
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
初始化时,readerIndex和writerIndex都是0,随着数据的写入writerIndex会增加,此时readable byte部分增加,writable bytes减少。当读取时,discardable bytes增加,readable bytes减少。由于读操作只修改readerIndex,写操作只修改writerIndex,让ByteBuf的使用更加容易理解,避免了由于遗漏flip导致的功能异常。
此外,当调用discardReadBytes方法时,可以把discardable bytes这部分的内存释放。总体想法是通过将readerIndex移动到0,writerIndex移动到writerIndex-readerIndex下标,具体移动下标的方式依据ByteBuf实现类有所不同。这个方法可以显著提高缓冲区的空间复用率,避免无限度的扩容,但会发生字节数组的内存复制,属于以时间换空间的做法。
ByteBuf重要API
read、write、set、skipBytes
前3个系列的方法及最后一个skipBytes都属于改变指针的方法。举例来说,readByte会移动readerIndex1个下标位,而int是4个byte的大小,所以readInt会移动readerIndex4个下标位,相应的,writeByte会移动writerIndex1个下标位,writeInt会移动writerIndex4个下标位。set系列方法比较特殊,它的参数为index和value,意即将value写入指定的index位置,但这个操作不会改变readerIndex和writerIndex。skipBytes比较简单粗暴,直接将readerIndex移动指定长度。
mark和reset
markReaderIndex和markWriterIndex可以将对应的指针做一个标记,当需要重新操作这部分数据时,再使用resetReaderIndex或resetWriterIndex,将对应指针复位到mark的位置。
duplicate、slice、copy
这3种方法都可以复制一份字节数组,不同之处在于duplicate和slice两个方法返回的新ByteBuf和原有的老ByteBuf之间的内容会互相影响,而copy则不会。duplicate和slice的区别在于前者复制整个ByteBuf的字节数组,而后者默认仅复制可读部分,但可以通过slice(index, length)分割指定的区间。
retain、release
这是ByteBuf接口继承自ReferenceCounted接口的方法,用于引用计数,以便在不使用对象时及时释放。实现思路是当需要使用一个对象时,计数加1;不再使用时,计数减1。考虑到多线程场景,一般也多采用AtomicInteger实现。netty却另辟蹊径,选择了volatile + AtomicIntegerFieldUpdater这样一种更节省内存的方式。
ByteBuf扩容
在ByteBuf写入数据时会检查可写入的容量,若容量不足会进行扩容。
final void ensureWritable0(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}
int minNewCapacity = writerIndex + minWritableBytes;
int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
int fastCapacity = writerIndex + maxFastWritableBytes();
if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
newCapacity = fastCapacity;
}
capacity(newCapacity);
}
忽略一些检验性质的代码后,可以看到扩容时先尝试将现有写索引加上需要写入的容量大小作为最小新容量,并调用ByteBufAllocate的calculateNewCapacity方法进行计算。跟入这个方法:
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
可以看到这个方法的目的则是计算比可写容量稍大的2的幂次方。minNewCapacity由上一个方法传入,而maxCapacity则为Integer.MAX_VALUE。具体步骤是首先判断新容量minNewCapacity是否超过了计算限制CALCULATE_THRESHOLD,默认为4M,如果没有超过4MB,那么从64B开始不断以2的幂次方形式扩容,直到newCapacity超过minNewCapacity。而若一开始新容量就超过了4M,则调整新容量到4M的倍数+1。比如newCapacity为6M,因为6/4 = 1,所以调整为(1+1)*4M=8M。
在计算完容量之后会调用capacity方法。这是一个抽象方法,这里以UnpooledHeapByteBuf为例。
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
byte[] oldArray = array;
int oldCapacity = oldArray.length;
if (newCapacity == oldCapacity) {
return this;
}
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
trimIndicesToCapacity(newCapacity);
bytesToCopy = newCapacity;
}
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
setArray(newArray);
freeArray(oldArray);
return this;
}
首先检查newCapacity是否大于0且小于最大容量。之后准备好老数组要复制的长度。trimIndicesToCapacity(newCapacity)是缩容时调用的,它将readerIndex和newCapacity的较小值设置为新的readerIndex,将newCapacity设置为新的writerIndex。
之后便分配一个新数组,并开始复制旧数组的元素。复制成功后,将新数组保存为成员变量,将老数组释放掉。
ByteBuf种类
出于性能和空间的多方考虑,netty从3个维度定义了各种不同的ByteBuf实现类,主要是池化、堆内堆外、可否使用Unsafe类这3个维度,从而演化出8种不同的ByteBuf,它们分别是PooledUnsafeHeapBytebuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf、PooledDirectBytebuf、UnpooledUnsafeHeapByteBuf、UnpooledHeapByteBuf、UnpooledUnsafeDirectByteBuf、UnpooledDirectByteBuf。
ByteBuf接口之下有一个抽象类AbstractByteBuf,实现了接口定义的read、write、set相关的方法,但在实现时只做了检查,而具体逻辑则定义一系列以_开头的proteced方法,留待子类实现。
ByteBufAllocate
不同于一般形式的创建对象,ByteBuf需要通过内存分配器ByteBufAllocate分配,对应于不同的ByteBuf也会有不同的BtteBufferAllocate。netty将之抽象为ByteBufAllocate接口。我们看一下有哪些方法:
- buffer()、buffer(initialCapacity)、buffer(initialCapacity、maxCapacity),分配ByteBuf的方法,具体分配的Buffer是堆内还是堆外则由实现类决定。2个重载方法分别以给定初始容量、最大容量的方式分配内存
- ioBuffer()、ioBuffer(initialCapacity)、ioBuffer(initialCapacity、maxCapacity)更倾向于分配堆外内存的方法,因为堆外内存更适合用于IO操作。重载方法同上
- heapBuffer()、heapBuffer(initialCapacity)、heapBuffer(initialCapacity、maxCapacity)分配堆内内存的方法。
- directBuffer()、directBuffer(initialCapacity)、directBuffer(initialCapacity、maxCapacity)分配堆外内存的方法。
- compositeBuffer()。可以将多个ByteBuf合并为一个ByteBuf,多个ByteBuf可以部分是堆内内存,部分是堆外内存。
ByteBufAllocate接口定义了heap和direct这一个维度,其他维度则交由子类来定义。
UnPooledByteBufAllocate
ByteBufAllocate有一个直接实现类AbstractByteBufAllocate,它实现了大部分方法,只留下2个抽象方法newHeapBuffer和newDirectBuffer交由子类实现。AbstractByteBufAllocate有2个子类PooledByteBufAllocate和UnpooledByteBufAllocate,在这里定义了pooled池化维度的分配方式。
看看UnpooledByteBufAllocate如何实现2个抽象方法:
newHeapBuffer
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
可以看到实现类根据PlatformDependent.hasUnsafe()方法自动判定是否使用unsafe维度,这个方法通过在静态代码块中尝试初始化sun.misc.Unsafe来判断Unsafe类是否在当前平台可用,在juc中,这个类使用颇多,作为与高并发打交道的netty,出现这个类不令人意外。UnpooledUnsafeHeapByteBuf与UnpooledHeapByteBuf并不是平级关系,事实上前者继承了后者,在构造方法上也直接调用UnpooledHeapByteBuf的构造方法。构造方法比较简单,初始化byte数组、初始容量、最大容量,将读写指针的设置为0,并将子类传入的this指针保存到alloc变量中。
两种Bytebuf的区别在于unsafe会尝试通过反射的方式创建byte数组,并将数组的地址保存起来,之后再获取数据时也会调用Unsafe的getByte方法,通过数组在内存中的地址+偏移量的形式直接获取,而普通的SafeByteBuf则是保存byte数组,通过数组索引即array[index]访问。
// UnsafeHeapByteBuf初始化数组
protected byte[] allocateArray(int initialCapacity) {
return PlatformDependent.allocateUninitializedArray(initialCapacity);
}
// HeapByteBuf初始化数组
protected byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
// UnsafeHeapByteBuf通过UnsafeByteBufUtil获取字节
static byte getByte(byte[] data, int index) {
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
// HeapByteBuf获取字节
static byte getByte(byte[] memory, int index) {
return memory[index];
}
newDirectBuffer
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
DirectByteBuf构造方法大致与heap的类似,只是保存数据的容器由字节数组变为了jdk的ByteBuffer。相应的,分配与释放内存的方法也变成调用jdk的ByteBuffer方法。而UnsafeByteBuf更是直接用long类型记录内存地址。
// DirectByteBuf获取字节
protected byte _getByte(int index) {
return buffer.get(index);
}
// UnsafeDirectByteBuf获取字节
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
// 获取内存地址
final long addr(int index) {
return memoryAddress + index;
}
// UnsafeByteBufUtil获取字节
static byte getByte(long address) {
return UNSAFE.getByte(address);
}
由于PooledByteBufAllocate内容较为庞大,放入下一节讲述。
未完待续···