4.MapReduce模板开发

1. mapreduce程序介绍:

mapreduce是Google提出的一种计算框架。框架处理key、value对;数据处理过程中数据流向格式。

mapreduce程序开发包含三个部分:

(1)map:分布到很多节点上去执行;

(2)reduce:合并map的输出结果

(3)Job: job调度配置。

Input -->  Map  -->  Reduce --> Output

2. wordcount思路:

假设对下边这个文件wc.txt,进行词频统计(分隔符是制表符):

SF LUNA LION

LUNA NEC LION

SF SF POM

将该文件上传到 HDFS 上:

$cd $HADOOP_HOME

$bin/hdfs dfs -put /home/natty.ma/bigdata/hadoop/files/wordcountfiles.txt /user/natty.ma/hdfsapi/

$bin/hdfs dfs -text /user/natty.ma/hdfsapi/wordcountfiles.txt

(1)在Input阶段,MapReduce内部的FileInputFormat会将文件处理成“<偏移量,本行内容>”这种key value的格式。

那么Input阶段会生成下边的结果:

<13,SF LUNA LION> 

<26,LUNA NEC LION>

<37,SF SF POM>

(2)在Map阶段,输入是第(1)步的输出。将每行记录内容按照制表符拆分出单词,并把拆分出的单词作为key,value给1。

这样得到的key value对是:

<SF,1>

<LUNA,1>

<LION,1>

<LUNA,1>

...

(3)在Reduce阶段,合并每个key的多个值。

在MR内部,会将相同的Map阶段的key 的value值合并成list(Iterable)。得到下面的key value对:

<LION,(1,1)>

<LUNA,(1,1)>

<NEC,(1)>

<POM,(1)>

<SF,(1,1,1)>

上边的集合作为Reduce阶段的输入,在reduce阶段,将value值的list的值合并(sum)。Reduce的输出是:

<LION,2>

<LUNA,2>

<NEC,1>

<POM,1>

<SF,3>

3.程序实现:

下面是按照规范实现的示例程序:

WordCountMapReduce.java

上边程序实现了WordCount的逻辑。主要包含4个部分:创建 Mapper类、创建Reducer类、配置Job、提交Job

4.提炼模板:

MapReduce也是“八股文”变成模式,最终要的是分析MapReduce要实现的业务逻辑,从而确定Map和Reduce阶段的业务逻辑。

确定Map阶段的输入输出、Reduce阶段的输入和输出。确定Map和Reduce的参数(LongWritable、IntWritable、Text等)。

提炼了模板需要修改以下地方:

(1)Mapper和Reducer的类名(根据业务实际含义)

(2)Mapper类的输入、输出类型(输入的key、value;输出的key、value)。

(3)Reducer类的输入、输出类型(输入的key、value;输出的key、value)。

(4)Mapper类重写的map方法输入类型。map()方法的前2个参数是map的输入key、value。输出key、value写入context中。

(5)Reducer类重写的reduce方法的输入类型。reduce()方法的前2个参数是reduce的输入key、value。输出key、value写入context中。

(6)job设置mapper和reducer类 及其key、value类型。

下面是整理的模板类:

ModuleWordCountMapReduce

5. WebPV事例程序研究:

该事例使用一个样本文件,以TAB键分割数据。数据字典:


WebPV数据文件数据字典

计数每一个省份代码(provinceId)有效的URL数量。首先,清洗每条记录,保留合法记录(例如,url不能为空,必须要有provinceId字段,provinceId字段需要是数字等条件)。清洗出所有合法的记录后,统计每个省份代码出现的次数,也就是该省份PV数量了。

下面WebPV的MapReduce程序实现了上述逻辑:

WebPVMapReduce.java

6. Shuffle 阶段:

Shuffle描述的是从Map输出到Reduce输入的中间过程。Shuffle包含Map端ShuffleReduce端Shuffle。这中间的过程是:

Input  -->  map()阶段 -->  map阶段output --> map端Shuffle --> Reduce端Shuffle --> Reduce阶段Input 


MapReduce中的shuffle和sort

6.1 Map端Shuffle处理:

map()函数产生的map的Output不是直接写到磁盘,而是在内存中缓存并进行预排序。每个map任务有环形内存缓冲来保存map输出。这个内存空间默认是100MB(通过mapreduce.task.io.sort.mb来配置),但是如果这个缓冲内存占满了,就需要往磁盘上写了,这个过程叫做spill。但是一般情况下,达到一个阈值就会开始spill(溢出),默认是80%,通过mapreduce.map.sort.spill.percent 来配置。 写在运行map的节点的本地,目录有下边属性配置:mapreduce.cluster.local.dir。

