Flink内存管理

前言

参考 : <Flink内核原理与实践>

1. 一个java对象内存大小

java所有数据类型对应的字节大小

类型 大小(byte) 说明
Long 8
Double 8
Int 4
Float 4
Char 2
Short 2
Byte 1
Boolean 1
Class Point 8 保存对象的引用指针,指针压缩后为4个字节
Mark Word 8 保存运行时的数据 : hash,锁状态等.
Array length 4 如果对象是数组需要额外的4个字节记录数组长度

上面是按照64位的,如果是32位有所不同

java对象的组成 : 对象头,实例数据,对齐部分

类型 说明
对象头 由Markword和类指针组成,运行时存储对象运行时数据{hash code,gc年龄,锁标志等..},32系统头大小8byte,64系统为16byte开启指针压缩为12byte
实例数据 当前对象中的实例字段,有上面的数据类型组成
对齐 JVM要求对象大小比须是8的倍数,为了使对象达到8的倍数而补充的数据

JVM关于压缩指针的参数

-XX:-+UseCompressedOops : 普通对象指针压缩(OOP即ordinary object pointer)

-XX:-+UseCompressedClassPointers : 类型指针压缩,即针对klass pointer的指针压缩

class A {
  int i;
  String s;
}   //                                                是否开启指针压缩
// 那么一个new A()所占用的大小 =  16/12(对象头) + 4(int) + 8/4(ref) + 4/4(对齐) = 32/24byte;

测试对象大小的代码,具体展示结果可自行测试,下面是具体测试代码

/**
 * -XX:-+UseCompressedClassPointers 压缩class指针的. 对象头中使用 开启这个参数需要开启下个参数才可生效
 * -XX:-+UseCompressedOops 压缩对象值指针的,
 *  maven opjdk提供的
            <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.10</version>
        </dependency>
 * @author xuzhiwen
 */
public class Test {
  public static void main(String[] args) throws NoSuchFieldException {
    extracted(new Object(), "Object ");
    extracted(new int[1], " int[1]");
    extracted(new int[10], " int[10]");
    extracted(new int[0], "int[0]");
    extracted(new Integer[1], "Integer[1] ");
    extracted(new Integer[0], "Integer[0]");
    extracted(new T(new T()), "T");
    extracted(new T2(), "T2");
  }

  private static void extracted(Object o, String s) {
    System.out.println(s + " 占用大小");
    System.out.println(ClassLayout.parseInstance(o).toPrintable());
    System.out.println();
  }

  static class T2 {}
  static class T {
    public static T t2 = new T();
    int i;
    T t;
    public T(T t) {this.t = t;}
    public T() {}
  }
}

jvm 序列化缺点

  • 无法跨语言 : java序列化的对象,其他语言无法反序列化,除非实现java序列化协议
  • 易被攻击 : java无法保证序列化和反序列化的安全性,需要手动添加代码黑白名单,判断等方式防止被攻击
  • 流太大,性能差 : 序列化后的字节比目前主流的protobuf,kryo等序列化框架要大很多,并且耗时要久一点

可以看到java的序列化有很多不足的地方,所以在很多框架中都是选择自己实现序列化或者使用一些主流的开源框架

2. heap & off-heap

  • heap (堆内存)

    • jvm的内存区域中,占用内存空间最大的一部分叫做堆'heap',也就是我们所说的堆内存. jvm中的heap主要是存放所有对象的实例,这一块区域在jvm启动的时候被创建,并被所有的线程所共享,同时也是垃圾收集器的主要工作区域
  • off-heap (非堆内存/堆外内存)

    • 堆外内存意味着把一些对象的实例分配在Java虚拟机堆内存以外的内存区域,这些内存直接受操作系统而不是jvm管理,这样做的结果就是能保持一个较小的堆,以减少垃圾收集对程序的影响
    • 为了解决heap过大导致gc停顿过长的问题,java可以通过off-heap来缓解这个问题, off-heap可以通过零拷贝, 以减少数据从jvm内存到系统之间的拷贝次数

3. Flink为什么自己管理内存

  • java序列化问题
    • 序列化效率低,序列化时间过久
    • 序列化密度低,序列化后的字节数组过大
  • GC & oom问题
    • 由于大数据情况下,jvm启动会开辟较大空间,导致full gc的时候时间较久,极大的影响任务性能
    • 数据量大的情况下,当jvm对象分配的大小超过jvm内存(heap)时,会抛出oom,导致jvm停止,影响框架健壮性和整体性能

4. Flink自主管理内存的好处

  • 将内存抽象成MemorySegment,每个MemorySegment(底层就是一个字节数组)默认大小32K
  • segment会存储在老年代,segment会被持续引用,不会被ygc的时候回收掉,那么老年代的内存有很大一部分是不变的,如果full gc 那么一定是新生代对象移动到老年代中造成,这样对于管理老年代和控制full gc就会变得轻松一些.在新生代中创建的对象很大一部分会被flink通过序列化存入segment中,只要控制好非序列化的对象即可,这样内存管理会变得简单很多. memorySegment在后面会介绍
  • 自定义序列化.弥补java序列化的不足,在flink中处理的对象都是经过序列化的,然后存入segment中,所以user创建对象大部分都被在老年代中,由于flink的序列化性能较高,会使内存占用降低,这样老年代的对象也很稳定,发生full gc的次数就会被减少,对象存储在segment的字节数组中,当对象被释放的时候我们只需要通过调整数组的指针即可
  • 直接操作二进制数据.flink可以直接对序列化后的字节进行计算,所以内存使用和计算效率非常高.二进制数据的操作结果依然是字节,同样保存在segment中,只有在需要的时候才会反序列化成对象.序列化是消耗cpu的主要操作,因此flink的计算避免了大量的序列化操作

一.Flink内存模型

内存模型.png

上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网

内存可以分为三部分

heap内存

  • TaskManager使用的heap内存,作为flink(Runtime)框架使用内存的一部分
  • Task使用的heap内存,用于用户代码使用

