什么是shuffer
- 宽依赖之间会划分stage,而Stage之间就是Shuffle
Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,这期间涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等,所以说Shuffle是整个应用程序运行过程中非常昂贵的一个阶段,理解Spark Shuffle原理有助于优化Spark应用程序。
在DAG阶段以shuffle为界,划分stage,上游stage做map task,每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;下游stage做reduce task,每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。
- 基于Hash的Shuffle实现
- 基于Sort的Shuffle实现(现在采用的机制)
1.2之前是HashShuffleManager
1.2版本之前这时候每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M R ,其中M是Map的个数,R是Reduce的个数。这样会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量。后面忍不了了就做了优化,把在同一core上运行的多个Mapper 输出的合并到同一个文件,这样文件数目就变成了 cores R 个了
2.1版本 分为三种writer机制, 分为 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。
-上面是使用哪种 writer 的判断依据, 是否开启 mapSideCombine 这个判断,是因为有些算子会在 map 端先进行一次 combine, 减少传输数据。
-因为 BypassMergeSortShuffleWriter 会临时输出Reducer个(分区数目)小文件,所以分区数必须要小于一个阀值,默认是小于200。
-UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。
BypassMergeSortShuffleWriter 实现细节
BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter实现基本一致, 唯一的区别在于,map端的多个输出文件会被汇总为一个文件。 所有分区的数据会合并为同一个文件,会生成一个索引文件,是为了索引到每个分区的起始地址,可以随机 access 某个partition的所有数据。
SortShuffleWriter 实现细节
我们可以先考虑一个问题,假如我有 100亿条数据,但是我们的内存只有1M,但是我们磁盘很大, 我们现在要对这100亿条数据进行排序,是没法把所有的数据一次性的load进行内存进行排序的,这就涉及到一个外部排序的问题,我们的1M内存只能装进1亿条数据,每次都只能对这 1亿条数据进行排序,排好序后输出到磁盘,总共输出100个文件,最后怎么把这100个文件进行merge成一个全局有序的大文件。我们可以每个文件(有序的)都取一部分头部数据最为一个 buffer, 并且把这 100个 buffer放在一个堆里面,进行堆排序,比较方式就是对所有堆元素(buffer)的head元素进行比较大小, 然后不断的把每个堆顶的 buffer 的head 元素 pop 出来输出到最终文件中, 然后继续堆排序,继续输出。如果哪个buffer 空了,就去对应的文件中继续补充一部分数据。最终就得到一个全局有序的大文件。
如果你能想通我上面举的例子,就差不多搞清楚sortshufflewirter的实现原理了,因为解决的是同一个问题。
SortShuffleWriter 中的处理步骤就是
使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在内存中进行排序, 排序的 K 是(partitionId, hash(key)) 这样一个元组。
如果超过内存 limit, 我 spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根据 hash(key)进行比较排序
如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件 和 当前内存中的数据结构中的数据进行 merge sort, 进行全局排序
和我们开始提的那个问题基本类似,不同的地方在于,需要对 Key 相同的元素进行 aggregation, 就是使用定义的 func 进行聚合, 比如你的算子是 reduceByKey(+), 这个func 就是加法运算, 如果两个key 相同, 就会先找到所有相同的key 进行 reduce(+) 操作,算出一个总结果 Result,然后输出数据(K,Result)元素。
SortShuffleWriter 中使用 ExternalSorter 来对内存中的数据进行排序,ExternalSorter内部维护了两个集合PartitionedAppendOnlyMap、PartitionedPairBuffer, 两者都是使用了 hash table 数据结构, 如果需要进行 aggregation, 就使用 PartitionedAppendOnlyMap(支持 lookup 某个Key,如果之前存储过相同key的K-V 元素,就需要进行 aggregation,然后再存入aggregation后的 K-V), 否则使用 PartitionedPairBuffer(只进行添K-V 元素),
1.2之后是SortShuffleManager
创建磁盘文件上的区别在SHUFFER上游阶段进行两个机制从而减少临时文件的产生。
shuffleFileGroup机制在上游的每一个Group磁盘文件的数量与下游stage的task数量是相同的
consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量
主要区别: SortShuffleManager
- 在上游通过两个机制改变产生的临时文件数.
Shuffle阶段划分:
shuffle write:mapper阶段,上一个stage得到最后的结果写出
- SortShuffle-write普通机制
- 排序,先写入5M的内存模型(Map or Aaary),不够在申请,排序是根据kyro序列化支持排序的序列化
根据公式
applyMemory=nowMenory*2-oldMemory
申请的内存=当前的数据内存情况*2-上一次的内嵌情况
溢写,通过JAVA的流写入磁盘BufferedOutputStream, 默认默认的batch数量是10000条.
merge,一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中. 还包括一个索引文件 记录这
标识了下游各个task的数据在文件中的start offset与end offset。
- bypassShuffle-write机制(不排序)
shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
触发条件:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
不是聚合类的shuffle算子(比如reduceByKey)。
也是根据KEY 的hash取余来写入磁盘,跟未经优化的HashShuffleManager是一样,只是最后做了带索引的文件跟合并文件, 也是有 溢写跟merge
磁盘写机制不同,第二不会进行排序
UnsafeShuffle-write机制满足条件.
- 先序列化在排序机制,可以序列化排序,
- kryo排序& partition数量不能大于指定阙值2的24次方.因为partition number使用的是24bit.
-
一般满足上述条件才会执行,
1617505599.png
1617505971(1).png