Shuffle
MapReduce执行过程中,有一个很关键的过程--shuffle
- shuffle过程即为在map结束,对数据进行处理、排序、分区的一个过程
-
以下为shuffle过程的一个简单图形
粗略说明:
- map程序写出结果数据到缓冲区(大小默认100M)
- 容量达到缓冲区的80%时,缓冲区开始将部分数据溢写到磁盘,在溢写之前,mapreduce会对数据进行分区,并使数据在分区内有序排序,分区默认按照hash后对reduce数取模进行,但此时数据量较小,因此一次溢写只写一个文件,一个文件中可能会有很多分区的数据,此过程只保证了数据分区内有序及分区间有序
- 随着map task的不停运行,磁盘上的溢出文件越来越多,框架会把磁盘中多次溢写的文件不同文件同一partition的数据合并成一个文件,按照reduce个数分区,数据在分区内是有序的,此过程执行的是归并排序
- mapreduce的执行reduce任务的节点将分区好的数据通过网络将所有map任务属于自己reduce的分区数据远程copy到本地磁盘的工作目录
- reduce节点将本地磁盘的数据文件通过归并排序进一步合并文件,并将相同key的数据分为一组,使不同key之间有序
- shuffle阶段结束,执行reduce任务,最终生成的是一个key对应一组值得数据集,一个key对应的一组数据会调用一次reduce方法
Combiner优化
在map阶段还可以对数据进行预合并,主要应用在数据量特别大的场景,这样的场景由于数据量大,为了节省网络带宽及传输时间,在合适的时候可以使用combiner对数据进行预合并,combiner一般为reduce方法
- combiner聚合执行的地方:
- 一些的时候进行聚合
- 执行完毕,对分区后的数据文件进行聚合
- 使用combiner的好处
- 减少Map Task输出的数据量,由于临时结果写入到本地磁盘,所以可以减少磁盘IO
- 减少Reduce-Map网络传输的数据量,由于Reduce需要远程通过网络从Map拷贝数据,这样可以提高拷贝速度
- 应用场景
- 结果可以叠加,比如求和,但求平均的就不可以
- 设置方法
job.setCombinerClass(WordCountReducer.class)
(与reduce的类一样)
YARN内置调度器
数据本地性
如果任务运行在与它需要处理的数据在同一个节点,则称该任务具有数据本地性
- 本地性级别:同节点>同机架>跨机架
- 优点:避免通过网络远程读取数据,提高数据读取效率
推测执行
- 作业完成时间取决于最慢的任务完成时间
- 为了解决此问题,hadoop引入了推测执行机制:
- 发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度
- 为拖后腿的任务启动一个备份任务,同时运行
- 谁先执行完,就用谁的结果
- 有些场景,比如任务存在严重倾斜,某些特殊任务(比如向数据库中写入数据),就不适合推测执行
FIFO调度器
FIFO调度器:即队列调度器
- 将所有应用程序放入一个队列中,先进入队列排在前面的先获得资源
- 局限性
- 由于应用程序独占整个运行资源,资源利用率低,无法交叉利用资源
- 不够灵活,比如紧急任务无法插队,耗时长的作业拖慢耗时短的作业
多队列分开调度
- 所有资源按照比例划分到不同的队列
- 每个队列可以实现单独的调度策略
- 优点:
- 按照不同的资源使用情况将资源划分到不同的队列
- 能让更多的应用程序获得资源
- 使用灵活,资源利用率高
- 调度器:
- CapacityScheduler调度器
- FairScheduler调度器
CapacityScheduler调度器
- Yahoo开源的共享集群调度器
- 以队列方式组织作业
- 每个队列内部采用FIFO调度策略
- 每个队列分配一定比例的资源
- 可限制每个用户使用资源量
配置方法:
- 在yarn-site.xml配置文件中设置使用CapacityScheduler调度器:
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
- 在hadoop配置文件目录下创建capacity-sheduler.xml文件,添加各队列资源分配情况:
<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,data_bi</value>
</property>
<!--队列占用集群资源的百分比,所有队列占用集群资源之和不能超过100-->
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>60</value>
</property>
<!--资源上限,最多可使用的资源容量-->
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>80</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.data_bi.capacity</name>
<value>40</value>
</property>
</configuration>
配置完成无需重启YARN,使用管理命令刷新调度配置:bin/yarn rmadmin -refreshQueues
,但只能添加调度器,如果原来有调度器,想要修改,只能重启ResourceManager
FairScheduler调度器
- 由Facebook开源的共享集群调度器
- 以队列方式组织队列
- 基于最小资源和公平共享量进行调度:本队列资源可共享给其他队列
- 支持资源抢占(等待一段时间后,回收本队列共享给其他队列的资源)
- 内部队列中可使用的策略:
- FIFO
- fair(默认),基于内存使用量调度分配资源
- 任务延时调度
- 提高数据本地性
- 提高系统整体吞吐率
- 公平调度器的目的:
- 允许多用户共享集群资源
- 允许短时的临时作业与长时作业共享集群资源
- 根据比例来管理集群资源,确保集群资源的有效利用
配置方法
- 在yarn-site.xml文件中设置调度器类型,指定公平调度器配置文件路径
<!--yarn使用的调度器类型-->
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<!--公平调度器自定义配置文件路径,该文件每隔10秒就会被加载一次,可以在集群运行过程中改变队列的配置-->
<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>/usr/local/hadoop/etc/hadoop/fair-scheduler.xml</value>
</property>
<!--应用程序未指定队列名时,是否指定用户名作为应用程序所在的队列名,如果设置为false或未设置,所有未知队列的应用程序将会被提交到default队列中,默认为true-->
<property>
<name>yarn.scheduler.fair.user-as-default-queue</name>
<value>true</value>
</property>
<!--如果一个队列占用的资源量少于最小资源量限制,是否启用资源抢占,默认false,抢占机制可以使其他队列的作业容器终止,从而使占用的资源让出,将资源分配给占用资源量少于最小资源量限制的队列-->
<property>
<name>yarn.scheduler.fair.preemption</name>
<value>true</value>
</property>
- 创建fair-scheduler.xml配置文件:
<allocations>
<!--配置队列名-->
<queue name="data_bi">
<!--分配给该队列的最小资源,设置格式为"X mb, Y vcores",当调度策略属性schedulingPolicy的属性值是fair时,其cores值会被忽略,仅按照申请的内存大小来调度-->
<minResources>800 mb,1 vcores</minResources>
<!--分配给该队列的最大资源,设置格式为"X mb, Y vcores",当调度策略属性schedulingPolicy的属性值是fair时,其cores值会被忽略,仅按照申请的内存大小来调度-->
<maxResources>1000 mb,2 vcores</maxResources>
<!--最多同时运行的应用程序数目,通过限制该数目,可以防止超量Map Task同时运行时产生的中间输出结果撑爆磁盘-->
<maxRunningApps>2</maxRunningApps>
<!--标记了资源池的权重,当资源池中有任务等待,并且集群中有空闲资源的时候,每个资源池可以根据权重获得不同比例的空闲资源,默认为1-->
<weight>1.0</weight>
</queue>
</allocations>
hadoop2.7.4配置公平调度器时,访问resourcemanager的8080端口会出现问题,官方已有解决方案,具体为将编译好的hadoop-yarn-server-resourcemanager-2.7.4.jar包替换安装目录下share/hadoop/yarn目录下的jar包
调度器简单实践
- 修改yarn-site.xml,添加上述调度器配置
- 添加相应的调度器配置文件
- 重启resourcemanager
yarn-daemon.sh stop resourcemanager
(hadoop3.0中,两种调度器区别已经不大) - 访问相应的resourcemanager端口页面,查看调度器情况