RDD总结

1. 概述

按照官方文档,RDD是表示不可变的、分区的、可并行计算的数据集合,有五个特点

编号 特点 变量名
1 由若干个分区组成 getPartitions
2 一个函数用来计算每个分区 compute
3 有若干个依赖的RDD deps
4 对于RDD[(K,V)],可能会有一个Partitioner partitioner
5 基于移动计算优于移动数据原则,每个分区都有优先的执行位置 getPreferredLocations

RDD的操作函数分为2类,一类为transform,一类为action,前者是惰性的,不触发任务执行,每一个transform操作都会生成新的RDD,后者则相反;操作形成了下游RDD依赖于上游RDD的依赖关系,依赖通过Dependency来抽象,一类为窄依赖NarrowDependency,它有OneToOneDependencyRangeDependency两个子类,另一类称为宽依赖,由ShuffleDependency表示。
Spark通过RDD的优先计算位置,将RDD的计算本地性从高往低分为如下(在类TaskLocality中定义,在最后一个小节会详细讨论):

PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY

2. 常用的transform操作

输出的RDD 算子 描述
MapPartitionsRDD mapflatMapfilter 输入的RDD与输出的RDD分区不会变
CoalescedRDD coalescerepartition 用来合并分区
PartitionwiseSampledRDD sample 用来对输入的RDD中的分区数据进行采样
PartitionerAwareUnionRDD union 当两个RDD的Partitioner相同时会生成,分区数不变
UnionRDD union 当两个RDD的Partitioner不同或者为null时会生成,分区数为2者之和
MapPartitionsRDD intersection 取两个RDD之间的交集,有Shuffle,底层通过cogroup实现
CartesianRDD cartesian 取两个RDD的笛卡尔积
PipedRDD pipe 形参是shell命令,表示将rdd中的数据当成shell命令的输入,获取处理后的输出
MapPartitionsRDD mapPartitionsWithIndex 将该分区的索引与整个分区作为输入
ZippedPartitionsRDD* zipPartitions 将所有RDD的相同分区的数据聚合在一起,注意所有RDD分区数要相同
SubtractedRDD subtract 获取不在形参RDD中的元素
CoGroupedRDD cogroup 将多个RDD key相同的值聚合在一起

3. PairRDDFunctions

PairRDDFunctions里面包含了RDD非常核心的操作,其中以combineByKeyWithClassTag最为关键,因为它是reduceByKeyreduceBy的底层实现也是reduceByKey)、groupByKey(groupBy)、aggregateByKeyfoldByKey的底层实现

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    /*验证代码,忽略*/
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

整个流程如下:

  1. 对一个分区,创建一个map,当key不在map中是,执行createCombiner,map的类型为[K, C]
  2. 当key在map中时,调用mergeValue,将value合并到C
  3. 在不同分区的相同key进行shuffle聚合时,调用mergeCombiners,将map中的C合并为另一个C

4. OrderedRDDFunctions

OrderedRDDFunctions包含了所有的排序操作

输出的RDD 算子 描述
ShuffledRDD sortByKey 将分区内的数据按Key进行排序,Paritioner变为RangePartitioner
ShuffledRDD repartitionAndSortWithinPartitions 将分区内的数据按key进行排序,比sortByKey灵活,可以定义key的分区规则,按value排序时可以使用
filterByRange 取范围之内的数据,范围为[lower, upper],而不是(lower, upper]

其它RDDFunctions

除了上述的Functions,还有DoubleRDDFunctionsAsyncRDDActionsSequenceFileRDDFunctions.

  1. DoubleRDDFunctions可以用来作一些数学运算,比如平均值(mean)、方差(variance)、总和(sum)、直方图(histogram)
  2. AsyncRDDActions可以用来异步计算RDD
  3. SequenceFileRDDFunctions用来将RDD的数据写入到SequenceFile中

Partitioner

