如何工作
先将用户构建的DAG转化为分布式任务,结合分布式集群资源的可用性,基于调度规则依序把分布式任务分发到执行器。
执行步骤
调度的核心组件
DAGScheduler
主要职责:
1.把用户DAG拆分为Stages
2.在Stage内部创建Tasks,这些任务囊括了用户通过组合不同算子实现的数据转换逻辑,然后执行器Executors接受到Tasks,将其中封装的计算函数应用于分布式数据分片,去执行分布式的计算过程。
在分发任务之前,调度系统得先判断哪些节点得计算资源空闲,然后再把任务分发过去。
TaskScheduler
职责:基于既定规则于策略达成供需双方的匹配于撮合,核心是任务调度的规则与策略
TaskScheduler的调度分为两个层次,一个是Stages之间的调度优先级,一个是Stages内不同任务之间的调度优先级。
不同Stages之间
首先,对于两个或者多个Stages,如果它们之间不存在依赖关系,互相独立,在面对同一份可用资源的时候,就会存在竞争关系,这个时候,先调度谁,或者说谁有限享用这份计算资源。就得基于既定规则与协议。
TaskScheduler提供 两种调度模式,分别是FIFO和FAIR(公平调度),FAIR取决于配置文件failscheduler.xml定义。
同一个Stage之间
当TaskScheduler接受到来自于SchedulerBackend的WorkerOffer后,TaskScheduler会优先选择那些满足本地性级别的要求的任务进行分发。本地性级别有4种,process local < Node local <Rack local < Any,分别是进程本地性,节点本地性,机架本地性和跨机架本地性。从左到右,计算任务访问所需数据的效率越来越差。
进程本地性表示计算任务所需的输入数据就在某一个Executor进程内,因此把这样的计算任务调度到目标进程内最划算。同理,如果数据源还未加载到Executor进程,而是存储在某一计算节点的磁盘中,那么把任务调度到目标节点上去,也是一个不错的选择。再次,如果我们无法确定输入源在哪台机器,但可以肯定它一定在某个机架上,本地性级别就会退化到Racklocal。
DAGScheduler划分Stages、创建分布式任务的过程中,会为每一个任务指定本地性级别,本地性级别中会记录该任务有意向的计算节点地址,甚至是Executor进程 ID。换句话说,任务自带调度意愿,它通过本地性级别告诉TaskScheduler自己更乐意被调度到哪里去。
由此可见,Spark调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销。毕竟,分发代码的开销要比分发数据的代价低太多,这也正是“数据不动代码动”这个说法的由来。
SchedulerBackend
对于资源调度器得封装和抽象,支持Standalone,yarn等,SchedulerBackend使用ExecutorDataMap得数据结构记录Executor得资源状态。是一种HashMap,key标记Executor得字符串,value是一种ExecutorData得数据机构,ExecutorData封装了Executor得资源状态,例如RPC地址,主机地址,可用CPU核数,满配CPU核数等等。是Executor得“资源画像”。
// any protection. But accessing `executorDataMap` out of the inherited methods must be
// protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
// be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
override val resourcesInfo: Map[String, ExecutorResourceInfo],
override val resourceProfileId: Int,
val registrationTs: Long
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
resourcesInfo, resourceProfileId)
SchedulerBackend对内是Executor的资源画像,对外是SchedulerBackend以WorkerOffer为粒度提供计算资源。WorkerOffer封装了Executor ID,主机地址、CPU核数,用来表示一份可用于调度任务的空闲资源。
private[spark]
case class WorkerOffer(
executorId: String,
host: String,
cores: Int,
// `address` is an optional hostPort string, it provide more useful information than `host`
// when multiple executors are launched on the same host.
address: Option[String] = None,
resources: Map[String, Buffer[String]] = Map.empty,
resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
总的来说,TaskScheduler根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化。然后,交给SchedulerBackend,SchedulerBackend根据ExecutorData 中记录的RPC地址和主机地址,再将序列化的任务通过网络分发到目的主机的Executor中去。最后,Executor接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算。
DAGScheduler在创建Tasks 的过程中,是如何设置每一个任务的本地性级别?
位置信息通过特定的字符串前缀格式标识 executor_[hostname][executorid] [hostname] hdfs_cache[hostname] DAGScheduler会尝试获取RDD的每个Partition的偏好位置信息,a.如果RDD被缓存,通过缓存的位置信息获取每个分区的位置信息;b.如果RDD有preferredLocations属性,通过preferredLocations获取每个分区的位置信息;c. 遍历RDD的所有是NarrowDependency的父RDD,找到第一个满足a,b条件的位置信息 DAGScheduler将生成好的TaskSet提交给TaskSetManager进行任务的本地性级别计算
参考资料:吴磊老师的Spark调优