Flink问题

1. BDP平台建议设置

  • 并行度 Parallelism
    在分布式的运行环境中,每个 opetator(例如 source、map 等 operator)都会切分成多个 subtask,Subtask 的数量即为该 operator 的并行度。
    为了更高效的执行,Flink 对 DAG 的调度进行了优化。Flink 程序会将多个 subtask 串联起来,这些被链接到一起的算子组成一个 task,每个 task 单独在一个线程中运行。这种优化能减少线程之间的切换和跨节点的数据交换,从而达到减少时延的同时提升吞吐量。为了最大限度地使用这种优化,我们强烈建议将各节点的并行度设置为相同值。

  • JobManage 内存大小
    JobManager 是流任务的管理者、协调者。
    通过该设置可以初始化 JobManager 的内存数量,建议 JobManager 内存大小设置不小于 4096 Mb,且应为 1024Mb 的整数倍。

  • TaskManager 内存大小
    TaskManager 是流任务的处理者。每一个 TaskManager 都是一个 JVM 进程。
    通过该设置可以初始化 TaskManager 的内存数量,建议 TaskManager 内存大小设置不小于 4096 Mb,且应为 1024Mb 的整数倍。
    默认情况下,Flink 将 TaskManager 中 70% 的自由内存(总内存减去用于网络缓存的内存)作为可管理的内存。TaskManager 监控可管理内存的大小,以避免 OOM 错误以及反序列化数据带来的开销。

  • TaskManager 数量
    每一个 TaskManager 都可以处理一部分任务,当流数据很大时候,单个 TaskManager 无法处理,并且可能面临单点故障的问题。
    用户可以通过设置 TaskManager 数量达到均衡负载的目的,建议数量至少设为 2 个。

  • Task Slots 个数
    每一个 TaskManager 都是一个 JVM 进程,可以在一个或多个独立的线程中处理串联的 task。
    我们采用 TaskSlots 来控制 TaskManager 可接受的 task 数量。每个 task slot 会占用一部分可管理的内存,多个 task slots 之间平均分配内存,以此防止竞争内存。
    通过设置 TaskSlots 可以发挥 Flink 并行计算的优势,使一个完整的 operator chain 在一个 task slot 中运行,减少线程之间的切换和跨节点的数据交换。

CPU 核心数 = 1 + TaskManager 个数 * 单个 TaskManager CPU 核心数
内存使用数 = JobManager 内存 + TaskManager 个数 * 单个 TaskManager 内存数;
Slots 个数:一个 slot 对应一个线程,通过该参数平均分配 TaskManager 内存。如果有 2G 内存,2个slots,则每个 slots 平均分到 1G 内存;
为了充分发挥多核 CPU 的优势,我们通过设置 vcore 将CPU个数与CPU核心数解绑。

2. 如何处理Kafka积压?

积压告警不是错误,指的是kafka 或者其他数据源数据的积压量大于监控设置的阈值,通常是由于上游写入kafka的数据速率增大或者下游消费的速率降低所致。
如果对数据实时性要求很高,此时可以增加资源以加快数据的消费
(1)增加任务运行的并行度。任务并行度最好应该等于 kafka 分区数目。(如果是kafka节点,此节点的并行度不可大于kafka的分区数)
(2)调整 taskmanager 的slot个数,要求最大并行度必须要小于等于 taskmanager*slot的个数
(3)有脏数据,跳过或者处理
(4)Flink Web UI 自带的反压监控(直接方式),通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask

3. 问题:公司怎么提交的实时任务,flink部署方式?

yarn-cluster

  • yarn-session:在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。
  • yarn-cluster:每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

4. 为什么用 Flink?

主要考虑的是 flink 的低延迟、高吞吐量和对流式数据应用场景更好的支
持;另外,flink 可以很好地处理乱序数据,而且可以保证 exactly-once 的状态一致
性。

5. checkpoint 的存储?

1、默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。

2、state 的store和checkpoint的位置取决于State Backend的配置(env.setStateBackend(…))

3、一共有三种State Backend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend
(1)MemoryStateBackend:state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中,基于内存的state backend在生产环境下不建议使用
(2)FsStateBackend:state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统一下
(3)RocksDBStateBackend:RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

6. 如何保证exactly-once?

使用 分布式快照机制 和 两阶段提交
把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统

