1. BDP平台建议设置
并行度 Parallelism
在分布式的运行环境中,每个 opetator(例如 source、map 等 operator)都会切分成多个 subtask,Subtask 的数量即为该 operator 的并行度。
为了更高效的执行,Flink 对 DAG 的调度进行了优化。Flink 程序会将多个 subtask 串联起来,这些被链接到一起的算子组成一个 task,每个 task 单独在一个线程中运行。这种优化能减少线程之间的切换和跨节点的数据交换,从而达到减少时延的同时提升吞吐量。为了最大限度地使用这种优化,我们强烈建议将各节点的并行度设置为相同值。JobManage 内存大小
JobManager 是流任务的管理者、协调者。
通过该设置可以初始化 JobManager 的内存数量,建议 JobManager 内存大小设置不小于 4096 Mb,且应为 1024Mb 的整数倍。TaskManager 内存大小
TaskManager 是流任务的处理者。每一个 TaskManager 都是一个 JVM 进程。
通过该设置可以初始化 TaskManager 的内存数量,建议 TaskManager 内存大小设置不小于 4096 Mb,且应为 1024Mb 的整数倍。
默认情况下,Flink 将 TaskManager 中 70% 的自由内存(总内存减去用于网络缓存的内存)作为可管理的内存。TaskManager 监控可管理内存的大小,以避免 OOM 错误以及反序列化数据带来的开销。TaskManager 数量
每一个 TaskManager 都可以处理一部分任务,当流数据很大时候,单个 TaskManager 无法处理,并且可能面临单点故障的问题。
用户可以通过设置 TaskManager 数量达到均衡负载的目的,建议数量至少设为 2 个。Task Slots 个数
每一个 TaskManager 都是一个 JVM 进程,可以在一个或多个独立的线程中处理串联的 task。
我们采用 TaskSlots 来控制 TaskManager 可接受的 task 数量。每个 task slot 会占用一部分可管理的内存,多个 task slots 之间平均分配内存,以此防止竞争内存。
通过设置 TaskSlots 可以发挥 Flink 并行计算的优势,使一个完整的 operator chain 在一个 task slot 中运行,减少线程之间的切换和跨节点的数据交换。
CPU 核心数 = 1 + TaskManager 个数 * 单个 TaskManager CPU 核心数
内存使用数 = JobManager 内存 + TaskManager 个数 * 单个 TaskManager 内存数;
Slots 个数:一个 slot 对应一个线程,通过该参数平均分配 TaskManager 内存。如果有 2G 内存,2个slots,则每个 slots 平均分到 1G 内存;
为了充分发挥多核 CPU 的优势,我们通过设置 vcore 将CPU个数与CPU核心数解绑。
2. 如何处理Kafka积压?
积压告警不是错误,指的是kafka 或者其他数据源数据的积压量大于监控设置的阈值,通常是由于上游写入kafka的数据速率增大或者下游消费的速率降低所致。
如果对数据实时性要求很高,此时可以增加资源以加快数据的消费
(1)增加任务运行的并行度。任务并行度最好应该等于 kafka 分区数目。(如果是kafka节点,此节点的并行度不可大于kafka的分区数)
(2)调整 taskmanager 的slot个数,要求最大并行度必须要小于等于 taskmanager*slot的个数
(3)有脏数据,跳过或者处理
(4)Flink Web UI 自带的反压监控(直接方式),通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask
3. 问题:公司怎么提交的实时任务,flink部署方式?
yarn-cluster
- yarn-session:在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。
- yarn-cluster:每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
4. 为什么用 Flink?
主要考虑的是 flink 的低延迟、高吞吐量和对流式数据应用场景更好的支
持;另外,flink 可以很好地处理乱序数据,而且可以保证 exactly-once 的状态一致
性。
5. checkpoint 的存储?
1、默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。
2、state 的store和checkpoint的位置取决于State Backend的配置(env.setStateBackend(…))
3、一共有三种State Backend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend
(1)MemoryStateBackend:state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中,基于内存的state backend在生产环境下不建议使用
(2)FsStateBackend:state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统一下
(3)RocksDBStateBackend:RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
6. 如何保证exactly-once?
使用 分布式快照机制 和 两阶段提交
把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统
7. 说一下 Flink 状态机制?
Flink 会以 checkpoint 的形式对各个任务的状态进行快照,用于保证故障恢复时的状态一致性。Flink 通过状态后端来管理状态 和 checkpoint 的存储。checkpoint 机制 采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
8. 数据倾斜问题?
步骤1:定位反压
定位反压有2种方式:Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask。
步骤2:确定数据倾斜
Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。如下图所示,红框内的 Subtask 出现数据热点。
处理方法:
- 倾斜发生在源头:
当我们的并发度设置的比分区数要低时,就会造成上面所说的消费不均匀的情况。解决方法可以调整并发度,source的并发度调整为kafka一致或者kafka的并发度是source并发度的整数倍。 - key分布不均匀
预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀。
聚合:去盐全局聚合,删除预聚合添加的前缀或者后缀,然后进行聚合统计
9. Flink什么情况下才会把Operator chain在一起形成算子链?
两个operator chain在一起的的条件:
上下游的并行度一致
下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
上下游节点都在同一个 slot group 中(下面会解释 slot group)
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
两个节点间数据分区方式是 forward(参考理解数据流的分区)
用户没有禁用 chain
10. Flink的反压?
反压(backpressure)反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。反压会影响到两项指标: checkpoint 时长和 state 大小。
解决反压:通过 Flink Web UI 自带的反压监控面板;通过 Flink Task Metrics。
分析反压的大致思路是:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。反压情况可以根据以下表格进行对号入座(图片来自官网):
11. 你们的Flink集群规模多大?
20CU
12. Task Slot 的概念
TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。askManager会将自己节点上管理的资源分为不同的Slot,这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。
13. Flink分区策略?
数据分区在 Flink 中叫作 Partition 。本质上来说,分布式计算就是把一个作业切分成子任务 Task, 将不同的数据交给不同的 Task 计算。
(1)GlobalPartitioner
数据会被分发到下游算子的第一个实例中进行处理。
(2)ForwardPartitioner
在API层面上ForwardPartitioner应用在 DataStream上,生成一个新的 DataStream。
该Partitioner 比较特殊,用于在同一个 OperatorChain 中上下游算子之间的数据转发,实际上数据是直接传递给下游的,要求上下游并行度一样。
(3)ShufflePartitioner
随机的将元素进行分区,可以确保下游的Task能够均匀地获得数据,使用代码如下:
dataStream.shuffle();
(4)RebalancePartitioner
以Round-robin 的方式为每个元素分配分区,确保下游的 Task 可以均匀地获得数据,避免数据倾斜。使用代码如下:
(5)RescalePartitioner
根据上下游 Task 的数量进行分区, 使用 Round-robin 选择下游的一个Task 进行数据分区,如上游有2个 Source.,下游有6个 Map,那么每个 Source 会分配3个固定的下游 Map,不会向未分配给自己的分区写人数据。这一点与 ShufflePartitioner 和 RebalancePartitioner 不同, 后两者会写入下游所有的分区。
dataStream.rescale();
(6)BroadcastPartitioner
将该记录广播给所有分区,即有N个分区,就把数据复制N份,每个分区1份,其使用代码如下:
dataStream.broadcast();
(7)KeyGroupStreamPartitioner
在API层面上,KeyGroupStreamPartitioner应用在 KeyedStream上,生成一个新的 KeyedStream。
KeyedStream根据keyGroup索引编号进行分区,会将数据按 Key 的 Hash 值输出到下游算子实例中。该分区器不是提供给用户来用的。
KeyedStream在构造Transformation的时候默认使用KeyedGroup分区形式,从而在底层上支持作业Rescale功能。
(8)CustomPartitionerWrapper
用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。
14. Flink并行度,与slot的区别
并行执行的任务。
我们在实际生产环境中可以从四个不同层面设置并行度:
操作算子层面(Operator Level)
执行环境层面(Execution Environment Level)
客户端层面(Client Level)
系统层面(System Level)
需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。
Slot的个数为并行执行的能力,并行度为实际使用的并行能力
15. Flink广播变量
可以理解为是一个公共的共享变量,我们可以把一个 dataset数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset 数据)。
注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现 OOM 这样的问题
Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量
16. Flink的内存管理
Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。
理论上Flink的内存管理分为三部分:
Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。