针对0.10及以上版本的kafka, spark推出了更简洁的模式进行数据读取, jar包名称为spark-streaming-kafka-0-10_2.12
. 这种方式可以使得读取的rdd的分区和kafka的分区保持一致, 从而实现高效地读取. 本文对这种读取方式进行解析, 不关注过多源码细节, 主要关注官方是如何利用KafkaConsumer
这个类来实现这种读取模式的, 本文的源码版本为2.4.0
先说结论 :
(1) 在启动的时候, spark在driver端生成一个consumer, 并获得所订阅主题及分区的当前offset值
(2) 根据设置的参数去判断每个分区需要拉取的数据量, 即每个分区的untilOffset值, 此时driver端获得了每个分区需要被消费的数据量, spark将这些值保存在OffsetRange的类里面, 该类的定义如下
final class OffsetRange private(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long) extends Serializable {
(3) 在executor端创建新的consumer (和driver端的consumer属于不同的消费组), 然后将上一步获得的 OffsetRange
分配到各个executor, 这些executor上的consumer根据指定的分区及起始offset进行消费.
(4) 待executor消费成功后, driver端的consumer即seek
每个分区的offset到最新的位置, 然后重复第二步的过程, 同时提交当前offset(可自动/手动提交)
总的来说, spark分别在driver端和executor端创建consumer连接, driver端的consumer负责确定当前需要消费数据offset范围, 并分配到各个executor, 然后executor端的consumer根据分配到的offset范围进行消费, 最后在driver端进行offset的提交
下面的详细流程:
- 创建
DirectKafkaInputDStream
官方的实例代码如下:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
这里创建的stream
对象为DirectKafkaInputDStream
, 这个类还是继承类InputDStream
, 所以我们直接看它重写的start
和compute
方法即可
start
方法定义如下:
override def start(): Unit = {
// 这里的consume类型为KafkaConsumer, 通过上一步传入的Subscribe类型来设置不同的KafkaConsumer属性
val c = consumer
// 对offset进行矫正, 主要是为了考虑缓存中有数据, 但是还未poll的情况
paranoidPoll(c)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> c.position(tp)
}.toMap
}
}
start
方法主要就是在driver端创建了一个KafkaConsumer
, 并对当前的offset进行更新/矫正
接下来是compute
方法, 主要是生成KafkaRDD
, 代码如下:
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
true)
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
getPreferredHosts, useConsumerCache)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
KafkaRDD`的构造方法为
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, useConsumerCache)
除了包含每个分区需要消费的数据范围offsetRanges
参数外, 还包含一个executorKafkaParams
, 此为executor端创建consumer时的参数,相关代码在
val executorKafkaParams = {
val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
KafkaUtils.fixKafkaParams(ekp)
ekp
}
点进去我们可以发现, executor端的kafkaParams
主要针对driver端的做了4点修改, 如下:
private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId) {
logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
}
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
// possible workaround for KAFKA-3135
val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
}
}
在成功poll到数据后, 会将currentOffsets
更新为untilOffsets
, 然后在下一次compute
之前的latestOffsets()
方法中会执行
c.seekToEnd(currentOffsets.keySet.asJava)
, 将driver端consumer的offset刷到最新.
最后还会有一个commitAll()
方法, 这个方法只在手动调用stream.commitAsync(offsetRanges)
后才会起作用, 因为这个方法是从一个队列里取出要提交的offset
值, 然后调用kafkaConsumer.commitAsync()
进行提交, 而stream.commitAsync(offsetRanges)
方法会将要提交的offset
保存至队列.
-
KafkaRDD
的实现
KafkaRDD
是继承自RDD
的类, 用于在上一步的compute
方法中返回的RDD
类型, 构造如下
private[spark] class KafkaRDD[K, V](
sc: SparkContext,
val kafkaParams: ju.Map[String, Object],
val offsetRanges: Array[OffsetRange],
val preferredHosts: ju.Map[TopicPartition, String],
useConsumerCache: Boolean
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges
先看它重写的getPartition
方法, 这是定义RDD分区的依据
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
}.toArray
}
在这里我们可以看到, RDD的分区数是根据offsetRanges
来的, 和订阅Kafka的分区数是保持一致.
接下来是compute
方法的重写, 这一步返回的是一个KafkaRDDIterator
, 而在KafkaRDDIterator
的next()
方法如下
override def next(): ConsumerRecord[K, V] = {
if (!hasNext) {
throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
}
val r = consumer.get(requestOffset, pollTimeout)
requestOffset += 1
r
}
我们可以看到, 在这一步就是调用consumer.get()
方法来返回数据, 每次返回一个ConsumerRecord[K, V]
, 这里的consumer
对象是KafkaDataConsumer
, get
方法就是从buffer
缓冲区中返回数据, buffer
数据来源在poll
方法中, 定义如下:
private def poll(timeout: Long): Unit = {
val p = consumer.poll(timeout)
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()} ${r.size}")
buffer = r.listIterator
}
这里的consumer即是真正的KafkaConsumer
了, 其定义在
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[K, V] = {
val c = new KafkaConsumer[K, V](kafkaParams)
val topics = ju.Arrays.asList(topicPartition)
c.assign(topics)
c
}
在这里我们可以看到, 这里采用的其实就是assign
指定分区的方式进行数据拉取.
大致的流程就是这些了, 主要设计到了DirectKafkaInputDStream
, KafkaRDD
, KafkaRDDIterator
这么几个类, 其中还涉及到了是否允许数据compacted、consumer缓存等概念, 因为这里主要关注KafkaConsumer消费的过程, 在这里不做进一步的讨论了, 如果不对的地方欢迎一起交流.