Hadoop 架构
Hadoop组成部分
- HDFS
管理者:namenode
工作者:DataNode
辅助管理者:secondaryNameNode - MapReduce
- YARN
管理者:ResourceManage
工作者:NodeManage
Hadoop 运行机制
MapReduce 详解
运行原理
- 在客户端执行
submit()
方法之前,会先去获取一下待读取文件的信息 - 将
job
提交给yarn
,这时候会带着三个信息过去(job.split(文件的切片信息),jar.job.xml) - yarn会根据文件的切片信息去计算将要启动的
maptask
的数量,然后去启动maptask
-
maptask
会调用InPutFormat()
方法区HDFS上面读取文件,InPutFormat()
方法会再去调用RecordRead()
方法,将数据以行首字母的偏移量为key,一行数据为value传给mapper()方法 -
mapper
方法做一些逻辑处理后,将数据传到分区方法中,对数据进行一个分区标注后,发送到环形缓冲区中 - 环形缓冲区默认的大小是100M,达到80%的阈值将会溢写
- 在溢写之前会做一个排序的动作,排序的规则是按照key进行字典序排序,排序的手段是快排
- 溢写会产生出大量的溢写文件,会再次调用
merge()
方法,使用归并排序,默认10个溢写文件合并成一个大文件, - 也可以对溢写文件做一次
localReduce
也就是combiner
的操作,但前提是combiner
的结果不能对最终的结果有影响 - 等待所有的maptask结束之后,会启动一定数量的
reduce task
-
reduce task
会发取拉取线程到map端拉取数据,拉取到的数据会先加载到内存中,内存不够会写到磁盘里,等待所有的数据拉取完毕,会将这些输出在进行一次归并排序 - 归并后的文件会再次进行一次分组的操作,然后将数据以组为单位发送到reduce()方法
- reduce方法做一些逻辑判断后,最终调用
OutputFormat()
方法,Outputformat()
会再去调用RecordWrite()
方法将数据以KV的形式写出到HDFS
上
环形缓冲区的作用以及数据结构
map task 数量受什么影响
- 输入文件大小,需注意 hdfs是块存储,如果hdfs迷人设置的是 128M块大小,一个文件是 200M,那么将会占用两个块,maptask就是 2个
- 文件数量 ,不同的文件,也会新启动一个maptask
简述 MapReduce 中的 shuffle
Map 端shuffle
文件split 之后,经过 mapper处理后,加上分区标记,存入环形缓冲区,达到阈值后会落入磁盘,由环形缓冲区写入磁盘时是根据 key 排序的,使用的是快速排序。
Map 和 Reduce 中间的 shuffle
此时Map阶段产生了一些小文件,此时需要将小文件合并起来,使用归并排序
Reduce端 shuffle
recuce 端会启动 reduce task 去拉取数据,注意这里是拉取数据,拉取过来的也是多个文件,需要做一个归并排序,并根据key做好分组
此处补充两种排序方法的代码
快速排序
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# @Time : 2020/2/29 上午10:20
# @Author : lixinsong
# @File : quick_sort.py
# @desc :
def quick_sort(alist, start, end):
"""快速排序"""
if start >= end:
return
mid = alist[start]
low = start
high = end
while low < high:
while low < high and alist[high] >= mid:
high -= 1
alist[low] = alist[high]
while low < high and alist[low] < mid:
low += 1
alist[high] = alist[low]
alist[low] = mid
quick_sort(alist, start, low - 1)
quick_sort(alist, low + 1, end)
if __name__ == '__main__':
a = [2, 1, 4, 8, 2]
quick_sort(a, 0, len(a)-1)
print(a)
归并排序
def merge(a, b):
c = []
h = j = 0
while j < len(a) and h < len(b):
if a[j] < b[h]:
c.append(a[j])
j += 1
else:
c.append(b[h])
h += 1
if j == len(a):
for i in b[h:]:
c.append(i)
else:
for i in a[j:]:
c.append(i)
return c
def merge_sort(lists):
if len(lists) <= 1:
return lists
middle = len(lists)/2
left = merge_sort(lists[:middle])
right = merge_sort(lists[middle:])
return merge(left, right)
if __name__ == '__main__':
a = [4, 7, 8, 3, 5, 9]
print merge_sort(a)