spark streaming源码分析之ReceiverTracker详解

我们关注的问题是数据是怎么被接收的?又是怎么存储的?

数据是被executor上的线程receiver接收的,接收之后交由executor上的线程ReceiverSupervisorImpl处理。

JobScheduler的重要成员之一登场!!ReceiverTracker!!!
ReceiverTracker的简单介绍?

ReceiverTracker的目的是为每个batch的RDD提供输入数据。通过以下三步完成:

  1. 分发receiver到executor,启动接收的线程。
  2. 分发ReceiverSupervisorImpl到executor,启动处理数据的线程,并掌握数据的信息
  3. 一个job提交了,它是怎么为其提供数据进行etl的?

++首先看下Receiver是怎么被分发到各个executor上的++

def start(): Unit = synchronized {
    //....

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))//用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息
      if (!skipReceiverLaunch) launchReceivers() //重要!考点!!!将receiver分发到executers
      //.....
    }
  }
//来!具体来看launchReceivers
private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map {...}//DStreamGraph持有所有的inputDS,获取到这些inputDS的receiver

    
    endpoint.send(StartAllReceivers(receivers))//拿到receivers后分发的具体实现
}

override def receive: PartialFunction[Any, Unit] = {
      // 确定了每个 receiver 要分发到哪些 executors 
      case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }
      //.....  
}

private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {

      // Function to start the receiver on the worker node
      //重点!考点!!这个函数会和rdd一起提交,它new了一个ReceiverSupervisorImpl用来具体处理接收的数据,后面会具体讲!!
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//真正处理接收到的数据
            supervisor.start()//启动线程
            supervisor.awaitTermination()//重要!堵塞线程,源源不断的从reciver处获取数据!
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      //重点!考点!!这里把recever和location打包成一个rdd了,所以recevier可以在多个executor上运行!!!
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      
      //.....

     //提交啦!⚠️ 到这里recevier就被分发到具体的executor上了
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      
      //....
    }
  

++来,再看一下具体在executor上是怎么实现处理数据的?++

第一部分,怎么接收数据?

recevier被分发到具体的executor上之后会怎么实现数据的处理呢?reciver会调用supervisor的put方法!!!也就是说recevier其实只关心从哪儿接数据以及数据接过来怎么解析,而并不关心数据怎么存!!!谁在用!!!

//先看下recevier怎么把数据给ReceiverSupervisorImpl,比如KafkaReceiver
class KafkaReceiver(....) extends Receiver[(K, V)](storageLevel) with Logging {

  def onStart() {

  
    //去哪儿接收数据
    // Kafka connection properties
    // Create the connection to the cluster

    //接收到的数据怎么解析
    val keyDecoder = ...
    val valueDecoder = ...


    //线程池接收数据
    val executorPool = ...
    topicMessageStreams.values.foreach { streams =>
        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
  }

  // 处理接收到的数据,store!!!这里会调用supervisor.pushSingle!!!!
  private class MessageHandler(stream: KafkaStream[K, V])
    extends Runnable {
    def run() {
      val streamIterator = stream.iterator()
        while (streamIterator.hasNext()) {
          val msgAndMetadata = streamIterator.next()
          store((msgAndMetadata.key, msgAndMetadata.message))
        }
    }
  }
}

第二部分,那么数据接过来了,怎么存储呢?这里是ReceiverSupervisorImpl实现的,主要有三个方法:

//put类,会把一条条的数据交给BlockGenerator,汇聚成block
def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
}


def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    
    //存储block的具体逻辑
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    
    //存储成功之后,发送新增的blockInfo到ReceiverTracker
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    
}

//把每个block通过blockManager存到内存/硬盘,同rdd逻辑一致
private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
     //wal,重点!预写!!防丢数据
      new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
  }

第三部分,数据怎么被用呢?数据被存储之后告知了ReceiverTracker,但是怎么用呢?

//ReceiverTracker自己是不管block的,它有一个成员receivedBlockTracker来处理!它是个老板!!!
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
}


//注意⚠️定时器JobGenerate在定时提交job的时候会调用ReceiverTracker的allocateBlocksToBatch方法来把block和batch对应起来,可以看到block怎么被分配到batch这个过程是receivedBlockTracker处理的!!
def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
  }

关于数据被存储之后,是怎么和rdd关联起来的,更多的内容在spark streaming源码分析之job、rdd、blocks之间是如何对应的?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容