概念:
hadoop是一个开源的、可靠的、可扩展的分布式计算框架。
hadoop可以使用简单的编程模型跨集群对大数据集进行分布式的处理,hadoop被设计出来就是为了达到改变只能使单台服务提供计算和存储的状况,让其能扩展到使用成百上千台服务器都能提供计算和存储的目的,hadoop能够构建数据仓库,能对PB级的数据进行存储、处理、分析、统计
hadoop的组成:
Hadoop Common:通用的工具类,提供对hadoop各个框架的支持
HDFS:分布式文件系统,使应用数据的访问具有高吞吐量
YARN:执行hadoop作业的调度和提供hadoop集群资源的管理
MapReduce:基于YARN之上的框架,用于并行处理大数据集
HDFS
HDFS是hadoop的分布式文件系统,是使用java构建的,其特点就是具有高可扩展性、高容错性、高吞吐量、可存储海量数据的特点,可以存储超大的文件,HDFS使用了流式的数据访问方式,适合处理大数据集的场景,可以在普通廉价的硬件上运行,并支持水平横行的扩展,数据以分布式的方式存储在HDFS中
1、高可扩展性:HDFS可以运行在普通的廉价机器上,并可以支持简单的水平横行扩展的方式进行扩容,达到了高可扩展性
2、高容错性:HDFS可以将文件分成数据块,以多节点多副本方式存储在不同的节点上,实现了HDFS的高容错性
3、高吞吐量:HDFS将文件拆分成多个大小均匀的数据块存储在各个节点上,并以多机并行处理节点上的数据块,使HDFS拥有高的吞吐量
4、存储海量数据和超大文件:HDFS可以通过水平横行扩展的方式进行扩容,将数据分成块存放在集群的各个节点上,相对于单机存储,HDFS的存储数据的能力会大得多,所以HDFS可以支持存储海量的数据,并适用于超大文件的存储
5、流式的数据访问:hdfs以流式数据访问的形式,达到一次写入,多次读取,使得hdfs非常适用于大数据集的分析任务
随之而来的缺点是:
1、不适用于小的文件存储
2、不能适用于低延迟场景,hdfs追求的是高的吞吐量
3、不支持文件的修改操作
HDFS架构:
官方的图片可能不是那么好理解
这个网上随便找的这个图比较好理解点
HDFS工作概述
HDFS采用的是主从(maser/slave)架构,一个master带多个slave,HDFS的master称为namenode,slave 称为datanode,namenode作为master管理着整个hdfs文件系统目录树namespace以及控制客户端对文件的访问,而datanode主要管理数据的存储和读写操作,通常每个节点都存放一个datanode,一个文件在hdfs中将会拆分成多个数据块block,并且block是被标记有序号的,通常一个block有128M,这些block将以多副本的方式分散的存储在各个datanode中(已经存储的文件不支持修改),这也是hdfs达到高容错性的方式。namenode可以对namespace的文件及目录进行打开,关闭,重命名的操作以及确认客户端的请求映射到哪个datanode上,还负责管理block副本创建的系数及位置,而datanode则可以对文件进行读写操作以响应客户端的读写请求。block的创建和删除也是由datanode进行操作。
关键名词总结:
namenode:作为hdfs的核心主要负责接受客户端的请求,对元数据进行管理包括了记录文件的名称、副本的系数、block存放在哪个datanode上。客户端进行写操作时请求首先会到namenode处,namenode会让客户端把文件拆分成均匀大小的数据块,namenode通过计算返回给客户端存储每一个数据块对应datanode的地址,并且是以多副本的方式存储;当客户端进行读操作时,请求也是先到达namenode处,namenode会将客户端需要的文件对应的所有block的信息返回给客户端其中包括了客户端需要按照什么样的顺序进行读取以及对应的block存储在哪个datanode上,namenode还会定期收集datanode上block的信息及datanode的各种信息和健康状况
datanode:负责存储和读取用户文件对应的block,定期向namenode发送心跳信息,并汇报datanode上的所有信息包括block信息、datanode的健康状态
block:文件的数据块,一般为128M,文件被拆分成多个数据块,被分散存储在不同的datanode上,除了最后一个数据块会小于等于其他的数据块以外,每一个数据块的大小都是一样的,并以多副本的方式存储
replication:block的副本,block通常会以多副本的方式存储在不同的datanode上,而副本的系数,存放的位置由namenode决定,同一个block的多个副本通常会放置在不同机架(多个节点组成一个机架)的datanode上,这样使hdfs拥有高容错性
HDFS 高可用
Active NameNode 和 Standby NameNode:两台 NameNode 形成互备,一台处于 Active 状态,为主 NameNode,另外一台处于 Standby 状态,为备 NameNode,只有主 NameNode 才能对外提供读写服务。
主备切换控制器 ZKFailoverController:ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换。
Zookeeper 集群:为主备切换控制器提供主备选举支持。共享存储系统:共享存储系统是实现 NameNode 的高可用最为关键的部分,共享存储系统保存了 NameNode 在运行过程中所产生的 HDFS 的元数据。
主 NameNode 和 备 NameNode: 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在确认元数据完全同步之后才能继续对外提供服务。
DataNode:会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。
hdfs环境安装
安装hdfs的前置条件是要先安装好jdk和ssh,准备完成后,开始安装:
将hdfs的压缩包下载并解压,然后配置配置文件,先配置安装目录下的/etc/hadoop/hadoop-env.sh 配置好jdk的安装路径
然后到安装目录下的/etc/hadoop/core-site.xml,配置hadoop的相关配置,注意这里千万不要把端口配置成9000,9000是以前版本的配置,我就是被Apache官网坑了,配来配去发现是9000端口不能配
然后到安装目录下的/etc/hadoop/hdfs-site.xml,配置副本数
然后进入安装目录下的bin目录执行./hdfs namenode -format对hdfs进行初始化,
然后进入安装目录下的sbin目录执行./start-dfs.sh启动hdfs,然后浏览器输入地址 ip:50070,ip地址是你们配置的那个,我本机配置的是192.168.200.128,所以浏览器输入http://192.168.200.128:50070/
显示active表示启动完成
hdfs的常用命令:
在操作hdfs之前把hdfs的配置到环境环境变量去,这样方便后面的操作:
出现输出hdfs的版本是出现jdk版本就代表配置成功
上传文件:hadoop fs -put test.txt /
下载文件:hadoop fs -get /test.txt
查看文件:hadoop fs -ls /**
查看文件内容:hadoop fs -text /test.txt
创建文件:hadoop fs -mkdir /test
递归创建文件夹和递归查看文件:hadoop fs -mkdir -p /a/b/c/d,hadoop fs -ls -R /
删除文件:hadoop fs -rm /test.txt
在hadoop的管理网页上可以查看hdfs文件的信息
yarn
概念:
yarn是一个资源调度器,可以将更多的计算框架,运行在同一个集群中,运行多个不同的类型的作业,改变了mapreduce1.X只能运行mr作业的状况,并共享HDFS的数据,多个不同的计算框架和作业可以享受整体资源的调度,极大的提高了集群的资源利用率,yarn可以说是一个操作系统级别的资源调度框架
yarn的工作流程及组件
客户端向服务端提交的作业会到达yarn的ResourceManage处,ResourceManage会为作业分配资源,然后让集群中一个nodemanager去启动container(运行环境),在第一个启动的container中启动一个applicationMaster,applicationMaster启动完成后会将信息注册到ResourceManager中,同时向ResourceManager申请资源,applicationMaster获取资源后就会在对应的nodeManager上启动container,然后在container中运行作业
ResourceManager:负责集群资源的统一管理和调度,处理客户端请求,监控nodemanager 包括健康状态资源使用情况,监控applicationMaster包括作业的运行情况
nodeManager:接收并执行ResourceManager和applicationMaster的命令,启动container并定期向ResourceManager发送心跳和container的运行状况资源使用情况,监控container资源的使用情况,负责并管理自己的资源使用情况
applicationMaster:applicationMaster负责管理运行在yarn上的应用程序,应用程序可能是一个spark、mr等的作业,applicationMaster将作业拆分成一个个task,这些task会运行在一个或者多个容器中,applicationMaster向ResourceManager申请资源,分配资源给内部的task进行启动,task会被运行在nodeManager启动的container中,applicationMaster会运行在第一个启动的container上,applicationMaster将信息注册到ResourceManager,ResourceManager将对applicationMaster的运行情况进行监控,包括task的运行情况,
container:集成了cpu,内存等一系列,是一个任务运行环境抽象,作业和applicationMaster都是运行在其中
yarn的环境搭建:
进入hadoop的安装目录里的etc目录,将mapred-site.xml.template拷贝一份出来改名为mapred-site.xml,将配置写入:
然后编辑etc目录下的yarn-site.xml,加入配置
进入安装目录下的sbin目录中执行 ./start-yarn.sh启动yarn
jps查看yarn是否启动:
进入yarn的管理界面,浏览器访问ip:8088,如我虚拟机的ip是192.168.200.128
http://192.168.200.128:8088/通过这个页面查看yarn的作业情况
yarn作业的提交
命令 hadoop jar XXXXX.jar classname 参数1 参数2 .....
例如hadoop安装目录下的/share/hadoop/mapreduce中有一个名为hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar的包,里面有个pi的类,可以使用它进行试验执行 hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 3 4
程序在执行中
执行结果
到页面看,可以看的任务的列表记录
MapReduce
概述
mapreduce是一个能够提供轻松编写一个应用程序的框架,这个应用程序是用于以并行的方式去对一个集群上的海量数据进行离线的批处理,这个集群可以拥有上千个节点的、可以是使用廉价机器组成的集群
mapreduce工作过程:
mapreduce能够将数据集拆分成一个个独立的数据块,多个map作业会以并行的方式对数据块进行处理,mapreduce会对这些map作业的输出进行排序,并将结果作为reduce作业的输入,而这些作业的输出和输入都会被存储在文件系统hdfs中,mapreduce会对这些作业进行调度,监视和重新执行失败的作业
通过mapreduce的wc案例理解mapreduce的工作:如下图我们要统计文件中三行英文的相同单词出现的个数,mapreduce利用inputFormat将文件输入并拆分成三个输入分片split,然后对split中的数据读取,这个过程叫做splitting,利用Recordreaders将每个split数据读取出来,每个split都会产生一个作业map task,一般一个block只对应一个split;图中有三个split所以产生三个作业将split的数据读取出来进行并行的处理,将数据拆分成一个个单词,这个过程称为mapping;然后通过将各个节点上相同的单词汇聚到同一个节点上进行计算,这个过程称为shuffing;之后将数据交由对应的reduce task对各个节点上单词的个数
进行统计,这个过程称为reducing;最后各个reduce task将统计的结果通过OutputFormat输出到hdfs中。这个案例体现了mapreduce分而治之的思想
MapReduce编程:
我们还是以最经典的案例wordcount进行演示,统计英文单词,不多BB直接上代码
先看map过程的方法,这个对应的拆分(mapping)的过程
/**
* 类Mapper<LongWritable, Text, Text, IntWritable>的四个泛型分别表示
* map方法的输入的键的类型key-input、值的类型value-input;输出的键的类型key-output、输出的值的类型value-output
* key-input 指的是当前所读行行首相对于split分片开头的字节偏移量,所以是long类型,对应序列化类型LongWritable
* value-input 指的是当前所读行,类型是String,对应序列化类型Text
* key-output 输出键,指的是单词,类型是String,对应序列化类型是Text
* value-output 输出值,指的是单词的个数,1,类型是int,对应序列化类型是IntWritable
*
*/
public class MapService extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String line = value.toString();//每行句子 Dear Bear River
String[] words = line.split(" ");//每行句子的单词
for (String word : words) {
//得到的结果以键值对显示,如 Bear:1,将所有的单词都以这样的形式输出给reduce
context.write(new Text(word),new LongWritable(1));
}
}
}
/**
* 这里reduce的输入就是map的输出,reduce的key-input、value-input分别与map的key-output、value-output
* Reducer<Text, IntWritable, Text, IntWritable>的四个泛型分别表示
* reduce方法的输入的键key-input、输入的值value-input;输出的键key-output、输出的值value-output
*
*/
public class ReduceService extends Reducer<Text,LongWritable,Text,LongWritable> {
//Iterable<LongWritable> 是相同单词的 value值的集合,也就是一堆1,这个就对应shuffling过程
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable value : values) {
sum+=value.get();//统计相同单词的总数
}
context.write(key,new LongWritable(sum));//输出 如:Dear:5;
}
}
public class WcTest {
public static void main(String[] args) throws Exception{
Configuration configuration=new Configuration();
Job wc = Job.getInstance(configuration, "WC");//作业的名字是WC
wc.setJarByClass(WcTest.class);//访问入口类
FileInputFormat.setInputPaths(wc, new Path(args[0])); //设置输入
FileOutputFormat.setOutputPath(wc, new Path(args[1]));//输出路径
wc.setMapperClass(MapService.class);//map类
wc.setMapOutputKeyClass(Text.class);//map输出key类型
wc.setMapOutputValueClass(LongWritable.class);//map输出value类型
wc.setReducerClass(ReduceService.class);//reduce类
wc.setMapOutputKeyClass(Text.class);//reduce输出key类型
wc.setMapOutputValueClass(LongWritable.class);//reduce输出value类型
wc.waitForCompletion(true);//提交作业
}
}
程序打包
统计这个文件内的单词,这个文件的名称是mr.txt
上传文件到hadoop的test目录
把架包传到服务器
执行命令:
hadoop jar mapreduceDemo-1.0-SNAPSHOT.jar mapreduce.WcTest hdfs://localhost:8020/test/mr.txt hdfs://localhost:8020/out/wc
执行成功
hadoop fs -ls /out/wc 进入/out/wc目录查看
hadoop fs -text /out/wc/part-r-00000 查看生成的文件part-r-00000的内容