7.netty内存管理-ByteBuf

ByteBuf

ByteBuf是什么

为了平衡数据传输时CPU与各种IO设备速度的差异性,计算机设计者引入了缓冲区这一重要抽象。jdkNIO库提供了java.nio.Buffer接口,并且提供了7种默认实现,常见的实现类为ByteBuffer。不过netty并没有直接使用nio的ByteBuffer,这主要是由于jdk的Buffer有以下几个缺点:

  1. 当调用allocate方法分配内存时,Buffer的长度就固定了,不能动态扩展和收缩,当写入数据大于缓冲区的capacity时会发生数组越界错误
  2. Buffer只有一个位置标志位属性position,读写切换时,必须先调用flip或rewind方法。不仅如此,因为flip的切换
  3. Buffer只提供了存取、翻转、释放、标志、比较、批量移动等缓冲区的基本操作,想使用高级的功能(比如池化),就得自己手动进行封装及维护,使用非常不方便。
    也因此,netty实现了自己的缓冲区——ByteBuf,连名字都如此相似。那么ByteBuf是如何规避ByteBuffer的缺点的?
    第一点显然是很好解决的,由于ByteBuf底层也是数组,那么它就可以像ArrayList一样,在写入操作时进行容量检查,当容量不足时进行扩容。
    第二点,ByteBuf通过2个索引readerIndex,writerIndex将数组分为3部分,如下图所示
+-------------------+------------------+------------------+
| discardable bytes |  readable bytes  |  writable bytes  |
|                   |     (CONTENT)    |                  |
+-------------------+------------------+------------------+
|                   |                  |                  |
0      <=      readerIndex   <=   writerIndex    <=    capacity

初始化时,readerIndex和writerIndex都是0,随着数据的写入writerIndex会增加,此时readable byte部分增加,writable bytes减少。当读取时,discardable bytes增加,readable bytes减少。由于读操作只修改readerIndex,写操作只修改writerIndex,让ByteBuf的使用更加容易理解,避免了由于遗漏flip导致的功能异常。
此外,当调用discardReadBytes方法时,可以把discardable bytes这部分的内存释放。总体想法是通过将readerIndex移动到0,writerIndex移动到writerIndex-readerIndex下标,具体移动下标的方式依据ByteBuf实现类有所不同。这个方法可以显著提高缓冲区的空间复用率,避免无限度的扩容,但会发生字节数组的内存复制,属于以时间换空间的做法。

ByteBuf重要API

read、write、set、skipBytes

前3个系列的方法及最后一个skipBytes都属于改变指针的方法。举例来说,readByte会移动readerIndex1个下标位,而int是4个byte的大小,所以readInt会移动readerIndex4个下标位,相应的,writeByte会移动writerIndex1个下标位,writeInt会移动writerIndex4个下标位。set系列方法比较特殊,它的参数为index和value,意即将value写入指定的index位置,但这个操作不会改变readerIndex和writerIndex。skipBytes比较简单粗暴,直接将readerIndex移动指定长度。

mark和reset

markReaderIndex和markWriterIndex可以将对应的指针做一个标记,当需要重新操作这部分数据时,再使用resetReaderIndex或resetWriterIndex,将对应指针复位到mark的位置。

duplicate、slice、copy

这3种方法都可以复制一份字节数组,不同之处在于duplicate和slice两个方法返回的新ByteBuf和原有的老ByteBuf之间的内容会互相影响,而copy则不会。duplicate和slice的区别在于前者复制整个ByteBuf的字节数组,而后者默认仅复制可读部分,但可以通过slice(index, length)分割指定的区间。

retain、release

这是ByteBuf接口继承自ReferenceCounted接口的方法,用于引用计数,以便在不使用对象时及时释放。实现思路是当需要使用一个对象时,计数加1;不再使用时,计数减1。考虑到多线程场景,一般也多采用AtomicInteger实现。netty却另辟蹊径,选择了volatile + AtomicIntegerFieldUpdater这样一种更节省内存的方式。

ByteBuf扩容

在ByteBuf写入数据时会检查可写入的容量,若容量不足会进行扩容。

