第03章 Hadoop API操作
HDFS操作
Maven配置
进行haddop HDFS相关开发首先需要引入下面两个开发依赖,当然需要根据开发环境中的Hadoop版本选择相关依赖的版本号。此处Hadoop的版本为2.9.2,且使用maven作为开发依赖管理工具。
Apache Hadoop Common
Apache Hadoop HDFS
下面为Hadoop为pom.xml
的配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ermao</groupId>
<artifactId>hadoop</artifactId>
<version>1.0</version>
<properties>
<hadoop.version>2.9.2</hadoop.version>
<testNG.version>7.1.0</testNG.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.testng/testng -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testNG.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
关于用户权限问题
当执行文件操作出现
org.apache.hadoop.security.AccessControlException: Permission denied: user=futianyu, access=WRITE, inode="/input":root:supergroup:drwxr-xr-x
报错信息,说明登录Hadoop的用户权限与操作HDFS中的文件权限不一致导致。
由于水平有限,所以简单叙述下hadoop中的权限问题,当连接到hadoop的HDFS文件系统时,Hadoop Common组件将默认获取当前登录系统的用户名作为登录hadoop的用户。而这将导致的问题是,如果在不同电脑环境进行开发时,在A电脑中登录Hadoop用户为admin,而在linux系统中用户为root。那么root上传的文件,admin用户将无法对root创建的文件进行操作。
下面给出简单分析:
// 创建与hdfs的链接
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.0.20:9000");
FileSystem fs = null;
// 获取文件系统
try {
fs = FileSystem.get(configuration);
} catch (IOException e) {
e.printStackTrace();
} finally {
if(fs != null){
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
当运行到FileSystem.get(configuration);
时,可以看下FileSystem.java
源码。
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) {
return get(conf);
} else {
if (scheme != null && authority == null) {
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
return get(defaultUri, conf);
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
} else {
return CACHE.get(uri, conf);
}
}
}
当执行CACHE.get(uri, conf)
时,此时已经生成了登录Hadoop的用户名。具体描述可参考下面两篇博客的分析
所以需要统一设置登录Hadoop的用户名,代替默认登录,这个地方可以添加代码如下代码(可添加一个静态代码块):
{
System.setProperty("HADOOP_USER_NAME","root");
}
开发应用
第1步,需要连接到大数据环境,并设置相关连接属性。设置链接名称以及链接地址为配置hadoop中的配置core-site.xml
所设置的名称以及链接地址,此处所设置的名称以及链接地址分别是:fs.defaultFS
以及hdfs://192.168.0.20:9000
。
第2步,获取HDFS文件系统FileSystem.get(configuration)
;
第3步,从文件系统中获取相关文件信息。
完整demo如下所示:
package com.ermao.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
/**
* @Project: learn_hadoop
* @Author: Mr.ErMao
* @Date: 2020/5/12 18:05
* @Description: Hadoop的主要程序
*/
public class HadoopApplication {
public static void main (String[] args) {
// 创建与hdfs的链接
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.0.20:9000");
FileSystem fs = null;
// 获取文件系统
try {
fs = FileSystem.get(configuration);
Path path = new Path("/");
// 判断是否是文件
boolean res = fs.isFile(path);
System.out.println(res);
// 文件系统列表查看文件状态
FileStatus[] list = fs.listStatus(path);
for (int i = 0; i < list.length; i++) {
FileStatus fileStatus = list[i];
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getOwner());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(fs != null){
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
其他的HDFS API操作可参见《Hadoop 系列(七)—— HDFS Java API》,这篇博客比较全,还不错。
关于HDFS流操作
当我们进行web开发时,可能涉及到文件上传操作,HDFS提供了两种操作方式:
-
从服务器将文件上传至HDFS系统以及从HDFS系统中将文件下载到服务器中。
-
fs.copyFromLocalFile(Path src, Path dst)
从服务器本地上传至HDFS系统。fs = FileSystem.get(Configuration conf)
-
src
为服务器本地路径 -
dst
为HDFS系统文件路径
-
fs.copyToLocalFile(Path src, Path dst)
从HDFS系统下载到服务器本地。fs = FileSystem.get(Configuration conf)
-
src
为HDFS系统文件路径 -
dst
为服务器本地路径
下面给出相关代码仅做参考:
@Test public void testUploadAndDownload(){ // 下面的代码做了一定的简化 HDFS hdfs = HDFS.getInstance(HadoopConfiguration.connectionName,HadoopConfiguration.connectionUri); // 上传文件操作 // 创建上传Path对象 Path uploadSrc = new Path("/Users/futianyu/Downloads/文件获取流程.jpg"); Path uploadDst = new Path("/input/文件获取流程.jpg"); // 创建下载Path对象 Path downloadSrc = new Path("/input/nginx-1.18.0.tar.gz"); Path downloadDst = new Path("/Users/futianyu/Downloads/nginx-1.18.0.tar.gz"); try { // 上传文件操作 hdfs.fs.copyFromLocalFile(uploadSrc,uploadDst); // 下载文件操作 hdfs.fs.copyToLocalFile(downloadSrc,downloadDst); } catch (IOException e) { e.printStackTrace(); }finally { hdfs.close(); } }
-
-
直接将客户端流写入到HDFS系统,服务器不做任何保存工作以及从HDFS系统中直接将文件流下载到客户端,同样服务器不做任何保存操作。
下面给出相关代码:
@Test public void testUploadIOStream(){ HDFS hdfs = HDFS.getInstance(HadoopConfiguration.connectionName,HadoopConfiguration.connectionUri); // 创建上传Path对象相对较大的文件 Path uploadSrc = new Path("/Users/futianyu/Downloads/jdk-8u251-linux-x64.tar.gz"); Path uploadDst = new Path("/input/jdk-8u251-linux-x64.tar.gz"); FSDataOutputStream outputStream = null; FileInputStream localInputStream = null; // 上传操作 try { // 允许覆盖 outputStream = hdfs.fs.create(uploadDst,true); // 打开本地文件 localInputStream = new FileInputStream("/Users/futianyu/Downloads/jdk-8u251-linux-x64.tar.gz"); int localFileSize = localInputStream.available(); System.out.println("文件大小为:"+localFileSize); // 读取文件字节流 byte[] bytes = new byte[localFileSize]; int res = localInputStream.read(bytes); if(res == localFileSize ){ System.out.println("读取输入流成功!"); } // 将字节流写入HDFS outputStream.write(bytes); outputStream.flush(); } catch (IOException e) { e.printStackTrace(); }finally { // 关闭文件输出流 if(outputStream != null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } // 关闭文件输入流 if(localInputStream != null){ try { localInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } // 最后关闭HDFS hdfs.close(); } } @Test public void testDownloadIOStream(){ HDFS hdfs = HDFS.getInstance(HadoopConfiguration.connectionName,HadoopConfiguration.connectionUri); // 创建下载Path对象 Path downloadSrc = new Path("/input/jdk-8u251-linux-x64.tar.gz"); File dwonloadDst = new File("/Users/futianyu/Downloads/jdk-8u251-linux.tar.gz"); FSDataInputStream hdfsInputStream = null; FileOutputStream fileOutputStream = null; // 打开HDFS文件 try { // 打开一个文件 hdfsInputStream = hdfs.fs.open(downloadSrc); // 获取文件大小 int hdfsFileSize = hdfsInputStream.available(); byte[] bytes = new byte[hdfsFileSize]; // 这个地方踩了一个坑 // 如果使用read()方法,并不能完全读取到文件。 // readFully则可以。 hdfsInputStream.readFully(bytes); // 将文件流写入本地文件 fileOutputStream = new FileOutputStream(dwonloadDst); fileOutputStream.write(bytes); fileOutputStream.flush(); } catch (IOException e) { e.printStackTrace(); }finally { // 关闭文件输入流 if(hdfsInputStream != null){ try { hdfsInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } // 关闭文件输出流 if(fileOutputStream != null){ try { fileOutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } // 关闭文件系统 hdfs.close(); } }
从上面看出一般情况下我们采用的是第二种操作流程,这样的目的可以减少服务器的相关IO操作,二来减少请求次数。相对而言将流程简单化,服务器上的应用可以解决更多关于安全认证等业务。使业务更好的分离。
MapReduce相关操作
MapReduce简介
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。MapReduce的核心,就是对一个需要计算的任务进行拆分,然后并行处理。
MapReduce是简单一致模型。如何理解,正常情况下,mysql当中能够完成的查询,都可以使用MR来查询。主要是针对于大数据量的数据查询,但是他的查询效率很低(但是这个是相对的,同样的数据量,mysql完成不了这样的查询)。
MapReduce原理以及架构
原理概述
比如,需要统计分析3000万分财务数据,然而有10个人可以进行统计分析财务数据。这10个人当中,有的人工作效率低,有的人工作效率高。那么整个统计工作什么时间完成,取决于效率低的人(木桶原理,一个木桶由10个木板组成,然而这个桶能装多少水,却取决于最短的模板)。
在Hadoop 1.0当中(没有加入yarn时),Hadoop将计算任务平均分配任务,是由最慢的服务器决定整个任务什么时候完成。
在Hadoop 2.0时,加入了协调机制,时刻关注各个节点完成情况,互相进行资源协调。保证各个节点任务同时完成。
MapReduce有意识的分配工作。简单说就是不再简单平均分配任务,根据不同计算机能力合理分配任务,保证任务同时完成。
将整个计算过程大致分为两个阶段:
- Map阶段——拆分计算任务,计算各自结果,整理排序
- Reduce阶段——将Map计算结果进一步整理归类,并对各个类别进行统计生成结果文件。
Map阶段
在上图中,File(文件)被HDFS分成若干block(数据块),将File的数据块交给Split进一步拆分(逻辑划分,不包含具体数据,只包含数据的位置信息)。需要注意以下两点:
- 一个split包含一个或者多个block(数据块),默认是一对一的关系。
- 一个split不包含两个文件的block(数据块),不会跨越file边界,也就是说split是不会跨越文件进行划分的。
当分片完成后,MapReduce程序会将split中的数据以K/V(Key/Value)的形式读取出来,然后将这些数据交给用户定义的Map函数进行处理。
- 一个Map处理一个split
用户用Map函数处理完数据后,同样将结果以K/V(Key/Value)的形式交给MR的计算框架。
MR计算框架会将不同的数据划分不同的partition,数据相同的多个partition最后会分到同一个reduce节点上面进行处理。也就是说一类partition对应一个reduce。
Map默认使用Hash算法对key值进行Hash计算,这样保证了相同key值的数据能够划分到相同的partition中,同时也保证了不同的parttion之间的数据量是大致相当的。
一个程序中Map和Reduce的数量是split和partition的数据决定的。
Reduce阶段
Map处理完成后,Reduce处理程序在各个Map节点将属于自己的数据拷贝到自己的内存缓冲区。
最后将这些数据合并成一个大的数据及,并且按照Key值进行聚合,把聚合后的value值作为一个迭代器给用户使用。
用户使用自定义的Reduce函数处理完迭代器中的数据后,把结果以K/V的格式存储到HDFS上的文件中。
总结
- Map阶段
- split进行逻辑划分,<font style="color:red">一般情况是对输入文件的每一行进行划分。</font>
- Map(由用户决定如何Map函数如何运行),简单来讲就是将小任务计算出来——整理成为一个个集合。
- Map-shuffle[1]阶段(集合整理阶段),将用户的计算结果进行整理合并。
- Collect阶段键数据放在环形缓冲区,环形缓冲区分为数据区和索引区。
- sort阶段对在统一partition内的索引按照key值排序。
- spill(溢写)阶段根据拍好序的索引将数据按顺序写到文件中。
- Merge阶段将Spill生成的小文件分批合并排序成一个大文件。
- Reduce(归一)阶段
- Reduce中shuffle阶段(重新整理阶段)。
- Copy阶段将各个Map中计算数据分批拷贝到Reduce的缓冲区。
- Spill阶段将内存缓冲区的数据按照顺序写到文件中去。
- Merge阶段啊将移除文件合并成一个排序好的数据集合。
- 调用用户自定义函数,计算结果Map整理出来的集合。
- 生成结果文件。
- Reduce中shuffle阶段(重新整理阶段)。
大致的过程可以看做是一个:统一
——>分裂
——>整理集合
——>统计结果
——>提交报告
的过程。
实际开发过程
在下面的开发过程中将进一步理解上面所叙述的原理以及过程。当然用户是无法直观观察到完整过程的。
Maven配置
进行MapReduce开发同样需要两个核心依赖,同样需要根据Hadoop的版本来确定使用依赖的版本。
Apache Hadoop MapReduce Core
Apache Hadoop MapReduce Common
<!-- MapReduce -->
<properties>
<hadoop.version>2.9.2</hadoop.version>
<testNG.version>7.1.0</testNG.version>
</properties>
<dependencies>
<!-- MapReduce -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
实际应用
下面将做一个简单的词频统计(wordcount——相当于MR的hello word)。即统计下面一段简单的文字中hello
、world
、a
等出现次数。
hello world! a
hello world! b
hello world! c
hello world! d
hello world!
开发步骤如下:
第一步,创建Mapper。这个Mapper必须继承Hadoop的org.apache.hadoop.mapreduce.Mapper
类。
package com.ermao.hadoop.mapreduce;
import org.apache.hadoop.mapreduce.Mapper;
public class TestMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
}
在上面的代码中,有四个泛型<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
。其中有两个输入泛型以及两个输出泛型。且必须使用Hadoop自己的包装类。
- 输入泛型
KEYIN
以及VALUEIN
:-
KEYIN
代表偏移量,我们已经知道在split阶段,对数据块的划分一般情况下是按照行进行划分的。比如:hello world! a
其第一次读起始位置为1以及14。那么第二次读的偏移量则是15到28。KEYIN
所代表的泛型就是LongWritable
。 -
VALUEIN
代表读取的数据类型,我们读取到是个hello world! a
字符串,那么VALUEIN
的包装类就是Text
。
-
- 输出泛型
KEYOUT
以及VALUEOUT
:-
KEYOUT
代表输出的是一个个的单词(字符串),其泛型就是Text
; -
VALUEOUT
代表的是统计的次数,是一个整数型数据,其泛型则为IntWritable
。每个单词的次数。
-
第二步,重写Mapper中的map
方法。这个map
调用机制类似线程中的run
方法,有MR调用来。用户则只用关心map
方法中的业务代码。
package com.ermao.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map (LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
}
}
第三步,下面将上面的代码补充完整。并做点阐述作用以及理解上述的MapReduce过程。
package com.ermao.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map (LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 读取到的某一行数据,但是不确定哪一行的数据。
String line = value.toString();
// hello world! a 中的数据是由空格进行分割的。
// 严格意义应该将!剔除出去。才是正确的单词。
String[] values = line.split(" ");
// 遍历单词数组
for (String v : values) {
context.write(new Text(v),new IntWritable(1));
}
}
}
context.write(new Text(v),new IntWritable(1));
相当于我们之前分析的partition
过程中的前奏。
第四步,Map阶段输出后,会根据索引Key值进行排序(也就对应Map 的Shuffle阶段,并分到不同的内存当中去,合并成一个排序好的结果集)。再来创建一个Reuce类并继承Reducer
并重写reduce
方法中的相关逻辑。
package com.ermao.hadoop.mapreduce;
public class TestReduce extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
}
Reducer
类同样有两个输入类型以及输出泛型。这个时候,已经在Map的Shuffle阶段后的中间结果。Hadoop已经将context.write(new Text(v),new IntWritable(1));
整理(经过了Key值排序后)成了如下形式(需要自己脑补画面):
{a:{1}}
{b:{1}}
{c:{1}}
{d:{1}}
{hello:{1,1,1,1,1}}
{world!:{1,1,1,1,1}}
-
KEYIN
Reduce过程中的KeyIn其实是Map阶段的输出,所以其泛型应该与Map阶段的输出匹配。KEYIN
就相当于是a
或者b
或者其他单词。 -
VALUEIN
则是我们在Map阶段设置的输出次数1。 -
KEYOUT
则是我们输出的结果键值,比如a
; -
VALUEOUT
则是我们输出的单次出现的总次数,如:4;
第五步,重写Reduce的reduce方法。如下所示, reduce
参数Key
值其实可以暂时不管(world1
)。主要对集合Iterable
进行计算,其形式类似{world!:{1,1,1,1,1}}
。
package com.ermao.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TestReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce (Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 统计次数
// Iterable<IntWritable>是一个整理好的数据集合。
// 在shuffle阶段除了排序,还会进行分组
// a:{1}
// b:{1}
// c:{1}
// d:{1}
// hello:{1,1,1,1}
// world!:{1,1,1,1}
int sum = 0;
for (IntWritable i: values) {
// 累计的总数
sum += i.get();
}
// 将结果输出
// 如果你高兴,你还有重新修改下结果。比如:
key = new Text(String.format("%s出现了",key.toString()));
context.write(key,new IntWritable(sum));
}
}
第六步,创建驱动类MapReduceDemo.java
,代码如下:
package com.ermao.hadoop.mapreduce;
import com.ermao.hadoop.config.HadoopConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MapReduceDemo {
static {
System.setProperty("HADOOP_USER_NAME","root");
}
public static void main (String[] args) {
Configuration conf = new Configuration();
conf.set(HadoopConfiguration.connectionName,HadoopConfiguration.connectionUri);
try {
// 创建词频统计任务,任务名称自定义即可
Job job = Job.getInstance(conf, "WordCount1");
//指定主执行类
job.setJarByClass(MapReduceDemo.class);
//指定map执行类
job.setMapperClass(TestMapper.class);
//指定reducer执行类
job.setReducerClass(TestReduce.class);
//指定map阶段输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定reducer阶段输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入文件的地址
FileInputFormat.setInputPaths(job , new Path("/input/test.txt"));
//输出文件的目录,输出路径必须是不存在的路径
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
String dir = String.format("/output/%s-result",simpleDateFormat.format(new Date()));
Path path = new Path(dir);
FileOutputFormat.setOutputPath(job , path);
//执行
System.out.println(job.waitForCompletion(true));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
Hadoop将结果生成了了一个目录,其下有两个文件:
_SUCCESS
-
part-r-00000
输出的结果。a出现了 1 b出现了 1 c出现了 1 d出现了 1 hello出现了 5 world!出现了 5
这个驱动类可以直接使用。
参考资料
- 《Hadoop与大数据挖掘》 作者:张良均 樊哲 位文超 刘名军 许国杰 周龙 焦正升
-
shuffle——洗牌 ↩