non-heap内存

  • 托管内存 : 实时用于rocksDB状态后端,离线则用于排序,hash,中间结果缓存,不预分配参数 managed memory
    taskmanager.memory.preallocate: false
  • 直接内存 :
    • TaskManager使用的直接内存,作为flink(Runtime)框架使用内存的一部分
    • Task使用的直接内存,用于用户代码使用
    • 用于网络传输使用的直接内存

jvm使用内存

  • JVM 元空间,即jvm的方法区,可以通过jvm参数进行配置
  • JVM开销保留的内存,如线程堆栈,jit缓存,gc

关于内存的判断

  • 若是 Flink 有硬限制的分区,Flink 会报该分区内存不足。否则进入下一步。
  • 若该分区属于 JVM 管理的分区,在其实际值增长导致 JVM 分区也内存耗尽时,JVM 会报其所属的 JVM 分区的 OOM (比如 java.lang.OutOfMemoryError: Jave heap space)。否则进入下一步。
  • 该分区内存持续溢出,最终导致进程总体内存超出容器内存限制。在开启严格资源控制的环境下,资源管理器(YARN/k8s 等)会 kill 掉该进程,在该情况下通常由开启了rocksDB导致的

heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题

内存参数配置

# ------------------- 堆内 ------------------------------

#flink框架使用内存,默认128,不计入slot
taskmanager.memory.framework.heap.size = xxxMB 
#task使用的内存,即用户代码
taskmanager.memory.task.heap.size = xxxMB 

# ------------------- 堆外 -------------------------------

#flink框架使用 默认128
taskmanager.memory.framework.off-heap.size = xxxMB 
#task使用 , 默认0
taskmanager.memory.task.heap.size = xxxMB 
# network使用的内存大小 默认 min=64mb max=1gb fraction=0.1
# 通过fraction计算出来值若比最小值小或比最大值大,就会限制到最小值或者最大值,比例是按照总内存计算
taskmanager.memory.network.off-heap.[min/max/fraction] = xxxMB/xxxMB/0.x 
# 托管内存 ManagedMemory 计算同上,默认0.4
taskmanager.memory.managed.off-heap.[size/fraction]= 0.x

#------------------- jvm使用 -----------------------------

# jvm元空间使用
taskmanager.memory.jvm-metaspace = xxxMB 
# jvm执行的开销,堆栈,io编译换成等使用的内存
taskmanager.memory.jvm-overhead = xxxMB 

# ------------------ 总内存  ------------------------------

# 综上框架使用的内存堆和堆外内存,通过该参数控制,
taskmanager.memory.flink.size = xxxMB 
# flink任务进程的使用的内存
taskmanager.memory.porcess.size = xxxMB 

# ------------------ jvm线束 ------------------------------
# 对应于上面的jvm相关参数,实际上都是根据上面内存配置计算出来的jvm参数,在启动tm的时候会指定对应的jvm参数
-Xmx / -Xms 配置jvm堆内存大小
-XX:MaxDirectMemorySize jvm直接内存
-XX:MeataspaceSize 元空间使用内存

二.Flink内存管理

1.存在的问题

1). jvm问题

  • 数据密度低
    • 上面我们已经介绍了java对象在内存的分配,由于有大量的额外对齐数据,会导致jvm中有效数据信息密度低,如果在大量对象的情况下会占用过多额外的内存空间
  • gc影响
    • jvm的gc虽然内存泄漏的可能已经开发人员的工作量,但是gc回收是不可控的,在tb,pb级的数据计算需要大量内存,在内存中生成大量的对象导致在full gc的时候会用时较久直接影响性能,甚至还会导致flink心跳超时问题
  • oom影响
    • 如果出现oom,则jvm直接崩溃,影响flink健壮性和性能问题
  • 缓存未命中
    • 由于java对象在堆上存储不是连续的,所以从内存读取java对象时,邻近的数据通常不是cpu下一步需要计算的数据,这就是缓存未命中,此时cpu等待从内存重新读取数据,此时cpu空转,cpu的速度和内存速度差距比较大,那么执行效率也会随之下降

2). 自主管理

  • 自定义序列化工具 : 将数据序列化成二进制数据存入在MemorySegment中,并且提供一些高效的读写方法,一些计算可以直接操作二进制,减少序列化,如果需要序列化直接序列化需要计算的数据即可,无需全部序列化
  • 合理使用堆外内存 : 堆外在写磁盘和网络io中是通过零拷贝的,而堆内需要在用户态复制一次,提高io效率
  • 缓存友好的数据结构和算法 : 通过友好的缓存,以较少缓存未命中,提高cpu的执行效率

3). 堆外不足

  • 堆外出现问题是难以排查,操作不当容易内存泄漏

  • 对于声明周期短的MemorySegment,如果分配在堆外,开销跟更高,分配时间要比堆内分配响应慢很多

    通过下面代码测试堆内和堆外内存的分配效率

    public void test(){
        for (int i = 1; i < 60; i++) {
          System.out.println("第" + i + "循环");
          alloc(1024 * 1024, 1024);
        }
    private static void alloc(int size, int s) {
        printTakesTime( s, "分配堆内存用时 = ",val1 -> {
              byte[] bytes = new byte[size];});
        printingTakesTime( s, "分配直接内存用时 = ", val1 -> {
              ByteBuffer buf = ByteBuffer.allocateDirect(size);});
        System.gc();
      }
     
    private static void printTakesTime( int s, String str, Consumer<Integer> consumer) {
        long start = System.currentTimeMillis();
        for (int i = 0; i < s; i++) {
          consumer.accept(null);
        }
        System.out.println(str + (System.currentTimeMillis() - start) + "ms ");
      }
    }
    

2. 内存数据结构

1).MemorySegment (内存段)

在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr ) .

同时MemorySegment也提供了对二进制数据的操作方法,以及读取字节数组序列化以及序列化字节数组的方法等

下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现


内存段继承图

在MemorySegment类的注释中有两段话,我们解释一下

