写一个WordCount例子
有一个数据源,格式如下
求出 item 该分类有多少?
1.新建一个Map/Reduce Project
File - New - Other - Map/Reduce Project
2.SalesItemCategoryMapper.class
//SalesItemCategoryMapper.class
package SalesProduct;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesItemCategoryMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
// 将输入的纯文本的数据转换成String
String valueString = value.toString();
// 将输入的数据先按行进行分割
StringTokenizer tokenizerArticle = new StringTokenizer(valueString, "\n");
while (tokenizerArticle.hasMoreTokens()) {
// 每行按空格划分
// StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
System.out.println("-tokenizer-->"+tokenizerArticle.nextToken());
System.out.println("-valueString-->"+valueString);
String[] items = valueString.split("\t");
String itemName = items[3];
Text name = new Text(itemName);
output.collect(name, one);
}
}
}
3.SalesItemCategoryReducer.class
//SalesItemCategoryReducer.class
package SalesProduct;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class SalesItemCategoryReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
@Override
public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
Text key = t_key;
// int frequencyForCountry = 0;
// while(values.hasNext()){
// IntWritable value = (IntWritable)values.next();
// frequencyForCountry += value.get();
//
// }
output.collect(key, new IntWritable(1));
}
}
4.SalesItemDriver.class
//SalesItemDriver.class
package SalesResult;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class SalesItemDriver {
public static void main(String[] args) {
JobClient my_client = new JobClient();
JobConf job_conf = new JobConf(SalesItemDriver.class);
job_conf.setJobName("SaleCategory");
job_conf.setOutputKeyClass(Text.class);
job_conf.setOutputValueClass(IntWritable.class);
// job_conf.setOutputValueClass(DoubleWritable.class);
// get category
job_conf.setMapperClass(SalesProduct.SalesItemCategoryMapper.class);
job_conf.setReducerClass(SalesProduct.SalesItemCategoryReducer.class);
// get category sum
// job_conf.setMapperClass(SalesProduct.SalesCategorySumMapper.class);
// job_conf.setReducerClass(SalesProduct.SalesCategorySumReducer.class);
job_conf.setInputFormat(TextInputFormat.class);
job_conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));
my_client.setConf(job_conf);
try {
JobClient.runJob(job_conf);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
5.添加日志 log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
6.右键项目 Run as - Run configurations …
设置两个参数
第一个参数表示你的目标文件
第二参数是运行结果保存在指定文件,注意保存所在文件夹不能手动创建,程序会自动创建。
7.点击Run
8.注意导入hadoop所需的类在如下文件夹内
9.练练手?
Data Set: https://pan.baidu.com/s/1c2J15Qw 密码: 4xkd
The format goes like this:
date time store_name item price payment_method
2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex
2012-01-01 09:00 Fort Worth Women's Clothing 153.57 Visa
2012-01-01 09:00 San Diego Music 66.08 Cash
......
......
......
Use mapreduce programming model to find out:
- What are the item categories? What is the total sales value for each item category?
- What are the sales name for the following store name? "Reno" "Toledo" "Chandler"
- How many items in total were sold?
- What is the total sales value for all stores?
10.可能会有用的命令
Format namenode:
bin/hdfs namenode -format
Start Hadoop:
sbin/start-dfs.sh
shin/start-yarn.sh
Stop Hadoop:
sbin/stop-dfs.sh
sbin/stop-yarn.sh
Check Report
bin/hdfs dfsadmin -report
Allow port:
sudo ufw allow from 192.168.9.4
#(allow access from this ip)
sudo ufw allow 22
#(here allow 22 port)
HDFS create folder:
bin/hadoop fs -mkdir /wordcount
#Use copyFromLocal to copy files to HDFS:
bin/hadoop fs -copyFromLocal /home/ubuntu/word.txt /wordcount/word.txt
check hadoop status :
http://目标IP:50070/dfshealth.html