广播数据变量
在App中经常会用到List、MaP等变量。如果不适用广播变量,默认每个task都会拉取一份副本到本地。广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。
广播变量,初始的时候,就在Drvier上有一份副本。 task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中,尝试获取变量副本;如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;此后这个executor上的task,都会直接使用本地的BlockManager中的副本。 executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,举例越近越好。
广播RDD
对于类似join等,两个RDD聚合的操作,通常会引起Shuffle,所以对于两个RDD一大一小的话,就可以使用广播变量:将数据量小的RDD "collect" 到driver本地,再广播出去。
对RDD进行广播,可以避免两个RDD之间的Shuffle。
SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);
//设置一个广播变量(只读)
int value = 5;
final Broadcast<Integer> broadcast = sc.broadcast(value);
//创建一个RDD
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(list);
//遍历RDD中的数据,并从广播变量中获取数据
JavaRDD<Integer> newRdd = rdd.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer x) throws Exception {
//获取广播变量的数据
int value = broadcast.value();
return x * value;
}
});
//遍历新的RDD
newRdd.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer x) throws Exception {
System.out.println(x);
}
});