MapReduce 分布式并行计算模型(二) 详细介绍

源码实现层面

Input

  • 父类:inputformat 抽象类

  • 方法:
    createRecordReader(): 创建数据读取器,负责构建一个读取器来读取数据
    返回:RecordReader 对象;实际读取数据的对象,

    isSplitable(): 判断block块是否可分割
    返回: true 表示数据会按照切分规则切成多少片
    false 表示数据会作为一个整体,不可分割

  • 分片规则:
    getminsplitSize(): 获取配置中每个分片最小值,默认是0M;
    getmaxsplitSize(): 获取配置中每个分片最大值,默认是256M;
    最终通过 Math.max(minSize,Math.min(maxSize,blockSize)) 来决定,即若无特殊配置,每个block块即是一个split,默认分片大小为128m;且每个block块若小于或等于splitSize的1.1倍即默认140M(128*1.1),就会当做一个split来做处理,不再进行拆分,只有大于1.1倍,才进行拆分;
    一个maptask最多处理的数据为分片大小的1.1倍;
    split = 文件总大小 / 分片大小 > 1.1

  • 分片方法:
    Math.max(minSize,Math.min(maxSize,blockSize)) 决定mapreduce不适合处理小文件,一个小文件就会占用一个分片,一个分片就会启用一个maptask,极其的浪费资源。如果处理数据过小,导致处理数据的时间甚至没有启动申请资源,关闭资源的长,就容易得不偿失。

  • 小文件解决方案:
    1.在hdfs上传时解决: 将多个小文件合并成大文件上传到hdfs,多个输入流,一个输出流
    2.在mapreduce任务之前做合并: 在input中实现多个小文件合并输出

  • 自定义输入:
    1.继承inputFormat类
    2.重写createRecordReader() 和 isSplitable()方法

  • 自定义读取器:
    1.继承recordReader类
    2.重写initialize(),nextKeyValue(),
    getcurrentKey(),getcurrentValue(),getProcess(),close()方法

  • 补充:SequenceFile: Hadoop中的一种二进制文件类型,文件以keyvalue形式存储在SequenceFile中;

Output

  • 父类:outputformat 抽象类

  • 方法:
    getRecordWriter(): 返回一个输出器对象

  • 自定义输出器:
    1.继承recordWriter()
    2.重写write()和close()方法

Shuffle的详细介绍

  • 概念: 洗牌
  • 内容:分为分区,排序,分组三个小阶段

Partiiton分区

  • 意义: 决定当前数据会被哪个reduce处理
    <延伸>
    1.一个reduce就是一个partition分区,默认只有一个partition分区,即只有一个reducetask;
    2.默认一个reduce会产生一个结果文件,几个partition分区就会产生几个结果文件
  • 规则: 根据key的hash值对numReduceTask取余
  • 实现类: 需要继承Partitioner,默认HashPartition

Sort排序

  • 意义:一次快速排序,两次归并排序,目的是为了减少算法复杂度
  • 规则:优先按照比较器comparator中compare的方法,若无比较器,则按照自定义类的compareto方法
    默认按照key进行字典升序(通过比较字节实现),比较器和比较方法两个必须实现一个,不然程序会报错
  • 规则值:大于1,小于-1,等于0
  • 实现类:需要继承Comparator,writableComparator

Group分组

  • 意义:聚合
  • 规则:按照key进行分组,相同的key的value放入同一个迭代器
  • 规则值: 同组0、不同组非0(不做处理)
  • 实现类:

Mapreduce优化

  • 参数优化

    • 资源
      mapreduce每个task默认1g内存、1core
      shuffle环形缓冲区默认100m,0.8溢写比
      yarn每个container默认最小1g内存,最大8g内存,最小1核,最大32核
    • 容错
      mapreduce每个task默认重试次数4次,
      默认task/job失败比例为0,
      默认无数据交互超时时间为10min
    • 稳定性
      mapreduce推测执行机制:对task执行进度靠后的任务新启动一个任务(启动备份任务):默认开启的
      公司一般都会关闭该机制,避免造成资源浪费
    • 线程与并行度
      可以提高maptask或reducetask的并行度,开启多线程等,
      提高系统的句柄数
  • 过程优化

    • Combiner

      • 本质思想:利用MapTask的并发的个数是远大于Reduce的个数,将聚合的逻辑由每个Map完成一部分,最后再由Reduce做最终的聚合,减轻Reduce的负载,也可避免数据倾斜的发生
      • 使用位置:map端的shuffle,在spill之前调用一次,在merge时候又调用一次
    • Compress

      • 意义: 减少磁盘以及网络的IO,提高数据传输和存储的效率,减少cpu负荷(压缩和解压缩的计算)

      • 类型: snappy、lz4、lzo

      • 使用位置:

        • 输入:MapReduce可以读取一个压缩的文件作为输入【不用】
          1.数据的文件类型由数据生成决定,
          2.MapReduce读压缩文件会将压缩的元数据读取进来
        • 传输:将map阶段的输出的结果经过压缩【主要】---减少网络传输io
          1.永久所有都压缩 mapred-site.xml
          2.单程序压缩 setConf(key,value)
          3.命令提交 指定参数
        • 输出:mapreduce可以输出压缩文件【少用】---减少磁盘存储

