本文调度算法所采用的策略是fair策略,即公平调度算法,另外为简单起见本文只考虑了一个资源即memory.
Schedulable接口
调度器调度的是可调度类,在FairScheduler中可调度的类是实现了Schedulable接口的类,包括FSParentQueue、FSLeafQueue和FSAppAttempt.
Schedulable接口子类的重要属性:
demand
该可调度对象当前所需要资源,对于叶子队列包括正在运行的应用需要的资源和没有运行应用需要的资源;minShare
一次调度过程中调度器最少分配给当前可调度对象的资源;maxShare
一次调度过程中调度器最多分配给当前可调度对象的资源;steadyFairShare
也叫Steady Fair Share
是调度器按照权重分配给当前可调度类资源的理论值;fairShare
也叫Instantaneous Fair Share
是调度器分配给当前可调度类的实际可用资源。
Fair Share
(可以翻译为配额)指的都是Yarn根据每个队列的权重、最大最小可运行资源计算的得到的可以分配给这个队列的最大可用资源。Steady Fair Share
属于理论值,只要集群总体资源和队列配置不变,则每个队列的Steady Fair Share
值是不变的;而Instantaneous Fair Share
是动态变化值,代表了运行时每个队列可使用的最大资源,在分配资源的时候除了考虑权重,还考虑了其它因素,如果当前的叶子队列没有应用的话则分配给当前叶子节点的Instantaneous Fair Share
为0.
FairSchudler采用Instantaneous Fair Share
进行资源的调度分配。
公平分配算法
除去锁定的资源
ComputeFairShares#computeSharesInternal用于计算所有队列的Fair share,计算之前会除去一些特殊的队列及其所占用的资源。这些特殊的队列为Fixed队列,可以理解为资源已经被锁定,不需要调度器去分配资源。除去锁定的资源的主要逻辑是在ComputeFairShares#getFairShareIfFixed这个方法中
- Schedulable#getMaxShare = 0 锁定资源为0,不需要被调度了;
- 如果是计算
Instantaneous Fair Share
且当前的队列不活跃(对于叶子队列来说是没有应用),锁定资源为0,不需要被调度了;- 如果队列的权重weight <= 0, 该队列不需要被调度了,直接分配0或者sched#minShare了。
// ComputeFairShares#getFairShareIfFixed
private static int getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
// Check if maxShare is 0
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
return 0;
}
// For instantaneous fairshares, check if queue is active
if (!isSteadyShare &&
(sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
return 0;
}
// Check if weight is 0
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}
return -1;
}
分配配额
主要逻辑是在ComputeFairShares#computeSharesInternal除去锁定资源的队列之后,计算每个队列应该分到的配额(fair share).算法的核心是确定一个常数R,使得每个队列根据其Weight按照比例获得配额,每个队列获得的配额应该满足下面几个条件:
- 各个队列获得的配额应该等于或者无限接近于totalResource(这个总资源减去了锁定的资源);
- 各个队列获得的配额应该大于等于该队列的minShare;
- 各个队列获得的配额应该小于等于该队列的maxShare。
private static void computeSharesInternal(
Collection<? extends Schedulable> allSchedulables,
Resource totalResources, ResourceType type, boolean isSteadyShare) {
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) {
return;
}
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}
int totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else {
right = mid;
}
}
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
setResourceValue(
computeShare(sched, right, type), sched.getFairShare(), type);
}
}
}
一个简单的栗子
配置文件fair-scheduler.xml
<?xml version="1.0"?>
<allocations>
<queue name="root" >
<queue name="parentA" >
<weight>8</weight>
<queue name="childA1" />
<queue name="childA2" />
<queue name="childA3" />
<queue name="childA4" />
</queue>
<queue name="parentB" >
<weight>1</weight>
<queue name="childB1" />
<queue name="childB2" />
</queue>
</queue>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
</allocations>
加载上面配置文件之后QueueManager中存在的队列为:
- 根节点
root
- 默认节点
root.default
- FSParentQueue
root.parentA
和root.parentB
- FSLeafQueue
root.parentA.childA1
,root.parentA.childA2
,root.parentA.childA3
和root.parentA.childA4
- FSLeafQueue
root.parentB.childB1
和root.parentB.childB2
由于配置了权重root.parentA
的权重是8,root.parentB
的权重是1。这两个节点的子节点均没有配置权重取默认值1.
当前的ResourceManager一个节点没有,加入资源为<memory:16384, vCores:0>(16GB)的node1进ResourceManager,触发
NODE_ADDED
事件最终会调用queueMgr.getRootQueue().recomputeSteadyShares()
,重新计算所有节点的Steady Fair Share
,最终结果为:
root, steady_fair_share = <memory:16384, vCores:0>, fair_share = <memory:16384, vCores:0>
root.parentA, steady_fair_share = <memory:13108, vCores:0>, fair_share = <memory:0, vCores:0>
root.default, steady_fair_share = <memory:1638, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentB, steady_fair_share = <memory:1638, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentA.childA4, steady_fair_share = <memory:3277, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentA.childA1, steady_fair_share = <memory:3277, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentA.childA3, steady_fair_share = <memory:3277, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentA.childA2, steady_fair_share = <memory:3277, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentB.childB2, steady_fair_share = <memory:819, vCores:0>, fair_share = <memory:0, vCores:0>
root.parentB.childB1, steady_fair_share = <memory:819, vCores:0>, fair_share = <memory:0, vCores:0>
结果分析:
- 由于没有具体的应用每个可调度对象分到的
Instantaneous Fair Share
为0;root
的直接子节点root.parentA
、root.default
、root.parentB
按照 8:1:1的比例进行分配Steady >Fair Share
;root.parentA
的4个子节点按照1:1:1:1的比例得到了root.parentA
的所有资源;root.parentB
的2个子节点按照1:1的比例得到了root.parentB
的所有资源;
分配一个2GB的应用给
root.parentA.childA1
,会发现root.parentA.childA1
的Instantaneous Fair Share
为16GB,其它的queue仍然为0,因为其它的queue没有应用。
上述栗子参见Hadoop源码下hadoop-yarn-project子工程下测试类
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.TestFairSchedulerFairShare