在Spark中,完成计算主要依赖RDD数据结构,RDD(弹性分布式数据集)是一个重要的API,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销。
当一个Spark应用被提交时,首先会根据Spark中的RDD对象构建一个DAG(有向无环图),再将DAG交给DAGScheduler解析。而DAG的构建就需要通过DAG之间的依赖来构建了。
依赖分为宽依赖和窄依赖两种。Spark工作在集群上时,被提交的应用程序会被显示或隐式地划分为几个不同的分区,方便master将工作分配给各个worker,一个应用程序会被分为几个不同的阶段,我们以最简单的WordCount为例
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Count {
def main(args : Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile("file:///Users/apple/codelib/text.txt")
val wordCount = textFile.
flatMap(line => line.split(" ")).
map(word => (word, 1)).
reduceByKey((a, b) => a + b)
wordCount.foreach(println)
}
}
一般如果没有显示地定义的话,会根据你集群的数量和cpu核数来确定分区数。在这个程序中,textFile中的文本先被按行分开,然后将单词变为(word,1)格式,最后用reduceByKey完成reduce操作,在这之中map操作并不会出发shuffle,故不会触发分区的变化,并且前一阶段到后一阶段的会形成窄依赖。如果触发了reduce操作,则会形成宽依赖。Spark通过依赖构建起DAG(有向无环图),完成之后的调度。
对于依赖来说,并不需要复杂的功能,只需要通过依赖找到父rdd即可,在窄依赖之中,由于一个或多个父分区只和一个子分区关联,故较容易实现。这是OneToOneDependency类里的实现:
def getParents(partitionId: Int): List[Int] = List(partitionId)
除此之外,窄依赖还有一种实现方法,被称为范围依赖,仅在UnionRDD中有使用,被用作范围依赖。具体用法可以参考UnionRDD的实现。
而宽依赖的实现稍微复杂一些
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
//获取新的shuffleId
val shuffleId: Int = _rdd.context.newShuffleId()
//向ShuffleManager注册Shuffle的信息
val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
因为牵涉到shuffle操作,所以宽依赖的实现中加入了shuffle相关的信息。
在构建好依赖后,spark会将任务划分为几个stage,而划分stage的依据就是宽依赖,spark会将宽依赖的地方断开,形成阶段,从而规划任务的执行。