Note on efficiency: For best efficiency, the code that uses this class should make sure that only one subclass is loaded, or that the methods that are abstract in this class are used only from one of the subclasses (either the HeapMemorySegment, or the HybridMemorySegment).

That way, all the abstract methods in the MemorySegment base class have only one loaded actual implementation. This is easy for the JIT to recognize through class hierarchy analysis, or by identifying that the invocations are monomorphic (all go to the same concrete method implementation). Under these conditions, the JIT can perfectly inline methods.

解释一下上面的两句话

为了提升效率应该只加载一个MemorySegment的子类或者,调用抽象方法的时候只调用其中一个子类的,避免交叉使用

这样MemorySegment的抽象方法只有一个已加载的实际发现,通过类的层次分析或确定是单态(所有调用都指向一个具

体实现)的,这样JIT很容易识别并(去虚化)进行方法内联

JIT(Just In Time)优化

jvm是编译和解释

  • 热点代码优化 : 对执行的热点代码进行编译成本地代码,执行编译后的代码效率会更快,编译后的代码会比膨胀,所以只有热点代码才会被编译
  • 完全去虚化 : 通过类型推导或者类层次分析,识别虚方法调用的唯一目标方法,将其转换为直接调用
   static class A{
   void a(){System.out.println("a");}
   }
   static class A1 extends A{
   @Override
   void a() { System.out.println("a1");}
   }
   static class A2 extends A{
   @Override
   void a() { System.out.println("a2");}
   }
   public static void main(String[] args) throws Exception {
   // 子类实现了父类的方法,构建对象的时候通过子类引用指向父类
   // 在调用方法的时候只能调用父类拥有的方法,但是子类有重写或者实现了对应的方法
   // 在编译时调用父类方法,运行时调用具体实现类方法
   // 那么这个方法就是虚方法
   
   // 如果只有A1.class被加载,通过层次分析,很容易识别到目标方法,
   // 如果A2.class也被加载,那么就会通过具体的类--------------
   final A a = new A1();
   a.a();
   }
  }
  • 方法内联 : 将目标方法代码转移至当前代码中,避免入栈和出栈不必要的开销
   public void t1(){
   int r = sum(1,2)+ sum(2,3);
   // 优化后 , 代码直接嵌入
   // int r = 1+2+2+3;
   }
   public int sum(int a,int b){
   return a + b;
   }
  }
  • ......等
public abstract class MemorySegment {
 /** unsafe对象,用于操作heap an non-heap内存 */
 @SuppressWarnings("restriction")
 protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
 /* 数组对象的偏移量,基本为16,因为是相对于数组对象而言,前面是对象头,后面是数据,当然不完全是,通常情况下而已 */
 @SuppressWarnings("restriction")
 protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
 /*  判断是大端法还是小端法,不同的cpu会使用不同的字节顺序 大端 : 低地址存放最高有效字节, 小端与大端相反   */
 private static final boolean LITTLE_ENDIAN =
 (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
 // ------------------------------------------------------------------------
 /* 如果使用heap的情况下为字节数组对象,non-heap情况下为null */
 protected final byte[] heapMemory;
 /* 内存地址,heapMemory =null 则为绝对地址,否则为heapMemory的相对地址 */
 protected long address;
 /*  标识地址结束位置, address+size   */
 protected final long addressLimit;
 /** memorySegment的大小 heap情况为字节数组长度,non-heap为byteBuffer的capacity */
 protected final int size;
 /**  memory segment owner */
 private final Object owner;

 // ----------------------------------------- 构造方法 ---------------------------------------------
 // 是用heap情况下的构造方法
 MemorySegment(byte[] buffer, Object owner) {
 this.heapMemory = buffer;
 this.address = BYTE_ARRAY_BASE_OFFSET; // address为字节数组的相对偏移量
 this.size = buffer.length; // 字节长度
 this.addressLimit = this.address + this.size; // address结束地址
 this.owner = owner;
 }
 // non-heap的构造方法
 MemorySegment(long offHeapAddress, int size, Object owner) {
 this.heapMemory = null; // heapMemory为null
 this.address = offHeapAddress; // 堆外对绝对地址,
 this.addressLimit = this.address + size;
 this.size = size; // 堆外申请的内存大小
 this.owner = owner;
 }
 // ------------------------------------------------------------------------------------------------

}

2).DataInputView/DataOutputView

MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存操作

DataInputView

DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page

InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同

方法图

dataInput方法图.png

类继承图
DataInput继承关系.png

DataOutputView

与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能

方法图

view方法

类继承图

DataOutputView继承关系.png

3).Buffer

用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffer<实际写入的是MemorySegment>写数据

BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer

常用参数介绍

public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer {
    /* buffer中持有的内存段 */
    private final MemorySegment memorySegment;
    /* 用于回收MemorySegment的回收器 */
    private final BufferRecycler recycler;
    /* buffer分配器,netty会使用到*/
    private ByteBufAllocator allocator;
    /* 当前buffer的容量 */
    private int currentSize;
      
  // 释放buffer,引用计数-1, 引用计数=0则调用deallocate方法回收buffer
      @Override
    public void recycleBuffer() {
        release();
    }
    // 保留buffer,原理就是引用计数+1
    @Override
    public NetworkBuffer retainBuffer() {
        return (NetworkBuffer) super.retain();
    }
  // 回收MemorySegment,放入pool
    @Override
    protected void deallocate() {
        recycler.recycle(memorySegment);
    }
  // ....... 忽略
}

buffer申请

// ------------------ resultPartition ------------------------------------------
// BufferWritingResultPartition.java
// 请求Buffer
private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
            throws IOException {
  // bufferPool = LocalBufferPool,请求本地buffer,如果LocalBufferPool没有memorySegment则会向全局的资源池申请memorySegment
  // 实际上请求buffer就是请求memorySegment的过程,memorySegment被bufferBuilder包装了一下,方便后面使用
        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
        if (bufferBuilder != null) {
            return bufferBuilder;
        }

        final long start = System.currentTimeMillis();
        try {
            // 当前面请求不到的时候,则会通过block的形式请求,阻塞直到有可用buffer返回
            bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
            idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
            return bufferBuilder;
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for buffer");
        }
    }

