关于抢占机制的说明及其源码分析https://blog.csdn.net/zhanyuanlin/article/details/71516286这篇文章已经讲的比较完整了,这里就不做无用功了,本文主要是针对这篇文章做一点补充。
抢占机制总结
- 每次抢占之前会统计当前FairScheduler需要抢占的所有资源,并没有按照队列来统计资源;
- 抢占完的资源并没有单独分给哪一个队列,而是将抢占资源还给节点,下次调度的时候统一分配资源根据调度算法给各个队列。
优先级计算
所谓优先级的计算就是计算正常调度和抢占调度的算法,对于FairSharePolicy对应FairShareComparator,计算各个Schedulable的调度的顺序。
主要逻辑是在FairSharePolicy$FairShareComparator#compare,如果第一个Schedulable s1比第二个Schedulable s2更需要调度那么,该方法会返回一个负数,即和FifoPolicy中的Priority一样,Priority值越小越先被调度。
- 计算两个待调度的Schedulable的minShare(s1和s2最小需求量),取Schedulable#minShare和Schedulable#demand较小值;
- 判断s1和s2实际资源使用量是否满足最小需求量,
s1Needy = true
和s1Needy = true
表示不满足,需要更多的资源来满足最小需求;- 计算s1和s2当前已分配的资源和最小需求量的比例
minShareRatio1
和minShareRatio2
,比例越小越需要资源;- 计算s1和s2当前已分配的资源和权重的比例
useToWeightRatio1
和useToWeightRatio2
,权重越大,比例可能会越大,越需要资源;- 按照2,3和4的计算量来确定s1和s2调度的优先级。
// FairSharePolicy$FairShareComparator#compare
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
一个疑问
FairScheduler#preemptResources为什么要在抢占容器之前和之后要分别执行
queue.resetPreemptedResources();
和queue.clearPreemptedResources();
// FairScheduler#preemptResources
protected void preemptResources(Resource toPreempt) {
// ...
// ...
// ...
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.resetPreemptedResources();
}
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
// 抢占容器
// 抢占容器
// 抢占容器
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
// ...
// ...
// ...
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.clearPreemptedResources();
}
}
// ...
// ...
// ...
}
getQueueManager().getRootQueue().preemptContainer()
在进行容器抢占时会调用FSAppAttempt#getResourceUsage,而FSAppAttempt#getResourceUsage会将当前的已经使用的资源减去FairScheduler#preemptedResources;
正常调度和抢占调度所使用的算法(Comparator)是一样的,正常调度并没有考虑FairScheduler#preemptedResources,因为每次抢占调用完之后会执行
queue.clearPreemptedResources()
来将FairScheduler#preemptedResources置为0.
为什么抢占调用要考虑当前的FairScheduler#preemptedResources,因为按照优先级比较器的逻辑减去已经抢占的资源可以提升当前应用的优先级(优先级越高调度的顺序越靠后),避免一直抢占当前应用的容器。
// FSAppAttempt#getResourceUsage
public Resource getResourceUsage() {
// Here the getPreemptedResources() always return zero, except in
// a preemption round
// In the common case where preempted resource is zero, return the
// current consumption Resource object directly without calling
// Resources.subtract which creates a new Resource object for each call.
return getPreemptedResources().equals(Resources.none()) ?
getCurrentConsumption() :
Resources.subtract(getCurrentConsumption(), getPreemptedResources());
}