先上图(纯手工,嘻嘻)
对于Map 端:
当Map 开始产生输出时,它并不是简单的把数据写到磁盘,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先是写到内存中的一个缓冲区,并做了一些预排序,以提升效率。
每个Map 任务都有一个用来写入输出数据的循环内存缓冲区。这个缓冲区默认大小是100MB,可以通过io.sort.mb 属性来设置具体大小。当缓冲区中的数据量达到一个特定阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,也就是80M时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。
在spill 过程中,Map 的输出将会继续写入到缓冲区,但如果缓冲区已满,Map 就会被阻塞直到spill 完成。spill 线程在把缓冲区的数据写到磁盘前,会对它进行一个二次快速排序,首先根据数据所属的partition 排序,然后每个partition 中再按Key 排序。
Combiner 就是一个Mini Reducer,它在执行Map 任务的节点本身运行,先对Map 的输出做一次简单Reduce,使得Map 的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。
每当内存中的数据达到spill 阀值的时候,都会产生一个新的spill 文件(spill 文件保存在由mapred.local.dir指定的目录中,Map 任务结束后删除。),所以在Map任务写完它的最后一个输出记录时,可能会有多个spill 文件。在Map 任务完成前,所有的spill 文件将会被归并排序为一个索引文件和数据文件。这是一个多路归并过程,最大归并路数由io.sort.factor 控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine 属性控制),那么Combiner 将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩(这种压缩同Combiner 的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer 的数据量。默认输出是不被压缩的, 但可以很简单的设置mapred.compress.map.output 为true 启用该功能。
当spill 文件归并完毕后,Map 将删除所有的临时spill 文件。
对于Reduce端:
1、复制Map输出;
2、排序合并;
3、Reduce处理;
1、Reduce会定期获取map的输出位置,进而复制输出到本地(map很小会放入内存,否则放入磁盘);
2、当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成;
3、在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。
Sort:
如果对hadoop的shuffle机制有所了解的人都知道,map所产生的中间数据在送给reduce进行处理之前是要经过排序的。具体的过程实际上是快速排序,堆排序和归并排序的完美结合。
首先,当map函数处理完输入数据之后,会将中间数据存在本机的一个或者几个文件当中,并且针对这些文件内部的记录进行一次快速排序,这里的排序是升序排序。在Map任务将所有的中间数据写入本地文件并进行快速排序之后,系统会对这些排好序的文件做一次归并排序,(merge)并将排好序的结果输出到一个大的文件当中。这段代码是在MapTask的内部类MapOutputBuffer中实现的,其中归并排序是调用了Merge类的merge方法,具体过程下面将会详细叙述。
当map阶段完成后,启动reduce过程之前,会把这些由map输出的中间文件copy到本地(拉取过程),然后生成一个或者几个Segment类的实例(溢出文件)。Segment类封装了这些中间数据,并且提供了一些针对这些中间数据的操作,比如读取记录等。在reduce端,这些中间数据可以存在内存中,也可以存在硬盘中。同时,系统还会启动两个merge(归并)线程,一个是针对内存中的segment进行归并,一个是针对硬盘中的segment进行归并。
Merge类的merge方法生成了一个MergeQueue类的实例,并且调用了该类的merge方法。MergeQueue类是PriorityQueue类的一个子类,同时实现了RawKeyValueIterator接口。PriorityQueue类实际上是一个小根堆,而MergeQueue的merge方法实际上就是将segment对象存储进父类的数据结构中,并且建立一个小根堆的过程。因此,hadoop的归并和排序不是两个分开的过程,而是一个过程。在将segment归并的同时进行了排序。
需要注意的是,这里针对segment排序的过程是以segment为单位的,而不是以segment中存储的记录(record)为单位的。而这里排序过程中对两个segment对象的比较是对segment中存储的第一个记录的键的比较。也就是说假设有两个segment,一个叫a,一个叫b,a<b仅仅是因为a的第一个记录的键小于b的第一个记录的键。具体的比较方法由用户定义的comparator类定义的。具体的比较过程在MergeQueue类中的lessThan方法中定义。现在,我们已经得到了一个以segment为单位,以segment中第一个记录的键为比较依据的小根堆,至此在系统中所谓的sort阶段就已经结束了。
但是实际上可以看出中间数据的排序是贯穿于整个shuffle阶段的。