补充:

Recordreader

  • 概念: 记录读取器,实际读取数据的对象,
  • 方法:
initialize(), 初始化执行
nextKeyValue(), 获取下一个键值对
getcurrentKey(), 获取当前key
getcurrentValue(),获取当前value
getProcess(), 获取当前执行进程
close(),关闭资源

textInputFormat

  • 概念: 文本输入类
  • 方法:
createrecordReader(); 创建一个记录读取器
isSplitable(); 数据是否可切片
liststatus(); 列举状态
singlethreadlisttstatus();
getsplits(); 获取数据切片
computeSplitSize(); 计算分片大小
getblockindex(); 获取块索引

Jobhistory和日志聚集

  • 个人理解:
    每个mapreduce都有一个单独的程序运行,每次yarn重启之后,之前运行过的程序无法记录,
    使用mapreduce的jobhistory可以管理所有运行过的程序的运行日志,这样就可以记录每个运行过的程序
    使用yarn的日志聚集功能,将各个节点运行产生的日志统一存储到hdfs上,就可以看到每个运行程序的详细信息

MR中使用小数据方案

  • 概念:在mr中如何频繁高效的使用小数据?

  • 解析:
    方案1: 将小数据放在分布式缓存中,每个block块都和缓存中的数据进行join,addcachefile方法
    过程: 会启动两个task,一个task负责将小表数据转换成hashtable,写入本地文件,并加载到分布式缓 存中。第二个task会去启动一个maptask扫描大表,执行maptask任务,根据分布式缓存中的数据做关联
    方案2: 将小数据通过程序mr程序的setup方法初始化加载到内存中

Map join 和Reduce join

  • 实现:
    • reduce join
      将join关联字段作为key,在reduce方法时进行处理关联,适合大的数据join大的数据,比较时的效率非常低,
    • map join
      适合于小数据量join大数据量的场景,参考mr中使用小数据方案

分布式缓存

  • 分布式缓存原理:通过在启动maptask时的初始化方法setup中设置addcachefile加载缓存文件,将每个小文件中的数据放到datanode节点上的内存中

  • 存放方式:既可以直接放在内存中,也可以放在内存数据库redis中

MR数据倾斜

  • 概念:
    由于某节点上任务数据分配过于集中,造成该节点上task处理任务耗时较久,使得job进度卡在90%左右不动,造成任务长尾现象;

  • 表现:
    1.处理任务时个别task迟迟不能完成,
    2.结果数据在集群的各个节点上分配不均衡的问题

  • 原因:
    1.数据本身就是倾斜的,集中在某个规律上,比如 空值字段,null未做过滤等
    2.数据分配规则有问题导致数据倾斜:
    例如:mapreduce的分区规则; hql语句join,group by,count(distinct)

  • 解决:
    1.在执行任务或操作之前,做数据清洗,过滤倾斜字段(适用字段不太重要,且过滤字段不作为key)
    2.在执行任务或操作之时,将倾斜字段添加随机数,在reduce阶段时再去掉随机数
    3.在执行任务或操作之时,弃用hash分区规则,改为随机分区规则或者自定义分区规则
    4.taks处理数据量小的话,就不会容易产生数据倾斜,所有可以使用map端聚合,map join,combine

  • 补充:
    2和3的解决方案会将key相同的数据分发到两个reducetask中,如果非要key相同的数据在一个reduce中处理,那么就需要启动两个mapreduce任务进行处理

MR的小文件处理

  • 上传前:
    使用java本地合并,两个输入流一个输出流,将小文件合并成大文件,然后上传
  • 上传中:
    使用hdfs参数命令,例如appendtofile
  • 上传后:
    使用封装类CombineTextInputformat,可以设置切片大小,通过计算虚拟存储,然后重新切片

对象重用机制

  • 概念:
    mapreduce计算时,key是引用类型,key是一个公用的引用对象(即使用了对象重用机制),但是值会随着values的迭代而变化,取得与之对应的key;

  • 表现:
    如果不遍历values,直接输出,则key是取得分组中的第一个key输出
    如果遍历values并同时输出,则key是会随着遍历而变化
    如果遍历完毕,取得分组后的最后一个kv对的key

相关端口

  • jobhistory rpc端口10020
  • jobhistory http服务端口19888
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容