源码实现层面
Input
父类:inputformat 抽象类
-
方法:
createRecordReader(): 创建数据读取器,负责构建一个读取器来读取数据
返回:RecordReader 对象;实际读取数据的对象,isSplitable(): 判断block块是否可分割
返回: true 表示数据会按照切分规则切成多少片
false 表示数据会作为一个整体,不可分割 分片规则:
getminsplitSize(): 获取配置中每个分片最小值,默认是0M;
getmaxsplitSize(): 获取配置中每个分片最大值,默认是256M;
最终通过 Math.max(minSize,Math.min(maxSize,blockSize)) 来决定,即若无特殊配置,每个block块即是一个split,默认分片大小为128m;且每个block块若小于或等于splitSize的1.1倍即默认140M(128*1.1),就会当做一个split来做处理,不再进行拆分,只有大于1.1倍,才进行拆分;
一个maptask最多处理的数据为分片大小的1.1倍;
split = 文件总大小 / 分片大小 > 1.1分片方法:
Math.max(minSize,Math.min(maxSize,blockSize)) 决定mapreduce不适合处理小文件,一个小文件就会占用一个分片,一个分片就会启用一个maptask,极其的浪费资源。如果处理数据过小,导致处理数据的时间甚至没有启动申请资源,关闭资源的长,就容易得不偿失。小文件解决方案:
1.在hdfs上传时解决: 将多个小文件合并成大文件上传到hdfs,多个输入流,一个输出流
2.在mapreduce任务之前做合并: 在input中实现多个小文件合并输出自定义输入:
1.继承inputFormat类
2.重写createRecordReader() 和 isSplitable()方法自定义读取器:
1.继承recordReader类
2.重写initialize(),nextKeyValue(),
getcurrentKey(),getcurrentValue(),getProcess(),close()方法
- 补充:SequenceFile: Hadoop中的一种二进制文件类型,文件以keyvalue形式存储在SequenceFile中;
Output
父类:outputformat 抽象类
方法:
getRecordWriter(): 返回一个输出器对象自定义输出器:
1.继承recordWriter()
2.重写write()和close()方法
Shuffle的详细介绍
- 概念: 洗牌
- 内容:分为分区,排序,分组三个小阶段
Partiiton分区
- 意义: 决定当前数据会被哪个reduce处理
<延伸>
1.一个reduce就是一个partition分区,默认只有一个partition分区,即只有一个reducetask;
2.默认一个reduce会产生一个结果文件,几个partition分区就会产生几个结果文件 - 规则: 根据key的hash值对numReduceTask取余
- 实现类: 需要继承Partitioner,默认HashPartition
Sort排序
- 意义:一次快速排序,两次归并排序,目的是为了减少算法复杂度
- 规则:优先按照比较器comparator中compare的方法,若无比较器,则按照自定义类的compareto方法
默认按照key进行字典升序(通过比较字节实现),比较器和比较方法两个必须实现一个,不然程序会报错 - 规则值:大于1,小于-1,等于0
- 实现类:需要继承Comparator,writableComparator
Group分组
- 意义:聚合
- 规则:按照key进行分组,相同的key的value放入同一个迭代器
- 规则值: 同组0、不同组非0(不做处理)
- 实现类:
Mapreduce优化
-
参数优化
- 资源
mapreduce每个task默认1g内存、1core
shuffle环形缓冲区默认100m,0.8溢写比
yarn每个container默认最小1g内存,最大8g内存,最小1核,最大32核 - 容错
mapreduce每个task默认重试次数4次,
默认task/job失败比例为0,
默认无数据交互超时时间为10min - 稳定性
mapreduce推测执行机制:对task执行进度靠后的任务新启动一个任务(启动备份任务):默认开启的
公司一般都会关闭该机制,避免造成资源浪费 - 线程与并行度
可以提高maptask或reducetask的并行度,开启多线程等,
提高系统的句柄数
- 资源
-
过程优化
-
Combiner
- 本质思想:利用MapTask的并发的个数是远大于Reduce的个数,将聚合的逻辑由每个Map完成一部分,最后再由Reduce做最终的聚合,减轻Reduce的负载,也可避免数据倾斜的发生
- 使用位置:map端的shuffle,在spill之前调用一次,在merge时候又调用一次
-
Compress
意义: 减少磁盘以及网络的IO,提高数据传输和存储的效率,减少cpu负荷(压缩和解压缩的计算)
类型: snappy、lz4、lzo
-
使用位置:
- 输入:MapReduce可以读取一个压缩的文件作为输入【不用】
1.数据的文件类型由数据生成决定,
2.MapReduce读压缩文件会将压缩的元数据读取进来 - 传输:将map阶段的输出的结果经过压缩【主要】---减少网络传输io
1.永久所有都压缩 mapred-site.xml
2.单程序压缩 setConf(key,value)
3.命令提交 指定参数 - 输出:mapreduce可以输出压缩文件【少用】---减少磁盘存储
- 输入:MapReduce可以读取一个压缩的文件作为输入【不用】
-
补充:
Recordreader
- 概念: 记录读取器,实际读取数据的对象,
- 方法:
initialize(), 初始化执行
nextKeyValue(), 获取下一个键值对
getcurrentKey(), 获取当前key
getcurrentValue(),获取当前value
getProcess(), 获取当前执行进程
close(),关闭资源
textInputFormat
- 概念: 文本输入类
- 方法:
createrecordReader(); 创建一个记录读取器
isSplitable(); 数据是否可切片
liststatus(); 列举状态
singlethreadlisttstatus();
getsplits(); 获取数据切片
computeSplitSize(); 计算分片大小
getblockindex(); 获取块索引
Jobhistory和日志聚集
- 个人理解:
每个mapreduce都有一个单独的程序运行,每次yarn重启之后,之前运行过的程序无法记录,
使用mapreduce的jobhistory可以管理所有运行过的程序的运行日志,这样就可以记录每个运行过的程序
使用yarn的日志聚集功能,将各个节点运行产生的日志统一存储到hdfs上,就可以看到每个运行程序的详细信息
MR中使用小数据方案
概念:在mr中如何频繁高效的使用小数据?
解析:
方案1: 将小数据放在分布式缓存中,每个block块都和缓存中的数据进行join,addcachefile方法
过程: 会启动两个task,一个task负责将小表数据转换成hashtable,写入本地文件,并加载到分布式缓 存中。第二个task会去启动一个maptask扫描大表,执行maptask任务,根据分布式缓存中的数据做关联
方案2: 将小数据通过程序mr程序的setup方法初始化加载到内存中
Map join 和Reduce join
- 实现:
- reduce join
将join关联字段作为key,在reduce方法时进行处理关联,适合大的数据join大的数据,比较时的效率非常低, - map join
适合于小数据量join大数据量的场景,参考mr中使用小数据方案
- reduce join
分布式缓存
分布式缓存原理:通过在启动maptask时的初始化方法setup中设置addcachefile加载缓存文件,将每个小文件中的数据放到datanode节点上的内存中
存放方式:既可以直接放在内存中,也可以放在内存数据库redis中
MR数据倾斜
概念:
由于某节点上任务数据分配过于集中,造成该节点上task处理任务耗时较久,使得job进度卡在90%左右不动,造成任务长尾现象;表现:
1.处理任务时个别task迟迟不能完成,
2.结果数据在集群的各个节点上分配不均衡的问题原因:
1.数据本身就是倾斜的,集中在某个规律上,比如 空值字段,null未做过滤等
2.数据分配规则有问题导致数据倾斜:
例如:mapreduce的分区规则; hql语句join,group by,count(distinct)解决:
1.在执行任务或操作之前,做数据清洗,过滤倾斜字段(适用字段不太重要,且过滤字段不作为key)
2.在执行任务或操作之时,将倾斜字段添加随机数,在reduce阶段时再去掉随机数
3.在执行任务或操作之时,弃用hash分区规则,改为随机分区规则或者自定义分区规则
4.taks处理数据量小的话,就不会容易产生数据倾斜,所有可以使用map端聚合,map join,combine补充:
2和3的解决方案会将key相同的数据分发到两个reducetask中,如果非要key相同的数据在一个reduce中处理,那么就需要启动两个mapreduce任务进行处理
MR的小文件处理
- 上传前:
使用java本地合并,两个输入流一个输出流,将小文件合并成大文件,然后上传 - 上传中:
使用hdfs参数命令,例如appendtofile - 上传后:
使用封装类CombineTextInputformat,可以设置切片大小,通过计算虚拟存储,然后重新切片
对象重用机制
概念:
mapreduce计算时,key是引用类型,key是一个公用的引用对象(即使用了对象重用机制),但是值会随着values的迭代而变化,取得与之对应的key;表现:
如果不遍历values,直接输出,则key是取得分组中的第一个key输出
如果遍历values并同时输出,则key是会随着遍历而变化
如果遍历完毕,取得分组后的最后一个kv对的key
相关端口
- jobhistory rpc端口10020
- jobhistory http服务端口19888