在Spark中,我们通过将数据分区的方式,避免任务之间的数据通信,使每个任务都可以独立执行,通信只有在shuffle的时候才会发生。
在接下来的两节中,我们将介绍两种在Spark中共享内存的方式。在分布式编程中,共享内存是非常有用的,但容易被滥用的特性。它可以使得编程更简单,但额外的同步会让应用的性能下降,因此要在一定限制下使用。
广播变量与闭包
广播变量是指将一个变量发送到所有的executors中,这和将数据放入到闭包中发送给executors看似实现的是相同功能,但它们是有区别的。
最主要的区别在于分发速度的差距很大。当使用闭包将数据发送给executors时,这是一对多的关系;而使用广播变量时,数据是以多对多的方式传播的,类似于BT(bit-torrent)的方式。
举例1
例如我们想要用Spark解析IP对应的国家,我们有一个IP和国家的列表,假设这个列表的数据大小为1TB。一种方式是直接以join的方式实现,需要shuffle的数据量为1TB,开销比较大。因此更好的方式是使用map join的方式,将列表发送到每个task中,在task中直接完成join的操作。
如果使用闭包的方式将列表发送给每个executors,假如我们有1000个tasks,那需要传输的数据量是1000 * 1TB = 1000TB,所有的数据都是由driver向外传输的,driver的负载是很高的。
如果使用广播变量的话,driver只需要传输稍微多于1TB的数据,可能是2TB,这将提升500到1000倍的速度。这个例子中的数据也可以换成训练好的模型,或者参数等等。
举例2
假设我们先通过transformations计算出我们想要的字典RDD,并将这个字典进行广播,然后在另一个transformations过程中,使用这个广播变量进行计算。
sc = SparkContext(conf = ...)
# compute the dictionary
my_dict_rdd = sc.textFile(...).map(...).filter(...)
my_dict_data = my_dict_rdd.collect()
# distributed the dictionary via the broadcast variable
boardcast_var = sc.boardcast(my_dict_data)
# use the broadcast variable within the task
my_data_rdd = sc.textFile(...).filter(lambda x : x in broadcast_var.value)
这里相当于将计算上传到executors中,并使用driver作为coordinator。
注意广播变量都是只读的,不可修改的,这是它在使用中的一个缺点。在使用中,要注意内存是否足够,防止内存溢出。