与Spark SQL结合使用
Spark Streaming最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。
案例:每隔10秒,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3热门的商品。
Java版本
public class Top3HotProduct {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "hadoop");
SparkConf conf = new SparkConf().setAppName("Top3HotProductJava").setMaster("local[2]");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));
streamingContext.checkpoint("hdfs://hadoop-100:9000/streamingCheckpoint");
JavaReceiverInputDStream<String> productVisitDstream = streamingContext.socketTextStream("hadoop-100", 10000);
JavaPairDStream<String, Integer> productVisitNumsDstream = productVisitDstream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] strings = s.split(" ");
return new Tuple2<>(strings[1] + "_" + strings[2], 1);
}
});
JavaPairDStream<String, Integer> tempResultDstream = productVisitNumsDstream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(60), Durations.seconds(10));
tempResultDstream.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> productVisitRDD) throws Exception {
JavaRDD<Row> productVisitRowRDD = productVisitRDD.map(new Function<Tuple2<String, Integer>, Row>() {
@Override
public Row call(Tuple2<String, Integer> v1) throws Exception {
return RowFactory.create(v1._1.split("_")[0], v1._1.split("_")[1], v1._2);
}
});
List<StructField> fieldList = new ArrayList<StructField>();
fieldList.add(DataTypes.createStructField("category", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("product", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("visit", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(fieldList);
HiveContext hiveContext = new HiveContext(productVisitRDD.context());
DataFrame productVisitDF = hiveContext.createDataFrame(productVisitRowRDD, structType);
productVisitDF.show();
productVisitDF.registerTempTable("product_visit");
DataFrame top3DF = hiveContext.sql("select category, product, visit " +
"from ( " +
"select category, product, visit, " +
"row_number() over(partition by category order by visit desc) rank " +
"from product_visit " +
") tmp " +
"where rank < 4");
top3DF.show();
return null;
}
});
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();
}
}
Scala版本