Hadoop开发--MapReduce分析(五)

一、Mapreduce简介

  Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
  Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

二、MapReduce并行处理的基本过程

Hadoop2.0之前和Hadoop2.0之后的区别:
2.0之前只有MapReduce的运行框架,那么它里面有只有两种节点,一个是master,一个是worker。
master既做资源调度又做程序调度,worker只是用来参与计算的。
在2.0之后加入了YARN集群,Yarn集群的主节点承担了资源调度,Yarn集群的从节点中会选出一个节点(这个由redourcemanager决定)。
用作类似于2.0之前的master的工作,来进行应用程序的调度。
资源调度: 处理程序所需要的cpu、内存资源,以及存储数据所需要的硬盘资源都是resourcemanager去分配的。

MapReduce并行处理的基本过程

  一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。
  图中执行的顺序都用数字标记了。
  1)MapReduce库先把user program的输入文件划分为M份(M为用户定义),如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。
  2)user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或者Reduce作业),worker的数量也是可以由用户指定的。
  3)被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。
  4)缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker
  5)master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业,所以排序是必须的。
  6)reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。
  7)当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。
  8)所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFFS)的。而且我们要注意Map/Reduce作业map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。

MapReduce并行处理的基本过程

三、MapRrduce输入与输出问题

  Map/Reduce框架运转在<key, value>键值对上,也就是说,框架把作业的输入看为是一组<key, value>键值对,同样也产出一组 <key, value>键值对做为作业的输出,这两组键值对的类型可能不同。
  框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现Writable接口。另外,为了方便框架执行排序操作,key类必须实现 WritableComparable接口。
  注意:不管是哪里的序列化,最主要的作用就是持久化存储或者是用于网络传输
  一个Map/Reduce作业的输入和输出类型如下所示:
  (input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。
  其实在前面讲解Hadoop IO的时候已经知道了解了Writale接口:  
  Writable接口是一个实现了序列化协议的序列化对象。
  在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。

四、MapReduce实际处理流程

  mapreduce 其实是分治算法的一种现,所谓分治算法就是“就是分而治之 ,将大的问题分解为相同类型的子问题(最好具有相同的规模),对子问题进行求解,然后合并成大问题的解。
  mapreduce就是分治法的一种,将输入进行分片,然后交给不同的task进行处理,然后合并成最终的解。
  mapreduce实际的处理过程可以理解为:Input->Map->Sort->Combine->Partition->Reduce->Output
  1)Input阶段
    数据以一定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以设置,也可以自定义分片函数。
  2)map阶段
    对输入的(key,value)进行处理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass进行设置。
  3)Sort阶段
    对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,然后定义排序规则。
  4)Combine阶段
    这个阶段对于Sort之后又相同key的结果进行合并,使用Job.setCombinerClass进行设置,也可以自定义Combine Class类。
  5)Partition阶段
    将Mapper的中间结果按照key的范围划分为R份(Reduce作业的个数),默认使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定义划分的函数。
    使用Job.setPartitionClass设置。
  6)Reduce阶段
    对于Mapper阶段的结果进行进一步处理,Job.setReducerClass进行设置自定义的Reduce类。
  7)Output阶段
    Reducer输出数据的格式。

五、一个job的运行流程

  一个mapreduce作业的执行流程是:作业提交->作业初始化->任务分配->任务执行->更新任务执行进度和状态->作业完成。


一个job的运行流程

