Flink on Yarn模式启动流程源代码分析

Flink on yarn的启动流程可以参见前面的文章 Flink on Yarn启动流程,下面主要是从源码角度看下这个实现,可能有的地方理解有误,请给予指正,多谢。

--> 1.命令行启动yarn session

bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
我们去看下启动脚本

  $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR" $log_setting  org.apache.flink.yarn.cli.FlinkYarnSessionCli  -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

主要是用java -cp的方式启动主类** *org.apache.flink.yarn.cli.FlinkYarnSessionCli * $@ 就是我们传入的哪些参数 " -n 3 -jm 1024 -nm 1024 -st" **。

1. FlinkYarnSessionCli 的启动流程分析

首先看下Main函数

public static void main(String[] args) {   
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
   System.exit(cli.run(args));
}

主要是构造FlinkYarnSessionCli,然后执行其run方法,这里主要介绍主要流程的代码。

public int run(String[] args)

  • 1.解析命令行参数

    cmd = parser.parse(options, args)
    
  • 2.根据命令行参数决定执行那种模式。

     # 第一种,判断命令是否包含 -q
    


** 如: **

# 第二种,判断是否有-id参数

这里我们看下交互模式是啥样的,共有两个可选项,help和stop,如果我们敲入stop,则应用对应的所有进程会退出

# 第三种,为正常模式


** 这里主要为构造YarnClusterDescriptor,然后调用其deploy方法启动集群 ,接着将Jobmanager和web ui地址写入到out文件中去,如果采用分离模式,则等待集群启动之后yarn session自动退出,如果不是则进入交互模式,我们可以通过交互控制这个Applitcation **

接着看下是如何构造YarnClusterDescriptor的

----------------- **1 creat YarnClusterDescriptor ** ----------------------

直接new YarnClusterDescriptor对象,然后将依赖jar地址,配置参数如taskmanager个数,jar地址,配置文件地址,配置参数等设置到YarnClusterDescriptor对象中去,然后返回这个对象。

------------** 2 YarnClusterDescriptor deploy ** -------------------------

由于YarnClusterDescriptor没有重写depoy方法则直接调用其父类AbstractYarnClusterDescriptor的deploy方法,但是最终调用的是其deployInternal方法.

接着看下deployInternal方法,简单的描述下流程,后续代码分析下面的github地址

  • 检查是否具备Deploy的条件,如配置文件,jar路径是否为空
  • 获取yarn的client,用户和RM进行通信
  • 增加动态的配置属性到配置conf对象中去,解析配置conf对象为kv对
  • 获取HDFS FileSyetem,这里用于将本地jar及配置文件上传到HDFS,
  • 判断JobManager和TaskManager申请的资源是否满足yarn分配单个container的最小分配,如果小于则将container最小分配用来初始化jobMananger和TaskMananer
  • 通过yarn client创建Application,返回GetNewApplicationResponse对象用于跟RM进行RPC通信。
  • 通过GetNewApplicationResponse对象获取RM能够为这个应用分配的最大资源,如果最大资源不能够满足jobManagerMemoryMb和taskManagerMemoryMb则报错,计算总的jobmanager和所有taskmanager总共需要的资源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),计算RM中总共的空闲资源,判断空闲资源是否满足前面计算需要的需求,如果不满足,则可能先启动yarn session,task manager等到有资源再进行启动;先为jobManager分配一个nm,然后再在其他的nm上启动taskmanager
  • 设置启动ApplicationMaster的 lanchcontext,这里主要是设置java home,主类,jvm参数数,log文件配置。ApplicationMaster的主类 YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。
