RM的子服务列表
[TOC]
RMApplicationHistoryWriter
AsyncDispatcher
AdminService
RMActiveServices
–RMSecretManagerService
–ContainerAllocationExpirer
–AMLivelinessMonitor
–RMNodeLabelsManager
–RMStateStore
–RMApplicationHistoryWriter
–SystemMetricsPublisher
–NodesListManager
–ResourceScheduler
–SchedulerEventDispatcher
–NMLivelinessMonitor
–ResourceTrackerService
–ApplicationMasterService
–ClientRMService
–ApplicationMasterLauncher
–DelegationTokenRenewer
————————————————
版权声明:本文为CSDN博主「Jerry Shao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_44630798/article/details/89005651
1 ClientRMService初始化
在ResourceManager#RMActiveServices#serviceInit()方法中进行ClientRMService的创建
ResourceManager.RMActiveServices.serviceInit(){
clientRM = createClientRMService();
addService(clientRM);
rmContext.setClientRMService(clientRM);
}
再看ClientRMService的serviceInit()方法,很简单就是为设置了下客户端绑定地址(即RM的RPC ip+port)
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
super.serviceInit(conf);
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
2 ResourceScheduler
在ResouceManager#RMActiveServices#serviceInit()方法中进行ResourceScheduler的创建;
进入createScheduler():根据配置文件中的参数反射创建scheduler实例,默认创建出CapacityScheduler对象。
ResourceManager.RMActiveServices.serviceInit(){
// Initialize the scheduler
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler);
}
protected ResourceScheduler createScheduler() {
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
YarnConfiguration.DEFAULT_RM_SCHEDULER);
LOG.info("Using Scheduler: " + schedulerClassName);
try {
Class<?> schedulerClazz = Class.forName(schedulerClassName);
if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) {
return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz,
this.conf);
} else {
throw new YarnRuntimeException("Class: " + schedulerClassName
+ " not instance of " + ResourceScheduler.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Scheduler: "
+ schedulerClassName, e);
}
}
由于scheduler被addIfService(scheduler)了,所以进入CapacityScheduler的serviceInit()
3 CapacityScheduler.java
[图片上传失败...(image-aeae3a-1582179940006)]
//存储队列信息
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
进入CapacityScheduler的serviceInit(),先看super.serviceInit(conf),再看initScheduler(configuration);
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
super.serviceInit(conf);
initScheduler(configuration);
}
//AbstractYarnScheduler.java
@Override
public void serviceInit(Configuration conf) throws Exception {
//yarn.nm.liveness-monitor.expiry-interval-ms nm存活检测间隔,默认600s
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
//yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms 默认10s
configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
//创建定期释放死亡NM上container的Timer,调度间隔为nmExpireInterval
createReleaseCache();
super.serviceInit(conf);
}
private synchronized void initScheduler(Configuration configuration) throws
IOException {
this.conf = loadCapacitySchedulerConfiguration(configuration);
//memory:minMem maxMem ; vcore:minVcores maxVcores
validateConf(this.conf);
//minimumMemory, minimumCores记录在Resource中返回
this.minimumAllocation = this.conf.getMinimumAllocation();
//初始化最大的资源给configuredMaxAllocation对象:会在ClusterNodeTracker.setConfiguredMaxAllocation(Resource)中使用
//ReentrantReadWriteLock锁,然后clone resource对象,复制给configuredMaxAllocation对象
initMaximumResourceCapability(this.conf.getMaximumAllocation());
//创建出资源计算器对象DefaultResourceCalculator
this.calculator = this.conf.getResourceCalculator();
//yarn.scheduler.include-port-in-node-name 默认false
this.usePortForNodeName = this.conf.getUsePortForNodeName();
//调度的app列表
this.applications = new ConcurrentHashMap<ApplicationId,SchedulerApplication<FiCaSchedulerApp>>();
//获取节点标签管理器对象
this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
//初始化队列
initializeQueues(this.conf);
//是否开启异步资源调度 yarn.scheduler.capacity.schedule-asynchronously.enable,默认false
scheduleAsynchronously = this.conf.getScheduleAynschronously();
//异步调度间隔 yarn.scheduler.capacity.schedule-asynchronously.scheduling-interval-ms,默认5s
asyncScheduleInterval =
this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,DEFAULT_ASYNC_SCHEDULER_INTERVAL);
//如果开启了异步调度,则创建异步调度线程
if (scheduleAsynchronously) {
asyncSchedulerThread = new AsyncScheduleThread(this);
}
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
"maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
"asynchronousScheduling=" + scheduleAsynchronously + ", " +
"asyncScheduleInterval=" + asyncScheduleInterval + "ms");
}
重点来分析下初始化队列initializeQueues(this.conf);
(1)解析配置项,构建队列树
(2)构建队列和用户/组的映射
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
//构建队列树,树根为root队列
root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,queues, queues, noop);
labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
setQueueAcls(authorizer, queues);
}
(1) 解析配置项,构建队列树
进入parseQueue,解析队列
@Lock(CapacityScheduler.class)
static CSQueue parseQueue(
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
//fullQueueName=root
String fullQueueName = (parent == null) ? queueName
: (parent.getQueuePath() + "." + queueName);
//获取子队列列表,即读取配置项yarn.scheduler.capacity.root.queues
String[] childQueueNames = conf.getQueues(fullQueueName);
//队列否开启了资源预留,即读取配置项yarn.scheduler.capacity.root.reservable,默认为false
boolean isReservableQueue = conf.isReservable(fullQueueName);
//如果没有子队列则创建LeafQueue
if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) {
throw new IllegalStateException(
"Queue configuration missing child queue names for " + queueName);
}
// Check if the queue will be dynamically managed by the Reservation
// system
// 如果队列开启了资源预留,则创建PlanQueue,该类型的queue会被ReservationSystem管理
if (isReservableQueue) {
queue =
new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName));
} else {
queue = new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
}
} else {//有子队列则创建ParentQueue
if (isReservableQueue) {
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
//root队列构建
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
//root的子队列构建,递归调用parseQueue
List<CSQueue> childQueues = new ArrayList<CSQueue>();
for (String childQueueName : childQueueNames) {
CSQueue childQueue =
parseQueue(csContext, conf, queue, childQueueName,
queues, oldQueues, hook);
childQueues.add(childQueue);
}
//为父队列设置子队列,即childQueues属性赋值,childQueues是个TreeSet按队列的capacity排序
parentQueue.setChildQueues(childQueues);
}
if(queue instanceof LeafQueue == true && queues.containsKey(queueName)
&& queues.get(queueName) instanceof LeafQueue == true) {
throw new IOException("Two leaf queues were named " + queueName
+ ". Leaf queue names must be distinct");
}
queues.put(queueName, queue);
LOG.info("Initialized queue: " + queue);
return queue;
}
ParentQueue构建
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
//队列比较器
this.queueComparator = cs.getQueueComparator();
//是否root队列
this.rootQueue = (parent == null);
//获取队列设置的capacity,参数:yarn.scheduler.capacity.root.rec.capacity,如果是root队列则直接是100
float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
if (rootQueue &&
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
throw new IllegalArgumentException("Illegal " +
"capacity of " + rawCapacity + " for queue " + queueName +
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
float capacity = (float) rawCapacity / 100;
float parentAbsoluteCapacity =
(rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
float absoluteCapacity = parentAbsoluteCapacity * capacity;
float maximumCapacity =
(float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
float absoluteMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<AccessType, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
//设置conf参数
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
//子队列按队列比较器来进行排序
this.childQueues = new TreeSet<CSQueue>(queueComparator);
LOG.debug("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
}
LeafQueue构建
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.activeUsersManager = new ActiveUsersManager(metrics);
this.minimumAllocationFactor =
Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
maximumAllocation);
float capacity = getCapacityFromConf();
float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
float maximumCapacity =
(float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
float absoluteMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
// Initially set to absoluteMax, will be updated to more accurate
// max avail value during assignContainers
absoluteMaxAvailCapacity = absoluteMaxCapacity;
int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
float userLimitFactor =
cs.getConfiguration().getUserLimitFactor(getQueuePath());
int maxApplications =
cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
maxApplications = (int)(maxSystemApps * absoluteCapacity);
}
maxApplicationsPerUser =
(int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
float maxAMResourcePerQueuePercent = cs.getConfiguration()
.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
int maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications(
resourceCalculator,
cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteMaxCapacity);
this.maxActiveAppsUsingAbsCap =
CSQueueUtils.computeMaxActiveApplications(
resourceCalculator,
cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteCapacity);
int maxActiveApplicationsPerUser =
CSQueueUtils.computeMaxActiveApplicationsPerUser(
maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<AccessType, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
//set
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
defaultLabelExpression, this.capacitiyByNodeLabels,
this.maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
}
//app比较器,按appID排序
Comparator<FiCaSchedulerApp> applicationComparator = cs.getApplicationComparator();
//pendingApp列表
this.pendingApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
//activeApp列表
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
}
ParentQueue和LeafQueue的一点说明:队列通过ParentQueue和LeafQueue构成了一颗树,app只能在LeafQueue上申请资源,LeafQueue保存了当前运行的app列表,app列表分两种:pendingApp列表和activeApp列表,两个列表都是按appID来排序的,即capacityScheduler在队列内部是按FIFO调度的。
(2) 构建队列和用户/组的映射
队列映射 initializeQueueMappings():将用户/组和队列映射起来
private void initializeQueueMappings() throws IOException {
//如果存在队列映射,它是否会覆盖用户指定的值? 管理员可以使用此项将作业放入与用户指定的队列不同的队列中.
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info("Initialized queue mappings, override: "
+ overrideWithQueueMappings);
// Get new user/group mappings
//指定用户/组到特定队列的映射 yarn.scheduler.capacity.queue-mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
//check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
!mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queues.get(mapping.queue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping.queue);
}
}
}
//apply the new mappings since they are valid
mappings = newMappings;
// initialize groups if mappings are present
if (mappings.size() > 0) {
groups = new Groups(conf);
}
}
capacity相关重要类
CSQueue 接口
CSQueue代表CapacityScheduler中分层队列树中的一个节点,使用这个类来保存队列的信息
ClusterNodeTracker
帮助程序库:-跟踪所有群集SchedulerNodes的状态-提供方便的方法来过滤和排序节点
ResourceCalculator
进行资源比较和操作
ParentQueue
PlanQueue
这表示由ReservationSystem管理的动态队列。从用户的角度来看,这等效于保留的LeafQueue,但从功能上讲,它是ParentQueue的子类。
4 AMLivelinessMonitor
在ResouceManager#RMActiveServices#serviceInit()方法中进行createAMLivelinessMonitor的创建
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
创建了两个AMLivelinessMonitor实例:amLivelinessMonitor代表运行中的AM,amFinishingMonitor代表完成中的AM。
进入createAMLivelinessMonitor()
protected AMLivelinessMonitor createAMLivelinessMonitor() {
return new AMLivelinessMonitor(this.rmDispatcher);
}
//进入new AMLivelinessMonitor
//进入new AMLivelinessMonitor
public AMLivelinessMonitor(Dispatcher d) {
super("AMLivelinessMonitor", new SystemClock());
this.dispatcher = d.getEventHandler();
}
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
//am过期时间,yarn.am.liveness-monitor.expiry-interval-ms,默认10分钟
int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
//设置am过期间隔为10分钟
setExpireInterval(expireIntvl);
//设置am监控间隔为3分钟
setMonitorInterval(expireIntvl/3);
}
AbstractLivelinessMonitor.java
@Override
protected void serviceStart() throws Exception {
assert !stopped : "starting when already stopped";
//将存储的AM的时间重置为当前时间
resetTimer();
//创建一个监控AM的线程
checkerThread = new Thread(new PingChecker());
checkerThread.setName("Ping Checker");
//启动监控线程,不停的监控AM是否过期了
checkerThread.start();
super.serviceStart();
}
来看看监控线程的工作
private class PingChecker implements Runnable {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (AbstractLivelinessMonitor.this) {
//running是个map,元素为<AM,reportTime>
Iterator<Map.Entry<O, Long>> iterator = running.entrySet().iterator();
//avoid calculating current time everytime in loop
long currentTime = clock.getTime();
//所有AM进行当前时间和上次心跳汇报时间的间隔比较,如果超过设置的过期时间没有汇报心跳则认为AM过期了,然后会发起过期流程
while (iterator.hasNext()) {
Map.Entry<O, Long> entry = iterator.next();
if (currentTime > entry.getValue() + expireInterval) {
iterator.remove();
//发起过期流程
expire(entry.getKey());
LOG.info("Expired:" + entry.getKey().toString() +
" Timed out after " + expireInterval/1000 + " secs");
}
}
}
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
LOG.info(getName() + " thread interrupted");
break;
}
}
}
}
来看下过期流程:AMLivelinessMonitor.expire()
@Override
protected void expire(ApplicationAttemptId id) {
dispatcher.handle(new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));
}
即向RMAppAttemptImpl发送RMAppAttemptEventType.EXPIRE事件。
总结
capacity 使用Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>()来记录队列,
使用CSQueue保存队列的具体信息
解析队列 的时候,使用递归来解析child队列,同时设置每个队列父亲队列,队列通过ParentQueue和LeafQueue构成了一颗树,app只能在LeafQueue上申请资源,LeafQueue保存了当前运行的app列表,app列表分两种:pendingApp列表和activeApp列表,两个列表都是按appID来排序的,即capacityScheduler在队列内部是按FIFO调度的。
问题1:queue = hook.hook(parentQueue);
问题2:addService()
参考:https://blog.csdn.net/weixin_44630798/article/details/89005651