一个完整的mapreduce作业流程,包括4个独立的实体:
  客户端:client,编写mapreduce程序,配置作业,提交作业。
  JobTracker:协调这个作业的运行,分配作业,初始化作业,与TaskTracker进行通信。
  TaskTracker:负责运行作业,保持与JobTracker进行通信。
  HDFS:分布式文件系统,保持作业的数据和结果。

  1. 提交作业
      JobClient使用runjob方法创建一个JobClient实例,然后调用submitJob()方法进行作业的提交,提交作业的具体过程如下:
        1)通过调用JobTracker对象的getNewJobId()方法从JobTracker处获得一个作业ID。
        2)检查作业的相关路径。如果输出路径存在,作业将不会被提交(保护上一个作业运行结果)。
        3)计算作业的输入分片,如果无法计算,例如输入路径不存在,作业将不被提交,错误返回给mapreduce程序。
        4)将运行作业所需资源(作业jar文件,配置文件和计算得到的分片)复制到HDFS上。
        5)告知JobTracker作业准备执行(使用JobTracker对象的submitJob()方法来真正提交作业)。
  2. 作业初始化
      当JobTracker收到Job提交的请求后,将Job保存在一个内部队列,并让Job Scheduler(作业调度器)处理并初始化。初始化涉及到创建一个封装了其tasks的job对象,并保持对task的状态和进度的跟踪(step 5)。当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(step 6),然后再为每个split创建map task。
  3. 任务的分配
      TaskTracker和JobTracker之间的通信和任务分配是通过心跳机制完成的。TaskTracker作为一个单独的JVM,它执行一个简单的循环,主要实现每隔一段时间向JobTracker
      发送心跳,告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。如果有待分配的任务,它就会为TaskTracker分配一个任务。
  4. 任务的执行
      TTaskTracker申请到新的任务之后,就要在本地运行了。首先,是将任务本地化(包括运行任务所需的数据、配置信息、代码等),即从HDFS复制到本地。调用localizeJob()完成的。
      T对于使用Streaming和Pipes创建Map或者Reduce程序的任务,Java会把key/value传递给外部进程,然后通过用户自定义的Map或者Reduce进行处理,然后把key/value传回到java中。
      T其中就好像是TaskTracker的子进程在处理Map和Reduce代码一样。
  5. 更新任务的执行进度和状态
      进度和状态是通过heartbeat(心跳机制)来更新和维护的。对于Map Task,进度就是已处理数据和所有输入数据的比例。对于Reduce Task,情况就有点复杂,包括3部分,拷贝中间结果文件、排序、reduce调用,每部分占1/3。
  6. 任务完成
      当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为successful,同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。
      最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)

六、MapTask并行度决定机制

maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。
那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

  1. mapTask并行度的决定机制
    一个job的map阶段并行度由客户端在提交job时决定而客户端对map阶段并行度的规划的基本逻辑为:
    将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。
    默认128M一块。

2.FileInputFormat切片机制
1)FileInputFormat切片机制切片定义在InputFormat类中的getSplit()方法
  2)FileInputFormat中默认的切片机制:
    简单地按照文件的内容长度进行切片
    切片大小,默认等于block大小
    切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
  比如待处理数据有两个文件:
    file1.txt 320M
    file2.txt 10M
  经过FileInputFormat的切片机制运算后,形成的切片信息如下:
    file1.txt.split1-- 0~128
    file1.txt.split2-- 128~256
    file1.txt.split3-- 256~320
    file2.txt.split1-- 0~10M
  3)FileInputFormat中切片的大小的参数配置
    通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定
    minsize:默认值:1
    配置参数: mapreduce.input.fileinputformat.split.minsize
    maxsize:默认值:Long.MAXValue
    配置参数:mapreduce.input.fileinputformat.split.maxsize
    blocksize
  因此,默认情况下,切片大小=blocksize
  maxsize(切片最大值):
  参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值
  minsize (切片最小值):
  参数调的比blockSize大,则可以让切片变得比blocksize还大
  选择并发数的影响因素:
    运算节点的硬件配置
    运算任务的类型:CPU密集型还是IO密集型
    运算任务的数据量

  1. ReduceTask并行度的决定
    reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
  //默认值是1,手动设置为4 
  job.setNumReduceTasks(4);

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
  注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask
  尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

  1. mapreduce的shuffle机制
    mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。
      shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存)。
      具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序。
    分区partition(确定哪个数据进入哪个reduce)
    Sort根据key排序

详细流程  
    1、 maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
    2、 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
    3、 多个溢出文件会被合并成大的溢出文件
    4、 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
    5、 reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
    6、 reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
    7、 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
  Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
  缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100MCombiner进行局部value的合并。

七、MapReduce与YARN

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序。

YARN中的重要概念
  1) yarn并不清楚用户提交的程序的运行机制
  2) yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)
  3) yarn中的主管角色叫ResourceManager
  4) yarn中具体提供运算资源的角色叫NodeManager
  5) 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez ……
  6) 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可
  7) Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • MapReduce框架结构## MapReduce是一个用于大规模数据处理的分布式计算模型MapReduce模型主...
    Bloo_m阅读 3,730评论 0 4
  • 一、MapReduce应用场景 Hadoop的Mapreduce是一个使用简单的框架,基于它写出来的程序可以运行在...
    老实李阅读 1,413评论 0 9
  • MapReduce工作流程 流程图如下 解释上面的流程是整个mapreduce最全工作流程,但是shuffle过程...
    ZFH__ZJ阅读 556评论 0 3
  • 数据切片和MapTask并行度决定机制 1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定 2)每...
    bullion阅读 788评论 0 1
  • 盼望着长大成熟的我,在某一刻想要回到二十岁,就好像我还有幻想的机会一样 看芝加哥打字机刘亚仁有感
    singler碎碎念阅读 145评论 0 0