Spark 任务调度系统

如何工作

先将用户构建的DAG转化为分布式任务,结合分布式集群资源的可用性,基于调度规则依序把分布式任务分发到执行器。

执行步骤

image

调度的核心组件

image

DAGScheduler

主要职责:

1.把用户DAG拆分为Stages

2.在Stage内部创建Tasks,这些任务囊括了用户通过组合不同算子实现的数据转换逻辑,然后执行器Executors接受到Tasks,将其中封装的计算函数应用于分布式数据分片,去执行分布式的计算过程。

在分发任务之前,调度系统得先判断哪些节点得计算资源空闲,然后再把任务分发过去。

TaskScheduler

职责:基于既定规则于策略达成供需双方的匹配于撮合,核心是任务调度的规则与策略

image

TaskScheduler的调度分为两个层次,一个是Stages之间的调度优先级,一个是Stages内不同任务之间的调度优先级。

不同Stages之间

首先,对于两个或者多个Stages,如果它们之间不存在依赖关系,互相独立,在面对同一份可用资源的时候,就会存在竞争关系,这个时候,先调度谁,或者说谁有限享用这份计算资源。就得基于既定规则与协议。

TaskScheduler提供 两种调度模式,分别是FIFO和FAIR(公平调度),FAIR取决于配置文件failscheduler.xml定义。

同一个Stage之间

当TaskScheduler接受到来自于SchedulerBackend的WorkerOffer后,TaskScheduler会优先选择那些满足本地性级别的要求的任务进行分发。本地性级别有4种,process local < Node local <Rack local < Any,分别是进程本地性,节点本地性,机架本地性和跨机架本地性。从左到右,计算任务访问所需数据的效率越来越差。

进程本地性表示计算任务所需的输入数据就在某一个Executor进程内,因此把这样的计算任务调度到目标进程内最划算。同理,如果数据源还未加载到Executor进程,而是存储在某一计算节点的磁盘中,那么把任务调度到目标节点上去,也是一个不错的选择。再次,如果我们无法确定输入源在哪台机器,但可以肯定它一定在某个机架上,本地性级别就会退化到Racklocal。

DAGScheduler划分Stages、创建分布式任务的过程中,会为每一个任务指定本地性级别,本地性级别中会记录该任务有意向的计算节点地址,甚至是Executor进程 ID。换句话说,任务自带调度意愿,它通过本地性级别告诉TaskScheduler自己更乐意被调度到哪里去。

由此可见,Spark调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。毕竟,分发代码的开销要比分发数据的代价低太多,这也正是“数据不动代码动”这个说法的由来。

SchedulerBackend

对于资源调度器得封装和抽象,支持Standalone,yarn等,SchedulerBackend使用ExecutorDataMap得数据结构记录Executor得资源状态。是一种HashMap,key标记Executor得字符串,value是一种ExecutorData得数据机构,ExecutorData封装了Executor得资源状态,例如RPC地址,主机地址,可用CPU核数,满配CPU核数等等。是Executor得“资源画像”。

// any protection. But accessing `executorDataMap` out of the inherited methods must be
// protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
// be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
private[cluster] class ExecutorData(
    val executorEndpoint: RpcEndpointRef,
    val executorAddress: RpcAddress,
    override val executorHost: String,
    var freeCores: Int,
    override val totalCores: Int,
    override val logUrlMap: Map[String, String],
    override val attributes: Map[String, String],
    override val resourcesInfo: Map[String, ExecutorResourceInfo],
    override val resourceProfileId: Int,
    val registrationTs: Long
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
  resourcesInfo, resourceProfileId)

SchedulerBackend对内是Executor的资源画像,对外是SchedulerBackend以WorkerOffer为粒度提供计算资源。WorkerOffer封装了Executor ID,主机地址、CPU核数,用来表示一份可用于调度任务的空闲资源。

private[spark]
case class WorkerOffer(
    executorId: String,
    host: String,
    cores: Int,
    // `address` is an optional hostPort string, it provide more useful information than `host`
    // when multiple executors are launched on the same host.
    address: Option[String] = None,
    resources: Map[String, Buffer[String]] = Map.empty,
    resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)

总的来说,TaskScheduler根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化。然后,交给SchedulerBackend,SchedulerBackend根据ExecutorData 中记录的RPC地址和主机地址,再将序列化的任务通过网络分发到目的主机的Executor中去。最后,Executor接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算。

DAGScheduler在创建Tasks 的过程中,是如何设置每一个任务的本地性级别?

位置信息通过特定的字符串前缀格式标识 executor_[hostname][executorid] [hostname] hdfs_cache[hostname] DAGScheduler会尝试获取RDD的每个Partition的偏好位置信息,a.如果RDD被缓存,通过缓存的位置信息获取每个分区的位置信息;b.如果RDD有preferredLocations属性,通过preferredLocations获取每个分区的位置信息;c. 遍历RDD的所有是NarrowDependency的父RDD,找到第一个满足a,b条件的位置信息 DAGScheduler将生成好的TaskSet提交给TaskSetManager进行任务的本地性级别计算

参考资料:吴磊老师的Spark调优

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348

推荐阅读更多精彩内容