源码解析之checkpoint:rdd/streaming都是如何实现的?以及作用都是什么?

前言

之前知道checkpoint是在job执行后完成的,一直没理解原理。后来用spark streaming后更迷惑了,众所周知有些时候需要保存每一条数据的状态,或者我需要维持一个7天/30天的窗口,那么做checkpoint的时候难道我要把流里所有的数据都存下来吗?那这个数据也太大了。所以产出了这些文章。

首先,checkpoint是做什么的?

checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)

(这一段是百度的,如有雷同,就是我抄的)

然后,checkpoint要怎么用?
  1. rdd中:
val sc = ....
sc.setCheckpointDir("")

val rdd = ....
rdd.checkpoint() 

  1. spark streaming中:
val ssc = ....
ssc.checkpoint("") 

然后,checkpoint存了什么内容?
  1. rdd

将checkpoint的last rdd中每个partition写入checkpoint

  1. spark streaming

两部分,第一部分,写入了一个checkpoint对象,driver存的各种配置信息,具体包括相关配置信息,checkpoint目录,DStreamGraph等。以及DStreamGraph中inputstream/outputstream包含的计算函数也存入checkpoint了,所以我们更改了代码,但是不删除checkpoint的话,无法生效

第二部分,worker存的data数据,每个rdd又一个checkpointdata对象,同spark core一样存入。

注意⚠️:spark streaming的每一个rdd的data都会存入checkpoint,因为每一个rdd都调用了docheckpoint(后面会详细讲),而spark score只会存入调用了checkpoint的rdd数据

rdd的checkpoint实现

首先,看你在代码里调用了rdd.checkpoint()之后做了什么?

def checkpoint(): Unit = RDDCheckpointData.synchronized {

    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

可以看到定义了一个CheckpointData,这个对象的作用是什么呢?暂时先搁置,我们看下spark context这里提交job提交的代码:

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    ....
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    
    ....
    rdd.doCheckpoint()//do了do了
  }

可以看到在提交完job之后,调用了rdd的doCheckpoint方法,我们跟进去看下:

private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          checkpointData.get.checkpoint()
        } else {
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

只有定义了checkpointData的rdd才会执行checkpoint,否则沿着血缘关系的向上找,直到找到为止。这里就和前面呼应了,只有对该rdd调用了checkpoint方法,checkpointData才会被定义。接着看checkpointData.get.checkpoint():

final def checkpoint(): Unit = {
    

    val newRDD = doCheckpoint()//这里调用的是子类ReliableRDDCheckpointData实现的doCheckpoint()实现checkpoint

    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()//重点考点!⚠️
    }
  }

这里做了两件事,第一docheckpoint,第二调用rdd的markCheckpointed将该rdd对上游的依赖都置空

protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    ....
  }
  
  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    

    .....
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    ....
  }
streaming的checkpoint实现

分为两部分,一部分是存metadata,另一部分存data,首先看下metadata:

定时器JobGenerator提交job时,可以看到post了DoCheckpoint信息,之后会调用doCheckpoint方法:

private def generateJobs(time: Time) {
    ....
      graph.generateJobs(time) // generate jobs using 
    ...
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}



private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
     
      ssc.graph.updateCheckpointData(time) //每个ds更新每个ds的checkpoint信息
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
      ////将Checkpoint信息写到Checkpoint目录  
    } 
    
  }

重点看下ssc.graph.updateCheckpointData(time)

def updateCheckpointData(time: Time) {
    
    this.synchronized {
      outputStreams.foreach(_.updateCheckpointData(time))
    }
    
  }
  
  private[streaming] def updateCheckpointData(currentTime: Time) {
    
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
    
  }

可以看到每个outputDS执行了updateCheckpointData,我们看下checkpointData.update的做了哪些事情。这里的例子是DirectKafkaInputDStreamCheckpointData

def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {  
      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]  
    }  
  
    override def update(time: Time) {  
      batchForTime.clear()//删除老的Checkpoint信息  
      generatedRDDs.foreach { kv =>  
        val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray//a是一个数组,数组的一个元素是一个RDD的所有分区信息,一个分区信息包含了这个分区的起始数据和终止数据在Kafka的某个分区的offset  
        batchForTime += kv._1 -> a//将分区信息存储在DirectKafkaInputDStreamCheckpointData.data中  
      }  
    }  

到这里为止,我们就清楚streaming中checkpoint metapdata是怎么存储的。接下来,看第二部分,data是怎么存储的?DS在执行action算子的时候,比如getOrCompute

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      】
      if (isTimeValid(time)) {

        val rddOption = ....

        rddOption.foreach { case newRDD =>
         
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } 
    }
  }

!!!!可以看到调用了rdd.checkpoint!那就是说接下来的逻辑就和rdd做checkpoint的逻辑一致了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容