Spark将数据拆分为分区并并行执行分区上的计算。您应该了解数据的分区方式以及何时需要手动调整分区以使Spark计算有效运行。
分区介绍
创建一个数值型的DataFrame来说明数据是如何分区的
val x = (1 to 10).toList
val numbersDf = x.toDF(“number”)
实验机器上,这个numbersDf被分为2个分区
scala> numbersDf.rdd.partitions.size
res0: Int = 2
将DataFrame写入磁盘时,每个分区都是一个单独的CSV文件。
numbersDf.write.csv("/usr/local/spark_output/numbers")
结果如下:
coalesce
coalesce方法减少了DataFrame中的分区数量。以下是如何合并两个分区中的数据:
val numbersDf2 = numbersDf.coalesce(1)
我们可以验证coalesce是否只创建了一个只有一个分区的新DataFrame:
scala> numbersDf2.rdd.partitions.size
res2: Int = 1
numbersDf2将作为一个文本文件写入磁盘:
numbersDf2.write.csv("/usr/local/spark_output/numbers")
结果如下:
合并算法将数据从分区B移动到分区A,并将数据从分区D移动到分区C.分区A和分区C中的数据不随合并算法一起移动。该算法在某些情况下很快,因为它最小化了数据移动。
增加分区
我们尝试使用coalesce来增加分区,但是并不生效:
val numbersDf3 = numbersDf.coalesce(4)
scala> numbersDf3.rdd.partitions.size
res6: Int = 2
合并算法通过将数据从某些分区移动到现有分区来更改节点数。该算法显然不能增加分区数量。
repartition
repartition方法可用于增加或减少DataFrame中的分区数。
让我们用numbersDf创建一个带有两个分区的homerDf。
scala> val homerDf = numbersDf.repartition(1)
scala> homerDf.rdd.partitions.size
res7: Int = 1
让我们检查homerDf中每个分区的数据:
scala> homerDf.write.csv("/usr/local/spark_output/numbers")
重分区算法执行完整数据混洗,并在分区之间平均分配数据。它不会尝试像合并算法那样最小化数据移动。
增加分区
重分区方法也可用于增加分区数
scala> val bartDf = numbersDf.repartition(6)
scala> bartDf.rdd.partitions.size
res11: Int = 6
以下是如何在bartDf中的分区之间拆分数据
重新分区方法可以完全重排数据,因此可以增加分区数。
coalesce和repartition之间的区别
重新分区算法对数据进行完全重排,并创建相同大小的数据分区。coalesce结合现有分区以避免完全洗牌。
按列repartition
让我们使用以下数据来检查特定列如何重新对DataFrame进行分区。先创建DataFrame
val people = List(
(10,"blue"),
(13,"red"),
(15,"blue"),
(99,"red"),
(67,"blue")
)
val peopleDf = people.toDF("age","color")
让我们通过color列重新分区DataFrame:
val colorDf = peopleDf.repartition($"color")
按列分区时,Spark默认会创建至少200个分区。查看分区数据,只有两个分区有数据,且同一个分区中的数据的color字段是一致的。
scala> colorDf.rdd.partitions.size
res15: Int = 200
colorDf包含每种color的不同分区,并针对color提取进行了优化。按列分区类似于索引关系数据库中的列。
考虑分区
DataFrames的分区似乎是一个底层实现细节,应该由框架来管理,但事实并非如此。在将大数据解析为小数据时,几乎总是应该重新分区数据。
你可能会经常把大数据通过过滤成小数据,所以习惯重新分区。