Shuffle过程是MapReduce的核心,描述着数据从map task输出到reduce task输入的这段过程。
Hadoop的集群环境,大部分的map task和reduce task是执行在不同的节点上的,那么reduce就要取map的输出结果。那么集群中运行多个Job时,task的正常执行会对集群内部的网络资源消耗严重。虽说这种消耗是正常的,是不可避免的,但是,我们可以采取措施尽可能的减少不必要的网络资源消耗。另一方面,每个节点的内部,相比于内存,磁盘IO对Job完成时间的影响相当的大,。
所以:从以上分析,shuffle过程的基本要求:
1.完整地从map task端拉取数据到reduce task端
2.在拉取数据的过程中,尽可能地减少网络资源的消耗
3.尽可能地减少磁盘IO对task执行效率的影响
那么,Shuffle的设计目的就要满足以下条件:
1.保证拉取数据的完整性
2.尽可能地减少拉取数据的数据量
3.尽可能地使用节点的内存而不是磁盘
Shuffle的执行阶段流程:
1).Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value序列化数据,Partition分区信息等。
2).Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
3).Merge 阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
4).Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程(一个是内存到磁盘的合并,一个是磁盘到磁盘的合并)对内存到本地的数据文件进行合并操作。
6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask 阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可
处理过程:
1.map进程不基于block进行,而是基于一个抽象的切片split,map task的并发数是由切片的数量决定的,有多少个切片就启动多少个map task。
2.切片是一个逻辑概念,指的是文件的数据偏移量范围
3.切片的具体大小应该根据处理的文件的大小来调整
4.每个map都有一个环形内存缓冲区,用于存储任务的输出,默认大小100M,到大阈值0.8后,一个后台线程把内容写到(spill)磁盘的指定目录中。
5.写入磁盘前,要进行partition,sort,如果有combiner,combine排序后数据
6.然后把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。此时还需要重新排序。
7.reducer 通过http方式得到输出文件的分区
8.reduce stask接受到多个map输出的中间数据文件,这些中间数据文件分别有序,但是整体无序,因此还需要重新进行排序操作。然后进行merge合并操作。
9.最终由一个reduce task处理,结果输出到一个文件中。
整个shuffle过程都是由MRAPPMaster进行控制
MRAPPMaster的任务监控调度机制处理过程:
生产集群调优
1、每个map缓存内存大小设置(io.sort.mb属性),
2、溢出的阈值设置(io.sort.splill.percent)
3、每个spill切片大小