ByteBuf基础
Java Nio 的Buffer
在进行数据传输的过程中,我们经常会用到缓冲区。
在Java NIO 为我们提供了原生的七种缓冲区实现,对应着Java 的七种基本类型。一般使用ByteBuffer较多。原生的Buffer虽然能满足我们的日常使用,但是要进行复杂的应用的时候,确有点力不从心了,原生Buffer存在着以下缺点。因此Netty对其进行了封装,提供了更为友好的接口供我们使用。
- 当我们调用对应Buffer类的allocate方法来创建缓冲区实例的时候,会分配指定的空间,同时缓冲区的长度就会被固定,不能进行动态的增长或者收缩。如果我们写入的数据大于缓冲区的capacity的时候,就会发生数组越界错误。
- Buffer只有一个位置标志位属性Position,我们只能flip或者rewind方法来对position进行修改来处理数据的存取位置,一不小心就可能会导致错误。
- Buffer只提供了存取、翻转、释放、标志、比较、批量移动等缓冲区的基本操作,我们想使用高级的功能,就得自己手动进行封装及维护,使用非常不方便。
ByteBuf工作原理
ByteBuf也是通过字节数组作为缓冲区来存取数据,通过外观模式聚合了JDK NIO元素的ByteBuffer,进行封装。
ByteBuf是通过readerIndex跟writerIndex两个位置指针来协助缓冲区的读写操作的。
在对象初始化的时候,readerIndex和writerIndex的值为0,随着读操作和写操作的进行,writerIndex和readerIndex都会增加,不过readerIndex不能超过writerIndex,在进行读取操作之后,0到readerIndex之间的空间会被视为discard,调用ByteBuf的discardReadBytes方法,可以对这部分空间进行释放重用,类似于ByteBuffer的compact操作,对缓冲区进行压缩。readerIndex到writerIndex的空间,相当于ByteBuffer的position到limit的空间,可以对其进行读取,WriterIndex到capacity的空间,则相当于ByteBuffer的limit到capacity的空间,是可以继续写入的。
readerIndex跟writerIndex让读写操作的位置指针分离,不需要对同一个位置指针进行调整,简化了缓冲区的读写操作。
同样,ByteBuf对读写操作进行了封装,提供了动态扩展的能力,当我们对缓冲区进行写操作的时候,需要对剩余的可用空间进行校验,如果可用空间不足,同时要写入的字节数小于可写的最大字节数,会对缓冲区进行动态扩展,它会重新创建一个缓冲区,然后将以前的数据复制到新创建的缓冲区中,
ByteBuf基本功能
- 顺序读
在进行读操作之前,首先对缓冲区可用的空间进行校验。如果要读取的字节长度小于0,就会抛出IllegalArgumentException异常,如果要读取的字节长度大于已写入的字节长度,会抛出IndexOutOfBoundsException异常。通过校验之后,调用getBytes方法,从当前的readerIndex开始,读取length长度的字节数据到目标dst中,由于不同的子类实现不一样,getBytes是个抽象方法,由对应的子类去实现。如果读取数据成功,readerIndex将会增加相应的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
- 顺序写
读操作是将源字节数组从srcIndex开始,length长度的数据写入到当前的ByteBuf中的。
一开始需要对写入数组的字节数进行校验,如果写入长度小于0,将会抛出IllegalArgumentException异常,如果写入字节数小于当前ByteBuf的可写入字节数,则通过检验。如果写入字节数大于缓冲区最大可动态扩展的容量maxCapacity,就会抛出
IndexOutOfBoundsException异常,否则的话,就会通过动态扩展来满足写入需要的字节数。首先通过calculateNewCapacity计算出重新扩展后的容量,然后调用capacity方法进行扩展,不同的子类有不同实现,所以也是一个抽象方法。- 计算扩展容量,首先设置门阀值为4m,如果要扩展的容量等于阀值就使用阀值作为缓冲区新的容量,如果大于阀值就以4M作为步长,每次增加4M,如果扩展期间,要扩展的容量比最大可扩展容量还大的话,就以最大可扩展容量maxCapacity为新的容量。否则的话,就从64开始倍增,直到倍增之后的结果大于要扩展的容量,再把结果作为缓冲区的新容量。
- 通过先倍增再步长来扩展容量,如果我们只是writerIndex+length的值作为缓冲区的新容量,那么再以后进行写操作的时候,每次都需要进行容量扩展,容量扩展的过程需要进行内存复制,过多内存复制会导致系统的性能下降,之所以是倍增再部长,在最初空间比较小的时候,倍增操作并不会带来太多的内存浪费,但是内存增长到一定的时候,再进行倍增的时候,就会对内存造成浪费,因此,需要设定一个阀值,到达阀值之后就通过步长的方法进行平滑的增长。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity实现
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
- Clear操作
clear操作只是把readerIndex和writerIndex设置为0,不会对存储的数据进行修改。
public ByteBuf clear() {
readerIndex = writerIndex = 0;
return this;
}
-
索引操作
- 读写位置索引设置:主要是对边界条件进行校验,设置readerIndex的时候,newReaderIndex不能小于0跟大于writerIndex;设置writerIndex的时候,newWriterIndex必须大于readerIndex和小于当前的capacity。如果不能通过校验的话,就会抛出IndexOutOfBoundsException异常。
- mark和reset操作:由于有readerIndex和writerIndex,因此进行mark或者reset需要指定相应的操作位置索引,mark操作会把当前的readerIndex或者writerIndex设置为markedReaderIndex或者markedWriterIndex;reset操作的话,它是参入对应的mark值调用对应readerIndex()或者writerIndex();
-
缓冲区重用
可以通过discardReadByte方法去重用已经读取过的缓冲区。
首先对readerIndex进行判断:- 如果readerIndex等于0,就说明没有读取数据,没有可以用来重用的空间,直接返回;
- 如果readerIndex大于0且不等于writerIndex的话,说明有进行数据读取被丢弃的缓冲区,也有还没有被读取的缓冲区。调用setBytes方法进行字节数组的复制,将没被读取的数据移动到缓冲区的起始位置,重新去设置readerIndex和writerIndex,readerIndex为0,writerIndex为原writerIndex-readerIndex;同时,也需要对mark进行重新设置。
- 首先对markedReaderIndex进行备份然后跟decrement进行比较,如果markedReaderIndex比decrement小的话,markedReaderIndex设置为0,再用markedWriterIndex跟decrement比较,如果小于的话,markedWriterIndex也设置为0,否则的话markedWriterIndex较少decrement;
- 如果markedReaderIndex比decrement大的话,markedReaderIndex和markedReaderIndex都减去decrement就可以了。
- 如果readerIndex等于writerIndex的话,说明没有可以进行重用的缓冲区,直接对mark重新设置就可以了,不需要内存复制。
public ByteBuf discardReadBytes() {
ensureAccessible();
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
} else {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
protected final void adjustMarkers(int decrement) {
int markedReaderIndex = this.markedReaderIndex;
if (markedReaderIndex <= decrement) {
this.markedReaderIndex = 0;
int markedWriterIndex = this.markedWriterIndex;
if (markedWriterIndex <= decrement) {
this.markedWriterIndex = 0;
} else {
this.markedWriterIndex = markedWriterIndex - decrement;
}
} else {
this.markedReaderIndex = markedReaderIndex - decrement;
markedWriterIndex -= decrement;
}
}
- skipBytes
当我们需要跳过某些不需要的字节的时候,可以调用skipBytes方法来跳过指定长度的字节来读取后面的数据。
首先对跳跃长度进行判断,如果跳跃长度小于0的话,会抛出IllegalArgumentException异常,或者跳跃长度大于当前缓冲区可读长度的话,会抛出IndexOutOfBoundsException异常。如果校验通过,新的readerindex为原readerIndex+length,如果新的readerIndex大于writerIndex的话,会抛出IndexOutOfBoundsException异常,否则就更新readerIndex。
public ByteBuf skipBytes(int length) {
checkReadableBytes(length);
int newReaderIndex = readerIndex + length;
if (newReaderIndex > writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
length, readerIndex, writerIndex));
}
readerIndex = newReaderIndex;
return this;
}
ByteBuf源码分析
[图片上传失败...(image-54964b-1539838427542)]
AbstractReferenceCountedByteBuf
AbstractReferenceCountedByteBuf是ByteBuf实现对引用进行计数的基类,用来跟踪对象的分配和销毁,实现自动内存回收。
- 成员变量
- refCntUpdater refCntUpdater是一个AtomicIntegerFieldUpdater类型的成员变量,它可以对成员变量进行原子性更新操作,达到线程安全。
- REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是标识refCnt字段在AbstractReferenceCountedByteBuf的内存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf两个子类中都会使用到这个偏移量。
- refCnt volatile修饰保证变量的线程可见性,用来跟踪对象的引用次数
- 对象引用计数器
每调用retain方法一次,引用计数器就会加一。retain方法通过自旋对引用计数器进行加一操作,引用计数器的初始值为1,只要程序是正确执行的话,它的最小值应该为1,当申请和释放次数相等的时候,对应的ByteBuf就会被回收。当次数为0时,表明对象被错误的引用,就会抛出IllegalReferenceCountException异常,如果次数等于Integer类型的最大值,就会抛出
IllegalReferenceCountException异常。retain通过refCntUpdater的compareAndSet方法进行原子操作更新,compareAndSet会使用获取的值与期望值进行比较,如果在比较器件,有其他线程对变量进行修改,那么比较失败,会再次自旋,获取引用计数器的值再次进行比较,否则的话,就会进行加一操作,退出自旋。
release方法的话与retain方法类似,也是通过自旋循环进行判断和更新,不过当refCnt的值等于1的时候,表明引用计数器的申请跟释放次数一样,对象引用已经不可达了,对象应该要被垃圾收集回收掉了,调用deallocate方法释放ByteBuf对象
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
UnpooledHeapByteBuf
UnpooledHeapByteBuf是一个非线程池实现的在堆内存进行内存分配的字节缓冲区,在每次IO操作的都会去创建一个UnpooledHeapByteBuf对象,如果频繁地对内存进行分配或者释放会对性能造成影响。
- 成员变量
- ByteBufAllocator 用于内存分配
- array 字节数组作为缓冲区,用于存储字节数据
- ByteBuffer 用来实现Netty ByteBuf 到Nio ByteBuffer的变换
- 动态扩展缓冲区
调用capacity方法动态扩展缓冲区,首先要对扩展容量进行校验,如果新容量的大小小于0或者大于最大可扩展容量maxCapacity的话,抛出IllegalArgumentException异常。
通过校验之后,如果新扩展容量比原来大的话,则创建一个新的容量为新扩展容量的字节数组缓冲区,然后调用System.arraycopy进行内存复制,将旧的数据复制到新数组中去,然后用setArray进行数组替换。动态扩展之后需要原来的视图tmpNioBuffer设置为控。
如果新的容量小于当前缓冲区容量的话,不需要进行动态扩展,但是需要截取部分数据作为子缓冲区。- 首先对当前的readerIndex是否小于newCapacity,如果小于的话继续对writerIndex跟newCapacity进行比较,如果writerIndex大于newCapacity的话,就将writerIndex设置为newCapacity,更新完索引之后就通过System.arrayCopy内存复制将当前可读的数据复制到新的缓冲区字节数组中。
- 如果newCapacity小于readerIndex的话,说明没有新的可读数据要复制到新的字节数组缓冲区中,只需要把writerIndex跟readerIndex都更新为newCapacity既可,最后调用setArray更换字节数组。
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
- setBytes
字节数组复制,首先对数据进行合法性检验,如果srcIndex或者index的值小于0,就会抛出IllegalArgumentException,如果index+length的值大于capacity的值或者srcIndex+length的值大于src.length的话,就会抛出IndexOutOfBoundsException异常。通过校验之后,就调用System.arraycopy进行字节数组复制。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.length);
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
checkIndex(index, length);
if (srcIndex < 0 || srcIndex > srcCapacity - length) {
throw new IndexOutOfBoundsException(String.format(
"srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
}
}
- Netty ByteBuf与Nio ByteBuffer转换
要将Netty的ByteBuf转化为Nio ByteBuffer,在ByteBuffer中有wrap静态方法,只需要传入对应的字节数组即可创建转化为ByteBuffer,在nioBuffer方法还调用了slice方法,它可以创建一个从原ByteBuffer的position开始缓冲区,与原缓冲区共享同一段数据元素。nioBuffer方法不会重用缓冲区,只能保证writerIndex跟readerIndex的独立性。
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
PooledByteBuf
在Netty4之后加入内存池管理,通过内存池管理比之前ByteBuf的创建性能得到了极大提高。
- PoolChunk
- Page 可以用来分配的最小内存块单位
- Chunk page的集合
PoolChunk主要负责内存块的分配及释放,chunk中的page会构建成一颗二叉树,默认情况下page的大小是8K,chunk的大小是2^11 page,即16M,构成了11层的二叉树,最下面一层的叶子节点有8192个,与page的数目一样,每一次内存的分配必须保证连续性,方便内存操作。每个节点会记录自己在Memory Area的偏移地址,当一个节点表示的内存区域被分配之后,那么该节点会被标志为已分配,该节点的所有子节点的内存请求都会忽略。每次内存分配的都是8k(2n)大小的内存块,当需要分配大小为chunkSize/(2k)的内存端时,为了找到可用的内存段,会从第K层左边开始寻找可用节点。
- PoolArena
在内存分配中,为了能够集中管理内存的分配及释放,同时提供分配和释放内存的性能,一般都是会先预先分配一大块连续的内存,不需要重复频繁地进行内存操作,那一大块连续的内存就叫做memory Arena,而PoolArena是Netty的内存池实现类。
在Netty中,PoolArena是由多个Chunk组成的,而每个Chunk则由多个Page组成。PoolArena是由Chunk和Page共同组织和管理的。
- PoolSubpage
当对于小于一个Page的内存分配的时候,每个Page会被划分为大小相等的内存块,它的大小是根据第一次申请内存分配的内存块大小来决定的。一个Page只能分配与第一次内存内存的内存块的大小相等的内存块,如果想要想要申请大小不想等的内存块,只能在新的Page上申请内存分配了。
Page中的存储区域的使用情况是通过一个long数组bitmap来维护的,每一位表示一个区域的占用情况。
PooledDirectByteBuf
- 创建字节缓冲区
由于内存池实现,每次创建字节缓冲区的时候,不是直接new,而是从内存池中去获取,然后设置引用计数器跟读写Index,跟缓冲区最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}
- 复制字节缓冲区实例
copy方法可以复制一个字节缓冲区实例,与原缓冲区独立。
首先要对index和length进行合法性判断,然后调用PooledByteBufAllocator的directBuffer方法分配一个新的缓冲区。newDirectBuffer方法是一个抽象方法,对于不同的子类有不同的实现。如果是unpooled的话,会直接创建一个新的缓冲区,如果是pooled的话,它会从内存池中获取一个可用的缓冲区。
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
ByteBuf辅助类分析
ByteBufHolder
ByteBufHolder是ByteBuf的一个容器,它可以更方便地访问ByteBuf中的数据,在使用不同的协议进行数据传输的时候,不同的协议消息体包含的数据格式和字段不一样,所以抽象一个ByteBufHolder对ByteBuf进行包装,不同的子类有不同的实现,使用者可以根据自己的需要进行实现。Netty提供了一个默认实现DefaultByteBufHolder。
ByteBufAllocator
ByteBufAllocator是字节缓冲区分配器,根据Netty字节缓冲区的实现不同,分为两种不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他们提供了不同ByteBuf的分配方法。
CompositeByteBuf
CompositeByteBuf是一个虚拟的Buffer,它可以将多个ByteBuf组装为一个ByteBuf视图。
在Java NIO中,我们有两种实现的方法
- 将其他ByteBuffer的数据复制到一个ByteBuffer中,或者重新创建一个新的ByteBuffer,将其他的ByteBuffer复制到新建的ByteBuffer中。
- 通过容器将多个ByteBuffer存储在一起,进行统一的管理和维护。
在Netty中,CompositeByByteBuf中维护了一个Component类型的集合。Component是ByteBuf的包装类,它聚合了ByteBuf.维护在集合中的位置偏移量等信息。一般情况下,我们应该使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法来创建CompositeByteBuf,而不是直接通过构造函数去实例化一个CompositeByteBuf对象。
private int addComponent0(int cIndex, ByteBuf buffer) {
checkComponentIndex(cIndex);
if (buffer == null) {
throw new NullPointerException("buffer");
}
int readableBytes = buffer.readableBytes();
// No need to consolidate - just add a component to the list.
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
if (cIndex == components.size()) {
components.add(c);
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
components.add(cIndex, c);
if (readableBytes != 0) {
updateComponentOffsets(cIndex);
}
}
return cIndex;
}
private void consolidateIfNeeded() {
final int numComponents = components.size();
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = allocBuffer(capacity);
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
}
}
public CompositeByteBuf removeComponent(int cIndex) {
checkComponentIndex(cIndex);
Component comp = components.remove(cIndex);
comp.freeIfNecessary();
if (comp.length > 0) {
updateComponentOffsets(cIndex);
}
return this;
}
private static final class Component {
final ByteBuf buf;
final int length;
int offset;
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
buf.release(); // We should not get a NPE here. If so, it must be a bug.
}
}
ByteBufUtil
ByteBufUtil是ByteBuf的工具类,它提供了一系列的静态方法来操作ByteBuf。