最近在看Spark任务调度,尤其是延迟调度这块,翻了好多资料与博客都是发现延迟调度中是这样介绍的:
“在为任务分配节点时(executor),先判断任务的最佳运行节点是否空闲,也就是本任务的数据本地性最高的节点。如果空闲直接分配任务,如果最佳节点没有足够资源则等待一定时间。在等待时间内得到资源,则任务在该节点运行,否则找出次佳节点运行。。。。”
后看根据源码还是看的不太懂,为此找到一篇博客的作者大哥BIGUFO ,很热心和我讲解了许多,收益很多,当初自己还是理解的有些窄。
先看一下我的理解,以TaskSchedulerImpl中resourceOffers开始
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
......
// 获取可用的executor列表并加入不同的集合中
.....
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue // 对获得的TaskSet排序
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) { // 取出每一个任务集
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) { // 按照任务集的locality
do { // 对单个任务集调度
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
可以看出主要策略如下:
- 1.随机打乱executor,避免分配到同一个work中
- 2.对TaskSet排序
- 3.为TaskSet分配资源,按照TaskSet最大的locality开始分配
- 4.取第一个TaskSet,再取当前TaskSet的最大的locality,调用resourceOfferSingleTaskSet
在此假设有三个work节点都为CPU为1,也就是只能并发执行一个任务,每个work节点只有一个executor,而当前TaskSet就四个任务,如下图:
task1和task2数据在executor1中,所以task1和task2的当前locality为process_local,task3和task4数据在work2和work3中,所以对应不同的work都为node_local
现在任务开始调度,当前TaskSet的currentMaxLocality为process_local,执行函数 resourceOfferSingleTaskSet(TaskSet, PROCESS_LOCAL, ....)
对单个任务集调度
该函数代码如下TaskSchedulerImpl#resourceOfferSingleTaskSet:
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
for (i <- 0 until shuffledOffers.size) { //对传递过来的executor遍历
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) { //判断传递过来的executor是否满足
try { //如果满足,就选择当前的executor,和指定的locality来执行任务。
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
可以看出对在给定executor集合和locality情况下,对单个任务集调度过程如下:
- 1.一个for循环遍历executor集合,假设我们选择上图中的executor1, 开始判断CPU核数是否满足,一看满足,便开始执行resourceOffer
- 2.执行resourceOffer(executor1,work1,PROCESS_LOCAL), (这个locality是从TaskSchedulerImpl#resouceOffers传递过来的)
其实这一步就是给定executor和locality,寻找最合适的任务,进入这个函数看一下:
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
......
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality //记录本地性 process_local
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime) //寻找当前允许的本地性
if (allowedLocality > maxLocality) {
// 如果允许的本地性低,还是用原来的本地性
// 假设getAllowedLocalityLevel返回的是NODE_LOCAL, 比原来PROCESS低,还是用PROCESS
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
......
可以看出,resourceOffer就是给定了executor和locality,来找合适的任务,我们给定的是executor1 和 PRCESS_LOCAL,开始执行getAllowedLocalityLevel,该函数就是返回允许执行的locality,进入该函数
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
......
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
因为我们四个任务中,task1和task2相对于executor1是PROCESS,也就是PROCESS级别的有两个任务,所以会执行这句 case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
然后返回本地性还是PROCESS_LOCAL, 然后就在resourceOffer中调用dequeueTask(executor1, work1, PROCESS_LOCAL) ,该函数如下:
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
// TaskLocality <= maxLocality, TaskLocality level is lower by fyz
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}
// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}
在执行第一个for循环,发现有task1和task2满足,假设选择task1最为执行任务,那么直接return,task1在executor1执行了。
注意dequeueTask成功一个后,此时返回到哪里了,返回的是TaskSchedulerImpl#resourceOfferSingleTaskSet,因为这仅仅是对executor1分配了任务,里面有个for循环呢,是对所有executor遍历,然后寻找合适的任务。
返回后选择第二个executor,假设是executor2,此时的locality还是PROCESS_LOCAL(只有返回到resourceOffers中,locality才会改变),此时流程如下:
- 给定了executor2和 PROCESS_LOCAL, 调用resourceOffer
- 调用getAllowedLocalityLevel,因为task2相对executor1还是PROCESS,所以返回的还是PROCESS。此时假设等待时间超过默认的3s,则将该任务task2降一级。
- 调用dequeueTask(executor2,work2,PROCESS_LOCAL),执行第一个for,发现对executor2,没有PROCESS的任务,返回None,因为后面都有一个if判断过不去TaskLocality.isAllowed(),PROCESS是最高的
- 然后再次换一个executor,选择executor3,,同样上面前三步的操作
注意,在第二步中,如果超时,在getAllowedLocalityLevel中,将任务的locality从PROCESS降为NODE_LOCAL,返回到resourceOffer,后面的if还会再次修改成PROCESS。但是有人认为,那干嘛还要降一级,反正最后还有修改过来,岂不是浪费感情?
实际上,如果当前任务的locality从PROCESS降为NODE_LOCAL,虽然这次任务是以PORCESSS级别执行的,那么下次返回到TaskSchedulerImpl#resourceOffers中按照locality 的for循环(也就是NODE_LOCAL)后,再次执行任务,发现没有PROCESS的任务了,就好执行NODE_LOCAL的任务了,如果不降级,无论轮到TaskSet的哪个级别,总是在getAllowedLocalityLevel判断有一个PROCESS任务,又修改为PROCEES级别的任务了。
可以发现,三个executor遍历完,就执行一个task1,然后再次返回,这次返回到的是TaskSchedulerImpl#resourceOffers中两个for循环中的do-while 循环,因为resourceOfferSingleTaskSet返回true,会一直执行do-while,除非满足一下条件中的一条,才结束do-while
- executor的CPU都不满足,这样就改不了resourceOfferSingleTaskSet的launchedTask=false的值
- 没有执行的任务了,这样就不能执行resourceOfferSingleTaskSet中的for了,也是无法修改launchedTask=false的值了。
任务超时情况
在第一次调用TaskSchedulerImpl#resourceOffers执行了任务task1,task3,task4,由于三个worker都是是一核心,那么每个work一次只能执行一个任务,所以还剩下一个task2 是对work1的PROCESS没有被执行,此时所有work都是忙碌状态,那么此时执行完毕resourceOffers,没有资源了。因为任务分配到资源后,会将任务提交到CoarseGrainedSchedulerBackend的launchTasks方法中,在该方法中会将任务发送到worker节点的Coarse...Backend执行。有人问难道没有资源了,剩下的一个task2就不执行了吗?
其实调度池中是有一个pool来维护所有的TaskSet的,这次没有完成,下次再次执行该TaskSet,然后还是resourceOffers,如果还是没有资源,那么就在到pool了。那么来分析一下有资源的时,再次执行TaskSet的情况。
假设下一次执行TaskSet,有空闲资源了,发现还时task2没有执行,且是对executor1的PROCESS的任务。此时发现空闲资源为worker2上的executor2,此时任务开始执行。
- 调用resourceOffers,然后以PROCESS开始调用TaskSchedulerImpl#resourceOfferSingleTaskSet,发现 executor2可用
- 然后调用TaskSetManager#resourceOffer,此时有两种,一是当前任务task2离上次任务的执行时间间隔小于3s,二是超过3s
- 2.1. 如果当前小于3s ,那么就在dequeueTask时发现没有对task2的PROCESS的executor,那么返回到TaskSchedulerImpl#resourceOffers中了,继续按照locality再次执行
- 2.2. 如果当前超过3s, 那么在TaskSetManager#getAllowedLocalityLevel中将task2的locality降为NODE_LOCAL ,但是比PROCESS级别低,在dequeueTask执行时,传入的allowedLocality还是为PROCESS,本地还是以PROCESS调度task2。
- 在2.2中还是调度不成功,返回到resourceOffers中,此时将PROCESS将为NODE_LOCAL执行,然后再次提交给resourceOfferSingleTaskSet,执行任务task2 还是不行得话还是按照上面步骤1开始执行。
- 假设最后task2的locality降到了ANY,且还是executor2有资源,那么再次调度时以ANY调度该任务,直接分配给executor2。
也是通过加了一些log感觉明白了不少。自己记性比较差,就算是做一个笔记。如果哪里写错了,还请各位批评指正。