// ------------------- 

buffer回收

当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用

  // NetworkBuffer.java
   @Override
    public void recycleBuffer() {
      // 由于继承了netty的AbstractReferenceCountedByteBuf类,所以也具有引用计数功能
      // 调用了release之后会使引用计数-1,当count=0的时候就会回收buffer了,buffer回收了并不会释放memorySegment
        release();
    }

// -----------------------------------------------------------------------------

// LocalBufferPool.java
// 废品回收站
private void recycle(MemorySegment segment, int channel) {
        BufferListener listener;
        CompletableFuture<?> toNotify = null;
        NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
        while (!notificationResult.isBufferUsed()) {
            synchronized (availableMemorySegments) {
                if (channel != UNKNOWN_CHANNEL) {
                    if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {
                        unavailableSubpartitionsCount--;
                    }
                }

                if (isDestroyed || hasExcessBuffers()) {
                  // 返回给全局bufferPool
                    returnMemorySegment(segment);
                    return;
                } else {
                    listener = registeredListeners.poll();
                    if (listener == null) {
                      // 返回给localBufferPool
                        availableMemorySegments.add(segment);                              
                        break;
                    }
                }

                checkConsistentAvailability();
            }
            notificationResult = fireBufferAvailableNotification(listener, segment);
        }
        mayNotifyAvailable(toNotify);
    }

4).BufferBuilder & BufferConsumer

省略 ...................

就是写入和消费MemorySegment的

3.MemroyManager

flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存

内存申请
class MemoryManager {
      public List<MemorySegment> allocatePages(Object owner, int numPages)
            throws MemoryAllocationException {
        List<MemorySegment> segments = new ArrayList<>(numPages);
        allocatePages(owner, segments, numPages);
        return segments;
    }
    // 申请内存
     public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)
            throws MemoryAllocationException {

        long memoryToReserve = numberOfPages * pageSize;
        try {
            memoryBudget.reserveMemory(memoryToReserve);
        } catch (MemoryReservationException e) {
            throw new MemoryAllocationException( String.format("Could not allocate %d pages", numberOfPages), e);
        }
                
       // 注册一个释放内存的清理函数
        Runnable gcCleanup = memoryBudget.getReleaseMemoryAction(getPageSize());
       // 申请内存直接内存,并将释放内存的runnable传入,用于回收内存
        allocatedSegments.compute(
                owner,
        (o, currentSegmentsForOwner) -> {
                    Set<MemorySegment> segmentsForOwner =
                            currentSegmentsForOwner == null
                                    ? new HashSet<>(numberOfPages)
                                    : currentSegmentsForOwner;
                    for (long i = numberOfPages; i > 0; i--) {
                        MemorySegment segment =
                          // 申请一个HybridMemorySegment内存
                                allocateOffHeapUnsafeMemory(getPageSize(), owner, gcCleanup);
                        target.add(segment);
                        segmentsForOwner.add(segment);
                    }
                    return segmentsForOwner;
                });
    }
}

    // MemorySegmentFactory
    public static MemorySegment allocateOffHeapUnsafeMemory(
            int size, Object owner, Runnable gcCleanupAction) {
        long address = MemoryUtils.allocateUnsafe(size);
      // 通过unsafe分配一个directBytebuffer
        ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
      // 用于回收内存的清理器
        Runnable cleaner =
                MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);
        return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
    }
    
        // MemoryUtils
    static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {
        try {
            ByteBuffer buffer = (ByteBuffer) UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);
          // 为byteBuffer的addree 和 capacity赋值
            UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
            UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
            buffer.clear();
            return buffer;
        } catch (Throwable t) {
            throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t);
        }
    }
内存释放
public final class HybridMemorySegment extends MemorySegment {
      @Override
    public void free() {
      // 调用父类的free方法
        super.free();
      // 执行清理器,cleaner会执行到下面的两个方法
        if (cleaner != null) {
            cleaner.run();
        }
      // 切断引用,gc直接回收
        offHeapBuffer = null;
    }
}

    // UnsafeMemoryBudget.java    --- MemoryManager的一个参数,上面申请内存的时候用到了
    // 通过cas的方式循环释放,实际的上释放并不是真正意义上的释放,只是修改了指针的位置
    // 这里释放的是manager的内存,将要释放的内存大小加至到可用内存大小中,完整释放
  // 注意实际上Memory是不存储内存的,通过参数来控制内存的申请,最底层的内存申请还是通过unsafe来做的
    void releaseMemory(@Nonnegative long size) {
        if (size == 0) {return; }
        boolean released = false;
        long currentAvailableMemorySize = 0L;
      // cas方式释放
        while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size){
            released = availableMemorySize
              .compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size);
        }
        if (!released) {
            throw new IllegalStateException(String.format("Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",size, currentAvailableMemorySize, totalMemorySize));
        } 
      
     // MemoryUtils.java 
     // 这里是实际的释放内存,我们在MemoryManager申请内存的时候,首先通过unsafe来申请一块内存即size,long address =    unsafe.allocateMemory(size)返回的是堆外的实际地址,通过申请的内存来实例化一个ByteBuffer对象
     // 最终释放的时候仍需要使用unsafe来做,直接通过unsafe.freeMemory(address)即可
     private static void releaseUnsafe(long address) {
        UNSAFE.freeMemory(address);
    }
rocksDB申请资源

MemoryManager更多的是用户管理,来控制rocksDB的内存使用,通过rocksDB block cache和writerBufferManager参数来限制,具体值通过TM内存配置计算得来,最终还是有rocksDB自己来负责运行过程中内存的申请和释放,所以对于rocks真实的内存使用,flink并不能完全的掌握,也就导致了flink 任务被yarn/k8s给kill掉,这里rocksDB的内存申请是通过jni来申请,对于其申请的原理目前我也不是特别清楚,后面我查阅相关资料.研究一下