final void ensureWritable0(int minWritableBytes) {
    if (minWritableBytes <= writableBytes()) {
        return;
    }
    int minNewCapacity = writerIndex + minWritableBytes;
    int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
    int fastCapacity = writerIndex + maxFastWritableBytes();
    if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
        newCapacity = fastCapacity;
    }
    capacity(newCapacity);
}

忽略一些检验性质的代码后,可以看到扩容时先尝试将现有写索引加上需要写入的容量大小作为最小新容量,并调用ByteBufAllocate的calculateNewCapacity方法进行计算。跟入这个方法:

public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
    if (minNewCapacity == threshold) {
        return threshold;
    }
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }
    return Math.min(newCapacity, maxCapacity);
}

可以看到这个方法的目的则是计算比可写容量稍大的2的幂次方。minNewCapacity由上一个方法传入,而maxCapacity则为Integer.MAX_VALUE。具体步骤是首先判断新容量minNewCapacity是否超过了计算限制CALCULATE_THRESHOLD,默认为4M,如果没有超过4MB,那么从64B开始不断以2的幂次方形式扩容,直到newCapacity超过minNewCapacity。而若一开始新容量就超过了4M,则调整新容量到4M的倍数+1。比如newCapacity为6M,因为6/4 = 1,所以调整为(1+1)*4M=8M。

在计算完容量之后会调用capacity方法。这是一个抽象方法,这里以UnpooledHeapByteBuf为例。

public ByteBuf capacity(int newCapacity) {
    checkNewCapacity(newCapacity);
    byte[] oldArray = array;
    int oldCapacity = oldArray.length;
    if (newCapacity == oldCapacity) {
        return this;
    }
    int bytesToCopy;
    if (newCapacity > oldCapacity) {
        bytesToCopy = oldCapacity;
    } else {
        trimIndicesToCapacity(newCapacity);
        bytesToCopy = newCapacity;
    }
    byte[] newArray = allocateArray(newCapacity);
    System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
    setArray(newArray);
    freeArray(oldArray);
    return this;
}

首先检查newCapacity是否大于0且小于最大容量。之后准备好老数组要复制的长度。trimIndicesToCapacity(newCapacity)是缩容时调用的,它将readerIndex和newCapacity的较小值设置为新的readerIndex,将newCapacity设置为新的writerIndex。
之后便分配一个新数组,并开始复制旧数组的元素。复制成功后,将新数组保存为成员变量,将老数组释放掉。

ByteBuf种类

出于性能和空间的多方考虑,netty从3个维度定义了各种不同的ByteBuf实现类,主要是池化、堆内堆外、可否使用Unsafe类这3个维度,从而演化出8种不同的ByteBuf,它们分别是PooledUnsafeHeapBytebuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf、PooledDirectBytebuf、UnpooledUnsafeHeapByteBuf、UnpooledHeapByteBuf、UnpooledUnsafeDirectByteBuf、UnpooledDirectByteBuf。
ByteBuf接口之下有一个抽象类AbstractByteBuf,实现了接口定义的read、write、set相关的方法,但在实现时只做了检查,而具体逻辑则定义一系列以_开头的proteced方法,留待子类实现。

ByteBufAllocate

不同于一般形式的创建对象,ByteBuf需要通过内存分配器ByteBufAllocate分配,对应于不同的ByteBuf也会有不同的BtteBufferAllocate。netty将之抽象为ByteBufAllocate接口。我们看一下有哪些方法:

  1. buffer()、buffer(initialCapacity)、buffer(initialCapacity、maxCapacity),分配ByteBuf的方法,具体分配的Buffer是堆内还是堆外则由实现类决定。2个重载方法分别以给定初始容量、最大容量的方式分配内存
  2. ioBuffer()、ioBuffer(initialCapacity)、ioBuffer(initialCapacity、maxCapacity)更倾向于分配堆外内存的方法,因为堆外内存更适合用于IO操作。重载方法同上
  3. heapBuffer()、heapBuffer(initialCapacity)、heapBuffer(initialCapacity、maxCapacity)分配堆内内存的方法。
  4. directBuffer()、directBuffer(initialCapacity)、directBuffer(initialCapacity、maxCapacity)分配堆外内存的方法。
  5. compositeBuffer()。可以将多个ByteBuf合并为一个ByteBuf,多个ByteBuf可以部分是堆内内存,部分是堆外内存。
    ByteBufAllocate接口定义了heap和direct这一个维度,其他维度则交由子类来定义。

