COMP9313_WEEK2

声明:由于本人也是处于学习阶段,有些理解可能并不深刻,甚至会携带一定错误,因此请以批判的态度来进行阅读,如有错误,请留言或直接联系本人。

WEEK2内容概要:1)MapReduce内部工作机理;2)利用Java实现MapReduce(自学)
关键词:Mapper; Reducer; Master; Combiner; Partitioner; MapReduce Framework; shuffle; sort

首先,提出问题:
Typical big data problem challenges:
1)How do we break up a large problem into smaller tasks that can be executed in parallel?
2)How do we assign tasks to workers distributed across a potentially large number of machines?
3)How do we ensure that the workers get the data they need?
4)How do we coordinate synchronization among the different workers?
5)How do we share partial results from one worker that is needed by another?
6)How do we accomplish all of the above in the face of software errors and hardware faults?

问题一:How do we break up a large problem into smaller tasks that can be executed in parallel?

在MapReduce是由一个叫做MapReduce Framework的框架结构来统筹MapReduce的各个任务协调分配,在HDFS input文件给MapReduce系统时,MapReduce Framework根据job(即input的文件)的InputFormat来做三步工作:
1)检查作业输入的有效性。
2)把输入文件切分成多个logical InputSplit instance, 并把每一实例分别分发给一个 Mapper。
3)提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得input records, 这些records将由Mapper处理。
此处对(3)进行一定修正:3)提供RecordReader的实现,这个RecordReader(RecordReader把由InputSplit 提供的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。 因此RecordReader负责处理记录的边界情况和把数据表示成keys/values对形式)从InputSplit中获得input records,然后将records表示成value/key对形式交由Mapper处理。
(注:The default behavior of file-based InputFormat implementations, typically sub-classes of FileInputFormat, is to split the input into logical InputSplit instances based on the total size, in bytes, of the input files. However, the FileSystem blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.)

问题二:How do we assign tasks to workers distributed across a potentially large number of machines?
在问题一中,我们已经知道了InputFormat将文件分成多个logical InputSplit instance(假设为n个),于是MapReduce Framework通过一定算法将这n个logical InputSplit instance分配给n个Mapper。(注:InputSplit instance的拆分大小注意事项:1)Small size is better for load-balancing: faster machine will be able to process more splits; 2)But if splits are too small, the overhead of managing the splits dominate the total execution time; 3)For most jobs, a good split size tends to be the size of a HDFS block, 64MB(default))
在这里我们需要介绍Mapper的工作机制:(Mapper会有一个具体的map function来指导Mapper如何将input records转换为intermediate records)
Mappers通过map function将input records转换为intermediate records。这种转换的intermediate records不需要与input records的类型一致。一个给定的input pair可以映射成0个或多个输出键值对。Mapper的数量由logical InputSplit instance的数量决定(Reducer的数量由User决定)。在Mapper将intermediate records传送给Reducer之前,需要通过shuffle和sort步骤将intermediate records整理好。这里我们详细的分析一下Mapper下的shuffle和sort的工作机制(以下内容摘自互联网http://www.cnblogs.com/qiaoqianxiong/p/4974147.html):
每个Map即(Mapper)在内存中都有一个缓存区,map的输出结果会先放到这个缓冲区中,在缓冲区中,会进行预排序(即sort和comibner),以提高效率。
缓冲区默认大小是100MB(可以通过io.sort.mb属性更改大小),当缓冲区中的数据达到特定的阈值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent默认是0.80)时,系统会启动一个后台线程把缓冲区的内容spill(溢写)到磁盘。溢出到磁盘的一个临时文件中,即80%的内容成为一个临时文件。当这80%的内容溢出时,map会继续向剩余的20%缓冲区中输出。
spill线程在把缓冲区中的数据写到磁盘前,会进行一个二次快速排序,首先根据数据所属的Partition排序,然后每个Partition中再按Key排序。输出包括一个索引文件和数据文件。如果设定了Combiner,将在排序输出的基础上进行。
Comibner就是一个Mini Reducer,在执行Map任务的节点本身运行,对Map的输出做一次简单Reduce,使得Map'de输出更紧凑,更少的数据会被写入磁盘和传送到Reduce端。
一个Map任务会产生多个spill文件,在Map任务完成前,所有的spill文件将会归并排序为一个索引文件和数据文件。当spill文件归并完成后,Map将删除所有的临时文件,并告知TaskTracker任务已完成。
对写入到磁盘的数据可以选择采取压缩的方式,如果需要压缩,则需要设置mapred.compress.map.output为true。
还有一个Partition的概念,一个临时文件是进行了分区的,并且分区的数量由reduce的数量决定,不同的分区传给不同的reduce。

