在Hadoop中,用于执行MapReduce任务的机器角色有两个:
- JobTracker:用于调度工作的,初始化作业,分配作业,与TaskTracker进行通信,协调整个作业的执行
- TaskTracker:用于执行工作的;保持与JobTracker的通信,在分配的数据片段上执行map或reduce任务
- HDFS:保存作业的数据,配置信息,保存作业结果
- 客户端:编写mapreduce,配置作业,提交作业
一个Hadoop集群只有一个JobTracker.
MapReduce是一种编程模型,是一种编程方法。
- 输入一个大文件,通过split之后,将其分为多个分片
- 每个文件分片由单独的机器去处理,这就是map方法
- 将各个机器计算的结果进行汇总并得到最终的结果,这就是reduce方法
input->split->map->shuffle->reduce->output
在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两个阶段:map阶段和reduce阶段,这两个阶段分别用两个函数来表示,即map函数和reduce函数,map函数接受一个<key,value>形式的输入,然后同样产生一个key-value形式的中间输出,Hadoop会负责将所有具有相同中间key值的value集合到一起传递给reduce函数,reduce函数接受一个如<key,(list of value)>的形式输入,然后对这个value集合进行处理,每个reduce产生0个或者1个输出,reduce的输出结果也是key-value
在MR的标准模型中,reduce阶段在map阶段完成之前无法启动,而且在下载到reducer之前,所有处理过程的中间数据都保存在磁盘中,所有这些都显著增加了处理的延迟
shuffle
shuffle过程包含在map和reduce两端,在Map端的shuffle过程是对map的结果进行划分partition,排序sort和分割spill,然后将属于同一个划分的输出合并在一起merge,并写在磁盘上,同时按照不同的划分将结果发送给对应的reduce(map的输出的划分和reduce的对应关系由JobTracker确定)。reduce端又会将各个map送来的属于同一个划分的输出进行合并,然后对merge的结果进行排序,最后交给reduce处理
map端
map端的shuffle过程包含在collect函数对map输出结果的处理过程中,
reduce端
reduce端shuffle阶段可以分为三个阶段:复制map输出,排序合并,reduce处理
旧API
map方法
map函数继承于MapReduceBase,并且实现了Mapper接口,此接口是一个泛型类型,有4个形式的参数,分别是
- 输入key值类型
- 输入value值类型
- 输出Key值类型
- 输出value值类型
reduce方法
reduce函数继承于MapReduceBase,并且实现了Reducer接口,reduce函数是以map函数的输出作为输入
新API
- 在新API中,Mapper与Reducer已经不是接口 而是抽象类,而且map函数和reduce函数已经不再实现Mapper和Reducer接口,而是继承。
- 广泛使用context对象,并使用MapContext进行MapReduce之间的通信,MapContext同时充当OutCollector和Reporter
角色 - Job的配置统一由configuration来完成,不需要额外使用JobConf对守护进程进行配置
- 由Job类来负责Job的控制,而不是JobClient,JobClient在新API中被删除
数据流
数据首先按照TextInputFormat形式被处理成两个InputSplit,然后输入到两个map中,map程序会读取inputSplit指定位置的数据,然后按照设定的方式处理批数据,最后写入本地磁盘中。注意,这里不是写到hdfs上,因为map的输出在job完成之后即可删除,因此不需要存储在hdfs上。但是由于网络传输降低了mapreduce任务的执行效率,因此map的输出文件是写在磁盘上的,如果map程序在没有来得及将数据传送到reduce就崩溃了,那么JobTracker只需要另外选取一台机器重新执行这个task就可以了。
Reduce会读取map的输出数据,合并Value,然后将他们输出到hdfs上,reduce的输出会占用很多的网络带宽,不过这与上传数据一样,是不可避免的。
在这,需要注意:
- MapReduce在执行过程中往往不止一个reduce task,reduce task的数量是可以通过程序指定的,当存在多个reduce task时,每个reduce会收集一个或者多个key值,当出现多个reduce task时,每个reducetask都会生成一个输出文件
- 在没有reduce任务时,系统会直接将map的输出结果作为最终的结果,同时map task的数量可以看成是reduce task的数量,即有多少个maptask就有多少个输出文件
MR任务优化
MapReduce计算模型的优化主要集中在两个方面:计算性能方面的优化,IO操作方面的优化
- 任务的调度
计算方面,hadoop总是优先将任务分配给空闲的机器,使得所有的任务能公平分享资源;IO方面,hadoop尽量将Map任务分配给InputSplit的机器,减少网络IO的消耗 - 数据预处理与InputSplit的大小
MR任务擅长处理少量的大数据,不擅长大量的小数据,因此可以通过设置map的输入数据大小来调整map运行时间,可以设置块block的大小,也可以设置map任务的梳理来调整map任务的数据输入 - maph和reduce任务的数量
- combine函数
- 压缩
- 自定义comparator
Hadoop流
基本工作原理:
InputSplit->map>stdin>executable->stout->map->key/value
当一个可执行文件作为Mapper时,每一个map任务以一个独立的进程启动这个可执行文件,然后在map任务运行时,会把输入切分成行提供给可执行文件,并作为它的标准输入stdin内容,当可执行文件运行出结果
几个问题
- 新旧API之间的差别
- 如何去重
- 如何排序
- reduce卡死
reduce过程的百分比与对应的处理如下:- 0~33%是shuffle的过程,数据从mapper已到了reducer
- 33~67%是sort的过程,这个过程只会在mapper完成后才会执行
- 67~100%才是reducer程序执行的过程。如果reduce卡在了67%,那么说明reducer一个也没有执行。可能是输入数据太大,超过了限制,也可能是reducer有死循环的bug