Spark RDD实现分析

RDD的概述

RDD是只读的、分区记录的集合,是Spark编程模型的最主要抽象,它是一种特殊的集合,支持多种数据源,有容错机制、衍生血缘关系、可被缓存、支持并行操作。

RDD的属性特征

1)分区(partition)

数据集的基本组成单位。包含一个数据分片列表,将数据进行切分,并决定并行的计算的粒度。其中分片的个数可由程序指定(默认值为程序分配的CPU数量)。每个分区分配的存储由BlockManager实现,分区都被逻辑映射成BlockManager的一个Block(Block被一个Task负责计算)。

RDD Partition存储和计算模型

另外,从BlockManager的源码中可以看出,把分区的数据存储需要制定的几个参数:

blockId:块id

data:分区的数据buffer

level:rdd存储的持久化等级

BlockManager存储分区数据

2)函数(compute)

一个计算每个分区的函数,RDD的计算以分片为单位,每个RDD实现compute函数。通俗来说,compute用于计算每个分片,得出一个可遍历的结果,用于描述在父RDD上执行的计算。

3)依赖(dependency)

RDD的转换都会生成新的RDD,RDD之间形成子->父的依赖关系(源RDD没有依赖),通过依赖关系描述血缘关系(lineage),在部分分区数据丢失的,通过lineage重建丢失的分区数据,提升整体运算效率。

4)优先位置(preferred location)

每个分片的优先计算位置,Spark在进行任务调度的时候,会尽可能滴将计算任务分配到所需数据块的存储位置,满足“移动计算优先移动数据”的理念。

5)分区策略(Partitioner)

RDD分片函数,描述分区模式和数据分片粒度。Partitioner函数决定RDD本身的分片数据,同事决定了parent RDD Shuffle输出时的分片数据。

Spark实现两种类型的分片函数:基于哈希的HashPartitioner和基于范围的RangePartitioner。区别在于只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None的。

具体区别详看下一篇《Spark RDD分区策略

RDD创建

细分来说,Spark有二种方式创建RDD

1、并行化已存在的Scala集合

scala> val data = Array(1,2,3,4,5)

data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data) //这里关注slices参数,指定数据集切分成几个分区

distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :29

scala> distData.reduce((a,b) => a + b)

res1: Int = 15

2、通过外部文件系统的数据集创建

SparkContext类中的textFile用于创建文本类型的RDD

def textFile(path:String, minPartitions: Int = defaultMinPartitions)

