[Spark源码剖析]Spark 延迟调度策略

本文旨在说明 Spark 的延迟调度及其是如何工作的

什么是延迟调度

在 Spark 中,若 task 与其输入数据在同一个 jvm 中,我们称 task 的本地性为 PROCESS_LOCAL,这种本地性(locality level)是最优的,避免了网络传输及文件 IO,是最快的;其次是 task 与输入数据在同一节点上的 NODE_LOCAL,数据在哪都一样的 NO_PREF,数据与 task 在同一机架不同节点的 RACK_LOCAL 及最糟糕的不在同一机架的 ANY

本地性越好,对于 task 来说,花在网络传输及文件 IO 的时间越少,整个 task 执行耗时也就更少。而对于很多 task 来说,执行 task 的时间往往会比网络传输/文件 IO 的耗时要短的多。所以 Spark 希望尽量以更优的本地性启动 task。延迟调度就是为此而存在的。

Spark的位置优先(1): TaskSetManager 的有效 Locality Levels这篇文章中,我们可以知道,假设一个 task 的最优本地性为 N,那么该 task 同时也具有其他所有本地性比 N 差的本地性。

假设调度器上一次以 locality level(本地性) M 为某个 taskSetManager 启动 task 失败,则说明该 taskSetManager 中包含本地性 M 的 tasks 的本地性 M 对应的所有节点均没有空闲资源。此时,只要当期时间与上一次以 M 为 taskSetManager 启动 task 时间差小于配置的值,调度器仍然会以 locality level M 来为 taskSetManager 启动 task

延时调度如何工作

函数TaskSetManager#getAllowedLocalityLevel是实现延时调度最关键的地方,用来返回当前该 taskSetManager 中未执行的 tasks 的最高可能 locality level。以下为其实现

/**
   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
   */
  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        currentLocalityIndex += 1
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

代码有点小长,好在并不复杂,一些关键注释在以上源码中都有注明。

循环条件为while (currentLocalityIndex < myLocalityLevels.length - 1)
其中myLocalityLevels: Array[TaskLocality.TaskLocality]是当前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合,本地性越高的 locality level 在 myLocalityLevels 中的下标越小(具体请参见//www.greatytc.com/p/05034a9c8cae

currentLocalityIndex 是 getAllowedLocalityLevel 前一次返回的 locality level 在 myLocalityLevels 中的索引(下标),若 getAllowedLocalityLevel 是第一次被调用,则 currentLocalityIndex 为0

整个循环体都在做这几个事情:

  1. 判断 myLocalityLevels(currentLocalityIndex) 这个级别的本地性对应的待执行 tasks 集合中是否还有待执行的 task

  2. 若无;则将 currentLocalityIndex += 1 进行下一次循环,即将 locality level 降低一级回到第1步

  3. 若有,且当前时间与上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) 时间间隔小于 myLocalityLevels(currentLocalityIndex) 对应的延迟时间(通过spark.locality.wait.process或spark.locality.wait.node或spark.locality.wait.rack配置),则 currentLocalityIndex 不变,返回myLocalityLevels(currentLocalityIndex)。这里是延迟调度的关键,只要当前时间与上一次以某个 locality level 启动 task 的时间只差小于配置的值,不管上次是否成功启动了 task,这一次仍然以上次的 locality level 来启动 task。说的更明白一些:比如上次以 localtyX 为 taskSetManager 启动 task 失败,说明taskSetManager 中 tasks 对应 localityX 的节点均没有空闲资源来启动 task,但 Spark 此时仍然会以 localityX 来为 taskSetManager 启动 task。为什么要这样做?一般来说,task 执行耗时相对于网络传输/文件IO 要小得多,调度器多等待1 2秒可能就可以以更好的本地性执行 task,避免了更耗时的网络传输或文件IO,task 整体执行时间会降低

  4. 若有,且当前时间与上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) 时间间隔大于 myLocalityLevels(currentLocalityIndex) 对应的延迟时间,则将 currentLocalityIndex += 1 进行下一次循环,即将 locality level 降低一级回到第1步


下面为帮助理解代码的部分说明

判断是否还有当前 locality level 的 task 需要执行

val moreTasks = myLocalityLevels(currentLocalityIndex) match {
    case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
    case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
    case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
    case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
  }

moreTasksToRunIn就不进行过多解释了,主要作用有两点:

  1. 对于不同等级的 locality level 的 tasks 列表,将已经成功执行的或正在执行的该 locality level 的 task 从对应的列表中移除
  2. 判断对应的 locality level 的 task 是否还要等待执行的,若有则返回 true,否则返回 false

myLocalityLevels(currentLocalityIndex) 等于 PROCESS_LOCAL 为例,这一段代码用来判断该 taskSetManager 中的 tasks 是否还有 task 的 locality levels 包含 PROCESS_LOCAL

if (!moreTasks)

若!moreTasks,则对currentLocalityIndex加1,即 locality level 变低一级,再次循环。

根据 //www.greatytc.com/p/05034a9c8cae 的分析我们知道,若一个 task 存在于某个 locality level 为 level1 待执行 tasks 集合中,那么该 task 也一定存在于所有 locality level 低于 level1 的待执行 tasks 集合。

从另一个角度看,对于每个 task,总是尝试以最高的 locality level 去启动,若启动失败且下次以该 locality 启动时间与上次以该 locality level 启动时间超过配置的值,则将 locality level 降低一级来尝试启动 task

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,141评论 5 461
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,101评论 2 372
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,318评论 0 321
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,357评论 1 266
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,205评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,194评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,595评论 3 384
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,285评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,578评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,642评论 2 311
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,402评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,266评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,641评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,933评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,220评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,591评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,780评论 2 335

推荐阅读更多精彩内容