What is Spark Partition?
分区(Partitioning)实质上是将数据划分为多个部分。在分布式系统中,分区被定义为将大数据集分割后存储为集群的多个文件块。
基于数据局部性的原则,Worker节点拉取靠近自己的数据执行计算任务。通过分区,网络IO请求量大大降低,从而使得数据处理的速度大大提高。
数据的分区是基于 RDD 实现的,不同分区的数据可以在不同的节点上被不同计算任务。RDD 中单个分区内的数据只会被同一个任务(Task)处理。
Partition Techniques
HashPartitioner
HashPartitioner 是 Spark 中的默认分区方法,基于 Java 中 Object.hashcode()
实现分区功能. hashcode()
的作用是保证相等的对象拥有相同的 hashcode.
RangePartitioner
如果 RDD 中的数据是可排序的,则可以选择 RangePartitioner 将数据记录(Record)划分为几乎相等的区间。Spark 通过对 RDD 内数据进行采样的方法来确定区间边界。
RangePartitioner 首先基于键值对数据记录排序,然后依据传入的分区数参数对记录进行分区。
CustomPartitioner
通过继承 Partitioner 类,可以创建新的分区方法,以实现自定义分区数量并决定每个分区中存储什么数据记录。
Code Snippets
Partition
分区在 Spark 中被定义为特质(Trait),如下所示:
/**
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
Partition 的定义中,我们发现,分区中最重要的几个元素是:
- index, 即该记录在父 RDD 中的分区编号,通常一条记录所属的分区可由其在父 RDD 中的分区编号推导得到;
- hashCode(), 该分区的 hashCode;
- equals(other), 用于判断两个分区是否相等的方法。
Partitioner
Partitioner 类在 Spark 中的定义如下:
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
从定义中可以看到,一个具体的分区方法必须实现两个方法:
- numPartitions, 返回该分区方法最终将划分出的类别总数量;
- getPartition(key), 根据传入的键值,确定该记录所属的分区。
Experiences
Spark 中通过 spark.default.parallelism
确定了 RDD 中分区的数量,通常取值为集群中可用核心(Core)数量的 2~3 倍。