Eclipse编写MapReduce程序

写一个WordCount例子

有一个数据源,格式如下

data-source.png

求出 item 该分类有多少?

1.新建一个Map/Reduce Project

File - New - Other - Map/Reduce Project

2.SalesItemCategoryMapper.class

//SalesItemCategoryMapper.class
package SalesProduct;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class SalesItemCategoryMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);

    @Override
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        // 将输入的纯文本的数据转换成String
        String valueString = value.toString();
        // 将输入的数据先按行进行分割
        StringTokenizer tokenizerArticle = new StringTokenizer(valueString, "\n");

        while (tokenizerArticle.hasMoreTokens()) {
            // 每行按空格划分
            //  StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
            System.out.println("-tokenizer-->"+tokenizerArticle.nextToken());
            System.out.println("-valueString-->"+valueString);
            String[] items = valueString.split("\t");
            String itemName = items[3];
            Text name = new Text(itemName);
            output.collect(name, one);
        }
    }

}

3.SalesItemCategoryReducer.class

//SalesItemCategoryReducer.class
package SalesProduct;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SalesItemCategoryReducer  extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {

    @Override
    public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        // TODO Auto-generated method stub
        Text key = t_key;
//      int frequencyForCountry = 0;
//      while(values.hasNext()){
//          IntWritable value = (IntWritable)values.next();
//          frequencyForCountry += value.get();
//          
//      }
        
        output.collect(key, new IntWritable(1));
    }

}

4.SalesItemDriver.class

//SalesItemDriver.class
package SalesResult;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class SalesItemDriver {
    public static void main(String[] args) {
        JobClient my_client = new JobClient();
        JobConf job_conf = new JobConf(SalesItemDriver.class);
        job_conf.setJobName("SaleCategory");
        job_conf.setOutputKeyClass(Text.class);
        job_conf.setOutputValueClass(IntWritable.class);
//      job_conf.setOutputValueClass(DoubleWritable.class);

        // get category
        job_conf.setMapperClass(SalesProduct.SalesItemCategoryMapper.class);
        job_conf.setReducerClass(SalesProduct.SalesItemCategoryReducer.class);

        // get category sum
//      job_conf.setMapperClass(SalesProduct.SalesCategorySumMapper.class);
//      job_conf.setReducerClass(SalesProduct.SalesCategorySumReducer.class);

        job_conf.setInputFormat(TextInputFormat.class);
        job_conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

        my_client.setConf(job_conf);
        try {
            JobClient.runJob(job_conf);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}

5.添加日志 log4j.properties

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  

6.右键项目 Run as - Run configurations …

java-application-config-1

设置两个参数

java-mapreduce-params

第一个参数表示你的目标文件
第二参数是运行结果保存在指定文件,注意保存所在文件夹不能手动创建,程序会自动创建。

7.点击Run

demo-result.png
demo-result-2.png
demo-result-3.png

8.注意导入hadoop所需的类在如下文件夹内

hadoop-jars

9.练练手?
Data Set: https://pan.baidu.com/s/1c2J15Qw 密码: 4xkd

The format goes like this:
date time store_name item price payment_method

2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex
2012-01-01 09:00 Fort Worth Women's Clothing 153.57 Visa
2012-01-01 09:00 San Diego Music 66.08 Cash
......
......
......

Use mapreduce programming model to find out:

  1. What are the item categories? What is the total sales value for each item category?
  2. What are the sales name for the following store name? "Reno" "Toledo" "Chandler"
  3. How many items in total were sold?
  4. What is the total sales value for all stores?

10.可能会有用的命令
Format namenode:

bin/hdfs namenode -format

Start Hadoop:

sbin/start-dfs.sh
shin/start-yarn.sh

Stop Hadoop:

sbin/stop-dfs.sh
sbin/stop-yarn.sh

Check Report

bin/hdfs dfsadmin -report

Allow port:

sudo ufw allow from 192.168.9.4 
#(allow access from this ip)
sudo ufw allow 22 
#(here allow 22 port)

HDFS create folder:

bin/hadoop fs -mkdir  /wordcount
#Use copyFromLocal to copy files to HDFS:
bin/hadoop fs -copyFromLocal /home/ubuntu/word.txt /wordcount/word.txt

check hadoop status :

http://目标IP:50070/dfshealth.html
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容