最常用的Partitioner是HashPartitioner(默认分区器)、RangePartitionerHashPartitioner实现逻辑简单,这里介绍下RangePartitioner

RangePartitioner的构建流程
  1. RangePartitioner尽量使每个分区的数据大致相同
  2. 对RDD的Key通过池塘采样(reservoir sample,从n个元素中取随机取k个元素作为样本,保证每个元素被选取的概率为k/n,采样总数不超过一百万,默认为3 * 20 * 原RDD的分区数)在每个分区中进行采样
  3. 对每个分区中的样本进行合并,形成一个候选集合,然后排序,根据分区数来均匀的从候选集合中取值,形成一个大小为指定分区数的数组来对key进行分区
  4. 在对某个key分区时,找到最接近自己的样本,该样本的索引既是key所在分区
  5. 所以,使用了RangePartitioner的算子(如sortByKey)得到的RDD,如果将key按照分区索引从小到大合并在一起形成一个集合,该集合的数据是有序的

5. 本地性

任务本地性级别获取分为如下几个步骤:

  1. 获取计算资源(ExecutorData)
  2. 获取待计算RDD每个分区的输入数据所在的位置(即偏好位置,用TaskLocation抽象)
  3. 通过ExecutorDataTaskLocation来获取整个待计算RDD(TaskSetManager)的可能的本地性级别
  4. 通过调度规则来决定TaskSetManager中每个任务的本地性级别
5.1 获取计算资源

这里以yarn-client模式为例


spark_获取资源时序图.PNG

大致过程如下:

  1. 在创建SparkContext时,调用YarnSchedulerstart方法,里面会接着执行YarnClientSchedulerBackend的start方法,YarnClientSchedulerBackend会创建一个Client与RM进行通信,启动AM
  2. AM启动后,则会向RM申请资源,申请到资源后,启动executor
  3. executor启动后,会向Driver端注册
  4. 此时,YarnClientSchedulerBackend的成员变量executorDataMap就有了exector的信息,其中在计算本地性最重要的就是executor所在的host
RDD优先计算位置

DAGScheduler在提交stage时,会通过方法getPreferredLocs来获取该stage的优先计算位置

// getPreferredLocs实际调用了getPreferredLocsInternal(rdd, partition. new HashSet), 深度遍历RDD的窄依赖
private def getPreferredLocsInternal(rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// 访问过的分区就不再重复访问了
    if (!visited.add((rdd, partition))) {
      return Nil
    }
    //如果该分区被缓存了,返回缓存地址,类型为ExecutorCacheTaskLocation 
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 通过子rdd重写的的preferredLocations方法获取偏好位置,比如HadoopRDD、
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    // 从尾往头开始深度遍历,直到找到不为空的偏好位置,否则返回空
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }
      case _ =>
    }
    Nil
  }
计算任务本地性

spark_计算本地性级别时序图

如图,在创建TaskSetManager时,先通过addPendingTask获取任务的候选本地性,然后通过computeValidLocalityLevels获取真正的本地性

private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        // 被Executor缓存的分区
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
       // 输入的数据在HDFS上,且数据节点的host在申请的资源之中(HadoopRDD, ReliableCheckpointRDD)
        case e: HDFSCacheTaskLocation => {
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) => {
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
            case None => _
          }
        }
        case _ => Unit
      }
// 比如网络上的某个输入流或文件
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
// 如果输入数据的主机在某个机架上
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }
// 没有偏好位置
    if (tasks(index).preferredLocations == Nil) {
      pendingTasksWithNoPrefs += index
    }

    allPendingTasks += index  // 表示分区为index对应的任务还未提交
  }

计算任务的本地性

private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    if (!pendingTasksForExecutor.isEmpty &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty &&
        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    if (!pendingTasksForRack.isEmpty &&
        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
      levels += RACK_LOCAL
    }
    levels += ANY
    levels.toArray
  }

最后TaskSetManager通过resourceOffer方法来决定任务的本地性

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容