RDD,即Resilient Distributed Dataset,是Spark的核心概念。这篇文章就是讲讲spark的rdd依赖关系的,不理解Spark的rdd依赖关系,很多东西你都无法理解。
Spark的RDD依赖主要分为两大类,一类是窄依赖(Narrow Dependencies),一类是宽依赖(Wide Dependencies)。
他们的关系如下图所示
可以看到,所谓的窄依赖指的是父RDD的每个分区都至多被子RDD的分区使用,而宽依赖指的是多个子RDD的分区依赖一个父RDD的分区,这就涉及到shuffle操作了。
接下让我们通过一段代码看看什么操作是宽依赖,什么操作是窄依赖。
object DependencyDemo {
def printDependencyInfo(dep: Dependency[_]) {
println("Dependency type : " + dep.getClass)
println("Dependency RDD : " + dep.rdd)
println("Dependency partitions : " + dep.rdd.dependencies)
println("Dependency partitions size : " + dep.rdd.dependencies.length)
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val rdd = sc.textFile("/user/cloudera/barcelona/births/births.csv")
val wordPair = rdd.flatMap(_.split(",")).map(x => (x, 1))
wordPair.dependencies.foreach(printDependencyInfo(_))
val wordCount = wordPair.reduceByKey(_ + _)
wordCount.dependencies.foreach(printDependencyInfo(_))
}
}
把这段代码用spark-shell运行或者yarn-client模式提交下得到的结果如下
19/04/30 20:16:52 INFO spark.SparkContext: Created broadcast 0 from textFile at DependencyDemo.scala:23
Dependency type : class org.apache.spark.OneToOneDependency
Dependency RDD : MapPartitionsRDD[2] at flatMap at DependencyDemo.scala:24
Dependency partitions : List(org.apache.spark.OneToOneDependency@4cc2e9fc)
Dependency partitions size : 1
19/04/30 20:16:52 INFO mapred.FileInputFormat: Total input paths to process : 1
Dependency type : class org.apache.spark.ShuffleDependency
Dependency RDD : MapPartitionsRDD[3] at map at DependencyDemo.scala:24
Dependency partitions : List(org.apache.spark.OneToOneDependency@38b1b855)
Dependency partitions size : 1
可以看到第一个wordPair是OneToOneDependency,第二个wordCount是ShuffleDependency。
我们查看下spark的源代码看看spark如何定义Dependency的,我这里的spark版本是1.6.2。
图中的ShuffleDependency就是宽依赖。
我们看下ShuffleDependency的代码,代码其实很简单,里面关键的就是生成一个shuffleId并向ShuffleManager注册
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
OneToOneDependency的代码就更简单了,里面只有一个获取parent的方法。
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
这些知识对理解spark的核心还是很有用的,如何划分stage,就是根据RDD的依赖关系来的,遇到宽依赖就划分stage,这个后面我们有时间专门写一篇文章讲讲spark的DAGScheduler如何划分stage。