一、广播变量
Driver端new一个list,假设给每个Excutor发送100个task,每个task带一个list过来,100个task带100个list,放在Excutor端的内存里,这样会存在问题,所以使用广播变量来解决。
使用广播变量,在每个Excutor端有blockManager来管理这个list,当task执行时先去blockManager内寻找list,不必获取Driver的list,如此100个task共享使用这个list即可。
广播变量使用
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
sc.stop()
注意事项:
- 能不能将一个RDD使用广播变量广播出去?
不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 - 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
二、累加器
下图中代码内部给i+1是在Excutor端执行的,并不改变外面Driver端i的值,所以最终打印结果i=0,在Driver端定义一个累加器来解决此问题。
累加器相当于把每个RDD分区处理的结果进行累加。
累加器使用
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()
注意事项:
- 累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新