Spark:Dynamic Resource Allocation【动态资源分配】

1. 问题背景
2. 原理分析
   2.1 Executor生命周期
   2.2 ExecutorAllocationManager上下游调用关系
3. 总结与反思
4. Community Feedback

1.问题背景

用户提交Spark应用到Yarn上时,可以通过spark-submit的num-executors参数显示地指定executor个数,随后,ApplicationMaster会为这些executor申请资源,每个executor作为一个Container在Yarn上运行。Spark调度器会把Task按照合适的策略分配到executor上执行。所有任务执行完后,executor被杀死,应用结束。在job运行的过程中,无论executor是否领取到任务,都会一直占有着资源不释放。很显然,这在任务量小且显示指定大量executor的情况下会很容易造成资源浪费。

在探究Spark如何实现之前,首先思考下如果自己来解决这个问题,需要考虑哪些因素?大致的方案很容易想到:如果executor在一段时间内一直处于空闲状态,那么就可以kill该executor,释放其占用的资源。当然,一些细节及边界条件需要考虑到:

  • executor动态调整的范围?无限减少?无限制增加?
  • executor动态调整速率?线性增减?指数增减?
  • 何时移除Executor?
  • 何时新增Executor了?只要由新提交的Task就新增Executor吗?
  • Spark中的executor不仅仅提供计算能力,还可能存储持久化数据,这些数据在宿主executor被kill后,该如何访问?
  • 。。。

2.原理分析

2.1 Executor生命周期

首先,先简单分析下Spark静态资源分配中Executor的生命周期,以spark-shell中的wordcount为例,执行命令如下:

# 以yarn模式执行,并指定executor个数为1
$ spark-shell --master=yarn --num-executors=1

# 提交Job1 wordcount
scala> sc.textFile("file:///etc/hosts").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# 提交Job2 wordcount
scala> sc.textFile("file:///etc/profile").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# Ctrl+C Kill JVM

上述的Spark应用中,以yarn模式启动spark-shell,并顺序执行两次wordcount,最后Ctrl+C退出spark-shell。此例中Executor的生命周期如下图:


static-allocation

从上图可以看出,Executor在整个应用执行过程中,其状态一直处于Busy(执行Task)或Idle(空等)。处于Idle状态的Executor造成资源浪费这个问题已经在上面提到。下面重点看下开启Spark动态资源分配功能后,Executor如何运作。


spark_dynamic_allocation_executor_lifecycle

下面分析下上图中各个步骤:

  1. spark-shell Start:启动spark-shell应用,并通过--num-executor指定了1个执行器。
  2. Executor1 Start:启动执行器Executor1。注意:Executor启动前存在一个AM向ResourceManager申请资源的过程,所以启动时机略微滞后与Driver。
  3. Job1 Start:提交第一个wordcount作业,此时,Executor1处于Busy状态。
  4. Job1 End:作业1结束,Executor1又处于Idle状态。
  5. Executor1 timeout:Executor1空闲一段时间后,超时被Kill。
  6. Job2 Submit:提交第二个wordcount,此时,没有Active的Executor可用。Job2处于Pending状态。
  7. Executor2 Start:检测到有Pending的任务,此时Spark会启动Executor2。
  8. Job2 Start:此时,已经有Active的执行器,Job2会被分配到Executor2上执行。
  9. Job2 End:Job2结束。
  10. Executor2 End:Ctrl+C 杀死Driver,Executor2也会被RM杀死。

上述流程中需要重点关注的几个问题:

  • Executor超时:当Executor不执行任何任务时,会被标记为Idle状态。空闲一段时间后即被认为超时,会被kill。该空闲时间由spark.dynamicAllocation.executorIdleTimeout决定,默认值60s。对应上图中:Job1 End到Executor1 timeout之间的时间。
  • 资源不足时,何时新增Executor:当有Task处于pending状态,意味着资源不足,此时需要增加Executor。这段时间由spark.dynamicAllocation.schedulerBacklogTimeout控制,默认1s。对应上述step6和step7之间的时间。
  • 该新增多少Executor:新增Executor的个数主要依据是当前负载情况,即running和pending任务数以及当前Executor个数决定。用maxNumExecutorsNeeded代表当前实际需要的最大Executor个数,maxNumExecutorsNeeded和当前Executor个数的差值即是潜在的新增Executor的个数。注意:之所以说潜在的个数,是因为最终新增的Executor个数还有别的因素需要考虑,后面会有分析。下面是maxNumExecutorsNeeded计算方法:
  private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
              tasksPerExecutorForFullParallelism)
      .toInt
  }
  • 其中numRunningOrPendingTasks为当前running和pending任务数之和。
  • executorAllocationRatio:最理想的情况下,有多少待执行的任务,那么我们就新增多少个Executor,从而达到最大的任务并发度。但是这也有副作用,如果当前任务都是小任务,那么这一策略就会造成资源浪费。可能最后申请的Executor还没启动,这些小任务已经被执行完了。该值是一个系数值,范围[0~1]。默认1.
  • tasksPerExecutorForFullParallelism:每个Executor的最大并发数,简单理解为:cpu核心数(spark.executor.cores)/ 每个任务占用的核心数(spark.task.cpus)。