接着,我们介绍Reducer的工作机制:(Reducer会有一个具体的reduce function来指导Reducer如何将intermediate records进行最后的reduce)
(以下内容摘自互联网http://www.cnblogs.com/qiaoqianxiong/p/4974147.html
Reduce端通过HTTP获取Map端的数据,只要有一个map任务完成,Reduce任务就开始复制它的输出,这称为copy阶段。
JobTracker知道Map输出与TaskTracker的映射关系,Reduce端有一个线程间歇地向JobTracker询问Map输出的地址,直到把所有的数据都获取到。
如果map输出比较小,他们被复制到Reduce的内存中,如果缓冲区空间不足,会被复制到磁盘上。复制的数据放在磁盘上,后台线程会进行归并为更大的排序文件,对于压缩文件,系统会自动解压到内存方便归并。
当所有的Map输出被复制后,Reduce任务进入排序阶段(确切的说是归并阶段),这个过程会重复多次。Merge有三种形式:内存到内存,内存到磁盘,磁盘到磁盘。
内存到内存默认不启用;内存到磁盘的方式也会产生溢写,如果设置了Combiner,此时也会启用,在磁盘上生成多个溢写文件;磁盘到磁盘会生成一个最终的文件作为Reduce的输入。
Combiner:Comibner就是一个Mini Reducer,在执行Map任务的节点本身运行,对Map的输出做一次简单Reduce,使得Map'de输出更紧凑,更少的数据会被写入磁盘和传送到Reduce端。(参照下图)

Combiner说明

Partitioner:
1)Partitioner controls the partitioning of the keys of the intermediate map-outputs.
1.1)The key (or a subset of the key) is used to derive the partition, typically by a hash function.
1.2)The total number of partitions is the same as the number of reduce tasks for the job.
1.2.1)This controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
2)System uses HashPartitioner by default: hash(key) mod R
3)Sometimes useful to override the hash function:

举例:hash(hostname(URL)) mod R ensures URLs from a host end up in the same output file(注,这里的R指的是Reducer的数量)
<u>https://www.unsw.edu.au/faculties;</u><u>https://www.unsw.edu.au/about-us;</u> <u>https://www.anu.edu.au/faculties;</u><u>https://www.anu.edu.au/about-us</u>; <u>https://www.unimelb.edu.au/faculties;</u> <u>https://www.unimelb.edu.au/about-us</u>
这6个网址,我们可以通过设置partitioner的hash(key)来识别中间网段的不同来进行分类(unsw归类为一组,anu归类为一组,unimelb归类为一组)

Partitioner说明

这里buffer in memory分成三次partition是因为文件太大,需要再将文件split后进行partition然后sort,最后将所有的partition merge到disk中得到3个partition。这里可以看到到3个reducer fetch了自己所要的partition,那是因为”The total number of partitions is the same as the number of reduce tasks for the job. ”

问题三:How do we ensure that the workers get the data they need?
对于问题三的分析,Mapper是被动接受数据的,然后根据map function来处理数据,Reducer是由用户主动设定想要接收什么数据的。假设一,这里的workers是Reducers,Reducer是通过http请求来获取想要的数据的,Reducer的设定是MapReduce Framework通过一定的算法分配得到的,所以如果reducer得到的数据是自己不想要的,它会向MapReduce Framework反馈信息。同理,假设二,这里的workers是Mappers,Mapper在得到input后发现这个input不是自己能够处理的文件,向MapReduce Framework反馈信息。

问题四: How do we coordinate synchronization among the different workers? 首先我们给出Master的定义(PPT P.20):
1)Master node takes care of coordination:
1.1)Task status: (idle, in-progress, completed)
1.2)Idle tasks get scheduled as workers become available
1.3)When a map task completes, it sends the master the location and sizes of its R intermediate files, one for each reducer
1.4)Master pushes this info to reducers
2)Master pings workers periodically to detect failures
所以,是Master来协调各个worker之间的任务。
(注:这里的Master和MapReduce Framework的关系可能会让人有些困惑,官方文档概述:Map/Reduce Framework由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。)

问题五:How do we share partial results from one worker that is needed by another?
问题分析,这里share的partial result是Mapper的output,another是Reducer。
通过MapReduce的工作机制,我们可以知道,是Reduce通过HTTP请求来获得它所需要的特定的intermediate records。

问题六:How do we accomplish all of the above in the face of software errors and hardware faults?(PPT P.21)
1)Map worker failure
1.1)Its task is reassigned to another map worker
1.2)Reduce workers are notified when task is rescheduled on another worker
2)Reduce worker failure
2.1)Its task is reassigned to another reduce worker
2.2)Reduce task is restarted (usually require restarting mapper tasks as well)
3)Master failure
3.1)MapReduce task is aborted and client is notified

最后还有问题,在map和reduce中间的操作是什么?intermediate records怎么处理?Reducer的output怎么处理?(PPT P.22给出了答案)

在map和reduce中间的操作是什么?
Implicit between the map and reduce phases is a parallel “group by” operation on intermediate keys

  1. Intermediate data arrive at each reducer in order, sorted by the key
  2. No ordering is guaranteed across reducers

intermediate records怎么处理?
Intermediate keys (used in shuffle and sort) are transient:

  1. They are not stored on the distributed filesystem
  2. They are “spilled” to the local disk of each machine in the cluster

Reducer的output怎么处理?
Output keys from reducers are written back to HDFS

  1. The output may consist of r distinct files, where r is the number of reducers
  2. Such output may be the input to a subsequent MapReduce phase
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容