Hadoop-MapReduce1.x-框架
1.主要成员
1)Client
Ø用户编写的MapReduce程序通过Client提交到JobTracker端;同时,用户可通过Client提供的一些接口查看作业的运行状态。在Hadoop内部用“作业”(Job)表示MapReduce程序。一个MapReduce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)。
2)JobTracker
ØJobTracke负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。
3)TaskTracker
ØTaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。
4)Task
ØTask 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split 的多少决定了Map Task 的数目,因为每个split 只会交给一个Map Task 处理。
2.主要过程
- run job
在客户端,用户编写Java程序,编写完成后,打包为jar包,然后提交。JobClient的run job()方法是用于新建JobClient实例,并调用其submitjob()方法的快捷方式 也可以用 job.waitcomplication()。提交作业后,run job()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度打印到控制台,作业完成后,如果成功,就显示作业计数器,如果失败,导致错误的原因打印到控制台。
- get new job id
-->向jobTracker请求一个job id (JobTracker.getNewJobId());
-->检查作业的输出说明,如果输出目录已经存在或者没有指定输出目录,则抛出异常给客户端,
-->检查作业的输入切片,如果输入分片不能计算如没有指定输入目录,如抛出异常给客户端。
- copy job resources
将运行作业所需要的资源包括
-->打包好的jar包(运行程序)
-->配置文件(xml)
-->计算所得的输入分片
复制到一job id 命名的目录下jobtracker的文件系统中,作业jar的副本较多(由mapred.submin.replication)控制默认为10
因此在运行作业的时候,集群中有许多tasktracker访问
- summit job
通知JobTracker 作业准备执行。(通过调用jobtracker的submitjob()方法实现)
- initlialize job (初始化)
当JobTracker接收到对其submitjob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度器(job scheduler)进行调度
并对其进行初始化。初始化包括创建一个表示正在运行的作业对象--封装任务和记录信息,以便跟踪任务的状态和进程。
- retrieve input splits (检索 恢复)
--> 为了创建任务运行列表,作业调度器首先从共享文件系统中获取JobClient以计算好的输入分片信息,然后为每个分片分配一个 map 任 务
--> 创建的reduce任务数量有job的mapred.reduce.task属性决定(setNumReduceTask()设置),scheduler创建相应数量的r educe任 务,任务在此时被分配ID
--> 除了map任务和reduce任务,还有setupJob,cleanupJob需要建立,有tasktracker在所有map开始前和所有reduce结束后分别 执行,这两个方法在OutPutCommitter中(默认是FileOutputCommiter),setupjob()创建输出目录和任务的临时工作目录, cleanupjob删除临时工作目录
- heartbeat(returns task)
-->TaskTracker运行一个简单的循环来定期发送心跳给JobTasker."心跳"告知JobTracker,tasktacker是否还存活,同时也充当着两 者 之间的消息通道,作为心跳的一部分,tasktracker 还会指明他自己是否准备好运行下一次任务,如果是,jobtracker会为他分 配一 个新的任务,并通过心跳的返回值与tasktracker进行通信
-->每个tasktracker会有固定的map和reduce任务槽,数量由tasktracker核的数量和内存的大小来决定,jobtracker会先将 tasktracker的所有map槽填满,然后再填reduce槽
-->jobtracker分配map任务时,会选取与输入分片最近的tasktracker,分配reduce任务用不着考虑数据本地化。
- retrieve job resources
-->通过从共享文件系统把作业的JAR文件复制到tasktracker所在的文件系统,从而实现作业的jar文件本地化,同时 tasktracker将 应用程序所需要的全部文件从分布式缓存复制到本地磁盘。
-->tasktracker为任务新建一个本地工作目录,并把jar文件中的内容解压到这个文件夹下
- launch(发起)
Tasktracker新建一个TaskRunner实例来运行该任务
- run
TaskRunner启动一个新的JVM来运行每个任务,以便客户的map/reduce不会影响到tasktracker