问题1:executor动态调整的范围?无限减少?无限制增加?调整速率?

要实现资源的动态调整,那么限定调整范围是最先考虑的事情,Spark通过下面几个参数实现:

  • spark.dynamicAllocation.minExecutors:Executor调整下限。(默认值:0)
  • spark.dynamicAllocation.maxExecutors:Executor调整上限。(默认值:Integer.MAX_VALUE)
  • spark.dynamicAllocation.initialExecutors:Executor初始数量(默认值:minExecutors)。

三者的关系必须满足:minExecutors <= initialExecutors <= maxExecutors

注意:如果显示指定了num-executors参数,那么initialExecutors就是num-executor指定的值。

问题2:Spark中的Executor既提供计算能力,也提供存储能力。这些因超时被杀死的Executor中持久化的数据如何处理?

如果Executor中缓存了数据,那么该Executor的Idle-timeout时间就不是由executorIdleTimeout决定,而是用spark.dynamicAllocation.cachedExecutorIdleTimeout控制,默认值:Integer.MAX_VALUE。如果手动设置了该值,当这些缓存数据的Executor被kill后,我们可以通过NodeManannger的External Shuffle Server来访问这些数据。这就要求NodeManager中spark.shuffle.service.enabled必须开启。

2.2 ExecutorAllocationManager上下游调用关系

Spark动态分配的主要逻辑由ExecutorAllocationManager类实现,首先分析下与其交互的上下游关系,如下图所示:


spark_dynamic_allocation

主要的逻辑很简单:ExecutorAllocationManager中启动一个周期性任务,监控当前Executor是否超时,如果超时就将其移除。当然Executor状态的收集主要依赖于Spark提供的SparkListener机制。周期性任务逻辑如下:

private[spark] class ExecutorAllocationManager {

  // Executor that handles the scheduling task.
  private val executor =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

  def start(): Unit = {
    。。。
    val scheduleTask = new Runnable() {
      override def run(): Unit = {
        try {
          schedule()
        } catch {...}
      }
    }
    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
    。。。
  }
  
  private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis
    // 同步当前所需要的Executor数
    updateAndSyncNumExecutorsTarget(now)

    val executorIdsToBeRemoved = ArrayBuffer[String]()
    // removeTimes是<executorId, expireTime>的映射。
    removeTimes.retain { case (executorId, expireTime) =>
      val expired = now >= expireTime
      if (expired) {
        initializing = false
        executorIdsToBeRemoved += executorId
      }
      !expired
    }
    // 移除所有超时的Executor
    if (executorIdsToBeRemoved.nonEmpty) {
      removeExecutors(executorIdsToBeRemoved)
    }
  }
}

以上就是对于Spark的动态资源分配的原理分析,相关源码可以参考Apache Spark:ExecutorAllocationManager。完整的配置参数见:Spark Configuration: Dynamic Allocation

3.总结与反思

  1. Pascal之父Nicklaus Wirth曾经说过一句名言:程序=算法+数据结构。对于Spark动态资源分配来说,我们应更加关注算法方面,即其动态行为。如何分配?如何伸缩?上下游关系如何?等等。
  2. 回馈社区:回馈是一种输出,就迫使我们输入的质量要足够高。这是一种很有效的技能提升方式。万事开头难,从最简单的typo fix/docs improvement起步。

4. Community Feedback

  1. 完善Executor相关参数的文档说明。SPARK-26446: Add cachedExecutorIdleTimeout docs at ExecutorAllocationManager
  2. fix bug:SPARK-26588:Idle executor should properly be killed when no job is submitted

参考

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2zo9qc4727c4s

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容