背景
前段时间做的是一个流式项目里,场景为:对于流式数据,使用过滤规则进行实时过滤并产出结果数据。流式数据为源源不断的IP,筛选出在合格IP集合中的数据,传输到下游消息中间件中。
技术选型
上游数据从消息中间件中读取,处理采用Spark Streaming,下游也采用消息中间件。
广播变量
广播变量的适用场景
在Spark这种分布式计算中,如果每个算子都需要读取一个变量,并且变量的数据量最好在百级,则采用广播变量,把这个变量广播到各个executor算子中;
广播变量相对于外部变量的优点
- 减少了各个算子间数据的网络传输;很明显,广播变量一旦广播后,在所有executor中都存在了,不随task的变化而变化。但是对于外部变量来说,其随着task的变化而变化,如果task中用到了,它则需要从各个节点中拉取/传输/删除;
- 减少了内存占用;广播变量的数据存储在executor的共享内存中,即:一个executor中只存储一份广播变量;但是外部变量则是一个task中都存储一份,一个executor中分配到了多少task,则存储多少份外部变量;
ps:Spark中executor的内存管理见之前写的文章:https://blog.csdn.net/qq_35583915/article/details/109359939
广播变量的使用
val sparkConf = new SparkConf().setAppName("broadcast-in-spark")
sparkConf.set("spark-config-key","spark-config-value")
val sparkSession = SparkSession
.builder
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
//注意,因为一个Spark项目中只允许定义一个spark上下文,所以,后面用于广播变量的sparkContext只能从前面定义的sparkSession中获取,以保证不出现两个两个上下文定义
val sparkContext = sparkSession.getSparkContext
val broadcastUse = sparkContext.broadcast(useValue)
println(s"此次广播变量的内容为${broadcastUse.value}")
广播变量使用心得
- 最好能把广播变量当成一个常量一样去使用。当然不是说广播变量不能修改,只不过广播变量的修改步骤为:1.删除这个广播变量;2.使用当前名字重新定义新的广播变量。所以说,广播变量最符合的使用场景是常量情况下;
- 不能在算子中定义/修改/删除广播变量。除读操作外,广播变量的其他操作需要借助项目的Spark上下文环境,即sparkContext。而Spark的上下文环境只能在Driver中定义和使用,不能在Executor间进行序列化后传输。众所周知,算子的执行是在Executor中,但如外部变量、设置Spark执行环境等操作是在Driver中。[ps:之前总结的Driver/Executor等的作用见文章:https://blog.csdn.net/qq_35583915/article/details/109359346]因此,只能在算子中通过.value方法获取广播变量的值参与运算,不能对广播变量进行修改等操作。