备注:(当缓冲区快满时,会写到磁盘生成一个临时文件。当这个map task结束后,会对磁盘上这个map task产生的所有临时文件进行排序、合并,并生成最终的输出文件,等待reduce task来拉取数据。)

Partition、Sort、Combine、Merge、Compress:

在写入磁盘前,会将数据分为不同的分区,每个分区对应一个reducer(也就是说分配到同一个partition的数据会由同一个reducer来处理)。在同一个分区内,数据会按照key来排序,如果有combiner函数的话,也会执行在sort的输出结果上。Combiner函数会先对map结果进行合并(例如sum),这样使得临时文件中的数据大大减少,临时文件也就会变小,进而减少在集群中数据转移量,减少I/O,提高效率。由于每次缓冲器达到阈值时,都会生成一个新的临时文件,所以很可能有很多临时文件,这些文件最终会合并成一个单独的分区的、排序的output文件。

生成的spill文件数量如果大于3个,会再次调用combiner。如果只有1个或者2个spill文件,那么combiner就不再值得使用来降低map output文件的大小,所以combiner就不再执行了。

在map output写入磁盘前,做压缩可以优化性能。文件压缩后变小,写入磁盘更快,文件在拷贝到其他reducer的节点的时间也会缩短。默认是不做压缩的。可以修改属性 :mapreduce.map.output.compress

6.2 Reduce端Shuffle处理:

1.Copy阶段:

Map阶段的mapoutput放在执行map任务的机器的本地目录(local dir)上。如果mapoutput分了多个partition。假如,mapoutput输出文件中分区到partition1的数据,就需要拷贝到reducer1执行所在的机器。并且,reducer1需要的文件,会来自多个不同的map tasks(同时也就来自多个节点),这些map tasks完成时间不同,只要map task完成,reducer就会立即copy数据。

Reduce Task在copy data时,如果数据较小会直接copy在内存中,如果超过阈值,存在磁盘上。当拷贝在磁盘上累积时,后台程序会做merge和sort。(当然,这个是在同一个Reduce Task中做的处理,也肯定是同一个分区的数据)

2. Sort阶段:

map outputs拷贝完成后,进入sort阶段。该阶段合并来自不同 map tasks的 map outputs并保持排序。Sort阶段分多轮进行 ,假如有50个map outputs,并且 merge参数设为10(通过mapreduce.task.io.sort.factor 属性设定)。那么会有5个中间临时文件。Merge阶段不会把这些文件合成为一个文件传递给reduce阶段,而是把这5个中间临时文件直接传给reduce,reduce阶段再对这5个中间临时文件执行reduce函数。

3. Reduce阶段:

在Reduce阶段,Reduce函数在每个已排序的output文件上的每个key使用,reduce的output直接写到HDFS。该阶段进行group,相同key的value组合在一起 ,例如:<hadoop,(1,1,1)>。

6.3 mapreduce优化:

shuffle是mapreduce的核心,优化mapreduce程序也就是优化shuffle阶段,优化一些shuffle阶段使用的参数。在优化时,可以重点优化shuffle过程中的下面几个阶段:

partition

sort

combine

compress

group

7. 自定义数据类型

Writable接口(org.apache.hadoop.io.Writable):一个实现简单、高效、序列化协议的序列化对象。

在MapReduce中,value必须实现Writable接口。

在MapReduce中,key必须实现WritableComparable接口。 WritableComparable接口继承Writable和Comparable<T>。

下面的类自定义了数据类型 CustomDataWritable。包含5个属性:上下行数据包、上下行流量、电话号码个数计数。

CustomDataWritable.java

8. 手机流量案例统计

手机流量案例(日志文件),数据字典:

日志文件数据字典

如上边数据文件的数据字典,统计每一个手机号的 上行数据包总数、下行数据包总数、上行总流量数、下行总流量数。 

那么其实思路非常清晰,我们筛选出符合条件的数据记录。按照手机号码(msisdn)来sum上边4个指标就可以了。非常简单! 在用mapreduce来实现时,我们需要创建一个“自定义数据类型” , 这个类型包含以下四个属性:

upPackNum;

downPackNum;

upPayLoad;

downPayLoad;

这样一简化之后,我们就可以按照key(手机号码),Reduce我们的自定义数据类型就可以了。

下面程序实现了按手机号统计流量的逻辑(最后一位是该手机号出现的次数):

PhoneStatisticsMapReduce.java

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,817评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,329评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,354评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,498评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,600评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,829评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,979评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,722评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,189评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,519评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,654评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,940评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,762评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,993评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,382评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,543评论 2 349

推荐阅读更多精彩内容