Spark中内置的调度算法都是通过特质SchedulingAlgorithm定义了调度算法的规范,代码如下所示:
private[spark] trait SchedulingAlgorithm{
def comparator[s1:Schedule,s2:Schedule]:Boolean
}
仅仅定义了一个comparator方法来实现对两个任务的比较来实现不同的调度策略。
FIFO
先进先出算法,其实现类的代码如下:
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
true
} else {
false
}
}
}
分析:
对s1和s2两个可调度任务的优先级进行比较,其中优先级的值越小表示优先级越高;
当s1和s2的优先级值相等的时候,进一步比较s1和s2所属的Stage的身份标识,stageId小的,优先级更高;
如果s1的优先级值小于s2的优先级值,则优先调度s1,否则调度s2;
FAIR
公平算法,其实现类的代码如下:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}
分析:
如果s1中处于运行状态的Task的数量小于s1的minShare,并且s2中处于运行状态的Task的数量大于s2的minShare,则优先调度s1;
如果s1中处于运行状态的Task的数量大于等于s1的minShare,并且s2中处于运行状态的Task的数量小于s2的minShare,则优先调度s2;
-
如果s1中处于运行状态的Task的数量小于s1的minShare,并且s2中处于运行状态的Task的数量也小于s2的minShare,那么进一步对minShareRatio1和minShareRatio2进行比较。如果minShareRatio1小于minShareRatio2,则优先调度s1;反之,调度s2。如果minShareRatio1和minShareRatio2相等,则进行s1和s2的名字比较,小的优先进行调度;
minShareRatio:正在运行的任务数量和minShare数量之间的比值
-
如果s1中处于运行状态的Task的数量大于等于s1的minShare,并且s2中处于运行状态的Task的数量大于等于s2的minShare,那么再对taskToWeightRatio1和taskToWeightRatio2进行比较。较小值,优先调度。如果taskToWeightRatio1和taskToWeightRatio2相等,还需要对s1和s2的名字进行比较,较小者,优先调度。
taskToWeightRatio:正在运行的任务数量与任务权重之间的比值