1. 概述
按照官方文档,RDD
是表示不可变的、分区的、可并行计算的数据集合,有五个特点
编号 | 特点 | 变量名 |
---|---|---|
1 | 由若干个分区组成 | getPartitions |
2 | 一个函数用来计算每个分区 | compute |
3 | 有若干个依赖的RDD
|
deps |
4 | 对于RDD[(K,V)] ,可能会有一个Partitioner |
partitioner |
5 | 基于移动计算优于移动数据原则,每个分区都有优先的执行位置 | getPreferredLocations |
RDD
的操作函数分为2类,一类为transform
,一类为action
,前者是惰性的,不触发任务执行,每一个transform
操作都会生成新的RDD,后者则相反;操作形成了下游RDD依赖于上游RDD的依赖关系,依赖通过Dependency
来抽象,一类为窄依赖NarrowDependency
,它有OneToOneDependency
、RangeDependency
两个子类,另一类称为宽依赖,由ShuffleDependency
表示。
Spark通过RDD的优先计算位置,将RDD的计算本地性从高往低分为如下(在类TaskLocality
中定义,在最后一个小节会详细讨论):
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
2. 常用的transform操作
输出的RDD | 算子 | 描述 |
---|---|---|
MapPartitionsRDD |
map 、flatMap 、filter
|
输入的RDD与输出的RDD分区不会变 |
CoalescedRDD |
coalesce 、repartition
|
用来合并分区 |
PartitionwiseSampledRDD | sample |
用来对输入的RDD中的分区数据进行采样 |
PartitionerAwareUnionRDD | union |
当两个RDD的Partitioner相同时会生成,分区数不变 |
UnionRDD | union |
当两个RDD的Partitioner不同或者为null时会生成,分区数为2者之和 |
MapPartitionsRDD | intersection |
取两个RDD之间的交集,有Shuffle,底层通过cogroup实现 |
CartesianRDD | cartesian |
取两个RDD的笛卡尔积 |
PipedRDD | pipe |
形参是shell命令,表示将rdd中的数据当成shell命令的输入,获取处理后的输出 |
MapPartitionsRDD | mapPartitionsWithIndex |
将该分区的索引与整个分区作为输入 |
ZippedPartitionsRDD* | zipPartitions |
将所有RDD的相同分区的数据聚合在一起,注意所有RDD分区数要相同 |
SubtractedRDD | subtract |
获取不在形参RDD中的元素 |
CoGroupedRDD | cogroup |
将多个RDD key相同的值聚合在一起 |
3. PairRDDFunctions
PairRDDFunctions里面包含了RDD非常核心的操作,其中以combineByKeyWithClassTag
最为关键,因为它是reduceByKey
(reduceBy
的底层实现也是reduceByKey
)、groupByKey
(groupBy
)、aggregateByKey
、foldByKey
的底层实现
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
/*验证代码,忽略*/
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
整个流程如下:
- 对一个分区,创建一个map,当key不在map中是,执行
createCombiner
,map的类型为[K, C]
- 当key在map中时,调用
mergeValue
,将value合并到C
- 在不同分区的相同key进行shuffle聚合时,调用
mergeCombiners
,将map中的C
合并为另一个C
4. OrderedRDDFunctions
OrderedRDDFunctions包含了所有的排序操作
输出的RDD | 算子 | 描述 |
---|---|---|
ShuffledRDD | sortByKey |
将分区内的数据按Key进行排序,Paritioner变为RangePartitioner |
ShuffledRDD | repartitionAndSortWithinPartitions |
将分区内的数据按key进行排序,比sortByKey 灵活,可以定义key的分区规则,按value排序时可以使用 |
filterByRange |
取范围之内的数据,范围为[lower, upper],而不是(lower, upper] |
其它RDDFunctions
除了上述的Functions,还有DoubleRDDFunctions
、AsyncRDDActions
、SequenceFileRDDFunctions
.
DoubleRDDFunctions
可以用来作一些数学运算,比如平均值(mean)、方差(variance)、总和(sum)、直方图(histogram)AsyncRDDActions
可以用来异步计算RDDSequenceFileRDDFunctions
用来将RDD的数据写入到SequenceFile中
Partitioner
最常用的Partitioner是HashPartitioner
(默认分区器)、RangePartitioner
,HashPartitioner
实现逻辑简单,这里介绍下RangePartitioner
RangePartitioner的构建流程
- RangePartitioner尽量使每个分区的数据大致相同
- 对RDD的Key通过池塘采样(reservoir sample,从n个元素中取随机取k个元素作为样本,保证每个元素被选取的概率为k/n,采样总数不超过一百万,默认为3 * 20 *
原RDD的分区数
)在每个分区中进行采样- 对每个分区中的样本进行合并,形成一个候选集合,然后排序,根据分区数来均匀的从候选集合中取值,形成一个大小为指定分区数的数组来对key进行分区
- 在对某个key分区时,找到最接近自己的样本,该样本的索引既是key所在分区
- 所以,使用了RangePartitioner的算子(如sortByKey)得到的RDD,如果将key按照分区索引从小到大合并在一起形成一个集合,该集合的数据是有序的
5. 本地性
任务本地性级别获取分为如下几个步骤:
- 获取计算资源(
ExecutorData
)- 获取待计算RDD每个分区的输入数据所在的位置(即偏好位置,用
TaskLocation
抽象)- 通过
ExecutorData
与TaskLocation
来获取整个待计算RDD(TaskSetManager
)的可能的本地性级别- 通过调度规则来决定
TaskSetManager
中每个任务的本地性级别
5.1 获取计算资源
这里以yarn-client模式为例
大致过程如下:
- 在创建
SparkContext
时,调用YarnScheduler
的start
方法,里面会接着执行YarnClientSchedulerBackend
的start方法,YarnClientSchedulerBackend
会创建一个Client
与RM进行通信,启动AM- AM启动后,则会向RM申请资源,申请到资源后,启动executor
- executor启动后,会向Driver端注册
- 此时,
YarnClientSchedulerBackend
的成员变量executorDataMap
就有了exector的信息,其中在计算本地性最重要的就是executor所在的host
RDD优先计算位置
DAGScheduler
在提交stage时,会通过方法getPreferredLocs
来获取该stage的优先计算位置
// getPreferredLocs实际调用了getPreferredLocsInternal(rdd, partition. new HashSet), 深度遍历RDD的窄依赖
private def getPreferredLocsInternal(rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// 访问过的分区就不再重复访问了
if (!visited.add((rdd, partition))) {
return Nil
}
//如果该分区被缓存了,返回缓存地址,类型为ExecutorCacheTaskLocation
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// 通过子rdd重写的的preferredLocations方法获取偏好位置,比如HadoopRDD、
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// 从尾往头开始深度遍历,直到找到不为空的偏好位置,否则返回空
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
计算任务本地性
如图,在创建TaskSetManager时,先通过
addPendingTask
获取任务的候选本地性,然后通过computeValidLocalityLevels
获取真正的本地性
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
// 被Executor缓存的分区
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
// 输入的数据在HDFS上,且数据节点的host在申请的资源之中(HadoopRDD, ReliableCheckpointRDD)
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
case None => _
}
}
case _ => Unit
}
// 比如网络上的某个输入流或文件
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
// 如果输入数据的主机在某个机架上
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
// 没有偏好位置
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // 表示分区为index对应的任务还未提交
}
计算任务的本地性
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
levels.toArray
}
最后TaskSetManager
通过resourceOffer
方法来决定任务的本地性