Spark Streaming(1) - 基本原理

本文基于spark 2.11

1. 前言

spark使用RDD来抽象的表示数据,用户使用RDD提供的一些算子编写自己的spark application,使用RDD抽象表示数据要求对于输入数据是静态的,但是在流式数据处理中数据如同流水一样不停的在管道中产生,这不符合RDD的要求。Spark Streaming的处理方式是,从输入流中读区数据,将数据作为一个个batch保存起来,这样就有了静态的数据,就可以用RDD来表示这些数据,然后就可以基于RDD 创建任务了。

2. 基本原理

下面是一个从kafka读取数据处理的代码:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount")
// batchDuration 设置为 1 秒,然后创建一个 streaming 入口
// 每1秒依据RDD中创建一次job,输入RDD就从已经已经收集的batch中取。
val ssc = new StreamingContext(conf, Seconds(1))

val kafkaParams: Map[String, String] = Map("group.id" -> "test",...)
val topics = Map("test" -> 1)
val lines = KafkaUtils.createStream(
      ssc,
      kafkaParams,
      topics,  StorageLevel.MEMORY_AND_DISK)
val words = lines.flatMap(_.split(" "))     
val pairs = words.map(word => (word, 1))    
val wordCounts = pairs.reduceByKey(_ + _)   
wordCounts.print()                      
wordCounts.foreachRDD(...)     
ssc.start()
ssc.awaitTermination()

和之前基于RDD的的wordcount程序不同:

  1. KafkaUtils.createStream(...)创建出来的不是RDD,和是一个DStream的类,
  2. DStream同样存在map、flatMap、reduceByKey这样的转换操作,但是它是从DStream到DStream的转换。
  3. print在RDD里表示一种action,会触发job的创建和提交,但是DStream的action操作不会,它的处理方式不同,后续会介绍。
  4. ssc.start会启动一下组建:
    • JobScheduler, 调度和追踪job
    • JobGenerator,由JobScheduler启动,定时(初始化StreamingContext指定的时间)从DStream创建出job
    • ReceiverTracker, 运行在Driver上,收集 从各个receiver上报的流数据batch信息
    • ReceiverSupervisor,由ReceiverTracker运行发送消息使其在executor上运行,接收Receiver汇报的batch数据,然后将数据信息汇报给ReceiverTracker。
    • Receiver, 运行在executor上,由ReceiverSupervisor启动,负责着流中读区数据,分成batch,汇报给ReceiverSupervisor。

从上介绍可以看出,job是在ssc.start过程中创建的,而且在运行期间会根据用户设置的duration不断的创建。

下图表示了使用kafka作为输入源时的streaming工作期间的流程:

图1 streaming工作流程
  1. Receiver1从kafka中读入数据,将数据转发给ReceiverSupervisor
  2. ReceiverSupervisor,使用BlockManager存储并管理数据信息。
  3. ReceiverSupervisor,将数据信息发送给运行在Driver上的ReceiverTracker。
  4. JobGenerator,假设上面wordCount代码中DStream之间的转换看作一张DAG,DStreamGraph保存了所有的DAG。JobGenerate每隔一段时间从DStreamGraph中的DStream DAG生成RDD的DAG,然后提交RDD的job。RDD的数据则来自Generator根据ReceiverTracker中收集的batch数据信息。

2.1 DStream 到RDD的转换

由于基于RDD计算的基于静态的数据,而数据是不断产生的,spark streaming将输入数据切成一个个batch,因此需要不断的产生job去计算batch中的数据。

上面wordCount程序,描述了DStream之间的转换,看起来几乎和RDD之间的转换是一样的,JobGenerator运行期间根据DStream不停创建RDD,再由RDD生成job 经SparkContext提交运行。DStream相当于模板,RDD相当于使用模板创造出的零件,而JobGenerator则相当于操作模板的工人了。

下图描述了DStream和RDD在运行期间的关系:

DStream和RDD

可以看到DStream的子类都有一个RDD的对应类,一句DStream生成的RDD DAG和DStream拥有一样的转换和依赖。采集输入流中的一段数据作为RDD的源数据。

RDD#compute方法中完成输入数据的计算,DStream也存在compute方法,但是其compute方法这是完成DStream到RDD的转换。

2.2 ReceiverInputDStream

所有继承DStream的类中,ReceiverInputDStream除了像其他DStream一样创建出RDD以外,还需要返回一个Receiver负责接收收据,例如ReceiverInputDStream的子类SocketInputDStream就能返回一个SocketReceiver的Receiver的实现类。

ReceiverInputDStream一般都是一个DStream DAG的源头。

当ReceiverTracker调用start启动时,它会从DStreamGraph持有的DStream DAG中获得所有的ReceiverInputDStream,然后取得Receiver,通过巧妙的方式将Reciver包装成Task,然后发送到executor上执行,然后在receiver端,Receiver和ReceiverSupervisor启动接收数据。

在SparkStreaming(3) ReceiverTracker和Receiver中,启动receiver时,receiver就是按上面方式获得的。

2.3 output 操作

DStream和RDD有着类似的操作,map这种使得RDD转换成新的RDD的操作称为Transformation,foreach这种触发job的创建和提交的操作称为Action, DStream类似,Dstream到DStream的称为Transformation, DStream的output操作有点类似rdd中的action操作,一个action意味着一个新的job被创建提交。DStream的output操作意味着一个DStream DAG模板的创建,也意味着到此处DStream转换成RDD应该触发job,DStream常见的output操作有:

  1. saveAsTextFiles
  2. saveAsObjectFiles
  3. print
  4. foreachRDD

3 DStreamGraph

DStreamGraph用来保存所有output操作生成DStream DAG。
比如下面是DStream#foreachRDD的代码:

private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
   // 调用了DStream#register()方法
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

//register方法向DStreamGraph注册当前DStream
// 由于DStream保存了所有的父依赖,因此注册当前DStream
// 就能追溯出整个DStream DAG,相当于注册了DStream DAG
private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

上面说DStream的output操作相当于触发一个DStream DAG模板的创建,而一个模板对应一种job。 第一节wordcount代码中分别有print和foreachRDD两个output操作,因此DStreamGraph可以理解持有两个DStream DAG,如下图:


尽管创建出来的DStream DAG是一样的,但是依然会创建出两份RDD DAG,生成两类job,

DStreamGraph还肩负着从根据注册的DStreamDAG创建job的任务,后续JobGenerator就是调用DStreamGraph完成创建job。下面是DstreamGraph创建job的方法:

def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      // 对每一个因output操作而注册的DStream DAG生成job
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }


这是DStream#generateJob方法,time表示每一次生成job的时间

private[streaming] def generateJob(time: Time): Option[Job] = {
   // getOrCompute将DStream转换成RDD,转换操作是从当前
   // DStream往上游追溯,追溯到源头后在一次往下生成RDD的过程
   // 是一次DFS的过程。
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
         // 创建到RDD后提交job
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容