前言
之前知道checkpoint是在job执行后完成的,一直没理解原理。后来用spark streaming后更迷惑了,众所周知有些时候需要保存每一条数据的状态,或者我需要维持一个7天/30天的窗口,那么做checkpoint的时候难道我要把流里所有的数据都存下来吗?那这个数据也太大了。所以产出了这些文章。
首先,checkpoint是做什么的?
checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)
(这一段是百度的,如有雷同,就是我抄的)
然后,checkpoint要怎么用?
- rdd中:
val sc = ....
sc.setCheckpointDir("")
val rdd = ....
rdd.checkpoint()
- spark streaming中:
val ssc = ....
ssc.checkpoint("")
然后,checkpoint存了什么内容?
- rdd
将checkpoint的last rdd中每个partition写入checkpoint
- spark streaming
两部分,第一部分,写入了一个checkpoint对象,driver存的各种配置信息,具体包括相关配置信息,checkpoint目录,DStreamGraph等。以及DStreamGraph中inputstream/outputstream包含的计算函数也存入checkpoint了,所以我们更改了代码,但是不删除checkpoint的话,无法生效
第二部分,worker存的data数据,每个rdd又一个checkpointdata对象,同spark core一样存入。
注意⚠️:spark streaming的每一个rdd的data都会存入checkpoint,因为每一个rdd都调用了docheckpoint(后面会详细讲),而spark score只会存入调用了checkpoint的rdd数据
rdd的checkpoint实现
首先,看你在代码里调用了rdd.checkpoint()之后做了什么?
def checkpoint(): Unit = RDDCheckpointData.synchronized {
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
可以看到定义了一个CheckpointData,这个对象的作用是什么呢?暂时先搁置,我们看下spark context这里提交job提交的代码:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
....
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
....
rdd.doCheckpoint()//do了do了
}
可以看到在提交完job之后,调用了rdd的doCheckpoint方法,我们跟进去看下:
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
只有定义了checkpointData的rdd才会执行checkpoint,否则沿着血缘关系的向上找,直到找到为止。这里就和前面呼应了,只有对该rdd调用了checkpoint方法,checkpointData才会被定义。接着看checkpointData.get.checkpoint():
final def checkpoint(): Unit = {
val newRDD = doCheckpoint()//这里调用的是子类ReliableRDDCheckpointData实现的doCheckpoint()实现checkpoint
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()//重点考点!⚠️
}
}
这里做了两件事,第一docheckpoint,第二调用rdd的markCheckpointed将该rdd对上游的依赖都置空
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
....
}
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
.....
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
....
}
streaming的checkpoint实现
分为两部分,一部分是存metadata,另一部分存data,首先看下metadata:
定时器JobGenerator提交job时,可以看到post了DoCheckpoint信息,之后会调用doCheckpoint方法:
private def generateJobs(time: Time) {
....
graph.generateJobs(time) // generate jobs using
...
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
ssc.graph.updateCheckpointData(time) //每个ds更新每个ds的checkpoint信息
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
////将Checkpoint信息写到Checkpoint目录
}
}
重点看下ssc.graph.updateCheckpointData(time)
def updateCheckpointData(time: Time) {
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
}
private[streaming] def updateCheckpointData(currentTime: Time) {
checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime))
}
可以看到每个outputDS执行了updateCheckpointData,我们看下checkpointData.update的做了哪些事情。这里的例子是DirectKafkaInputDStreamCheckpointData
def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}
override def update(time: Time) {
batchForTime.clear()//删除老的Checkpoint信息
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray//a是一个数组,数组的一个元素是一个RDD的所有分区信息,一个分区信息包含了这个分区的起始数据和终止数据在Kafka的某个分区的offset
batchForTime += kv._1 -> a//将分区信息存储在DirectKafkaInputDStreamCheckpointData.data中
}
}
到这里为止,我们就清楚streaming中checkpoint metapdata是怎么存储的。接下来,看第二部分,data是怎么存储的?DS在执行action算子的时候,比如getOrCompute
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
】
if (isTimeValid(time)) {
val rddOption = ....
rddOption.foreach { case newRDD =>
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
}
generatedRDDs.put(time, newRDD)
}
rddOption
}
}
}
!!!!可以看到调用了rdd.checkpoint!那就是说接下来的逻辑就和rdd做checkpoint的逻辑一致了