我们关注的问题是数据是怎么被接收的?又是怎么存储的?
数据是被executor上的线程receiver接收的,接收之后交由executor上的线程ReceiverSupervisorImpl处理。
JobScheduler的重要成员之一登场!!ReceiverTracker!!!
ReceiverTracker的简单介绍?
ReceiverTracker的目的是为每个batch的RDD提供输入数据。通过以下三步完成:
- 分发receiver到executor,启动接收的线程。
- 分发ReceiverSupervisorImpl到executor,启动处理数据的线程,并掌握数据的信息
- 一个job提交了,它是怎么为其提供数据进行etl的?
++首先看下Receiver是怎么被分发到各个executor上的++
def start(): Unit = synchronized {
//....
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))//用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息
if (!skipReceiverLaunch) launchReceivers() //重要!考点!!!将receiver分发到executers
//.....
}
}
//来!具体来看launchReceivers
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map {...}//DStreamGraph持有所有的inputDS,获取到这些inputDS的receiver
endpoint.send(StartAllReceivers(receivers))//拿到receivers后分发的具体实现
}
override def receive: PartialFunction[Any, Unit] = {
// 确定了每个 receiver 要分发到哪些 executors
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
//.....
}
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
// Function to start the receiver on the worker node
//重点!考点!!这个函数会和rdd一起提交,它new了一个ReceiverSupervisorImpl用来具体处理接收的数据,后面会具体讲!!
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//真正处理接收到的数据
supervisor.start()//启动线程
supervisor.awaitTermination()//重要!堵塞线程,源源不断的从reciver处获取数据!
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
//重点!考点!!这里把recever和location打包成一个rdd了,所以recevier可以在多个executor上运行!!!
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
//.....
//提交啦!⚠️ 到这里recevier就被分发到具体的executor上了
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
//....
}
++来,再看一下具体在executor上是怎么实现处理数据的?++
第一部分,怎么接收数据?
recevier被分发到具体的executor上之后会怎么实现数据的处理呢?reciver会调用supervisor的put方法!!!也就是说recevier其实只关心从哪儿接数据以及数据接过来怎么解析,而并不关心数据怎么存!!!谁在用!!!
//先看下recevier怎么把数据给ReceiverSupervisorImpl,比如KafkaReceiver
class KafkaReceiver(....) extends Receiver[(K, V)](storageLevel) with Logging {
def onStart() {
//去哪儿接收数据
// Kafka connection properties
// Create the connection to the cluster
//接收到的数据怎么解析
val keyDecoder = ...
val valueDecoder = ...
//线程池接收数据
val executorPool = ...
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
// 处理接收到的数据,store!!!这里会调用supervisor.pushSingle!!!!
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
}
第二部分,那么数据接过来了,怎么存储呢?这里是ReceiverSupervisorImpl实现的,主要有三个方法:
//put类,会把一条条的数据交给BlockGenerator,汇聚成block
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
}
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
//存储block的具体逻辑
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//存储成功之后,发送新增的blockInfo到ReceiverTracker
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
}
//把每个block通过blockManager存到内存/硬盘,同rdd逻辑一致
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
//wal,重点!预写!!防丢数据
new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
第三部分,数据怎么被用呢?数据被存储之后告知了ReceiverTracker,但是怎么用呢?
//ReceiverTracker自己是不管block的,它有一个成员receivedBlockTracker来处理!它是个老板!!!
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
//注意⚠️定时器JobGenerate在定时提交job的时候会调用ReceiverTracker的allocateBlocksToBatch方法来把block和batch对应起来,可以看到block怎么被分配到batch这个过程是receivedBlockTracker处理的!!
def allocateBlocksToBatch(batchTime: Time): Unit = {
if (receiverInputStreams.nonEmpty) {
receivedBlockTracker.allocateBlocksToBatch(batchTime)
}
}
关于数据被存储之后,是怎么和rdd关联起来的,更多的内容在spark streaming源码分析之job、rdd、blocks之间是如何对应的?