下面的代码看看就行,后面是调入了rocksDB的代码,因为我个人也不是很了解rocksDB,这里就不多做介绍,这里我们只需要了解MemoryManager是用于提供rocksDB内存以及批处理相关的工作即可,这里不必太深究<我也不太懂,就不多说了>

   public <T extends AutoCloseable> OpaqueMemoryResource<T> getExternalSharedMemoryResource(
            String type, LongFunctionWithException<T, Exception> initializer, long numBytes)
            throws Exception {

        final Object leaseHolder = new Object();

        final SharedResources.ResourceAndSize<T> resource =
                sharedResources.getOrAllocateSharedResource(
                        type, leaseHolder, initializer, numBytes);

        final ThrowingRunnable<Exception> disposer =
                () -> sharedResources.release(type, leaseHolder);

        return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer);
    }

  public <T extends AutoCloseable> OpaqueMemoryResource<T> getSharedMemoryResourceForManagedMemory(
                    String type,
                    LongFunctionWithException<T, Exception> initializer,
                    double fractionToInitializeWith)
                    throws Exception {

        final long numBytes = computeMemorySize(fractionToInitializeWith);
        final LongFunctionWithException<T, Exception> reserveAndInitialize =
                (size) -> {
                    try {
                        reserveMemory(type, size);
                    } catch (MemoryReservationException e) {
                        throw new MemoryAllocationException("Could not created the shared memory resource of size "+ size + ". Not enough memory left to reserve from the slot's managed memory.",e);
                    }

                    try {
                        return initializer.apply(size);
                    } catch (Throwable t) {
                        releaseMemory(type, size);
                        throw t;
                    }
                };

        final Consumer<Long> releaser = (size) -> releaseMemory(type, size);
        final Object leaseHolder = new Object();

        final SharedResources.ResourceAndSize<T> resource =
                sharedResources.getOrAllocateSharedResource(type, leaseHolder, reserveAndInitialize, numBytes);

        final long size = resource.size();
        final ThrowingRunnable<Exception> disposer =() -> sharedResources.release(type, leaseHolder, releaser);
        return new OpaqueMemoryResource<>(resource.resourceHandle(), size, disposer);
    }


三.源码分析

NetworkBufferPool

NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请

// 代码没啥 就是提前申请了用于network的代码,这块代码在启动的时候进行申请
LocalBufferPool

LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer

LocalBufferPool继承图

每个Task拥有一个自己的LocalBufferPool,在数据接收和数据发送的过程中,会向LocalBufferPool请求buffer,将数据存储在buffer中的,如果LocalBufferPool持有的buffer用尽,则会想全局的nettyBufferPool请求buffer,为了防止单个Task导致整个TM的反压,会限制每个LocalBufferPool请求全局BufferPool的数量

在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据

class LocalBufferPool implements BufferPool {
    // 全局bufferPool,当localBufferPool的buffer用完之后会向全局bufferPool申请buffer
    private final NetworkBufferPool networkBufferPool;



    // 可用的memorySegment,memorySegment不会单独提供使用,会被NetworkBuffer进行封装后使用
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

    // 监听器,实际上就是bufferManager,在bufferManager内存不够用的时候会注册监听器,然后当内存可用的时候监听器会通知buffer可用
    // 然后将memorySegment封装成nettyBuffer添加到bufferManager的浮动队列中
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

    // 所需要的最小memorySegemt的数量
    private final int numberOfRequiredMemorySegments;
    // 最大分配数量
    private final int maxNumberOfMemorySegments;
    // 当前pool的大小
    private int currentPoolSize;
    // 从全局bufferPool请求的数量
    private int numberOfRequestedMemorySegments;
        // 每个channel最大使用的buffer数量
    private final int maxBuffersPerChannel;
  
        // 每个结果子分区使用的buffer数量
    @GuardedBy("availableMemorySegments")
    private final int[] subpartitionBuffersCount;
        // 每个结果子分区的buffer回收器
    private final BufferRecycler[] subpartitionBufferRecyclers;

    LocalBufferPool(
            NetworkBufferPool networkBufferPool, 
            int numberOfRequiredMemorySegments,
            int maxNumberOfMemorySegments,
            int numberOfSubpartitions,
            int maxBuffersPerChannel) {
        this.networkBufferPool = networkBufferPool;
        this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
        this.currentPoolSize = numberOfRequiredMemorySegments;
        this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;

        this.subpartitionBuffersCount = new int[numberOfSubpartitions];
        subpartitionBufferRecyclers = new BufferRecycler[numberOfSubpartitions];
        for (int i = 0; i < subpartitionBufferRecyclers.length; i++) {
            subpartitionBufferRecyclers[i] = new SubpartitionBufferRecycler(i, this);
        }
        this.maxBuffersPerChannel = maxBuffersPerChannel;
            
        synchronized (this.availableMemorySegments) {
          // 检查buffer的可用性,如果可用的buffer,如果buffer没有超过当前pool的大小,则会想全局bufferPool申请一个buffer
            if (checkAvailability()) {
               // 可用性设置成可用,废话文学
                availabilityHelper.resetAvailable();
            }
                    // 检查可用性是否保持一致
            checkConsistentAvailability();
        }
    }

        // 请求buffer,在bufferManager中没有可用buffer的时候会向LocalBufferPool请求浮动buffer
    public Buffer requestBuffer() {
      // 请求一个memorySegment并将其封装到NettyBuffer
        return toBuffer(requestMemorySegment());
    }


  // 请求bufferBuilder,用于task处理完数据发送下游时候使用,主要用于将处理完的数据放入buffer中发送到下游
  // BufferWritingResultPartition中会请求bufferBuilder存放record
  // bufferBuilder前面介绍过,主要是用于将数据写入到memorySegment中
    @Override
    public BufferBuilder requestBufferBuilder(int targetChannel) {
        return toBufferBuilder(requestMemorySegment(targetChannel), targetChannel);
    }

