1.JobManager 内存调优
1.1 JobManager 内存模型的介绍
JobManager 是 Java 进程,从 JVM 角度来看,总体分为 Heap 内存和 Off-Heap 内存。
① Total Process Memory 是 JobManager 的总内存,即 Master 进程的内存。
② Total Flink Memory 是 JobManager 框架的内存,包括 JVM Heap 内存和 Off-Heap 部分内存。
③ JVM Heap 内存 用于 Flink 框架、用户代码(作业提交及 Checkpoint 完成的回调函数)
④ Off-Heap 内存用于 Flink 框架依赖(例如 Akka 的网络通信)和包含 Metaspace,其值等于 Total Flink Memory 减去 JVM Heap。
⑤ JVM Metaspace 主要是加载用户程序中的类。
1.2 如何配置 JobManager 内存
(1)配置 JVM Heap 内存
作业运行的数量、作业的结构(算子数量、复杂度等)、用户代码(类数量、复杂度),决定 JVM Heap 内存。
通过配置参数 jobmanager.heap.size
,设置 JVM Heap 内存的大小。如果需要设置年轻代、老年代等参数,在启动 JobManager 进程时,可以通过启动脚本设置 -Xms、-Xmx、-Xmn 等参数。
JVM 参数 | TaskManager 内存 | JobManager 内存 |
---|---|---|
-Xms 和 -Xmx | 框架堆内存 + 任务堆内存 | JVM Heap 内存 |
-XX:MaxDirectMemorySize | 框架堆外内存 + 任务堆外内存(用户代码) + 网络内存 | Off-Heap 内存 |
-XX:MaxMetaspaceSize | JVM Metaspace | JVM Metaspace |
JVM 内存参数说明:
① -Xms 设置堆内存初始值,堆内存 = 新生代 + 老年代,默认是物理内存的 1/64
② -Xmx 设置堆内存最大值,默认是物理内存的 1/4
③ -Xmn 设置年轻代的大小
④ -XX:SurvivorRatio=8 设置新生代中 Eden 和 Survivor 的比值。8 表示Survivor : Eden = 1 : 8,两个 FromSuv 和 ToSuv 占比相等,即 Eden : FromSuv : FromSuv = 8 : 1 : 1
⑤ -XX:PretenureSizeThreshold=1024 设置对象进入老年区的阈值,即该对象大小超过 1024 KB 直接进入老年区
⑥ -XX:MetaspaceSize 设置元空间初始值,达到 MetaspaceSize,JVM 会进行 Metaspace GC。
⑦ -XX:MaxMetaspaceSize 元空间最大值,超过 MaxMetaspaceSize,会导致错误 java.lang.OutOfMemoryError: Metaspace。
如下所示,Flink on Kubernetes,Session 模式,通过 Jobmanager-deployment.yaml 设置 JVM 参数。
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
env:
- name: JVM_ARGS
value: -Xms1024m -Xmx4096m -XX:MetaspaceSize=256M
(2)配置 JVM Metaspace 内存
作业中的类数量、作业提交的次数(Session 模式)决定 JVM Metaspace 内存的大小。
注意事项:
① 设置 -XX:MetaspaceSize、-XX:MaxMetaspaceSize。如果 MetaspaceSize 使用默认值,可能导致 Metaspace 频繁 GC。如果 没有设置 MaxMetaspaceSize,其值是没有上限的,可能超过物理内存导致程序挂了。
② 集群在 Session 模式下,作业的类数量较多 + 提交次数较多。由于每次运行作业,会创建新的 URLClassLoader,使得 JobManager 加载的类数量不断增加,最终导致 java.lang.OutOfMemoryError: Metaspace。因此,建议采用 Pre-job 模式或者将作业与 Flink 打包在一起。
2.TaskManager 内存调优
2.1 TaskManager 内存模型的介绍
TaskManager 是 Java 进程,从 JVM 角度来看,总体分为 Heap 内存和 Off-Heap 内存。Flink-1.10 TaskManager 可以精细化管理内存,例如指定 Heap 内存中的任务堆内存(
taskmanager.memory.task.heap.size
)。
① Total Process Memory 是 TaskManager 的总内存,即 Master 进程的内存。
② Total Flink Memory 是 TaskManager 框架的内存,包括 JVM Heap 内存和 Off-Heap 部分内存。
③ JVM Heap 内存包含作业 Task 和框架的 JVM 堆内存。
④ Managed Memory 是 Flink 管理的托管内存,用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。
⑤ Direct Memory 是 JVM 直接内存,主要包含 Network Memory(网络数据交换缓冲区)、Framework Off-Heap Memory(不计入 slot 的 Flink 框架内存)、Task Off-HeapMemory(Task 执行用户代码所使用的对外内存)
⑥ JVM specific memory 是 JVM 本身使用的内存,主要是 JVM Metaspace
2.2 如何配置 TaskManager 内存
(1)设置 Total Process Memory 和 Total Flink Memory
- 进程总内存(
taskmanager.memory.process.size
) - Flink 总内存(
taskmanager.memory.flink.size
)
Flink 总内存的示例,如下图所示。
① 虚拟机部署:建议指定 Flink 总内存。
② 容器化部署:建议指定进程总内存,这等于容器的内存限制。详情请参考如何配置容器内存(Kubernetes、Yarn 或 Mesos)。
注意:同时设置进程总内存和 Flink 总内存,可能使得内存配置冲突,导致集群部署失败。
(2)设置 JVM Heap 内存
① taskmanager.memory.task.heap.size
,即作业 Task 的 JVM 堆内存,用于算子和用户代码。
② taskmanager.memory.framework.heap.size
,默认值是 128 MB,用于框架的 JVM 堆内存。
注意:在容器化部署下,不建议设置 taskmanager.memory.task.heap.size。该值等于 Total Flink Memory 减去 (Framework Heap Memory + Task Off-Heap Memory + Managed Memory + Network Memory),由于 Framework Heap Memory 默认值为 0,Task Off-Heap Memory
、Managed Memory 和 Network Memory 都没有默认值,所以 Task Heap 可以最大化。
技巧:如果 Total Flink Memory 设置超过容器内存限制,当 Task Heap 超过内存限制时,K8s 会重启拉起容器,快速恢复业务。
(3)设置 Direct Memory
① taskmanager.memory.managed.size
是托管内存,用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存
② taskmanager.memory.network.min
、taskmanager.memory.network.max
、taskmanager.memory.network.fraction
,设置网络数据交换缓冲区。
③ taskmanager.memory.task.off-heap.size
,设置 Task 执行用户代码所使用的堆外内存,例如用户代码读写 I/O。
④ taskmanager.memory.framework.off-heap.size
,默认值是 128MB,即不计入 slot 的 Flink 框架内存,不建议修改该配置。
(4)设置 JVM Metaspace
通过配置 taskmanager.memory.jvm-metaspace.size
,可以设置 Flink JVM 进程的 Metaspace,即等同于 -XX:MaxMetaspaceSize,如上 1.2 所示。
2.3 注意容器的内存限制
(1)Flink 进程内存超过容器内存
当没有设置Total Process Memory、Total Flink Memory 和 JVM Metaspace。Flink 进程内存可能超过容器内存,导致其进程被重启。因此,在容器化部署模式下,建议主动设置 Total Process Memory 和 JVM Metaspace。
(2)JVM Metaspace 需要额外的内存
预留 Metaspace,主动设置 -XX:MaxMetaspaceSize,否则可能无限增长。Flink on K8s,在 Session 模式下,JobManager 的 Metaspace 可能超过容器的内存限制,导致 JobManager 进程被 K8s 重启。
(3)用户代码或其依赖项会消耗大量的 Off-heap 内存
用户代码消耗大量的 Off-heap 内存,例如读写文件的 I/O。因此,需要额外分配 JVM Direct 内存。
(4)注意 RocksDB 的大状态
RocksDB 状态后端主要是消耗 Managed Memory。如果需要保存大状态数据,例如长窗口,可以增加 Managed Memory 的大小,提高 RocksDB 的性能。