1. mapreduce程序介绍:
mapreduce是Google提出的一种计算框架。框架处理key、value对;数据处理过程中数据流向格式。
mapreduce程序开发包含三个部分:
(1)map:分布到很多节点上去执行;
(2)reduce:合并map的输出结果
(3)Job: job调度配置。
Input --> Map --> Reduce --> Output
2. wordcount思路:
假设对下边这个文件wc.txt,进行词频统计(分隔符是制表符):
SF LUNA LION
LUNA NEC LION
SF SF POM
将该文件上传到 HDFS 上:
$cd $HADOOP_HOME
$bin/hdfs dfs -put /home/natty.ma/bigdata/hadoop/files/wordcountfiles.txt /user/natty.ma/hdfsapi/
$bin/hdfs dfs -text /user/natty.ma/hdfsapi/wordcountfiles.txt
(1)在Input阶段,MapReduce内部的FileInputFormat会将文件处理成“<偏移量,本行内容>”这种key value的格式。
那么Input阶段会生成下边的结果:
<13,SF LUNA LION>
<26,LUNA NEC LION>
<37,SF SF POM>
(2)在Map阶段,输入是第(1)步的输出。将每行记录内容按照制表符拆分出单词,并把拆分出的单词作为key,value给1。
这样得到的key value对是:
<SF,1>
<LUNA,1>
<LION,1>
<LUNA,1>
...
(3)在Reduce阶段,合并每个key的多个值。
在MR内部,会将相同的Map阶段的key 的value值合并成list(Iterable)。得到下面的key value对:
<LION,(1,1)>
<LUNA,(1,1)>
<NEC,(1)>
<POM,(1)>
<SF,(1,1,1)>
上边的集合作为Reduce阶段的输入,在reduce阶段,将value值的list的值合并(sum)。Reduce的输出是:
<LION,2>
<LUNA,2>
<NEC,1>
<POM,1>
<SF,3>
3.程序实现:
下面是按照规范实现的示例程序:
上边程序实现了WordCount的逻辑。主要包含4个部分:创建 Mapper类、创建Reducer类、配置Job、提交Job
4.提炼模板:
MapReduce也是“八股文”变成模式,最终要的是分析MapReduce要实现的业务逻辑,从而确定Map和Reduce阶段的业务逻辑。
确定Map阶段的输入输出、Reduce阶段的输入和输出。确定Map和Reduce的参数(LongWritable、IntWritable、Text等)。
提炼了模板需要修改以下地方:
(1)Mapper和Reducer的类名(根据业务实际含义)
(2)Mapper类的输入、输出类型(输入的key、value;输出的key、value)。
(3)Reducer类的输入、输出类型(输入的key、value;输出的key、value)。
(4)Mapper类重写的map方法输入类型。map()方法的前2个参数是map的输入key、value。输出key、value写入context中。
(5)Reducer类重写的reduce方法的输入类型。reduce()方法的前2个参数是reduce的输入key、value。输出key、value写入context中。
(6)job设置mapper和reducer类 及其key、value类型。
下面是整理的模板类:
5. WebPV事例程序研究:
该事例使用一个样本文件,以TAB键分割数据。数据字典:
计数每一个省份代码(provinceId)有效的URL数量。首先,清洗每条记录,保留合法记录(例如,url不能为空,必须要有provinceId字段,provinceId字段需要是数字等条件)。清洗出所有合法的记录后,统计每个省份代码出现的次数,也就是该省份PV数量了。
下面WebPV的MapReduce程序实现了上述逻辑:
6. Shuffle 阶段:
Shuffle描述的是从Map输出到Reduce输入的中间过程。Shuffle包含Map端Shuffle和Reduce端Shuffle。这中间的过程是:
Input --> map()阶段 --> map阶段output --> map端Shuffle --> Reduce端Shuffle --> Reduce阶段Input
6.1 Map端Shuffle处理:
map()函数产生的map的Output不是直接写到磁盘,而是在内存中缓存并进行预排序。每个map任务有环形内存缓冲来保存map输出。这个内存空间默认是100MB(通过mapreduce.task.io.sort.mb来配置),但是如果这个缓冲内存占满了,就需要往磁盘上写了,这个过程叫做spill。但是一般情况下,达到一个阈值就会开始spill(溢出),默认是80%,通过mapreduce.map.sort.spill.percent 来配置。 写在运行map的节点的本地,目录有下边属性配置:mapreduce.cluster.local.dir。
备注:(当缓冲区快满时,会写到磁盘生成一个临时文件。当这个map task结束后,会对磁盘上这个map task产生的所有临时文件进行排序、合并,并生成最终的输出文件,等待reduce task来拉取数据。)
Partition、Sort、Combine、Merge、Compress:
在写入磁盘前,会将数据分为不同的分区,每个分区对应一个reducer(也就是说分配到同一个partition的数据会由同一个reducer来处理)。在同一个分区内,数据会按照key来排序,如果有combiner函数的话,也会执行在sort的输出结果上。Combiner函数会先对map结果进行合并(例如sum),这样使得临时文件中的数据大大减少,临时文件也就会变小,进而减少在集群中数据转移量,减少I/O,提高效率。由于每次缓冲器达到阈值时,都会生成一个新的临时文件,所以很可能有很多临时文件,这些文件最终会合并成一个单独的分区的、排序的output文件。
生成的spill文件数量如果大于3个,会再次调用combiner。如果只有1个或者2个spill文件,那么combiner就不再值得使用来降低map output文件的大小,所以combiner就不再执行了。
在map output写入磁盘前,做压缩可以优化性能。文件压缩后变小,写入磁盘更快,文件在拷贝到其他reducer的节点的时间也会缩短。默认是不做压缩的。可以修改属性 :mapreduce.map.output.compress
6.2 Reduce端Shuffle处理:
1.Copy阶段:
Map阶段的mapoutput放在执行map任务的机器的本地目录(local dir)上。如果mapoutput分了多个partition。假如,mapoutput输出文件中分区到partition1的数据,就需要拷贝到reducer1执行所在的机器。并且,reducer1需要的文件,会来自多个不同的map tasks(同时也就来自多个节点),这些map tasks完成时间不同,只要map task完成,reducer就会立即copy数据。
Reduce Task在copy data时,如果数据较小会直接copy在内存中,如果超过阈值,存在磁盘上。当拷贝在磁盘上累积时,后台程序会做merge和sort。(当然,这个是在同一个Reduce Task中做的处理,也肯定是同一个分区的数据)
2. Sort阶段:
map outputs拷贝完成后,进入sort阶段。该阶段合并来自不同 map tasks的 map outputs并保持排序。Sort阶段分多轮进行 ,假如有50个map outputs,并且 merge参数设为10(通过mapreduce.task.io.sort.factor 属性设定)。那么会有5个中间临时文件。Merge阶段不会把这些文件合成为一个文件传递给reduce阶段,而是把这5个中间临时文件直接传给reduce,reduce阶段再对这5个中间临时文件执行reduce函数。
3. Reduce阶段:
在Reduce阶段,Reduce函数在每个已排序的output文件上的每个key使用,reduce的output直接写到HDFS。该阶段进行group,相同key的value组合在一起 ,例如:<hadoop,(1,1,1)>。
6.3 mapreduce优化:
shuffle是mapreduce的核心,优化mapreduce程序也就是优化shuffle阶段,优化一些shuffle阶段使用的参数。在优化时,可以重点优化shuffle过程中的下面几个阶段:
partition
sort
combine
compress
group
7. 自定义数据类型
Writable接口(org.apache.hadoop.io.Writable):一个实现简单、高效、序列化协议的序列化对象。
在MapReduce中,value必须实现Writable接口。
在MapReduce中,key必须实现WritableComparable接口。 WritableComparable接口继承Writable和Comparable<T>。
下面的类自定义了数据类型 CustomDataWritable。包含5个属性:上下行数据包、上下行流量、电话号码个数计数。
8. 手机流量案例统计
手机流量案例(日志文件),数据字典:
如上边数据文件的数据字典,统计每一个手机号的 上行数据包总数、下行数据包总数、上行总流量数、下行总流量数。
那么其实思路非常清晰,我们筛选出符合条件的数据记录。按照手机号码(msisdn)来sum上边4个指标就可以了。非常简单! 在用mapreduce来实现时,我们需要创建一个“自定义数据类型” , 这个类型包含以下四个属性:
upPackNum;
downPackNum;
upPayLoad;
downPayLoad;
这样一简化之后,我们就可以按照key(手机号码),Reduce我们的自定义数据类型就可以了。
下面程序实现了按手机号统计流量的逻辑(最后一位是该手机号出现的次数):