7. 说一下 Flink 状态机制?

Flink 会以 checkpoint 的形式对各个任务的状态进行快照,用于保证故障恢复时的状态一致性。Flink 通过状态后端来管理状态 和 checkpoint 的存储。checkpoint 机制 采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。

8. 数据倾斜问题?

步骤1:定位反压
定位反压有2种方式:Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask。
步骤2:确定数据倾斜
Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。如下图所示,红框内的 Subtask 出现数据热点。
处理方法:

  • 倾斜发生在源头:
    当我们的并发度设置的比分区数要低时,就会造成上面所说的消费不均匀的情况。解决方法可以调整并发度,source的并发度调整为kafka一致或者kafka的并发度是source并发度的整数倍。
  • key分布不均匀
    预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀。
    聚合:去盐全局聚合,删除预聚合添加的前缀或者后缀,然后进行聚合统计

9. Flink什么情况下才会把Operator chain在一起形成算子链?

两个operator chain在一起的的条件:
上下游的并行度一致
下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
上下游节点都在同一个 slot group 中(下面会解释 slot group)
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
两个节点间数据分区方式是 forward(参考理解数据流的分区)
用户没有禁用 chain

10. Flink的反压?

反压(backpressure)反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。反压会影响到两项指标: checkpoint 时长和 state 大小。
解决反压:通过 Flink Web UI 自带的反压监控面板;通过 Flink Task Metrics。
分析反压的大致思路是:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。反压情况可以根据以下表格进行对号入座(图片来自官网):

11. 你们的Flink集群规模多大?

20CU

12. Task Slot 的概念

TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。askManager会将自己节点上管理的资源分为不同的Slot,这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。

13. Flink分区策略?

数据分区在 Flink 中叫作 Partition 。本质上来说,分布式计算就是把一个作业切分成子任务 Task, 将不同的数据交给不同的 Task 计算。

image.png

(1)GlobalPartitioner
数据会被分发到下游算子的第一个实例中进行处理。
(2)ForwardPartitioner
在API层面上ForwardPartitioner应用在 DataStream上,生成一个新的 DataStream。
该Partitioner 比较特殊,用于在同一个 OperatorChain 中上下游算子之间的数据转发,实际上数据是直接传递给下游的,要求上下游并行度一样。
(3)ShufflePartitioner
随机的将元素进行分区,可以确保下游的Task能够均匀地获得数据,使用代码如下:

dataStream.shuffle();

(4)RebalancePartitioner
以Round-robin 的方式为每个元素分配分区,确保下游的 Task 可以均匀地获得数据,避免数据倾斜。使用代码如下:
(5)RescalePartitioner
根据上下游 Task 的数量进行分区, 使用 Round-robin 选择下游的一个Task 进行数据分区,如上游有2个 Source.,下游有6个 Map,那么每个 Source 会分配3个固定的下游 Map,不会向未分配给自己的分区写人数据。这一点与 ShufflePartitioner 和 RebalancePartitioner 不同, 后两者会写入下游所有的分区。

dataStream.rescale();

(6)BroadcastPartitioner
将该记录广播给所有分区,即有N个分区,就把数据复制N份,每个分区1份,其使用代码如下:

dataStream.broadcast();

(7)KeyGroupStreamPartitioner
在API层面上,KeyGroupStreamPartitioner应用在 KeyedStream上,生成一个新的 KeyedStream。
KeyedStream根据keyGroup索引编号进行分区,会将数据按 Key 的 Hash 值输出到下游算子实例中。该分区器不是提供给用户来用的。
KeyedStream在构造Transformation的时候默认使用KeyedGroup分区形式,从而在底层上支持作业Rescale功能。
(8)CustomPartitionerWrapper
用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。

14. Flink并行度,与slot的区别

并行执行的任务。
我们在实际生产环境中可以从四个不同层面设置并行度:
操作算子层面(Operator Level)
执行环境层面(Execution Environment Level)
客户端层面(Client Level)
系统层面(System Level)
需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。
Slot的个数为并行执行的能力,并行度为实际使用的并行能力

15. Flink广播变量

可以理解为是一个公共的共享变量,我们可以把一个 dataset数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset 数据)。
注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现 OOM 这样的问题
Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量

16. Flink的内存管理

Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。
理论上Flink的内存管理分为三部分:
Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容