path参数:指定文件的URI地址(hdfs://、本地等等)

minPartitions参数:指定分片数

scala> val distFile = sc.textFile("/Users/irwin/zookeeper.out")

distFile: org.apache.spark.rdd.RDD[String] = /Users/irwin/zookeeper.out MapPartitionsRDD[4] at textFile at :27

scala> distFile.map(s => s.length).reduce((a, b) => (a+b))

res3: Int = 102295

当然SparkContext中包含其他创建RDD的方法,如:

def wholeTextFiles(path:String, minPartitions: Int = defaultMinPartitions)

defhadoopRDD[K,V](conf: JobConf, inputFormatClass:Class[_ <: InputFormat[K,V]], keyClass:Class[K], valueClass:Class[V], minPartitions: Int = defaultMinPartitions)

RDD操作

RDD包含一系列的转换(Transformation)与执行(Action)。

转换

所有转换操作都是惰性的,指定处理相互依赖关系,是数据集的逻辑操作,并未真正计算。

执行

该操作指定数据的形式,当发生Action操作时,Spark将Action之间的所有Transformation组成的Job会并行计算。

例如从上面例子可以看到,只有在distFile.map的时候,Job才会真正执行,且返回最后Action的结构。

这里有个关键点:当每个Job计算完成,其内部所有RDD会被清除。所以有RDD需要重复使用,则使用Persist(或Cache)的方法将RDD持久化,详见下文RDD缓存介绍。

RDD操作

通过上图RDD操作列表可以看到,有以下内容:

RDD的Transformation操作、Action操作(常用执行操作、存储执行操作)、缓存操作、checkpoint操作,具体用法及意义可以详见官方文档。

RDD缓存

Spark持久化指的是在不同Transformation过程中,将数据集缓存在内存中,实现快速重用、故障快速恢复。

主动持久化

程序主动通过persist()或cache()方法操作标记需被持久化的RDD,事实上cache()使用的是persist()的默认方法。下面我们来看看持久化的等级:

持久化等级

根据名称可以理解对应Level的意义,其中带“SER”表示将RDD序列化为Java对象,带“2”表示将每个分区赋值到两个集群节点。Persist持久化RDD,会修改原来RDD的 meta info 中的StorageLevel,可以看到最后返回的是this,说明返回的是修改的RDD对象本身,而非产生了新的RDD。

newLevel替换原来storageLevel值

另外,从RDD类的源码中我们可以看到:

RDD

默认persist()使用内存存储,而cache()调用的是persist()默认实现。

自动持久化

指的是Spark自动保存一些Shuffle操作的中间结果。很容易理解,Spark为了表面Shuffle过程中出现异常的快速恢复。

再说一点,persist()后不一定说就不丢失,在内存不足的情况也是可能被删除。但是用户不用关心这块,RDD的容错机制保证了丢失也能计算正确执行。RDD通过 meta info 中的 Lineage 可以重算丢失的数据。

RDD依赖关系

Spark根据提交任务的计算逻辑(亦即是RDD的Transformation和Action)生成RDD之间的依赖关系,同时也生成逻辑上的DAG。这里说到的依赖关系指的是对父RDD依赖,这个关系包含两种类型:narrow dependency 和 wide dependency。

窄依赖(narrow dependency)

指每一个 parent RDD 的 Partition 最多被子RDD的一个Partition使用。从数据的角度来看,窄依赖的RDD整个操作都可以在同一个集群节点执行,以pinpeline的方式计算所有父分区,不会造成网络之间的数据混合。

窄依赖

宽依赖(wide dependency)

与窄依赖相反,指的是子RDD的Partition会依赖所有parent RDD的所有或多个Partition。宽依赖RDD会涉及数据混合,宽依赖需要首先计算好所有父分区的数据,然后在节点间进行Shuffle。

宽依赖

所有依赖的基类是traid Dependency[T],这是一个纯虚类:

class Dependency

其中rdd就是依赖的Parent RDD

对于窄依赖的实现是:

窄依赖实现

窄依赖有两种具体的实现

1、OneToOneDependency

OneToOneDependency

2、RangeDependency

RangeDependency

UnionRDD将多个RDD合成一个RDD,从上述分析,合成后的RDD每个parent RDD的partition的相对顺序是不会变。对于合并后的UnionRDD而言,每个parent RDD与其Partition的其实位置不同。

从RangeDependency类中可以看到,partitionId - outStart(UnionRDD起始位置) + inStart(parent RDD的起始位置),通过上述计算方式找到parent RDD对应的Partition。

对于宽依赖的实现是:

宽依赖实现

子RDD依赖parent RDD的所有Partition,因此需求shuffle过程。Shuffle过程由ShuffleManager控制,而宽依赖包含以下两种:基于Hash的HashShuffleManager以及基于排序的SortShuffleManager

ShuffleManager

DAG的构建

上述已经提到RDD通过一系列Transformation和Action形成了DAG,Spark根据DAG生成计算任务:

第一步:划分Stage

根据依赖关系的不同将DAG划分不同的阶段。对于窄依赖,由于Partition的去定型,窄依赖划分到同一个执行阶段;对于宽依赖,需等待parent RDD Shuffle处理完成,所以Spark根据宽依赖将DAG划分不同的Stage。

第二步:Stage内部分配Task

每个Partition都会分配一个计算Task(并行执行),Stage之间根据依赖关系编程一个粗粒度的DAG。

第三步:执行顺序

DAG的执行的顺序是从前往后,亦即是Stage只有其parent Stage执行完成后才执行(当然起始的stage不需要)。

Task的执行

上述提到,RDD的Transformation操作是惰性的,只有发生Action才会生成Job,而这个Job会映射成一个粗粒度的DAG,DAG执行每一个Stage会将Partition分配计算Task,这些Task会被提交到集群上执行计算,执行计算的逻辑部分为:Executor。

Spark的Task有两类:

org.apache.spark.scheduler.ShuffleMapTask与org.apache.spark.scheduler.ResultTask

回想一下,Transformation和Action的区别,Action会返回数据,而Transformation只进行RDD的转换。Spark的Task类型分别对应这两类,下面简单分析一下Task的执行过程。

org.apache.spark.scheduler.Task的run()方法开始执行Task

Task
runTask
ShuffleMapTask#runTask
RDD#iterator

runTask最终调用RDD的iterator,Task的计算从这里开始。

小结

至此,RDD的实现分析已经介绍完毕,我们回顾一下:

RDD的属性特征:分区、函数、依赖、优先位置、分区策略;

RDD缓存:持久化等级、主动持久化、自动持久化

RDD的操作:Transformation、Action

RDD依赖:窄依赖、宽依赖

DAG与Task

RDD是Spark最基本、最根本的数据抽象。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,914评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,935评论 2 383
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,531评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,309评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,381评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,730评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,882评论 3 404
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,643评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,095评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,448评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,566评论 1 339
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,253评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,829评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,715评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,945评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,248评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,440评论 2 348

推荐阅读更多精彩内容