Structured Streaming 与0.10及以上版本的Kafka整合来对Kafka中的读书进行读取和写入操作。
Linking
对于使用SBT/Maven定义的Scala/Java应用程序,请将你的应用程序与如下的artifact相连接:
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0
对于Python引用程序,你需要在发布应用程序时添加上述的库及其依赖,详情请参考下面的发布模块介绍。
从Kafka中读取数据
为流式查询创建一个Kafka Source
Scala 代码:
// 订阅一个topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅多个topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅满足一定正则式的topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Java代码:
// 订阅一个topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 订阅多个topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 订阅满足一定正则式的topic
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Python代码:
# 订阅一个topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅多个topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅满足一定正则式的topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
为批量查询定义一个Kafak Source
如果你的用例更适用于批处理的话,你可以根据既定的offset范围来创建一个DataSet/DataFrame
Scala代码:
// 订阅一个topic,默认从topic最早的offset到最近的offset
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅多个topic,并指定每个topic的订阅范围
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Java代码:
// 订阅一个topic,默认从topic最早的offset到最近的offset
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅多个topic,并指定每个topic的订阅范围
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
Python代码:
# 订阅一个topic,默认从topic最早的offset到最近的offset
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅多个topic,并指定每个topic的订阅范围
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Source中的每一行都遵循下列模式:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
不论是批处理还是流式处理都必须为Kafka Source设置如下选项:
选项 | 值 | 意义 |
---|---|---|
assign | json值{"topicA":[0,1],"topicB":[2,4]} | 指定消费的TopicPartition,Kafka Source只能指定"assign","subscribe","subscribePattern"选项中的一个 |
subscribe | 一个以逗号隔开的topic列表 | 订阅的topic列表,Kafka Source只能指定"assign","subscribe","subscribePattern"选项中的一个 |
subscribePattern | Java正则表达式 | 订阅的topic列表的正则式,Kafka Source只能指定"assign","subscribe","subscribePattern"选项中的一个 |
kafka.bootstrap.servers | 以逗号隔开的host:port列表 | Kafka的"bootstrap.servers"配置 |
下面的配置是可选的:
选项 | 值 | 默认值 | 支持的查询类型 | 意义 |
---|---|---|---|---|
startingOffsets | "earliest","lates"(仅streaming支持);或者json 字符"""{"topicA":{"0":23,"1":-1},"TopicB":{"0":-2}}""" | 对于流式处理来说是"latest",对于批处理来说是"earliest" | streaming和batch | 查询开始的位置可以是"earliest"(从最早的位置开始),"latest"(从最新的位置开始),或者通过一个json为每个TopicPartition指定开始的offset。通过Json指定的话,json中-2可以用于表示earliest,-1可以用于表示latest。注意:对于批处理而言,latest值不允许使用的。 |
endingOffsets | latest or json string{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | batch | 一个批查询的结束位置,可以是"latest",即最近的offset,或者通过json来为每个TopicPartition指定一个结束位置,在json中,-1表示latest,而-2是不允许使用的 |
failOnDataLoss | true or false | true | streaming query | 数据丢失之后(topic被删除,或者offset不在可用范围内时)查询是否失败,这可能会引起一个告警,如果你觉得不适用于你的应用程序时,你可以禁用掉。批查询如果在读出数据的时候发现数据丢失了总会失败。 |
kafkaConsumer.pollTimeoutMs | long | 512 | streaming 和batch | executor轮询kafka中的数据的超时时间 |
fetchOffset.numRetries | int | 3 | streaming 和batch | 获取Kafka offset的重试次数 |
fetchOffset.retryIntervalMs | long | 10 | 获取Kafka offset的时间间隔 | |
maxOffsetPerTrigger | long | none | streaming 和batch | 不想翻译了。。。 |
写数据到Kafka
这里我们讨论对鞋流式查询或者批查询数据到Apache Kafka的支持,请注意Apache Kafka只支持至少一次语义,所以当写流式查询数据或者批查询数据到Kafka时,有些记录可能会重复,这是有可能发生的,例如,kafka需要重新获取还未被一个Broker识别的消息记录,即使这条消息已经被Broker接收并将消息写入记录中了。由于Kafka本身的这些写语义,Structured Streaming无法避免写入的重复记录的发生。如果一个查询的写入成功,你就可以认为是至少写入了一次,一个删除写入重复记录的解决方案就是引入一个唯一主键,这就可以在读取时执行去重操作了。
Column | Type |
---|---|
key(可选) | string或者binary |
value(必选) | string或者binary |
topic(*可选) | string |
写入Kafka中的DataFrame必须遵循如下格式:
Column | Type |
---|---|
key(可选) | string或者binary |
value(必选) | string或者binary |
topic(*可选) | string |
注意:如果topic配置项没有指定的话,topic列是需要指定的
value列是唯一必选的选项,如果key列没有指定的话,系统会指定为null。如果topic列指定的话,会将给定的数据写入到Kafak中对应的topic中,除非"topic"配置项已经指定,如果配置项已经指定的话,配置项中的配置会覆盖掉topic列中的配置。
选项 | 值 | 意义 |
---|---|---|
kafka.bootstrap.server | 逗号分隔的host:port列表 | Kafka的"bootstrap.servers"配置 |
无论是批查询还是流式查询,下面的选项必须得为Kafka sink指定;
选项 | 值 | 意义 |
---|---|---|
kafka.bootstrap.server | 逗号分隔的host:port列表 | Kafka的"bootstrap.servers"配置 |
选项 | 值 | 默认值 | 支持的查询类型 | 意义 |
---|---|---|---|---|
topic | string | none | streaming和batch | 设置允许写入所有行到Kafka中的topic列表,这个配置会覆盖数据中存在的topic列 |
下面的配置是可选的:
选项 | 值 | 默认值 | 支持的查询类型 | 意义 |
---|---|---|---|---|
topic | string | none | streaming和batch | 设置允许写入所有行到Kafka中的topic列表,这个配置会覆盖数据中存在的topic列 |
为Streaming查询创建一个Kafka Sink
Scala代码:
// 写DataFrame中的key-value数据到option指定的kafka topic中
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
Java代码:
// 写DataFrame中的key-value数据到option指定的kafka topic中
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
Python代码:
# 写DataFrame中的key-value数据到option指定的kafka topic中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# 写DataFrame中的key-value数据到数据中指定的Kafka topic中
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
写Batch查询的结果到Kafka中
Scala代码:
//写DataFrame中的key-value数据到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
Java代码:
// 写DataFrame中的key-value数据到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
Python代码:
# 写DataFrame中的key-value数据到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# 写DataFrame中的key-value数据到数据中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
Kafka特殊配置
Kafka自身的配置可以通过DataStreamReader.option中以kafka.为前缀来指定,如:stream.option("kafka.bootstrap.servers","host:port"),有关Kafka的可配参数,请参阅Kafka Consumer COnfig文档中关于读数据的参数以及Kafka Producer COnfig文档中关于写数据的参数。
注意:下面的Kafka参数是不能设置的,如果设置的话Kafka的Source或者Sink会抛出异常:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame oeprations to explicitly serialize the values into either strings or byte arrays.
enable.auto.commit: Kafka source doesn’t commit any offset.
interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.
发布
作为Spark应用程序,通过spark-submit
来启动你的应用程序。spark-sql-kafka-0-10_2.11
以及它的依赖可以使用--packages
添加到'spark-submit'中,如:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...
请参考应用程序提交指南来获取更多关于提交带有外部依赖的应用程序的信息。