本篇文章介绍一下MapReduce[1]分布式计算
先回顾一下Hadoop架构:
Hadoop由HDFS分布式存储、MR分布式计算、Yarn资源调度三部分组成
MR
- MR是采用一种分而治之[2]的思想设计出来的分布式计算框架
- MR由两个阶段组成:
- Map阶段(切分成一个个小的任务)
- Reduce阶段(汇总小任务的结果)
Map阶段
- map阶段有一个关键的map()函数。
- 此函数的输入是键值对。
- 输出是一系列的键值对,输出写到本地磁盘。
Reduce阶段
- reduce阶段有一个关键的函数reduce函数。
- 此函数的输入也是键值对(即map的输出kv对)。
- 输出也是一系列的键值对,结果最终写入HDFS。
Map&Reduce工作流程图如下:
下面以MR的词频统计为例,详细介绍MR工作流程。
需求:统计一批英文文章中,每个单词出现的总次数。
假设:现在有一个输入文件"Gone With The Wind",这个文件有三个block:block1, block2, block3。三个block的内容依次如下图。
- Map阶段
- 每一个block对应一个分片split[3] (默认split与block一一对应)。
- 每一个split对应一个map任务(map task)。所以这里三个block将会对应三个map task(map1, map2, map3),这3个任务的逻辑完全一样。
- 以map1为例。map1会读取block1的数据,一次读取block1的一行数据,然后会产生一个kv对(其中,key是当前所读行的行首相对于当前block开始处的字节偏移量;value是当前行的内容;如假设当前所读行是第一行,那么当前行的内容是"Dear Bear River",则kv对是(0, "Dear Bear River")),作为map()的参数传入,调用map()。
- map()方法。将value当前行内容按空格切分,得到三个单词Dear|Bear|River,然后将每个单词变成键值对(Dear, 1)|(Bear, 1)|(River, 1),最终结果输出为文件,写入map任务所在节点的本地磁盘中(其中还有一个Shuffle的过程,下文会详细讲解)。
- block的第一行数据被处理完后,接着处理第二行,当map任务将当前block所有的数据全部处理完后,此map任务即运行结束。
- Reduce阶段
- reduce任务(reduce task)的个数由用户程序指定,main()内的job.setNumReduceTask(4)指定reduce任务是4个(reduce1, reduce2, reduce3, reduce4)。
- 以reduce1为例。reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端(拷贝阶段)。同样地,也会连接到map2,map3获取数据。最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一个组[4]。4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入(其中还有一个Shuffle的过程,下文会详细讲解),调用reduce()。
- reduce()方法。计算Dear的总数为4,并将(Dear, 4)作为键值对输出,最终结果输出成文件,写入HDFS。
MR中key的作用
MR编程中,key有特殊的作用:
① 数据中,若要针对某个值进行分组、聚合时,需将此值作为MR中的reduce的输入的key。 如上边词频统计例子,按单词进行分组,每组中对出现次数做聚合(计算总和);所以需要将每个单词作为reduce输入的key,MapReduce框架自动按照单词分组,进而求出每组即每个单词的总次数。
② 另外,key还具有可排序的特性,因为MR中的key类需要实现WritableComparable接口;而此接口又继承Comparable接口(可查看源码)。
MR编程时,要充分利用以上两点;结合实际业务需求,设置合适的key。
Shuffle
前面在讲map和reduce的工作原理的时候,对于map的处理结果只是简单地说保存在磁盘,而对于reduce,也只是简单地说了从map端获取处理结果作为其输入。这两个过程其实并不是那么那么简单,当中还有一个shuffle的过程。
其中,分区用到了分区器,默认分区器是HashPartitioner,源码:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
-
map端
- 每个map任务都有一个对应的环形内存缓冲区(如上图),对于map()输出的kv对并不是立马就写入磁盘,而是先写入到一个环形缓冲区(默认大小是100M),当内容占据80%的缓冲区空间后,由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件。
- 在溢写的过程中,map任务可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,最终造成100m占满后,map任务会暂停向环形缓冲区中写数据的过程;只执行溢出写的过程;直到环形缓冲区的数据全部溢出写到磁盘,才恢复向缓冲区写入。
- 后台线程溢写磁盘过程,有以下几个步骤:
- 先对每个溢写的kv对做分区;分区的个数由MR程序的reduce任务数决定; 默认使用HashPartitioner计算当前kv对属于哪个分区;计算公式:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks(见上面源码)
- 每个分区中,根据kv对的key做内存中排序。
- 若设置了map端本地聚合combiner,则对每个分区中,排好序的数据做combine[5]操作。
- 若设置了对map输出压缩的功能,会对溢写数据压缩。
- 随着不断的向环形缓冲区中写入数据,会多次触发溢写(每当环形缓冲区写满100m),本地磁盘最终会生成多个溢出文件。
- 合并溢写文件:在map task完成之前,所有溢出文件会被合并成一个大的溢出文件;且是已分区、已排序的输出文件(如上图,每个计算节点上保存的合并后的文件都有4个分区,每个分区内的kv对都是已经排好了序)。
- 在合并溢写文件时,如果至少有3个溢写文件,并且设置了map端combine的话,会在合并的过程中触发combine操作;
- 但是若只有2个或1个溢写文件,则不触发combine操作(因为combine操作,本质上是一个reduce,需要启动JVM虚拟机,有一定的开销)
- reduce端
- reduce task会在每个map task运行完成后,通过HTTP获得map task输出中,属于自己的分区数据(许多kv对),如果map输出数据比较小,先保存在reduce的jvm内存中,否则直接写入reduce磁盘。一旦内存缓冲区达到阈值(默认0.66)或map输出数的阈值(默认1000),则触发归并merge,结果写到本地磁盘。
- 若MR编程指定了combine,在归并过程中会执行combine操作。
- 随着溢出写的文件的增多,后台线程会将它们合并大的、排好序的文件。
- reduce task将所有map task复制完后,将合并磁盘上所有的溢出文件,默认一次合并10个,最后一批合并,部分数据来自内存,部分来自磁盘上的文件。
- 合并完成之后,进入“归并、排序、分组阶段”。
- 最后每组数据调用一次reduce方法。
小结:
shuffle主要指的是map端的输出作为reduce端输入的过程。
- map端
- map()输出结果先写入环形缓冲区。
- 缓冲区100M;写满80M后,开始溢出写磁盘文件。
- 此过程中,会进行分区、排序、combine(可选)、压缩(可选)
- map任务完成前,会将多个小的溢出文件,合并成一个大的溢出文件(已分区、排序)。
- reduce端
- 拷贝阶段:reduce任务通过http将map任务属于自己的分区数据拉取过来。
- 开始merge及溢出写磁盘文件。
- 所有map任务的分区全部拷贝过来后,进行阶段合并、排序、分组阶段。
- 每组数据调用一次reduce()。
- 结果写入HDFS。
拓展阅读:
-
MapReduce可简称为MR。 ↩
-
如果有一组大任务(复杂,计算量大,耗时较长的任务),使用单台服务器无法计算或者叫段时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同服务器上并行的执行,最终再汇总每个小任务的结果。 ↩
-
split 是一个逻辑概念,它只包含一些元数据信息,比如 、数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。 ↩
-
实际情况是存在多个不同的键,然后会根据键分组,相同的键分到一个组。 ↩
-
将如kv对("poem", 3)和("poem", 5)键值对合并的过程,叫combine操作,将map()结果写入磁盘之前进行combine可以减少带宽消耗。 ↩