    // 阻塞的形式请求bufferBuilder,在上面请求bufferBuilder的时候没有获取到,则会通过这个方法进行阻塞的请求
    public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
            throws InterruptedException {
        return toBufferBuilder(requestMemorySegmentBlocking(targetChannel), targetChannel);
    }

    // 将memory封装成buffer
    private Buffer toBuffer(MemorySegment memorySegment) {
        return new NetworkBuffer(memorySegment, this);
    }
    // 将memory封装成bufferBuilder
    private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int targetChannel) {
        if (targetChannel == UNKNOWN_CHANNEL) {
            return new BufferBuilder(memorySegment, this);
        } else {
            return new BufferBuilder(memorySegment, subpartitionBufferRecyclers[targetChannel]);
        }
    }
        
    // 阻塞的请求memorySegment
    private MemorySegment requestMemorySegmentBlocking(int targetChannel)
            throws InterruptedException {
        MemorySegment segment;
        while ((segment = requestMemorySegment(targetChannel)) == null) {
            try {
                getAvailableFuture().get();
            } catch (ExecutionException e) {
                ExceptionUtils.rethrow(e);
            }
        }
        return segment;
    }

    // 请求memorySegment
    private MemorySegment requestMemorySegment(int targetChannel) {
        MemorySegment segment;
        synchronized (availableMemorySegments) {
            if (targetChannel != UNKNOWN_CHANNEL
                    && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
                return null;
            }

            segment = availableMemorySegments.poll();
            if (segment == null) {
                return null;
            }

            if (targetChannel != UNKNOWN_CHANNEL) {
                if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {
                    unavailableSubpartitionsCount++;
                }
            }

            if (!checkAvailability()) {
                availabilityHelper.resetUnavailable();
            }
            checkConsistentAvailability();
        }
        return segment;
    }

    
        // 从全局bufferPool请求memorySegment
    private boolean requestMemorySegmentFromGlobal() {
        if (isRequestedSizeReached()) {
            return false;
        }

        // 如果请求成功则加入到可用的memorySegment队列
        MemorySegment segment = networkBufferPool.requestMemorySegment();
        if (segment != null) {
            availableMemorySegments.add(segment);
          // 将请求memorySegment数+!
            numberOfRequestedMemorySegments++;
            return true;
        }
        return false;
    }
 
        // 检查可用,顺便请求一个memorySegment
    private boolean checkAvailability() {
        if (!availableMemorySegments.isEmpty()) {
            return unavailableSubpartitionsCount == 0;
        }
        if (!isRequestedSizeReached()) {
            // 全局bufferPool请求一个内存段
            if (requestMemorySegmentFromGlobal()) {
                return unavailableSubpartitionsCount == 0;
            } else {
                requestMemorySegmentFromGlobalWhenAvailable();
                return shouldBeAvailable();
            }
        }
        return false;
    }

        // 释放内存段
    // 这里可以对应上BufferManager
    @Override
    public void recycle(MemorySegment segment) {
        recycle(segment, UNKNOWN_CHANNEL);
    }
    // listener实际上就是memoryManager
    private void recycle(MemorySegment segment, int channel) {
        BufferListener listener;
        CompletableFuture<?> toNotify = null;
        NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
        while (!notificationResult.isBufferUsed()) {
            synchronized (availableMemorySegments) {
                if (channel != UNKNOWN_CHANNEL) {
                    if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {
                        unavailableSubpartitionsCount--;
                    }
                }
                                
                // 如果请求的buffer已经大于了当前pool的大小,则将这个memorySegment返回给全局bufferPool
                if (isDestroyed || hasExcessBuffers()) {
                    returnMemorySegment(segment);
                    return;
                } else {
                    // 从lintener队列中取一个listener,如果listener==null说明bufferManager不需要浮动内存了
                    // 会直接将memorySegment加入当前pool的可用队列
                    listener = registeredListeners.poll();
                    if (listener == null) {
                        availableMemorySegments.add(segment);
                        // only need to check unavailableSubpartitionsCount here because
                        // availableMemorySegments is not empty
                        if (!availabilityHelper.isApproximatelyAvailable()
                                && unavailableSubpartitionsCount == 0) {
                            toNotify = availabilityHelper.getUnavailableToResetAvailable();
                        }
                        break;
                    }
                }

                checkConsistentAvailability();
            }
            // 如果listener不等于null,通知通知给listener buffer可用,
            notificationResult = fireBufferAvailableNotification(listener, segment);
        }

        mayNotifyAvailable(toNotify);
    }

    private NotificationResult fireBufferAvailableNotification(
            BufferListener listener, MemorySegment segment) {
            
        // 调用到bufferManager的notifyBufferAvailable()方法,将buffer加入到bufferManager的浮动buffer中
        NotificationResult notificationResult =
                listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
        // 如果bufferManager还需要更多的buffer,则会将listener再次加入到listener队列中,等待下次buffer被回收
        if (notificationResult.needsMoreBuffers()) {
            synchronized (availableMemorySegments) {
                if (isDestroyed) {
                    listener.notifyBufferDestroyed();
                } else {
                    registeredListeners.add(listener);
                }
            }
        }
        return notificationResult;
    }

        
  // bufferManager会调用,当bufferManager的buffer不够用的时候会通过监听器等待localBufferPool回收buufer
    public boolean addBufferListener(BufferListener listener) {
        synchronized (availableMemorySegments) {
            if (!availableMemorySegments.isEmpty() || isDestroyed) {
                return false;
            }

            registeredListeners.add(listener);
            return true;
        }
    }

        // 将内存段返回给全局bufferPool
    private void returnMemorySegment(MemorySegment segment) {
        assert Thread.holdsLock(availableMemorySegments);
                // 返回给pool后会将请求的内存段数量-1
        numberOfRequestedMemorySegments--;
        networkBufferPool.recycle(segment);
    }
        
    // localBufferPool为每个结果子分区分配的内存回收器,回收器会持有当前LocalBufferPool的引用,调用到当前pool的内存回收方法
    // 用于结果子分区回收buffer,最终还是释放给localBufferPool
    private static class SubpartitionBufferRecycler implements BufferRecycler {
        private int channel;
        private LocalBufferPool bufferPool;

        SubpartitionBufferRecycler(int channel, LocalBufferPool bufferPool) {
            this.channel = channel;
            this.bufferPool = bufferPool;
        }
        @Override
        public void recycle(MemorySegment memorySegment) {
            bufferPool.recycle(memorySegment, channel);
        }
    }
}
BufferManager

BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer

实际上提供的buffer是给到netty的handler了,在netty接收到server响应的消息后,会请求buffer解析message,将消息封装到buffer中,在请求buffer的过程中,实际上就是向bufferManager请求buffer的过程

在后面我们会介绍到LocalBufferPool

public class BufferManager implements BufferListener, BufferRecycler {

    // 持有浮动buffer和独占buffer的一个队列
    private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();
    // 用于向全局bufferPool申请buffer,以及释放buffer
    private final MemorySegmentProvider globalPool;
    //标记,是在等待浮动buffer
    @GuardedBy("bufferQueue") // 注解表示该变量被bufferQueue保护
    private boolean isWaitingForFloatingBuffers;
    //input channel所需的buffer总数
    @GuardedBy("bufferQueue")
    private int numRequiredBuffers;
  
    // 请求buffer
    // 该方法会在clinet端收到server端的数据后,会请求buffer封装server端的response message
    // 将解析好的msg发送到下游,然后释放掉netty的byteBuf
     Buffer requestBuffer() {
        synchronized (bufferQueue) {
            return bufferQueue.takeBuffer();
        }
    }
    
  // 这里是属于bufferManager独有buffer,单独进行管理,是有全局bufferPool申请的,当remoteInputChannel启动的时候在setup()方法中,一次性请求bufferManager所独有的buffer
    void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
      // 从全局bufferPool中请求buffer
        Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
        synchronized (bufferQueue) {
            // 将请求到的buffer加入到bufferManager的独有buffer队列汇总
            for (MemorySegment segment : segments) {
                bufferQueue.addExclusiveBuffer(
                        new NetworkBuffer(segment, this), numRequiredBuffers);
            }
        }
    }
  
  // 这个方法有onSenderBacklog()方法进行调用,当收到server的消息后,会根据server的背压数量来请求buffer
  // 如果请求到buffer,则将请求到的buffer数量通过credit的形式发送给server,这里涉及到了flink的背压通信机制
  
  // 请求浮动buffer,buffer是从localBufferPool请求的,该buffer请求在被回收的时候会选择性的返回,返回给               bufferManager/localBufferPool/全局bufferPool
   int requestFloatingBuffers(int numRequired) {
        int numRequestedBuffers = 0;
        synchronized (bufferQueue) {
            if (inputChannel.isReleased()) {
                return numRequestedBuffers;
            }
            numRequiredBuffers = numRequired;

            while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers
                    && !isWaitingForFloatingBuffers) {
              // 从localBufferPool请求buffer
                BufferPool bufferPool = inputChannel.inputGate.getBufferPool();
                Buffer buffer = bufferPool.requestBuffer();
                if (buffer != null) {
                    bufferQueue.addFloatingBuffer(buffer);
                    numRequestedBuffers++;
                } 
              // 如果localBufferPool没有buffer则会将自己加入的localBufferPool的listener队列中
              // 这里思考一下为什么要这么做,因为没有buffer了,所以通过一个监听器来监听buffer什么时候被释放
              // localBufferPool的buffer释放之后,会判断是否有listener,如果有listener则说明bufferManager需要buffer
              // 就会将回收之后的buffer加入到bufferManager的浮动buffer队列
              // ↓↓↓  --- 下面的notifyBufferAvailable()中有介绍
              else if (bufferPool.addBufferListener(this)) {
                    isWaitingForFloatingBuffers = true;
                    break;
                }
            }
        }
     // 返回请求到的buffer数量
        return numRequestedBuffers;
    }
   
  
  // BufferManager的recycle方法,与LocalBufferPool实现不同
  // 该方法被调用都是bufferManager独有的buffer使用完成了,所以会将独有的buffer返回到自己管理的buffer队列中,
  // 不会像LocalBufferPool通过一系列条件判断来决定buffer返回给谁
    public void recycle(MemorySegment segment) {
        @Nullable Buffer releasedFloatingBuffer = null;
        synchronized (bufferQueue) {
            try {
                // Similar to notifyBufferAvailable(), make sure that we never add a buffer
                // after channel released all buffers via releaseAllResources().
                if (inputChannel.isReleased()) {
                    globalPool.recycleMemorySegments(Collections.singletonList(segment));
                    return;
                } else {
                  // 释放多余的浮动buffer
                    releasedFloatingBuffer = bufferQueue.addExclusiveBuffer(
                        new NetworkBuffer(segment, this), numRequiredBuffers);
                }
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            } finally {
                bufferQueue.notifyAll();
            }
        }

        if (releasedFloatingBuffer != null) {
          // 如果有多余的浮动buffer,则释放掉
            releasedFloatingBuffer.recycleBuffer();
        } else {
            try {
              // 有buffer释放,就可以通知channel有buffer可用,就会向server反馈信用
                inputChannel.notifyBufferAvailable(1);
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        }
    }

  // ------------------------------------------------------------------------------------
  
  // 这个方法比较有意思,也比较复杂,让我们看一下方法名,通过翻译方法得到 中文方法名 : 通知 buffer 可用
  // 提前说明 :  bufferPool和bufferManager都实现了BufferRecycler接口,实现了recycle方法,用于回收buffer,重新使用的
  /*
  实际上这个方法是有LocalBufferPool调用,在调用了recycle方法后会对memorySegment释放,因为浮动buffer就是从localBuffer请求的,在创建(NettyBuffer=buffer)的时候需要一个BufferRecycler,在bufferManager创建buffer的时候传入的是自己,而LocalBufferPool创建的时候传入的是自己,所以,这里是由localBufferPool通过recycle方法调用,并且bufferManager的recycle的方法与localBufferPool实现不同,所以这里就会放入的浮动buffer的队列中
  在这个方法被调用的时候会有判断,判断是否需要更多的buffer,如果需要更多的buffer,如果需要更多的buffer会将listener(实际上就是BufferManager,因为BufferManager实现了BufferListener)加入的listener队列中(lintener会被循环利用),然后当buffer使用完成了,对buffer进行回收的时候,会选择是根据一些条件来判断是否返回到全局的bufferPool中或者返回到localBufferPool中
  返回全局bufferPool的条件是判断LocalBufferPool请求的buffer数量已经超过了LocalBufferPool的核心buffer数量,如果超过了则返回给全局bfferPool中
  返回LocalBufferPool的条件是,当不返回全局bufferPool则会判断是否存在listener,如果没有listener则将buffer返回给localBufferPool,这样的原因是因为,在请求buffer的时候如果没有可用buffer,就会添加listener到listener队列中,当buffer用完之后就会根据listener是否存在决定是否还需要更多的buffer
  
 localBufferPool 后面在介绍
   */
  public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
       BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.BUFFER_NOT_USED;
        if (inputChannel.isReleased()) {
            return notificationResult;
        }

        try {
            synchronized (bufferQueue) {
                if (inputChannel.isReleased() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                    isWaitingForFloatingBuffers = false;
                    return notificationResult;
                }
                                // 将buffer方法放入浮动buffer队列
                bufferQueue.addFloatingBuffer(buffer);
               // 唤醒在队列等待的线程
                bufferQueue.notifyAll();
                                
                if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
                    isWaitingForFloatingBuffers = false;             
                    notificationResult = BufferListener.NotificationResult.BUFFER_USED_NO_NEED_MORE;
                } else {
                    notificationResult = BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
                }
            }
            
            // 实际上这个判断永远都会进入,因为同步代码块的内容已经保证了result一定不等于BUFFER_NOT_USED
            // 要注意调用这个方法说明有buffer调用了recycle,说明buffer释放了,那么就可以被重新使用了
            if (notificationResult != NotificationResult.BUFFER_NOT_USED) {
               // 通过netty向server发送addCreditMessage,通知自己的信用
               // 在netty server端收到credit(信用)后会记录对应channel的信用
               // 当server向下游发送数据的时候,会根据下游的信用值来确定发送多少数据甚至不发送
               // 这样就不会因为某一个task的反压导致整个taskManger的反压
                inputChannel.notifyBufferAvailable(1);
            }
        } catch (Throwable t) {
            inputChannel.setError(t);
        }
        return notificationResult;
    }
  
  // 静态内部类,用于维护可用buffer
  static final class AvailableBufferQueue {
    
        // 从localBufferPool申请的buffer,优先使用
        final ArrayDeque<Buffer> floatingBuffers;
        // 从全局bufferPool申请的buffer,为channel独有
        final ArrayDeque<Buffer> exclusiveBuffers;

        AvailableBufferQueue() {
            this.exclusiveBuffers = new ArrayDeque<>();
            this.floatingBuffers = new ArrayDeque<>();
        }


        // 从全局buffer pool申请的该channel独占的buffer
        Buffer addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
            exclusiveBuffers.add(buffer);
            // 如果可用的buffer大于bufferManager的必须的buffer数量,则会释放掉多余的浮动buffer
            if (getAvailableBufferSize() > numRequiredBuffers) {
                return floatingBuffers.poll();
            }
            return null;
        }
            
       // 申请的浮动buffer,可以认为是独占的buffer用完了,开始申请临时buffer
        void addFloatingBuffer(Buffer buffer) {
            floatingBuffers.add(buffer);
        }

       // 在返回的时候优先返回浮动buffer
       // 为什么先请求浮动buffer呢,因为只有独有buffer用完之后才会请求浮动buffer,如果浮动buffer
       // 有buffer,则一定说明了独有buffer用完了,你们觉得呢
        Buffer takeBuffer() {
            if (floatingBuffers.size() > 0) {
                return floatingBuffers.poll();
            } else {
                return exclusiveBuffers.poll();
            }
        }
  }
}

