我们将探讨一个 Flink 作业在实际运行时,不同的 Task 之间是如何进行数据交换的。由于不同的 Task 可能并非运行在同一个 TaskManager 中,因而数据传输的过程中必然涉及到网络通信,文中也会对 Flink 的网络栈的实现,包括反压机制等,进行分析。
概览
Flink 的数据交换机制在设计时遵循两个基本原则: 1. 数据交换的控制流(例如,为初始化数据交换而发出的消息)是由接收端发起的 2. 数据交换的数据流(例如,在网络中实际传输的数据被抽象为 IntermediateResult 的概念)是可插拔的。这意味着系统基于相同的实现逻辑既可以支持 Streaming 模式也可以支持 Batch 模式下数据的传输
我们知道,在一个 TaskManager 中可能会同时并行运行多个 Task,每个 Task 都在单独的线程中运行。在不同的 TaskManager 中运行的 Task 之间进行数据传输要基于网络进行通信。实际上,是 TaskManager 和另一个 TaskManager 之间通过网络进行通信,通信是基于 Netty 创建的标准的 TCP 连接,同一个 TaskManager 内运行的不同 Task 会复用网络连接。
关于 Flink 的数据交换机制的具体流程,Flink 的 wiki 中给出了一个比较详细的说明,在这里引述一下其中的内容,对我们后续分析具体的实现细节很有帮助。
数据交换的控制流
上图代表了一个简单的 map-reduce 类型的作业,有两个并行的任务。有两个 TaskManager,每个 TaskManager 都分别运行一个 map Task 和一个 reduce Task。我们重点观察 M1 和 R2 这两个 Task 之间的数据传输的发起过程。数据传输用粗箭头表示,消息用细箭头表示。首先,M1 产出了一个ResultPartition(RP1)(箭头1)。当这个 RP 可以被消费是,会告知 JobManager(箭头2)。JobManager 会通知想要接收这个 RP 分区数据的接收者(tasks R1 and R2)当前分区数据已经准备好。如果接受放还没有被调度,这将会触发对应任务的部署(箭头 3a,3b)。接着,接受方会从 RP 中请求数据(箭头 4a,4b)。这将会初始化 Task 之间的数据传输(5a,5b),数据传输可能是本地的(5a),也可能是通过 TaskManager 的网络栈进行(5b)。对于一个 RP 什么时候告知 JobManager 当前已经出于可用状态,在这个过程中是有充分的自由度的:例如,如果在 RP1 在告知 JM 之前已经完整地产出了所有的数据(甚至可能写入了本地文件),那么相应的数据传输更类似于 Batch 的批交换;如果 RP1 在第一条记录产出时就告知 JM,那么就是 Streaming 流交换。
字节缓冲区在两个 Task 之间的传输
上面这张图展示了一个细节更加丰富的流程,描述了一条数据记录从生产者传输到消费者的完整生命周期。
最初,MapDriver 生成数据记录(通过 Collector 收集)并传递给 RecordWriter 对象。RecordWriter 包含一组序列化器,每个消费数据的 Task 分别对应一个。 ChannelSelector 会选择一个或多个序列化器处理记录。例如,如果记录需要被广播,那么就会被交给每一个序列化器进行处理;如果记录是按照 hash 进行分区的,ChannelSelector 会计算记录的哈希值,然后选择对应的序列化器。
序列化器会将记录序列化为二进制数据,并将其存放在固定大小的 buffer 中(一条记录可能需要跨越多个 buffer)。这些 buffer 被交给 BufferWriter 处理,写入到 ResulePartition(RP)中。 RP 有多个子分区(ResultSubpartitions - RSs)构成,每一个子分区都只收集特定消费者需要的数据。在上图中,需要被第二个 reducer (在 TaskManager 2 中)消费的记录被放在 RS2 中。由于第一个 Buffer 已经生成,RS2 就变成可被消费的状态了(注意,这个行为实现了一个 streaming shuffle),接着它通知 JobManager。
JobManager查找RS2的消费者,然后通知 TaskManager 2 一个数据块已经可以访问了。通知TM2的消息会被发送到InputChannel,该inputchannel被认为是接收这个buffer的,接着通知RS2可以初始化一个网络传输了。然后,RS2通过TM1的网络栈请求该buffer,然后双方基于 Netty 准备进行数据传输。网络连接是在TaskManager(而非特定的task)之间长时间存在的。
一旦 Buffer 被 TM2 接收,它同样会经过一个类似的结构,起始于 InputChannel,进入 InputGate(它包含多个IC),最终进入一个反序列化器(RecordDeserializer),它会从 buffer 中将记录还原成指定类型的对象,然后将其传递给接收数据的 Task。
几个基本概念
在开始介绍 Flink 中数据交换机制的具体实现之前,我们有必要先对几个重要的概念进行一下梳理。这几个概念主要是到对 Flink 作业运行时产生的中间结果的抽象。
IntermediateDataset
IntermediateDataset 是在 JobGraph 中对中间结果的抽象。我们知道,JobGraph 是对 StreamGraph 进一步进行优化后得到的逻辑图,它尽量把可以 chain 到一起 operator 合并为一个 JobVertex,而 IntermediateDataset 就表示一个 JobVertex 的输出结果。JobVertex 的输入是 JobEdge,而 JobEdge 可以看作是 IntermediateDataset 的消费者。一个 JobVertex 也可能产生多个 IntermediateDataset。需要说明的一点是,目前一个 IntermediateDataset 实际上只会有一个 JobEdge 作为消费者,也就是说,一个 JobVertex 的下游有多少 JobVertex 需要依赖当前节点的数据,那么当前节点就有对应数量的 IntermediateDataset。
IntermediateResult 和 IntermediateResultpartition
在 JobManager 中,JobGraph 被进一步转换成可以被调度的并行化版本的执行图,即 ExecutionGraph。在 ExecutionGraph 中,和 JobVertex 对应的节点是 ExecutionJobVertex,和 IntermediateDataset 对应的则是 IntermediataResult。由于一个节点在实际运行时可能有多个并行子任务同时运行,所以 ExecutionJobVertex 按照并行度的设置被拆分为多个 ExecutionVertex,每一个表示一个并行的子任务。同样的,一个 IntermediataResult 也会被拆分为多个 IntermediateResultPartition,IntermediateResultPartition 对应 ExecutionVertex 的输出结果。一个 IntermediateDataset 只有一个消费者,那么一个 IntermediataResult 也只会有一个消费者;但是到了 IntermediateResultPartition 这里,由于节点被拆分成了并行化的节点,所以一个 IntermediateResultPartition 可能会有多个 ExecutionEdge 作为消费者。
ResultPartition 和 ResultSubpartition
ExecutionGraph 还是 JobManager 中用于描述作业拓扑的一种逻辑上的数据结构,其中表示并行子任务的 ExecutionVertex 会被调度到 TaskManager 中执行,一个 Task 对应一个 ExecutionVertex。同 ExecutionVertex 的输出结果 IntermediateResultPartition 相对应的则是 ResultPartition。IntermediateResultPartition 可能会有多个 ExecutionEdge 作为消费者,那么在 Task 这里,ResultPartition 就会被拆分为多个 ResultSubpartition,下游每一个需要从当前 ResultPartition 消费数据的 Task 都会有一个专属的 ResultSubpartition。
ResultPartitionType 指定了 ResultPartition 的不同属性,这些属性包括是否流水线模式、是否会产生反压以及是否限制使用的 Network buffer 的数量。ResultPartitionType 有三个枚举值:
BLOCKING:非流水线模式,无反压,不限制使用的网络缓冲的数量
PIPELINED:流水线模式,有反压,不限制使用的网络缓冲的数量
PIPELINED_BOUNDED:流水线模式,有反压,限制使用的网络缓冲的数量
其中是否流水线模式这个属性会对消费行为产生很大的影响:如果是流水线模式,那么在 ResultPartition 接收到第一个 Buffer 时,消费者任务就可以进行准备消费;而如果非流水线模式,那么消费者将等到生产端任务生产完数据之后才进行消费。目前在 Stream 模式下使用的类型是 PIPELINED_BOUNDED。
InputGate 和 InputChannel
在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应的。也就是说,InputGate 实际上对应的是该 Task 依赖的上游算子(包含多个并行子任务),每个 InputGate 消费了一个或多个 ResultPartition。InputGate 由 InputChannel 构成,InputChannel 和 ExecutionGraph 中的 ExecutionEdge 一一对应;也就是说, InputChannel 和 ResultSubpartition 一一相连,一个 InputChannel 接收一个 ResultSubpartition 的输出。根据读取的 ResultSubpartition 的位置,InputChannel 有 LocalInputChannel 和 RemoteInputChannel 两种不同的实现。
数据交换机制的具体实现
数据交换从本质上来说就是一个典型的生产者-消费者模型,上游算子生产数据到 ResultPartition 中,下游算子通过 InputGate 消费数据。由于不同的 Task 可能在同一个 TaskManager 中运行,也可能在不同的 TaskManager 中运行:对于前者,不同的 Task 其实就是同一个 TaskManager 进程中的不同的线程,它们的数据交换就是在本地不同线程间进行的;对于后者,必须要通过网络进行通信。我们分别来介绍下这两个不同场景下数据交换的具体实现。通过合理的设计和抽象,Flink 确保本地数据交换和通过网络进行数据交换可以复用同一套代码。