UnPooledByteBufAllocate

ByteBufAllocate有一个直接实现类AbstractByteBufAllocate,它实现了大部分方法,只留下2个抽象方法newHeapBuffer和newDirectBuffer交由子类实现。AbstractByteBufAllocate有2个子类PooledByteBufAllocate和UnpooledByteBufAllocate,在这里定义了pooled池化维度的分配方式。
看看UnpooledByteBufAllocate如何实现2个抽象方法:

newHeapBuffer

protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
            new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

可以看到实现类根据PlatformDependent.hasUnsafe()方法自动判定是否使用unsafe维度,这个方法通过在静态代码块中尝试初始化sun.misc.Unsafe来判断Unsafe类是否在当前平台可用,在juc中,这个类使用颇多,作为与高并发打交道的netty,出现这个类不令人意外。UnpooledUnsafeHeapByteBuf与UnpooledHeapByteBuf并不是平级关系,事实上前者继承了后者,在构造方法上也直接调用UnpooledHeapByteBuf的构造方法。构造方法比较简单,初始化byte数组、初始容量、最大容量,将读写指针的设置为0,并将子类传入的this指针保存到alloc变量中。
两种Bytebuf的区别在于unsafe会尝试通过反射的方式创建byte数组,并将数组的地址保存起来,之后再获取数据时也会调用Unsafe的getByte方法,通过数组在内存中的地址+偏移量的形式直接获取,而普通的SafeByteBuf则是保存byte数组,通过数组索引即array[index]访问。

// UnsafeHeapByteBuf初始化数组
protected byte[] allocateArray(int initialCapacity) {
    return PlatformDependent.allocateUninitializedArray(initialCapacity);
}
// HeapByteBuf初始化数组
protected byte[] allocateArray(int initialCapacity) {
    return new byte[initialCapacity];
}
// UnsafeHeapByteBuf通过UnsafeByteBufUtil获取字节
static byte getByte(byte[] data, int index) {
    return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
// HeapByteBuf获取字节
static byte getByte(byte[] memory, int index) {
    return memory[index];
}

newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    return PlatformDependent.hasUnsafe() ?
            new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}

DirectByteBuf构造方法大致与heap的类似,只是保存数据的容器由字节数组变为了jdk的ByteBuffer。相应的,分配与释放内存的方法也变成调用jdk的ByteBuffer方法。而UnsafeByteBuf更是直接用long类型记录内存地址。

// DirectByteBuf获取字节
protected byte _getByte(int index) {
    return buffer.get(index);
}
// UnsafeDirectByteBuf获取字节
protected byte _getByte(int index) {
    return UnsafeByteBufUtil.getByte(addr(index));
}
// 获取内存地址
final long addr(int index) {
    return memoryAddress + index;
}
// UnsafeByteBufUtil获取字节
static byte getByte(long address) {
    return UNSAFE.getByte(address);
}

由于PooledByteBufAllocate内容较为庞大,放入下一节讲述。
未完待续···

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容

  • 焦点高级一期 洛阳 杜红平 坚持分享第534天 今天中午,我感觉小女发烧了,一量体温,38.6度,我就让她吃了退烧...
    随喜Prajana阅读 114评论 0 0
  • 内容来源于网络,本人只是在此稍作整理,如有涉及版权问题,归小甲鱼官方所有。 练习题(来自小甲鱼官方论坛) 0. 列...
    无罪的坏人阅读 13,903评论 5 17
  • 《非暴力沟通》之倾听的力量 每个人都渴望被倾听,因为倾听会产生很神奇的力量。 试想如果有人倾听你,不对你评头论足,...
    叶颖群阅读 109评论 0 0
  • 实验室折腾一天,回寝室的路上被四个小姐姐喊住,希望我们可以接受一个采访:回答零零后的问题以及想问零零后什么。有人说...
    粒子呀阅读 355评论 0 0
  • 今日日期:2019年1月5日 累计天数:23/30 ✊亲子宣言:成为你的妈妈,是上帝对我的恩典! ✨孩子第四个30...
    何川LX阅读 318评论 0 0