将数据导入HBase的方式有很多,其中之一就是采用mapreduce来批量写入,最近所在的小组有这样的需求,大家又都还属于学习阶段,于是查阅了很多的资料(感谢http://www.cnblogs.com/dongdone/p/5689370.html的作者为我的第一次尝试提供了宝贵的经验),在这里做个记录,希望对想我一样的初学者有帮助。
要想做好这个事情大致要分4步:
1、在Hbase中创建一张表(我的表名叫做pn,要写入的列族的名字叫做gcf)
在hbase shell中执行一下命令:
*** a、create 'pn', 'gcf'***
2、上传要导入的csv到hdfs上面
a、在Linux命令行中执行:hadoop fs -put 2222.csv /inputDir
3、编写mapreduce程序:
我是用idea写的java代码,关于idea打包jar包的方式网上有很多,我就不说了,只是有一点要注意的是关于打包方式的,在yarn上运行代码的时候,yarn上是没有hbase的jar包的,所以打包jar的时候要把你工程所依赖的jar包也一并打包进生成的jar里。我打包的时候不是直接打包进去的,而是先打包一个只有class文件的jar包,然后在随便一个地方新建一个lib文件夹并将工程所依赖的jar包都放进去,然后用winrar打开生成的jar包,将lib文件夹拖进去,这样jar包就生成好。
还有一个要注意的就是我用的maven来构建工程,这里做依赖的有:
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
org.apache.hadoop
hadoop-mapreduce-client-core
2.7.2
org.apache.hbase
hbase
1.2.0
org.apache.hbase
hbase-client
1.2.0
org.apache.hbase
hbase-common
1.2.0
org.apache.hbase
hbase-server
1.2.0
这是打包过程,代码在后面:
打包一个只有class文件的jar包
然后在随便一个地方新建一个lib文件夹并将工程所依赖的jar包都放进去
然后用winrar打开生成的jar包
这是生成好的jar的内容
我是代码:
packagecn.com.hbase;
importcn.com.utils.MD5;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.client.Put;
importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
importorg.apache.hadoop.hbase.mapreduce.TableOutputFormat;
importorg.apache.hadoop.hbase.mapreduce.TableReducer;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importjava.io.IOException;
/**
* Created by Asher on 2016/10/20.
*/
public classHdfsToHBase {
public static voidmain(String[] args)throwsException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","master01:2181,worker01:2181,worker02:2181");//当然这些都可以作为参数传入,这里只是实验,所以写死在了代码里,实际成产过程中肯定要用参数的方式
conf.set("hbase.rootdir","hdfs://master01:9000/hbase");
conf.set(TableOutputFormat.OUTPUT_TABLE,"pn");
Job job = Job.getInstance(conf,HdfsToHBase.class.getSimpleName());
TableMapReduceUtil.addDependencyJars(job);
job.setJarByClass(HdfsToHBase.class);
job.setMapperClass(HdfsToHBaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(HdfsToHBaseReducer.class);
FileInputFormat.setInputPaths(job,"hdfs://master01:9000/inputDir/2222.csv");
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
}
public static classHdfsToHBaseMapperextendsMapper {
privateTextoutKey=newText();
privateTextoutValue=newText();
@Override
protected voidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException {
String[] splited = value.toString().split(",");
outKey.set(MD5.GetMD5Code(splited[0]));//我将第一个分片的MD5值作为rowkey
outValue.set(splited[1] +"\t"+ splited[2] +"\t"+ splited[3]);
context.write(outKey,outValue);
}
}
public static classHdfsToHBaseReducerextendsTableReducer {
@Override
protected voidreduce(Text k2,Iterable v2s,Context context)throwsIOException,InterruptedException {
Put put =newPut(k2.getBytes());
for(Text v2 : v2s) {
String[] splited = v2.toString().split("\t");
if(splited[0] !=null&& !"NULL".equals(splited[0])) {
put.add("gcf".getBytes(),"name".getBytes(),splited[0].getBytes());
}else{
put.add("gcf".getBytes(),"name".getBytes(), null);
}
if(splited[1] !=null&& !"NULL".equals(splited[1])) {
put.add("gcf".getBytes(),"sex".getBytes(),splited[1].getBytes());
}else{
put.add("gcf".getBytes(),"sex".getBytes(), null);
}
if(splited[2] !=null&& !"NULL".equals(splited[2])) {
put.add("gcf".getBytes(),"mobile".getBytes(),splited[2].getBytes());
}else{
put.add("gcf".getBytes(),"mobile".getBytes(), null);
}
}
context.write(NullWritable.get(),put);
}
}
}
4、打包并执行:打包在上面先说了,下面说说执行的过程:
a、很简单,在Linux终端执行:hadoop jar /home/cat/hdfs2hbase.jar cn.com.hbase.HdfsToHBase
就能正常运行了,运行的结果如下:
看到成功插入了数据
本次笔记结束!