3.6 Shuffle机制

3.6 Shuffle机制

在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。

3.6.1 什么是Shuffle

Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。

在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:

1)保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。

2)每个分片对应一个文件(现在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。

因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。

在Spark中,任务通常分为两种,Shuffle mapTask和reduceTask,具体逻辑如图3-11所示:

[插图]

图3-11 Spark Shuffl e

图3-11中的主要逻辑如下:

1)首先每一个MapTask会根据ReduceTask的数量创建出相应的bucket, bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。

2)其次MapTask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。

当ReduceTask启动时,它会根据自己task的id和所依赖的Mapper的id从远端或本地的block manager中取得相应的bucket作为Reducer的输入进行处理。

这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。Spark shuffle可以分为两部分:

1)将数据分成bucket,并将其写入磁盘的过程称为Shuffle Write。

2)在存储Shuffle数据的节点Fetch数据,并执行用户定义的聚集操作,这个过程称为Shuffle Fetch。

3.6.2 Shuffle历史及细节

下面介绍Shuffle Write与Fetch。

1. Shuffle Write

在Spark的早期版本实现中,Spark在每一个MapTask中为每个ReduceTask创建一个bucket,并将RDD计算结果放进bucket中。

但早期的Shuffle Write有两个比较大的问题。

1)Map的输出必须先全部存储到内存中,然后写入磁盘。这对内存是非常大的开销,当内存不足以存储所有的Map输出时就会出现OOM(Out of Memory)。

2)每个MapTask会产生与ReduceTask数量一致的Shuffle文件,如果MapTask个数是1k, ReduceTask个数也是1k,就会产生1M个Shuffle文件。这对于文件系统是比较大的压力,同时在Shuffle数据量不大而Shuffle文件又非常多的情况下,随机写也会严重降低IO的性能。

后来到了Spark 0.8版实现时,显著减少了Shuffle的内存压力,现在Map输出不需要先全部存储在内存中,再flush到硬盘,而是record-by-record写入磁盘中。对于Shuffle文件的管理也独立出新的ShuffleBlockManager进行管理,而不是与RDD cache文件在一起了。

但是Spark 0.8版的Shuffle Write仍然有两个大的问题没有解决。

1)Shuffle文件过多的问题。这会导致文件系统的压力过大并降低IO的吞吐量。

2)虽然Map输出数据不再需要预先存储在内存中然后写入磁盘,从而显著减少了内存压力。但是新引入的DiskObjectWriter所带来的buffer开销也是不容小视的内存开销。假定有1k个MapTask和1k个ReduceTask,就会有1M个bucket,相应地就会有1M个write handler,而每一个write handler默认需要100KB内存,那么总共需要100GB内存。这样仅仅是buffer就需要这么多的内存。因此当ReduceTask数量很多时,内存开销会很大。

为了解决shuffle文件过多的情况,Spark后来引入了新的Shuffle consolidation,以期显著减少Shuffle文件的数量。

Shuffle consolidation的原理如图3-12所示:

[插图]

图3-12 Shuffl e consolidation

在图3-12中,假定该job有4个Mapper和4个Reducer,有2个core能并行运行两个task。可以算出Spark的Shuffle Write共需要16个bucket,也就有了16个write handler。在之前的Spark版本中,每个bucket对应一个文件,因此在这里会产生16个shuffle文件。

而在Shuffle consolidation中,每个bucket并非对应一个文件,而是对应文件中的一个segment。同时Shuffle consolidation产生的Shuffle文件数量与Spark core的个数也有关系。在图3-12中,job中的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个Shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到之前的8个文件后面,这样一共就只有8个Shuffle文件,而在文件内部共有16个不同的segment。因此从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number, R是Reducer的个数。

很显然,当M=C时,Shuffle consolidation产生的文件数和之前的实现相同。

Shuffle consolidation显著减少了Shuffle文件的数量,解决了Spark之前实现中一个比较严重的问题。但是Writer handler的buffer开销过大依然没有减少,若要减少Writer handler的buffer开销,只能减少Reducer的数量,但是这又会引入新的问题。