protected Class<?> getApplicationMasterClass() {   
return YarnApplicationMasterRunner.class;
}```
- 设置ApplicationSubmissionContext,获取ApplicationId
- 设置session需要的hdfs路径,然后将本地jar包及配置文件,配置文件上传到HDFS
- 设置AM启动的token信息,设置AM启动的过程中需要从hdfs下载那些依赖的jar和配置文件,设置ApplicationMaster及Flink及其他进程的classpath,不多说
- 设置钩子函数在deploy的时候清理上传到hdfs的文件及本地下载的依赖文件
- *** 重点,提交Applicaiton到RM;设置这个Application的状态为NEW,然后监控这个应用,如果不是之前的NEW状态,则打印当前状态,如果Running状态则跳出这个循环,如果是其他状态,则抛出YarnDeploymentException异常,上层调用捕获处理吧,不然250ms判断一次 ***
- depoly成功,钩子函数删除临时文件,如依赖的jar包和配置文件等,返回YarnClusterClient对象,包含了这YarnClusterDescriptor,ApplicationReport等重要的属性。
***
***deploy 成功以后进入交互模式,在runInteractiveCli里面最重要的一步是构造ApplicationClient Actor用于和JobManager Actor进行通信,但是如果发送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等消息,将会由jobmanager actor将forward方法路由到flink resource manager actor去处理,此时jobmanager作为flink resource manager的代理,此时收到这两个消息的时候,由于是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接给application client返回消息***
***
> ------------ ** 3 代码展示主要流程**------
![](http://upload-images.jianshu.io/upload_images/3249301-d22456f0939a8365.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-9c80aa18467d4e10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-85a2f462ff96e5fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-a3c81e3dc9b23db0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-bf190e6a72366f0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-57eb01f090d38dd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-d548d544dbd1b713.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-2013feca33032c46.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-d0d8c8c1a56f28ff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-be3a228edeffad9d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
*** ---- ApplicationClient 和JobManager Actor通信代码 --***

![](http://upload-images.jianshu.io/upload_images/3249301-56371ec18930ba4f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-ed28091d44dc3906.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-1a0df11e1a57941d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-913d82bf6d5825b8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6309a49886d0cc4e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-47f86bcfdee4f967.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6a483d4af26931cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-4de478f435cb1356.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-27986c3659bd96cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-a27f89acbe3406de.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-f04a3da4f97a08dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

## 2. YarnApplicationMasterRunner 启动流程分析
*** RM首先分配一个NM的container去启动YarnApplicationMasterRunner ,接着下来我们看下是怎么做的***
首先是进入main函数里面,构造一个YarnApplicationMasterRunner对象,直接调用其Run方法。
> run方法主要步骤
- 获取当前用户的UGI及远端UGI
- 将当前用户ugi里面的token传递到远端的UGI中,用于数据和服务访问
- 在远端的UGI里面执行runApplicationMaster启动ApplicationMaster
![](http://upload-images.jianshu.io/upload_images/3249301-7d3ac2af1bf091f6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

> runApplicationMaster主要过程,这里注释很清楚,我只捡重要的提示下
- 1) load and parse / validate all configurations
- 2) start the actor system,try to start the actor system, JobManager and JobManager actor system
- 3) Generate the configuration for the TaskManagers,这里主要是JobManager的地址,taskManager注册的超时时间,slot个数,这里还有最重要的一步是构造TaskManager的ContainerLaunchContext,这个context里面包含了启动TaskManager的启动命令,***主类是YarnTaskManager***。
-  start the actors and components in this order:
  1) JobManager & Archive (in non-HA case, the leader service takes this),启动JobManagerActor,这里主类是***YarnJobManager***
  2) Web Monitor (we need its port to register) 启动WEB监控页面,创建LeaderRetrievalService对象,这个主要用于启动TaskManager的时候,告诉TaskManager JobManager得akka url,用于TaskManager向JobManager进行注册。
  3) Resource Master for YARN   启动YarnFlinkResourceManager Actor,这里主要用于Flink container资源的管理包括申请与释放等。
  4) Process reapers for the JobManager and Resource Master
***这里主要介绍YarnApplicationMasterRunner 是如何通过YarnFlinkResourceManager去完成container的申请与启动TaskManager的,这里相对来说,比较复杂,我跟到Yarn的代码里才算整明白***

![YarnFlinkResourceManager的继承关系](http://upload-images.jianshu.io/upload_images/3249301-cbc8215f8c356913.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
说明YarnFlinkResourceManager其实是一个actor,在runApplicationMaster方法中,通过下面的代码启动这个Actor

Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),//YarnFlinkResourceManager
config,
yarnConfig,
leaderRetriever,
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//启动YarnFlinkResourceManager actor

接着看下YarnFlinkResourceManager 的构造方法,这里主要有三个成员变量比较重要

//在yarn 的rm端会调用该回对象的回调函数进行container申请,resourceManagerCallbackHandler里面只有该actor的actor ref,所以回调的过程中能够与该actor进行通信
/** Callback handler for the asynchronous resourceManagerClient /
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
//AM与RM通信的client,resourceManagerClient对象持有resourceManagerCallbackHandler
/
* Client to communicate with the Resource Manager (YARN's master) /
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
//AM与NM的通信client
/
* Client to communicate with the Node manager and launch TaskManager processes */
private NMClient nodeManagerClient;

YarnFlinkResourceManager 启动的过程先执行preStart方法,自己没有实现则执行其父类FlinkResourceManager的preStart方法。接着调用YarnFlinkResourceManager 的initialize方法。
> ***在initialize方法里面***
*** resourceManagerClient.start() ----> 
  AMRMClientAsyncImpl.serviceStart()--->
  CallbackHandlerThread.start()(守护线程)--->
YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster为YarnFlinkResourceManager ActorRef) -->
YarnFlinkResourceManager .containersAllocated -->
NMClient.startContainer(container, taskManagerLaunchContext)
至此通知各个NM启动container。***
![](http://upload-images.jianshu.io/upload_images/3249301-3a5b6377e93742b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-2c532578dc7a508d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6a2cd3067970fe39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-e14cfee9289c57bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-2e3b73f252030ea8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-a3239382a6bde777.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-0a52a463c263bbd7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-52cecffb2670cc67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

***至此,YarnApplicationMasterRunner 重要的流程已经说完,细节东西太多,就不再说了,有时间再看,接下来看YarnTaskManager的部分***

## 3. YarnTaskManager启动流程分析

***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)将通知NM去启动container,NM根据taskManagerLaunchContext的启动信息,从HDFS下载YarnTaskManager启动过程依赖的jar和配置文件
(container_tokens  default_container_executor_session.sh  default_container_executor.sh  flink-conf.yaml  flink.jar  launch_container.sh  lib  log4j.properties  logback.xml),然后shell执行launch_container.sh,最终用java -cp启动YarnTaskManager进程,启动进程的时候首先执行YarnTaskManager run方法,TaskManager会拿到JobManager的akka地址,然后发送注册消息,JobManager收到注册消息以后,注册成功之后就发送ack确认注册信息给TaskManager,然后TaskManger根据配置以及JobManager返回过来的信息构建一些真正干活的成员变量。过程:***
> 
YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])-->
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)-->
TaskManager.runTaskManager -->
TaskManager.startTaskManagerComponentsAndActor-->
actorSystem.actorOf(tmProps, actorName)-->
TaskManager.preStart-->
StandaloneLeaderRetrievalService.start(TaskManager)-->
TaskManger.notifyLeaderAddress-->
TaskManager.handleJobManagerLeaderAddress-->
TaskManager.triggerTaskManagerRegistration()
TaskManager.handleRegistrationMessage-->
instanceManager.registerTaskManager-->
jobManager 发送消息AcknowledgeRegistration给TaskManager
TaskManager.associateWithJobManager-->



![](http://upload-images.jianshu.io/upload_images/3249301-8797abe4fead540e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6ada492f0e25718b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-7d9e8a6893af056a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-e4116544f6c7a677.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-03e551a6ebf785c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-155d80ccd8b5bed4.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-40bbe88a86f090c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-2226948e829a5add.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-b9c444d706737b6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-3ff9f4e9243c1264.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-08823728261241ec.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![associateWithJobManager](http://upload-images.jianshu.io/upload_images/3249301-f3aa8048869975c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-1bb10a80436aa199.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-442effae29613f2c.png)




###基本上Flink on yarn的流程就是这样,细节需要深入,有不正确的地方,希望给予指正。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容