1、概述
flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义
flink与kafka整合,相应版本对于的maven依赖如下表
maven依赖举例
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
2、代码整合
2.1、添加source:Kafka Consumer
flink利用FlinkKafkaConsumer来读取访问kafka, 根据kafka版本不同FlinkKafkaConsumer的类名也会变化,会变为FlinkKafkaConsumer
[08,09,10...]后面的数字就是对于的kafka的大版本号 。
初始化FlinkKafkaConsumer 需要如下参数
1、topic名字,用来指定消费一个或者多个topic的数据
2、kafka的配置信息,如zk地址端口,kafka地址端口等
3、反序列化器(schema),对消费数据选择一个反序列化器进行反序列化。
flink kafka的消费端需要知道怎么把kafka中消息数据反序列化成java或者scala中的对象。用户通过使用DeserializationSchema,每一条kafka的消息都会作用于DeserializationSchema的eserialize(byte[] message)方法。来将kafka的消息转换成用户想要的结构。
用户通过自定义schema将接入数据转换成自定义的数据结构,主要通过实现KeyedDeserializationSchema或者DeserializationSchema接口来完成,可以自定义。flink内置的 对DeserializationSchema 的实现有
public class SimpleStringSchema implements DeserializationSchema<String>
public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>
对 KeyedDeserializationSchema的实现有
public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>
public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode>
例如:
val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p)
2.2、自定义schema举例
public class MySchema implements KeyedDeserializationSchema<KafkaMsgDTO> {
@Override
public KafkaMsgDTO deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
String msg = new String(message, StandardCharsets.UTF_8);
String key = null;
if(messageKey != null){
key = new String(messageKey, StandardCharsets.UTF_8);
}
return new KafkaMsgDTO(msg,key,topic,partition,offset);
}
@Override
public boolean isEndOfStream(KafkaMsgDTO nextElement) {
return false;
}
@Override
public TypeInformation<KafkaMsgDTO> getProducedType() {
return getForClass(KafkaMsgDTO.class);
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.7.0</version>
</dependency>
public class KafkaMsgDTO {
private String topic;
private int partition;
private long offset;
private String mesg;
@Override
public String toString() {
return "KafkaMsgDTO{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", offset=" + offset +
", mesg='" + mesg + '\'' +
", key='" + key + '\'' +
'}';
}
private String key;
public KafkaMsgDTO(){
}
public KafkaMsgDTO(String mesg,String key,String topic,int partition,long offset){
this.mesg = mesg;
this.key = key;
this.topic = topic;
this.partition = partition;
this.offset = offset;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public String getMesg() {
return mesg;
}
public void setMesg(String mesg) {
this.mesg = mesg;
}
}
2.3、指定offset位置进行消费
val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("topic",new MySchema(),p)
// myConsumer.setStartFromEarliest()
//从最早开始消费,消费过的数据会重复消费,从kafka来看默认不提交offset.
// myConsumer.setStartFromLatest()
//从最新开始消费,不消费流启动前未消费的数据,从kafka来看默认不提交offset.
myConsumer.setStartFromGroupOffsets()
//从消费的offset位置开始消费,kafka有提交offset,这是默认消费方式
//如果没有做checkpoint 数据进入sink就会提交offset,如果sink里面逻辑失败。offset照样会提交,程序退出,如果重启流,消费失败的数据不会被重新消费
//如果做了checkpoint 会保证数据的端到端精准一次消费。sink里面逻辑失败不会提交offset
2.4、checkpointing
env.enableCheckpointing(5000);
val stream = env.addSource(myConsumer)
2.5、sink逻辑
stream.addSink(x=>{
println(x)
println(1/(x.getMesg.toInt%2))//消息是偶数就会报错,分母为0
println(x)
})
val stream = env.addSource(myConsumer)
//实验表明如果sink处理逻辑有一部线程在跑,如果异步线程失败。offset照样会提交。
stream.addSink(x=>{
println(x)
new Thread(new Runnable {
override def run(): Unit = {
println(1/(x.getMesg.toInt%2))//消息是偶数就会报错,分母为0
}
}).start()
println(x)
})
2.6、指定到某个topic的offset位置进行消费
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)