记一次Flink写入Kafka坑点

最近做了一个将结果数据写入到Kafka的需求,sink部分代码如下:

  1. val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

  2. sinkTopic, new StringKeyedSerializationSchema,producerConfig, sinkSemantic)

  3. ds.addSink(kafkaProducer).setParallelism(sinkParallelism)

其中StringKeyedSerializationSchema是自定义的实现KeyedSerializationSchema的序列化器,用于序列化写入kafka的key/value, 任务也上线了,在flink web页面看到任务各项指标一切正常,也测试消费写入kafka的数据,得到的结果也如预期一样,想着万事大吉了,so easy~
过了一会kafka中间件的同事找过来说:你这个写入topic的数据怎么只有这几个分区,其他分区都没有数据写入~

image

什么情况?任务看着一切都ok啊,怎么就有分区没有数据写入呢?马上google一下数据写入kafka的分区策略:

  1. 如果指定写入分区,就将数据写入分区

  2. 如果没有指定分区,指定了key, 那么就会按照key hash对分区取模方式发送

  3. 如果既没指定分区又没指定key,那么就会以轮序的方式发送

而实际情况是有几个分区一条数据都没有写入,并且在StringKeyedSerializationSchema也指定了每条写入数据的key, 那么就一定是第一种情况了,在FlinkKafkaProducer011中指定了数据写入的分区,马上翻看源码,在FlinkKafkaProducer011的invoke方法里面有这么一个逻辑:

  1. if (flinkKafkaPartitioner != null) {

  2. record = new ProducerRecord<>(

  3. targetTopic,

  4. flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),

  5. timestamp,

  6. serializedKey,

  7. serializedValue);

  8. } else {

  9. record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);

  10. }

很明显就是执行了if逻辑,也是就flinkKafkaPartitioner不为空,在构建ProducerRecord时调用了flinkKafkaPartitioner.partition的方法,指定写入的partition,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的时候给的默认值FlinkFixedPartitioner,在看下其partition方式:

  1. partitions[parallelInstanceId % partitions.length]

parallelInstanceId表示当前task的index,partitions表示kafka的topic的分区,该逻辑求得的分区就是根据当前task index 对partition取余得到的,而我设置的sinkParallelism是4,topic的分区数是6,到这里就比较明朗,取余永远不会得到4、5,所以就导致分区4、5一直没有数据写入。如果设置的parallism设置比kafka的分区数还要大,就会导致得到的partition值大于topic实际partition。
那么解决方式有一下几种:

  1. parallism设置成为与kafka topic 分区数一致大小

  2. 将flinkKafkaPartitioner指定为空,并且制定写入kafka的key

  3. 将flinkKafkaPartitioner与写入的key都置为空

  4. 自定义一个FlinkKafkaPartitioner,重写partition方法

最终选择第三种较为简单的方案,修改代码:

  1. val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

  2. sinkTopic, new StringKeyedSerializationSchema,producerConfig,Optional.ofNullable(null), sinkSemantic,5)

同时将StringKeyedSerializationSchema的serializeKey返回值设置为null. 再次运行任务,查看kafka 数据写入情况,所有分区都有数据写入。最终破案。

image
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,638评论 0 10
  • 在软件项目的生命周期中,开发只占开始的一小部分,大部分时间我们要对项目进行运行维护,Kafka相关的项目也不例外。...
    柴诗雨阅读 8,203评论 0 7
  • 一.Kafka发送消息的整体流程: 步骤:1.ProducerInterceptors对消息进行拦截。2.Seri...
    陈阳001阅读 3,811评论 0 5
  • 一、Kafka简介 Kafka (科技术语)。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规...
    边学边记阅读 1,788评论 0 14
  • Kafka的基本概念 BrokerKafka集群中包含多个服务器,其中每个服务器称为一个broker。有一点需要注...
    frmark阅读 385评论 0 0