欢迎关注我的CSDN: https://blog.csdn.net/bingque6535
1. 编写WordCount
-
Driver端
package com.hjf.mr.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 9:49 */ public class WordCountJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 创建Configuration和Job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定Master对应的类, 就是当前所在的类 job.setJarByClass(WordCountJob.class); // 指定Mapper和Reducer对应的类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定Mapper端输出的key 和value 的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定Reducer端输出的key 和 value的类型 // 如果Mapper端和Reducer端输出的key, value类型一样, 可省略其中一组的指定 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path inputPath = new Path("./Data/words.txt"); Path outputPath = new Path("./Data/result"); // 如果输出路径存在, 则删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)){ fs.delete(outputPath, true); } // 设置文件的输入路径 和 结果的返回路径 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 提交 job.waitForCompletion(true); // job.submit(); } }
-
Mapper端
package com.hjf.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 9:54 * * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN: Mapper端输入key的类型, 一般都是LongWritable * VALUEIN: Mapper端输入value的类型 * KEYOUT: Mapper端输出key的类型 * VALUEOUT: Mapper端输出value的类型 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将Text型转换为String 型 String lines = value.toString(); String[] words = lines.split(" "); // word --> (word, 1) for (String word: words) { context.write(new Text(word), one); } } }
- Reducer端
package com.hjf.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 9:53 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN: reducer端输入key的类型 * VALUEIN: reducer端输入value的类型 * KEYOUT: reducer端输出key的类型 * VALUEOUT: reducer端输出value的类型 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 相同key的value值进行累加 for (IntWritable value: values) { sum += value.get(); } IntWritable count = new IntWritable(sum); context.write(key, count); } }
二. TopN
- 题目
1、统计每门课程的平均分
2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数
3、求出每门课程参考学生成绩最高的学生的信息:课程,姓名和平均分 - 数据集
computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75
1. 问题1
问题: 统计每门课程平均分
思路:
1. 先计算每名学生的每门课程的平均分
2. 再用计算得到的平均分累加计算课程平均分
-
Driver 端
package com.hjf.mr.top_n.question1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:06 * */ public class TopNJob1 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopNJob1.class); job.setMapperClass(TopNMapper1.class); job.setReducerClass(TopNReducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path inputPath = new Path("./Data/score.txt"); Path outputPath = new Path("./Data/result"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } }
-
Mapper 端
package com.hjf.mr.top_n.question1; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:12 */ public class TopNMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); String[] split = lines.split(","); Text courseName = new Text(split[0]); double sum = 0.0; int count = split.length - 2; for (int i = 2; i < split.length; i++) { sum += Integer.parseInt(split[i]); } DoubleWritable avg = new DoubleWritable(sum / count); context.write(courseName, avg); } }
-
Reducer 端
package com.hjf.mr.top_n.question1; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:13 */ public class TopNReducer1 extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0.0; int count = 0; for (DoubleWritable value: values){ sum += value.get(); count += 1; } DoubleWritable result = new DoubleWritable(sum / count); context.write(key, result); } }
2. 问题2
-
Driver
package com.hjf.mr.top_n.question2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:06 * */ public class TopNJob2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定自定义分区类 job.setPartitionerClass(TopNPartition.class); // 指定分区数 job.setNumReduceTasks(4); job.setJarByClass(TopNJob2.class); job.setMapperClass(TopNMapper2.class); job.setReducerClass(TopNReducer2.class); job.setMapOutputKeyClass(CourseScore.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(CourseScore.class); job.setOutputValueClass(NullWritable.class); Path inputPath = new Path("./Data/score.txt"); Path outputPath = new Path("./Data/result"); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } }
-
Mapper
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 11:12 */ public class TopNMapper2 extends Mapper<LongWritable, Text, CourseScore, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); String[] split = lines.split(","); String courseName = split[0]; String name = split[1]; double sum = 0.0; int count = split.length - 2; for (int i = 2; i < split.length; i++) { sum += Integer.parseInt(split[i]); } CourseScore courseScore = new CourseScore(courseName, name, sum / count); context.write(courseScore, NullWritable.get()); } }
-
Reducer
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.text.DecimalFormat; /** * @author Jiang锋时刻 * @create 2020-05-17 11:13 */ public class TopNReducer2 extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable> { @Override protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { double sum = 0.0; for (NullWritable value: values) { // 获取 CourseScore对象中的score值, 并将其保留一位小数, // 然后再重新封装成CourseScore对象 double score = key.getScore(); DecimalFormat df = new DecimalFormat("0.0"); double format = Double.parseDouble(df.format(score)); CourseScore courseScore = new CourseScore(key.getCourse(), key.getName(), format); context.write(courseScore, value); } } }
-
Partition
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author Jiang锋时刻 * @create 2020-05-17 15:42 * * Partitioner<KEY, VALUE> * pattition对应的key, value类型应该和Mapper端输出类型保持一致 */ public class TopNPartition extends Partitioner<CourseScore, NullWritable> { @Override public int getPartition(CourseScore courseScore, NullWritable nullWritable, int numPartitions) { String course = courseScore.getCourse(); // 将不同课程名的信息保存到不同的分区中 switch (course) { case "algorithm": return 0; case "computer": return 1; case "english": return 2; default: return 3; } } }
-
自定义类型
package com.hjf.mr.top_n.question2; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author Jiang锋时刻 * @create 2020-05-17 15:48 */ public class CourseScore implements WritableComparable<CourseScore> { private String course; private String name; private double score; // 反序列化时, 需要反射调用空参构造函数, 所以必须要有该空参构造函数 public CourseScore() { } public CourseScore(String course, String name, double score) { this.course = course; this.name = name; this.score = score; } public String getCourse() { return course; } public double getScore() { return score; } public String getName() { return name; } @Override public int compareTo(CourseScore that) { if (this.score > that.score){ return -1; } else if (this.score < that.score) { return 1; } else { return 0; } } // 重写序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeUTF(course); out.writeUTF(name); out.writeDouble(score); } // 重写反序列化方法 // 注意: 反序列化的顺序和序列化的顺序必须完全一致 @Override public void readFields(DataInput in) throws IOException { course = in.readUTF(); name = in.readUTF(); score = in.readDouble(); } @Override public String toString() { return "课程: " + course + ", 姓名: " + name + ", 分数: " + score; } }
3. 问题三
问题3只需在问题2代码的基础上修改一下输出条件即可
public class TopNReducer3 extends Reducer<CourseScore, NullWritable, CourseScore, NullWritable> {
// 注意: 必须设置在reduce函数外部, 否则每次运行都会被重新初始化
// 设置一个变量, 用于统计输出的数据条数
int count = 0;
// 需要输出前几的数据, 这里就设置为数字几
private final int top = 3;
@Override
protected void reduce(CourseScore key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value: values) {
double score = key.getScore();
DecimalFormat df = new DecimalFormat("0.0");
double format = Double.parseDouble(df.format(score));
CourseScore courseScore = new CourseScore(key.getCourse(), key.getName(), format);
if (count++ < top){
context.write(courseScore, value);
}
}
}
}
欢迎关注我的CSDN: https://blog.csdn.net/bingque6535