本文通过一个demon向读者展示,如何用spark 实现word count 功能。
创建项目
创建maven项目,添加spark核心依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.0</version>
</dependency>
如果使用java8版本还需要加入
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
否则读取文件时候出现异常
JavaRDD rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
java.lang.ArrayIndexOutOfBoundsException: 10582
word count 代码实现
main 方法
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("WordCountDemon");
//设置master属性
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
wordCount1(sc);
}
实现计数方法
public static void wordCount1(JavaSparkContext sc)
{
JavaRDD<String> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
//压扁
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String[] arr = s.split(" ");
for(String ss : arr){
list.add(ss) ;
}
return list.iterator() ;
}
});
//映射
JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
//聚合
JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//收集,打印输出
for(Object o : rdd4.collect()){
System.out.println(o);
}
}
也可以采用lambda 表达式更优雅的实现
public void wordCount2(JavaSparkContext sc){
JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt")
.flatMap( s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((v1,v2)-> (v1+v2));
System.out.println();
rdd1.collect().forEach(t-> System.out.println(t));
}
结果如下
(are,1)
(you,1)
(how,1)
(,1)
(river,3)
(hello,3)
(boy,1)
(good,1)
发现将key 值为 blank 的也统计了,我们可以用filter去掉不想要的结果
public static void wordCount2(JavaSparkContext sc){
JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt")
.flatMap( s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(s -> new Tuple2<>(s, 1))
.filter(t-> StringUtils.isNoneBlank(t._1))
.reduceByKey((v1,v2)-> (v1+v2));
System.out.println();
rdd1.collect().forEach(t-> System.out.println(t));
}
看到结果已经ok啦
(are,1)
(you,1)
(how,1)
(river,3)
(hello,3)
(boy,1)
(good,1)
谢谢你的阅读。