Flink源码(一):Actor系统创建流程1

利用碎片时间阅读了一下Flink的源码,选择Flink主要出发点还是了解一个稳定的分布式计算系统的实现,另外也是由于Flink相对更加成熟的Spark有其独到的优势,相信其在下一代分布式计算中也会占有重要的地位。Flink的主要概念可以在官网了解

Flink系统作业的提交和调度都是利用AKKA的Actor通信,因此也是由此作为切入点,首先理清整个系统的启动以及作业提交的流程和数据流。

flink basic concepts

图中可以看到,一个完整的Flink系统由三个Actor System构成,包括Client、JobManager(JM)以及TaskManager(TM)。下面对三个Actor系统的创建进行分析。

JM ActorSystem

JM是Flink系统的调度中心,这部分除了会看到JM ActorSystem的创建,还会了解到整个Flink系统的各个模块的初始化与运行。

先找程序入口,从启动脚本可以追溯到,每一个启动脚本最终都会运行flink_deamon.sh 脚本,查看该脚本:

...
...

case $DAEMON in
    (jobmanager)
        CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
    ;;

    (taskmanager)
        CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (*)
        echo "Unknown daemon '${DAEMON}'. $USAGE."
        exit 1
    ;;
esac


$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
...
...

由此找到JM的程序入口:org.apache.flink.runtime.jobmanager.JobManager.scala,代码中可以找到main函数,调用runJobManager方法:

def runJobManager(
      configuration: Configuration,
      executionMode: JobManagerMode,
      listeningAddress: String,
      listeningPort: Int)
    : Unit = {


    //startActorSystemAndJobManagerActors返回jobManagerSystem
    val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors(
      configuration,
      executionMode,
      listeningAddress,
      listeningPort,
      classOf[JobManager],
      classOf[MemoryArchivist],
      Option(classOf[StandaloneResourceManager])
    )

    // 阻塞,直到系统退出
    jobManagerSystem.awaitTermination()

    webMonitorOption.foreach{
      webMonitor =>
        try {
          webMonitor.stop()
        } catch {
          case t: Throwable =>
            LOG.warn("Could not properly stop the web monitor.", t)
        }
    }
  }

runJobManager方法逻辑比较简单,调用startActorSystemAndJobManagerActors方法中创建ActorSystem和JMActor,然后阻塞等待系统退出,看具体的JM创建过程:

def startActorSystemAndJobManagerActors(
      configuration: Configuration,
      executionMode: JobManagerMode,
      listeningAddress: String,
      listeningPort: Int,
      jobManagerClass: Class[_ <: JobManager],
      archiveClass: Class[_ <: MemoryArchivist],
      resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
    : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {

    LOG.info("Starting JobManager")

    // Bring up the job manager actor system first, bind it to the given address.
    val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
    LOG.info(s"Starting JobManager actor system at $hostPortUrl")

    val jobManagerSystem = try {
      val akkaConfig = AkkaUtils.getAkkaConfig(
        configuration,
        Some((listeningAddress, listeningPort))
      )
      if (LOG.isDebugEnabled) {
        LOG.debug("Using akka configuration\n " + akkaConfig)
      }
      
      AkkaUtils.createActorSystem(akkaConfig)//创建ActorSystem全局仅有一个
    }
    catch {
       ...
       ...
    }

    ...
    ...//此处省略webMonitor的创建
    
    try {
      // bring up the job manager actor
      LOG.info("Starting JobManager actor")
      val (jobManager, archive) = startJobManagerActors(
        configuration,
        jobManagerSystem,
        jobManagerClass,
        archiveClass)

      // start a process reaper that watches the JobManager. If the JobManager actor dies,
      // the process reaper will kill the JVM process (to ensure easy failure detection)
      LOG.debug("Starting JobManager process reaper")
      jobManagerSystem.actorOf(
        Props(
          classOf[ProcessReaper],
          jobManager,
          LOG.logger,
          RUNTIME_FAILURE_RETURN_CODE),
        "JobManager_Process_Reaper")

      // bring up a local task manager, if needed
      if (executionMode == JobManagerMode.LOCAL) {
        LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")

        val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
          configuration,
          ResourceID.generate(),
          jobManagerSystem,
          listeningAddress,
          Some(TaskManager.TASK_MANAGER_NAME),
          None,
          localTaskManagerCommunication = true,
          classOf[TaskManager])

        LOG.debug("Starting TaskManager process reaper")
        jobManagerSystem.actorOf(
          Props(
            classOf[ProcessReaper],
            taskManagerActor,
            LOG.logger,
            RUNTIME_FAILURE_RETURN_CODE),
          "TaskManager_Process_Reaper")
      }
      ...
      ...

      (jobManagerSystem, jobManager, archive, webMonitor, resourceManager)
    }
    ...
    ...
  }

这里可以看到startActorSystemAndJobManagerActors方法中利用AkkaUtils和flinkConfig创建了全局的ActorSystem,AkkaUtils也是对Actor创建的简单封装,这里不再赘述。紧接着利用刚创建的jobManagerSystem和jobManager的类名:jobManagerClass创建jobManager。除了jobManager以外,该方法中还创建了Flink的其他重要模块,从返回值中可以清楚看到。另外本地模式启动方式下,还会创建本地的启动本地的taskManagerActor。继续深入到startJobManagerActors,该方法接收jobManagerSystem等参数,创建jobManager和archive并返回:

def startJobManagerActors(
      configuration: Configuration,
      actorSystem: ActorSystem,
      jobManagerActorName: Option[String],
      archiveActorName: Option[String],
      jobManagerClass: Class[_ <: JobManager],
      archiveClass: Class[_ <: MemoryArchivist])
    : (ActorRef, ActorRef) = {

    val (executorService: ExecutorService,
    instanceManager,
    scheduler,
    libraryCacheManager,
    restartStrategy,
    timeout,
    archiveCount,
    leaderElectionService,
    submittedJobGraphs,
    checkpointRecoveryFactory,
    savepointStore,
    jobRecoveryTimeout,
    metricsRegistry) = createJobManagerComponents(
      configuration,
      None)

    val archiveProps = Props(archiveClass, archiveCount)

    // start the archiver with the given name, or without (avoid name conflicts)
    val archive: ActorRef = archiveActorName match {
      case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
      case None => actorSystem.actorOf(archiveProps)
    }

    val jobManagerProps = Props(
      jobManagerClass,
      configuration,
      executorService,
      instanceManager,
      scheduler,
      libraryCacheManager,
      archive,
      restartStrategy,
      timeout,
      leaderElectionService,
      submittedJobGraphs,
      checkpointRecoveryFactory,
      savepointStore,
      jobRecoveryTimeout,
      metricsRegistry)

    val jobManager: ActorRef = jobManagerActorName match {
      case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
      case None => actorSystem.actorOf(jobManagerProps)
    }

    (jobManager, archive)
 }

这里首先createJobManagerComponents方法创建了jobManager的重要组成模块,包括了存储、备份等策略的组件实现,还包括以后会遇到的scheduler、submittedJobGraphs,分别负责job的调度和作业的提交,这里暂不深入。
jobManagerActor已经成功创建,但是Scala中一个Actor会继承Actor类,并重写receive方法接受信息并处理,由此可以发现.JobManager类继承FlinkActor,再看FlinkActor:

trait FlinkActor extends Actor {
  val log: Logger

  override def receive: Receive = handleMessage

  /** Handle incoming messages
    * @return
    */
  def handleMessage: Receive 

  def decorateMessage(message: Any): Any = {
    message
  }
}

可以看到receive方法被重写,并赋值为handleMessage,所以处理消息的操作被放在FlinkActor子类Jobmanager的handleMessage方法中:

override def handleMessage: Receive = {

    ...
    ...
    case SubmitJob(jobGraph, listeningBehaviour) =>
      val client = sender()

      val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
        jobGraph.getSessionTimeout)

      submitJob(jobGraph, jobInfo)

    ...
    ...

handleMessage方法中处理的消息很多,包括了诸如作业恢复,leader决策,TM注册,作业的提交、恢复与取消,这里暂时只关注消息SubmitJob(jobGraph, listeningBehaviour),消息的定义很简单,不再追溯。而SubmitJob消息的主要获取Client传来的jobGraph以及listeningBehaviour。Flink的作业最后都会抽象为jobGraph交给JM处理。关于jobGraph的生成,会在后面的Job生成的过程中进行分析。
JM对job的处理函数submitJob(jobGraph, jobInfo),参数jobInfo中包括了Client端的ActorRef,用以Job处理结果的返回,该函数中实现了JM对作业的提交与处理的细节,为突出重点,放在作业处理部分分析。但从该方法的注释来看:

 /**
   * Submits a job to the job manager. The job is registered at the libraryCacheManager which
   * creates the job's class loader. The job graph is appended to the corresponding execution
   * graph and the execution vertices are queued for scheduling.
   *
   * @param jobGraph representing the Flink job
   * @param jobInfo the job info
   * @param isRecovery Flag indicating whether this is a recovery or initial submission
   */

在该方法中将Job注册到libraryCacheManager,并将Job执行饿的DAG加入到调度队列。

小结

这里仅仅就JM Actor的创建过程对flink的源码进行了分析,主要了解到flink系统JM部分ActorSystem的组织方式,main函数最终创建JM 监听客户端的消息,并对作业进行调度和Job容错处理,最终交由TaskManager进行处理。对于具体的调度和处理策略,JM和TM的通信会在以后进行分析。接下来首先看Client端的逻辑。

原创文章,原文到我的博客

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • 简单之美 | Apache Flink:特性、概念、组件栈、架构及原理分析http://shiyanjun.cn/...
    葡萄喃喃呓语阅读 7,360评论 0 27
  • 介绍 概述 Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Fli...
    stephen_k阅读 50,733评论 0 22
  • 说后来也许有些遥远。其实认识你不过一周有余。 只是相见恨晚。 当然我从没冒然的去问过你,是否也是如此,我怕你不回消...
    凌阿深阅读 1,045评论 10 15
  • 作者 言嘉 四月的春正浓 树树樱花热烈地开着 几片飘飞的花瓣 完美地点缀了明媚的春 似少...
    言嘉阅读 185评论 0 2