2. Shuffle Fetch与Aggregator

Shuffle Write写出去的数据要被Reducer使用,就需要Shuffle Fetch将所需的数据Fetch过来。这里的Fetch操作包括本地和远端,因为Shuffle数据有可能一部分是存储在本地的。在早期版本中,Spark对Shuffle Fetcher实现了两套不同的框架:NIO通过socket连接Fetch数据;OIO通过netty server去fetch数据。分别对应的类是Basic-BlockFetcherIterator和NettyBlockFetcherIterator。

目前在Spark1.5.0中做了优化。新版本定义了类ShuffleBlockFetcherIterator来完成数据的fetch。对于local的数据,ShuffleBlockFetcherIterator会通过local的BlockMan-ager来fetch。对于远端的数据块,它通过BlockTransferService类来完成。具体实现参见如下代码:

              [ShuffleBlockFetcherIterator.scala]

/* fetch local数据块 */

private[this] def fetchLocalBlocks() {

在MapReduce的Shuffle过程中,Shuffle fetch过来的数据会进行归并排序(merge sort),使得相同key下的不同value按序归并到一起供Reducer使用,这个过程如图3-13所示:

[插图]

图3-13 Fetch merge

这些归并排序都是在磁盘上进行的,这样做虽然有效地控制了内存使用,但磁盘IO却大幅增加了。虽然Spark属于MapReduce体系,但是对传统的MapReduce算法进行了一定的改变。Spark假定在大多数应用场景下,Shuffle数据的排序不是必须的,如word count。强制进行排序只会使性能变差,因此Spark并不在Reducer端做归并排序。既然没有归并排序,那Spark是如何进行reduce的呢?这就涉及下面要讲的Shuffle Aggregator了。

Aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。

在做word count reduce计算count值时,它会将Shuffle fetch到的每一个key-value对更新或是插入hashmap中(若在hashmap中没有查找到,则插入其中;若查找到,则更新value值)。这样就不需要预先把所有的key-value进行merge sort,而是来一个处理一个,省去了外部排序这一步骤。但同时需要注意的是,reducer的内存必须足以存放这个partition的所有key和count值,因此对内存有一定的要求。

在上面word count的例子中,因为value会不断地更新,而不需要将其全部记录在内存中,因此内存的使用还是比较少的。考虑一下如果是groupByKey这样的操作,Reducer需要得到key对应的所有value。在Hadoop MapReduce中,由于有了归并排序,因此给予Reducer的数据已经是group by key了,而Spark没有这一步,因此需要将key和对应的value全部存放在hashmap中,并将value合并成一个array。可以想象为了能够存放所有数据,用户必须确保每一个partition小到内存能够容纳,这对于内存是非常严峻的考验。因此在Spark文档中,建议用户涉及这类操作时尽量增加partition,也就是增加Mapper和Reducer的数量。

增加Mapper和Reducer的数量固然可以减小partition的大小,使内存可以容纳这个partition。但是在Shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增加得更多,由此带来write handler所需的buffer也会更多。在一方面我们为了减少内存的使用采取了增加task数量的策略,另一方面task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。

为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行,因此Spark新版本中提供了外部排序的实现,以解决这个问题。

Spark将需要聚集的数据分为两类:不需要归并排序和需要归并排序的数据。对于前者,在内存中的AppendOnlyMap中对数据聚集。对于需要归并排序的数据,现在内存中进行聚集,当内存数据达到阈值时,将数据排序后写入磁盘。事实上,磁盘上的数据只是全部数据的一部分,最后将磁盘数据全部进行归并排序和聚集。具体Aggregator的逻辑可以参见Aggregator类的实现。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,978评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,954评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,623评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,324评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,390评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,741评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,892评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,655评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,104评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,451评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,569评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,254评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,834评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,725评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,950评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,260评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,446评论 2 348

推荐阅读更多精彩内容