在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。
在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
HashShuffleManager(spark 2.x弃用)
未经优化的HashShuffleManager
优化后的HashShuffleManager
优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
SortShuffleManager
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
在基于排序的shuffle中,根据目标分区ID对传入的记录进行排序,然后写入一个单一的map output file 。reducers 拉取该文件的相邻区域,以便读取他们的map output 部分。在map output数据太大而无法加载内存的情况下,已排序的输出子集可以溢出到磁盘,而磁盘文件上的子集被合并,生成最终输出文件。
基于sort的洗牌有两种不同的生成路径来产生它的map output 文件:
1、Serialized sorting
当以下三个条件都成立时使用:
shuffle依赖项不指定聚集或输出排序;
支持序列化值的重新定位(当前支持KryoSerializer 和 Spark SQL’s custom serializers);
shuffle产生少于16777216个输出分区
2、Deserialized sorting
用于处理所有其他情况。
Serialized sorting mode序列化排序模式
在序列化排序模式中,传入的记录只要传递到shuffle写入器,并在排序过程中以串行形式缓冲,做了几处优化:
1、排序操作的是二进制序列化数据而不是java对象,从而降低了内存消耗和GC开销。
这种优化要求记录序列化器具有一定的属性允许序列化记录被重新排序,而不需要反序列化。
2、它使用一种专门的缓存高效排序器(ShuffleExternalSorter)排序,压缩记录指针和分区ID的数组。在排序数组中,每条记录只使用8字节的空间,这可以将更多的数组放入缓存中。
3、 溢出合并过程对属于同一个分区的序列化记录块进行操作。在合并过程中不需要反序列化记录。
4、当溢出压缩编解码器支持压缩数据的级联时,溢出合并简单地串联序列化和压缩溢出分区以生成最终输出分区。
这允许使用高效的数据复制方法,如NIO的“transferTo”。为了避免在合并过程中分配解压或复制缓冲区。
ByPassSortShuffleManager
该类实现基于排序的 shuffle hash-style shuffle fallback 路径。写入路径将传入记录写入单独的文件,每个reduce分区一个文件,然后将这些文件串联起来。每个分区文件形成单个输出文件,其中的区域被提供给reducers.记录不缓存在内存中,基本和HashShuffleWriter类似,
除了除了它以格式写入输出,可以通过IndexShuffleBlockResolver服务或消费,这个写入路径对于大量的reduce分区是无效的,因为
同时为所有分区打开单独的序列化程序和文件流。
启动bypass机制的条件:
no Ordering is specified,
no Aggregator is specific
the number of partitions is less than spark.shuffle.sort.bypassMergeThreshold
Spark Shuffle调优
spark.shuffle.file.buffer 32k buffer大小默认是32K maptask端的shuffle 降低磁盘IO .
spark.reducer.MaxSizeFlight 48M shuffle read拉取数据量的大小
spark.shuffle.memoryFraction 0.2 shuffle聚合内存的比例
spark.shuffle.io.maxRetries 3 拉取数据重试次数
spark.shuffle.io.retryWait 5s 调整到重试间隔时间60s
spark.shuffle.manager hash|sort Spark Shuffle的种类
spark.shuffle.consolidateFiles false 针对HashShuffle HashShuffle 合并机制
spark.shuffle.sort.bypassMergeThreshold 200 针对SortShuffle SortShuffle bypass机制 200次