Spark提供的所有计算,不管是批处理,Spark SQL,Spark Streaming还是Spark ML,它们底层都是通过RDD计算。所以这里就以RDD方式简单上手。首先认识一下RDD:RDD(Resilient Distributed Dataset)是Spark最基础核心的概念,它表示可分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD可以缓存到内存或磁盘中,每次对RDD数据集的操作之后的结果可以复用,省去了MapReduce大量的磁盘IO操作。这对于迭代运算频繁的不同维度的批处理、机器学习算法、交互式数据挖掘来说,效率提高很多。
方便起见,这里Spark使用的本地单机模式,需要在本地安装Spark及配置环境变量,由于Spark是Scala写的,也要安装Scala及环境变量的配置,注意对应的Scala版本要匹配。代码使用Maven构建工程。项目中,数据库使用Mongdb,编程语言Java,业务意义就是统计今年申报出口业务中每家企业的报关单量。
pom文件部分:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId><u>mongo</u>-spark-connector_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
代码部分:
// 创建 一个 spark session,可以配置输入源,输出源,spark运行模式,spark应用实例名称等
SparkSession spark = SparkSession.builder().master("local").appName("MongodbSparkDemoTest")
.config("spark.mongodb.input.uri",
"mongodb://user:password@localhost:port/db.collection")
.config("spark.mongodb.output.uri",
"mongodb://user:password@localhost:port/db.collection2")
.getOrCreate();
// 创建一个javaspark 上下文
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// 使用mongodb 聚合管道获取数据 生成一个RDD
JavaMongoRDD<Document> aggregatedRdd = MongoSpark.load(jsc).withPipeline(Collections.singletonList(Aggregates
.match(Filters.and(Filters.eq("iEMark", "E"), Filters.gte("declareDate", "20180101000000")))));
// 原始数据的样子
System.out.println("原始数据的样子");
aggregatedRdd.take(5).forEach((t) -> {
System.out.println(t);
});
;
// map 将一条记录映射成 key - value 的形式,key,value都可以是多个字段
JavaPairRDD<String, Long> mapRdd = aggregatedRdd.mapPartitionsToPair((Iterator<Document> t) -> {
List<Tuple2<String, Long>> list = new ArrayList<>();
while (t.hasNext()) {
Document doc = t.next();
list.add(new Tuple2<String, Long>(doc.getString("tradeCode"), (long) 1));
}
return list.iterator();
});
// map后的样子
System.out.println("map后的样子");
mapRdd.take(5).forEach((t) -> {
System.out.println(t);
});
// reduce 将相同key的value聚合
JavaPairRDD<String, Long> reducedRdd = mapRdd.reduceByKey((Long v1, Long v2) -> {
return v1 + v2;
});
// reduce 后的样子
System.out.println("reduce后的样子,企业编码-报关单量");
reducedRdd.take(5).forEach((t) -> {
System.out.println(t);
});
// 转换成mongodb文档格式
JavaRDD<Document> reducedDocRdd = reducedRdd.map((Tuple2<String, Long> v1) -> {
return new Document().append("tradeCode", v1._1).append("entryQty", v1._2);
});
// 统计结果保存到mongodb
MongoSpark.save(reducedDocRdd);
jsc.close();
Console输出: