15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性
其实No Receivers的方式更符合我们读取数据,操作数据的思路的。因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式。 如果要操作数据来源,肯定要有一个封装器,这个封装器一定是RDD类型。 以直接访问Kafka中的数据为例,看一下源码中直接读写Kafka中数据的例子代码:

object DirectKafkaWordCount {

 def main(args: Array[String]) {

 if (args.length < 2) {

 System.err.println(s"""

 |Usage: DirectKafkaWordCount <brokers> <topics>

 | <brokers> is a list of one or more Kafka brokers

 | <topics> is a list of one or more kafka topics to consume from

 |

 """.stripMargin)

 System.exit(1)

 }

 

 StreamingExamples.setStreamingLogLevels()

 

 val Array(brokers, topics) = args

 

 // Create context with 2 second batch interval

 val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")

 val ssc = new StreamingContext(sparkConf, Seconds(2))

 

 // Create direct kafka stream with brokers and topics

 val topicsSet = topics.split(",").toSet

 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

 ssc, kafkaParams, topicsSet)

 

 // Get the lines, split them into words, count the words and print

 val lines = messages.map(_._2)

 val words = lines.flatMap(_.split(" "))

 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

 wordCounts.print()

 

 // Start the computation

 ssc.start()

 ssc.awaitTermination()

 }

}

Spark streaming 会将数据源封装成一个RDD,也就是KafkaRDD:


/**

 * A batch-oriented interface for consuming from Kafka.

 * Starting and ending offsets are specified in advance,

 * so that you can control exactly-once semantics.

 * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">

 * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set

 * with Kafka broker(s) specified in host1:port1,host2:port2 form.

 * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD

 * @param messageHandler function for translating each message into the desired type

 */

private[kafka]

class KafkaRDD[

 K: ClassTag,

 V: ClassTag,

 U <: Decoder[_]: ClassTag,

 T <: Decoder[_]: ClassTag,

 R: ClassTag] private[spark] (

 sc: SparkContext,

 kafkaParams: Map[String, String],

 val offsetRanges: Array[OffsetRange],//该RDD的数据偏移量

 leaders: Map[TopicAndPartition, (String, Int)],

 messageHandler: MessageAndMetadata[K, V] => R

 ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges

可以看到KafkaRDD 混入了HasOffsetRanges,它是一个trait:

trait HasOffsetRanges {

 def offsetRanges: Array[OffsetRange]

}

其中OffsetRange,标识了RDD的数据的主题、分区、开始偏移量和结束偏移量:


inal class OffsetRange private(

 val topic: String,

 val partition: Int,

 val fromOffset: Long,

 val untilOffset: Long) extends Serializable

回到KafkaRDD,看一下KafkaRDD的getPartitions方法:

 override def getPartitions: Array[Partition] = {

 offsetRanges.zipWithIndex.map { case (o, i) =>

 val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))

 new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)

 }.toArray

 }

返回KafkaRDDPartition:

private[kafka]

class KafkaRDDPartition(

 val index: Int,

 val topic: String,

 val partition: Int,

 val fromOffset: Long,

 val untilOffset: Long,

 val host: String,

 val port: Int

) extends Partition {

 /** Number of messages this partition refers to */

 def count(): Long = untilOffset - fromOffset

}

KafkaRDDPartition清晰的描述了数据的具体位置,每个KafkaRDDPartition分区的数据交给KafkaRDD的compute方法计算:

 override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {

 val part = thePart.asInstanceOf[KafkaRDDPartition]

 assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))

 if (part.fromOffset == part.untilOffset) {

 log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +

 s"skipping ${part.topic} ${part.partition}")

 Iterator.empty

 } else {

 new KafkaRDDIterator(part, context)

 }

 }

KafkaRDD的compute方法返回了KafkaIterator对象:

 private class KafkaRDDIterator(

 part: KafkaRDDPartition,

 context: TaskContext) extends NextIterator[R] {

 

 context.addTaskCompletionListener{ context => closeIfNeeded() }

 

 log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +

 s"offsets ${part.fromOffset} -> ${part.untilOffset}")

 

 val kc = new KafkaCluster(kafkaParams)

 val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])

 .newInstance(kc.config.props)

 .asInstanceOf[Decoder[K]]

 val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])

 .newInstance(kc.config.props)

 .asInstanceOf[Decoder[V]]

 val consumer = connectLeader

 var requestOffset = part.fromOffset

 var iter: Iterator[MessageAndOffset] = null

    
//..................

}

KafkaIterator中创建了一个KakfkaCluster对象用于与Kafka集群进行交互,获取数据。

回到开头的例子,我们使用 KafkaUtils.createDirectStream 创建了InputDStream:

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

 ssc, kafkaParams, topicsSet)

看一下createDirectStream源码:

 def createDirectStream[

 K: ClassTag,

 V: ClassTag,

 KD <: Decoder[K]: ClassTag,

 VD <: Decoder[V]: ClassTag] (

 ssc: StreamingContext,

 kafkaParams: Map[String, String],

 topics: Set[String]

 ): InputDStream[(K, V)] = {

 val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)

//创建KakfaCluster对象

 val kc = new KafkaCluster(kafkaParams)

//更具kc的信息获取数据偏移量

 val fromOffsets = getFromOffsets(kc, kafkaParams, topics)

 new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](

 ssc, kafkaParams, fromOffsets, messageHandler)

 }

首先通过KafkaCluster从Kafka集群获取信息,创建DirectKafkaInputDStream对象返回

DirectKafkaInputDStream的compute方法源码:

 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {

    
//计算最近的数据终止偏移量

 val untilOffsets = clamp(latestLeaderOffsets(maxRetries))

    
//利用数据的偏移量创建KafkaRDD

 val rdd = KafkaRDD[K, V, U, T, R](

 context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

 

 // Report the record number and metadata of this batch interval to InputInfoTracker.

 val offsetRanges = currentOffsets.map { case (tp, fo) =>

 val uo = untilOffsets(tp)

 OffsetRange(tp.topic, tp.partition, fo, uo.offset)

 }

 val description = offsetRanges.filter { offsetRange =>

 // Don't display empty ranges.

 offsetRange.fromOffset != offsetRange.untilOffset

 }.map { offsetRange =>

 s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +

 s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"

 }.mkString("\n")

 // Copy offsetRanges to immutable.List to prevent from being modified by the user

 val metadata = Map(

 "offsets" -> offsetRanges.toList,

 StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)

 val inputInfo = StreamInputInfo(id, rdd.count, metadata)

 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

 

 currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

 Some(rdd)

 }

可以看到DirectKafkaInputDStream的compute方法中,首先从Kafka集群获取数据的偏移量,然后利用获取偏移量创建RDD,这个Receiver的RDD创建方式不同。


总结:
而且KafkaRDDPartition只能属于一个topic,不能让partition跨多个topic,直接消费一个kafkatopic,topic不断进来、数据不断偏移,Offset代表kafka数据偏移量指针。
数据不断流进kafka,batchDuration假如每十秒都会从配置的topic中消费数据,每次会消费一部分直到消费完,下一个batchDuration会再流进来的数据,又可以从头开始读或上一个数据的基础上读取数据。
思考直接抓取kafka数据和receiver读取数据:
好处一:
直接抓取fakfa数据的好处,没有缓存,不会出现内存溢出等之类的问题。但是如果kafka Receiver的方式读取会存在缓存的问题,需要设置读取的频率和block interval等信息。
好处二:
采用receiver方式的话receiver默认情况需要和worker的executor绑定,不方便做分布式,当然可以配置成分布式,采用direct方式默认情况下数据会存在多个worker上的executor。Kafkardd数据默认都是分布在多个executor上的,天然数据是分布式的存在多个executor,而receiver就不方便计算。
好处三:
数据消费的问题,在实际操作的时候采用receiver的方式有个弊端,消费数据来不及处理即操作数据有deLay多才时,Spark Streaming程序有可能奔溃。但如果是direct方式访问kafka数据不会存在此类情况。因为diect方式直接读取kafka数据,如果delay就不进行下一个batchDuration读取。
好处四:
完全的语义一致性,不会重复消费数据,而且保证数据一定被消费,跟kafka进行交互,只有数据真正执行成功之后才会记录下来。
生产环境下强烈建议采用direct方式读取kafka数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容