四. tm和jm内存分配代码 或者 内存分配源码

了解即可,如果感兴趣可以自行看代码,等我想起来在补齐

public abstract class AbstractContainerizedClusterClientFactory<ClusterID>
        implements ClusterClientFactory<ClusterID> {

    @Override
    public ClusterSpecification getClusterSpecification(Configuration configuration) {
        checkNotNull(configuration);

        final int jobManagerMemoryMB =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                                configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
                        .getTotalProcessMemorySize()
                        .getMebiBytes();

        final int taskManagerMemoryMB =
                TaskExecutorProcessUtils.processSpecFromConfig(
                                TaskExecutorProcessUtils
                                        .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                                                configuration,
                                                TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                        .getTotalProcessMemorySize()
                        .getMebiBytes();

        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

        return new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(jobManagerMemoryMB)
                .setTaskManagerMemoryMB(taskManagerMemoryMB)
                .setSlotsPerTaskManager(slotsPerTaskManager)
                .createClusterSpecification();
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容

  • 背景 1)java对象的存储密度比较低,对象主要包含 对象头,对象数据,对齐填充。 其中对齐填充是没用的,纯粹是为...
    LQC_gogogo阅读 952评论 0 1
  • 在大数据面前,JVM的内存结构和GC机制往往会成为掣肘 1. 对象开销:在HotSpot中,每个对象占用的内存空间...
    aiguang2016阅读 8,723评论 1 10
  • 如今,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,当然也包括 Flink。基于 ...
    尼小摩阅读 1,518评论 0 17
  • Flink内存管理 1.简介 自从2003-2006年,Google发表了三篇著名的大数据相关论文(Google ...
    寇寇寇先森阅读 1,266评论 0 0
  • TaskManager 的内存布局 Flink 内部并非直接将对象存储在堆上,而是将对象序列化到一个个预先分配的 ...
    专职掏大粪阅读 494评论 0 0