声明:由于本人也是处于学习阶段,有些理解可能并不深刻,甚至会携带一定错误,因此请以批判的态度来进行阅读,如有错误,请留言或直接联系本人。
WEEK2内容概要:1)MapReduce内部工作机理;2)利用Java实现MapReduce(自学)
关键词:Mapper; Reducer; Master; Combiner; Partitioner; MapReduce Framework; shuffle; sort
首先,提出问题:
Typical big data problem challenges:
1)How do we break up a large problem into smaller tasks that can be executed in parallel?
2)How do we assign tasks to workers distributed across a potentially large number of machines?
3)How do we ensure that the workers get the data they need?
4)How do we coordinate synchronization among the different workers?
5)How do we share partial results from one worker that is needed by another?
6)How do we accomplish all of the above in the face of software errors and hardware faults?
问题一:How do we break up a large problem into smaller tasks that can be executed in parallel?
在MapReduce是由一个叫做MapReduce Framework的框架结构来统筹MapReduce的各个任务协调分配,在HDFS input文件给MapReduce系统时,MapReduce Framework根据job(即input的文件)的InputFormat来做三步工作:
1)检查作业输入的有效性。
2)把输入文件切分成多个logical InputSplit instance, 并把每一实例分别分发给一个 Mapper。
3)提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得input records, 这些records将由Mapper处理。
此处对(3)进行一定修正:3)提供RecordReader的实现,这个RecordReader(RecordReader把由InputSplit 提供的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。 因此RecordReader负责处理记录的边界情况和把数据表示成keys/values对形式)从InputSplit中获得input records,然后将records表示成value/key对形式交由Mapper处理。
(注:The default behavior of file-based InputFormat implementations, typically sub-classes of FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.)
问题二:How do we assign tasks to workers distributed across a potentially large number of machines?
在问题一中,我们已经知道了InputFormat将文件分成多个logical InputSplit instance(假设为n个),于是MapReduce Framework通过一定算法将这n个logical InputSplit instance分配给n个Mapper。(注:InputSplit instance的拆分大小注意事项:1)Small size is better for load-balancing: faster machine will be able to process more splits; 2)But if splits are too small, the overhead of managing the splits dominate the total execution time; 3)For most jobs, a good split size tends to be the size of a HDFS block, 64MB(default))
在这里我们需要介绍Mapper的工作机制:(Mapper会有一个具体的map function来指导Mapper如何将input records转换为intermediate records)
Mappers通过map function将input records转换为intermediate records。这种转换的intermediate records不需要与input records的类型一致。一个给定的input pair可以映射成0个或多个输出键值对。Mapper的数量由logical InputSplit instance的数量决定(Reducer的数量由User决定)。在Mapper将intermediate records传送给Reducer之前,需要通过shuffle和sort步骤将intermediate records整理好。这里我们详细的分析一下Mapper下的shuffle和sort的工作机制(以下内容摘自互联网http://www.cnblogs.com/qiaoqianxiong/p/4974147.html):
每个Map即(Mapper)在内存中都有一个缓存区,map的输出结果会先放到这个缓冲区中,在缓冲区中,会进行预排序(即sort和comibner),以提高效率。
缓冲区默认大小是100MB(可以通过io.sort.mb属性更改大小),当缓冲区中的数据达到特定的阈值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent默认是0.80)时,系统会启动一个后台线程把缓冲区的内容spill(溢写)到磁盘。溢出到磁盘的一个临时文件中,即80%的内容成为一个临时文件。当这80%的内容溢出时,map会继续向剩余的20%缓冲区中输出。
spill线程在把缓冲区中的数据写到磁盘前,会进行一个二次快速排序,首先根据数据所属的Partition排序,然后每个Partition中再按Key排序。输出包括一个索引文件和数据文件。如果设定了Combiner,将在排序输出的基础上进行。
Comibner就是一个Mini Reducer,在执行Map任务的节点本身运行,对Map的输出做一次简单Reduce,使得Map'de输出更紧凑,更少的数据会被写入磁盘和传送到Reduce端。
一个Map任务会产生多个spill文件,在Map任务完成前,所有的spill文件将会归并排序为一个索引文件和数据文件。当spill文件归并完成后,Map将删除所有的临时文件,并告知TaskTracker任务已完成。
对写入到磁盘的数据可以选择采取压缩的方式,如果需要压缩,则需要设置mapred.compress.map.output为true。
还有一个Partition的概念,一个临时文件是进行了分区的,并且分区的数量由reduce的数量决定,不同的分区传给不同的reduce。
接着,我们介绍Reducer的工作机制:(Reducer会有一个具体的reduce function来指导Reducer如何将intermediate records进行最后的reduce)
(以下内容摘自互联网http://www.cnblogs.com/qiaoqianxiong/p/4974147.html)
Reduce端通过HTTP获取Map端的数据,只要有一个map任务完成,Reduce任务就开始复制它的输出,这称为copy阶段。
JobTracker知道Map输出与TaskTracker的映射关系,Reduce端有一个线程间歇地向JobTracker询问Map输出的地址,直到把所有的数据都获取到。
如果map输出比较小,他们被复制到Reduce的内存中,如果缓冲区空间不足,会被复制到磁盘上。复制的数据放在磁盘上,后台线程会进行归并为更大的排序文件,对于压缩文件,系统会自动解压到内存方便归并。
当所有的Map输出被复制后,Reduce任务进入排序阶段(确切的说是归并阶段),这个过程会重复多次。Merge有三种形式:内存到内存,内存到磁盘,磁盘到磁盘。
内存到内存默认不启用;内存到磁盘的方式也会产生溢写,如果设置了Combiner,此时也会启用,在磁盘上生成多个溢写文件;磁盘到磁盘会生成一个最终的文件作为Reduce的输入。
Combiner:Comibner就是一个Mini Reducer,在执行Map任务的节点本身运行,对Map的输出做一次简单Reduce,使得Map'de输出更紧凑,更少的数据会被写入磁盘和传送到Reduce端。(参照下图)
Partitioner:
1)Partitioner controls the partitioning of the keys of the intermediate map-outputs.
1.1)The key (or a subset of the key) is used to derive the partition, typically by a hash function.
1.2)The total number of partitions is the same as the number of reduce tasks for the job.
1.2.1)This controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
2)System uses HashPartitioner by default: hash(key) mod R
3)Sometimes useful to override the hash function:
举例:hash(hostname(URL)) mod R ensures URLs from a host end up in the same output file(注,这里的R指的是Reducer的数量)
<u>https://www.unsw.edu.au/faculties;</u><u>https://www.unsw.edu.au/about-us;</u> <u>https://www.anu.edu.au/faculties;</u><u>https://www.anu.edu.au/about-us</u>; <u>https://www.unimelb.edu.au/faculties;</u> <u>https://www.unimelb.edu.au/about-us</u>
这6个网址,我们可以通过设置partitioner的hash(key)来识别中间网段的不同来进行分类(unsw归类为一组,anu归类为一组,unimelb归类为一组)
这里buffer in memory分成三次partition是因为文件太大,需要再将文件split后进行partition然后sort,最后将所有的partition merge到disk中得到3个partition。这里可以看到到3个reducer fetch了自己所要的partition,那是因为”The total number of partitions is the same as the number of reduce tasks for the job. ”
问题三:How do we ensure that the workers get the data they need?
对于问题三的分析,Mapper是被动接受数据的,然后根据map function来处理数据,Reducer是由用户主动设定想要接收什么数据的。假设一,这里的workers是Reducers,Reducer是通过http请求来获取想要的数据的,Reducer的设定是MapReduce Framework通过一定的算法分配得到的,所以如果reducer得到的数据是自己不想要的,它会向MapReduce Framework反馈信息。同理,假设二,这里的workers是Mappers,Mapper在得到input后发现这个input不是自己能够处理的文件,向MapReduce Framework反馈信息。
问题四: How do we coordinate synchronization among the different workers? 首先我们给出Master的定义(PPT P.20):
1)Master node takes care of coordination:
1.1)Task status: (idle, in-progress, completed)
1.2)Idle tasks get scheduled as workers become available
1.3)When a map task completes, it sends the master the location and sizes of its R intermediate files, one for each reducer
1.4)Master pushes this info to reducers
2)Master pings workers periodically to detect failures
所以,是Master来协调各个worker之间的任务。
(注:这里的Master和MapReduce Framework的关系可能会让人有些困惑,官方文档概述:Map/Reduce Framework由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。)
问题五:How do we share partial results from one worker that is needed by another?
问题分析,这里share的partial result是Mapper的output,another是Reducer。
通过MapReduce的工作机制,我们可以知道,是Reduce通过HTTP请求来获得它所需要的特定的intermediate records。
问题六:How do we accomplish all of the above in the face of software errors and hardware faults?(PPT P.21)
1)Map worker failure
1.1)Its task is reassigned to another map worker
1.2)Reduce workers are notified when task is rescheduled on another worker
2)Reduce worker failure
2.1)Its task is reassigned to another reduce worker
2.2)Reduce task is restarted (usually require restarting mapper tasks as well)
3)Master failure
3.1)MapReduce task is aborted and client is notified
最后还有问题,在map和reduce中间的操作是什么?intermediate records怎么处理?Reducer的output怎么处理?(PPT P.22给出了答案)
在map和reduce中间的操作是什么?
Implicit between the map and reduce phases is a parallel “group by” operation on intermediate keys
- Intermediate data arrive at each reducer in order, sorted by the key
- No ordering is guaranteed across reducers
intermediate records怎么处理?
Intermediate keys (used in shuffle and sort) are transient:
- They are not stored on the distributed filesystem
- They are “spilled” to the local disk of each machine in the cluster
Reducer的output怎么处理?
Output keys from reducers are written back to HDFS
- The output may consist of r distinct files, where r is the number of reducers
- Such output may be the input to a subsequent MapReduce phase