1 前言
这里是基于已经看过或者本身已经掌握了 一文带你快速入门kafka 知识点为前提,继续往下写的。另外,这里涉及到的源码分析,笔者所分析的版本如下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2 springboot & Kafka
接下来进入主题。在 一文带你快速入门kafka 中,笔者搭建的是 springboot 项目,我们知道 springboot 是通过自动装配给我们直接使用 kafka 的,所以我们先来看下自动装配这块的代码。
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
@AutoConfiguration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
// 省略代码
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
// 这里被 new 了出来,是用了 DefaultKafkaProducerFactory
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
// 省略代码
}
知道 KafkaTemplate 是在哪里构建后,再看下发送消息的方法(org.springframework.kafka.core.KafkaTemplate#send)
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean {
// 省略代码
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
// 这里拿到了Producer对象
final Producer<K, V> producer = getTheProducer(producerRecord.topic());
this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
Object sample = null;
if (this.micrometerEnabled && this.micrometerHolder == null) {
this.micrometerHolder = obtainMicrometerHolder();
}
if (this.micrometerHolder != null) {
sample = this.micrometerHolder.start();
}
Future<RecordMetadata> sendFuture =
producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
// May be an immediate failure
if (sendFuture.isDone()) {
try {
sendFuture.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted", e);
}
catch (ExecutionException e) {
throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
}
}
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + KafkaUtils.format(producerRecord));
return future;
}
protected Producer<K, V> getTheProducer(@SuppressWarnings("unused") @Nullable String topic) {
boolean transactionalProducer = this.transactional;
if (transactionalProducer) {
boolean inTransaction = inTransaction();
Assert.state(this.allowNonTransactional || inTransaction,
"No transaction is in process; "
+ "possible solutions: run the template operation within the scope of a "
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
+ "before invoking the template method, "
+ "run in a transaction started by a listener container when consuming a record");
if (!inTransaction) {
transactionalProducer = false;
}
}
if (transactionalProducer) {
Producer<K, V> producer = this.producers.get();
if (producer != null) {
return producer;
}
KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
return holder.getProducer();
}
else if (this.allowNonTransactional) {
return this.producerFactory.createNonTransactionalProducer();
}
else if (topic == null) {
return this.producerFactory.createProducer();
}
else {
// 其实走的是这里生成 Producer 实例的,而这个 ProducerFactory 就是用了DefaultKafkaProducerFactory
return getProducerFactory(topic).createProducer();
}
}
// 省略代码
}
org.springframework.kafka.core.DefaultKafkaProducerFactory#createProducer()
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
implements ProducerFactory<K, V>, ApplicationContextAware,
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
// 省略代码
@Override
public Producer<K, V> createProducer() {
return createProducer(this.transactionIdPrefix);
}
@Override
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
// 在《一文带你快速入门kafka》的示例代码中,txIdPrefix 为 null
return doCreateProducer(txIdPrefix);
}
private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
synchronized (this) {
if (this.producer != null && expire(this.producer)) {
this.producer = null;
}
if (this.producer == null) {
// 在《一文带你快速入门kafka》的示例代码中,会走到这里来
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout, this.beanName, this.epoch.get());
this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
}
return this.producer;
}
}
// 省略代码
}
CloseSafeProducer 的类图如下
可以看出 spring 自己定义了一个 CloseSafeProducer 的类管理起来,并提供 Producer 的方法给我们使用。
另外,还需要关注下 ProducerRecord
org.apache.kafka.clients.producer.ProducerRecord
public class ProducerRecord<K, V> {
// 主题
private final String topic;
// 分区
private final Integer partition;
// 消息头
private final Headers headers;
// 我们的消息可以根据topic归类,而这个key可以对消息进行二次归类,同一个key的消息会放在同一个partition,有key的消息还能进行日志压缩
private final K key;
// 我们发送的消息,注意不要为空,因为为空表示特定的消息——墓碑消息
private final V value;
// 消息的时间戳
private final Long timestamp;
// 省略代码
}
从这里很容易知道,我们发送时不仅仅可以指定主题,还可以指定分区等信息。
必要参数
必要参数是指要创建生产者实例所必需用到的参数,这里指的是 kafka-client 原生态写法所需要的参数
-
key.serializer
对 key 进行序列化的类,可以自己实现
-
value.serializer
对 value 进行序列化的类,可以自己实现
spring:
kafka:
# 必填,不填使用默认值 localhost:9092 连接 kafka 集群的地址,多个用逗号隔开;单节点就填一个就好
bootstrap-servers: "192.168.226.140:9092"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
我这里这样配置是为了给 spring 自动装配,但是 spring 这些配置无非也就是给 kafka-client 使用,如果不怕麻烦完全可以自己去集成 kafka-client 然后创建 Producer 使用的。
消息的发送
我们看发送消息的源码时曾看到这么一段源码,看下图
ProducerRecord 是 kafka-client 对消息元数据封装的类,看上图,这里构建时就用了 topic 和 我们发送的消息数据,其它默认为空,请看下图
在实际开发过程中,有时我们需要更多信息,比如:指定 partition,指定 key等等。甚至,可能需要给消息增加一些有特殊业务含义的信息,但是又不能存放在我们的业务消息上(即 value),这时我们就需要使用 headers 了。
上图大家可以看到 Producer 发送消息的方法只有两个,由于 Future 本身就是支持异步和同步的,所以这里我们可以根据自己实际情况选择。另外,如果想异步,并且获取发送完成后做一些业务处理,可以用第二个方法,可以使用 Callback。
生产者客户端的整体架构
前面给出了 spring 集成 kafka-licent 的源码,知道了 spring 是如何简化我们对 kafka-client 的使用过程。接下来我们来看下生产者客户端与服务端之间的交互。下图来自《深入理解Kafka:核心设计与实践原理》
从这个架构图我们可以清晰看到,Producer 发送消息到 kafka服务端前要经历许多元件:
-
拦截器
Kafka 拦截器有两种:生产者拦截器和消费者拦截器。这里指的是生产者拦截器,可以根据自己业务需要在消息发送前,对消息进行业务处理。比如:统计。
-
序列化器
对 key 和 value 进行序列化使用的。
-
分区器
当我们发送消息时有明确指定发送的分区,那么用不上分区器。当我们没有指定时,就是靠分区器帮我们计算当前消息应该发往哪个分区的。
-
消息累加器(或者叫消息收集器)
Producer 发送消息不是一下就发送到了 Kafka 服务端的,而是会先放在我们本地内存缓存 —— 消息累加器。它会为每个分区创建一个双端队列,而里面的 ProducerBatch 是消息批次,多个 ProducerRecord 会放在一个 ProducerBatch 中一起发送,从而减少网络IO。缓存大小可以通过
buffer.memory
配置。buffer.memory
这个配置需要注意,它默认是 32 MB,如果发送速度太快,就会造成缓存区域被塞满。这时就会造成我们调用 send 方法时要么被阻塞,要么抛异常,这个主要需要看我们配置的阻塞时间max.block.ms
,默认 60 秒这里面其实还有一个 BufferPool,在上图没有体现出来的。
-
BufferPool
网络发送数据都是一个一个数据包发送的,我们发送时反复创建和释放存放内存的空间,在 Kafka 的应用场景中就是很大的开销了,所以 Kafka 就搞了一个 BufferPool,它会对特定大小的 ByteBuffer 进行复用。这个特定大小可以通过
batch.size
指定,默认 16 KB。
-
-
Sender
从图就可以看出,它会从消息累加器中拉取 ProducerBatch,并发送给 Kafka 服务端。只不过在发送前,Sender 会把 Deque<ProducerBatch> 转成 <Node, List<ProducerBatch>> → <Node, Request>,Node 表示这个消息批次需要发往的 broker,而 Request 是因为发送数据包不仅仅是只有消息数据,还要包含 Kafka 通信的协议
-
除此之外,它还有一个责任,就是更新元数据信息。举个列子,请看下图(图来自《深入理解Kafka:核心设计与实践原理》)
判定leastLoadedNode.png
这里明显可以看到 node1 的压力是最小的(这时 node1 就是 leastLoadedNode),在需要快速消费消息,充分利用资源的情况下,发送给 node1 是最为合理的,那么 Sender 怎么知道 Node1 压力是最小的呢?这就需要维护元数据信息了!一些元数据变动,客户端能感知到,如 broker 地址变化。这些需要使用到的元数据的变动是可以感知的,但是那些不需要使用的呢?配置metadata.max.age.ms
,默认 5 分钟,超过5分钟,没有更新的元数据都会触发元数据更新。这个动作是由 Sender 负责的,它会向 leastLoadedNode 发出请求。
生产者拦截器
拦截器(Interceptor)是早在 Kafka 0.10.0.0 中就引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器
要实现生产者拦截器十分的简单,只要实现 org.apache.kafka.clients.producer.ProducerInterceptor
即可。该接口只有三个抽象方法需要实现:
-
onSend
这个方法会在 KafkaProducer.send(ProducerRecord) 和KafkaProducer.send(ProducerRecord, Callback) 调用时触发,onSend 在 key 和 value 被序列化和计算 partition 之前。
-
onAcknowledgement
这个方法会在消息被应答失败时被调用,它优先执行于 Callback。
-
close
主要用于关闭拦截器在执行时所用到的一些需要关闭或清理的资源。
注意:这三个方法抛出的异常均会被捕获并记录到日志中,并不会再向上传递。
分区器
分区器主要是在我们没有指定 partition 时,给我们计算当前消息应该发往哪个 partition
public class KafkaProducer<K, V> implements Producer<K, V> {
// 省略代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
// 序列化器对key序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
// 序列化器对value序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 计算 partition 的方法
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 省略代码
}
// 分区器
private final Partitioner partitioner;
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
// 省略代码
}
Kafka 中默认的分区器是:org.apache.kafka.clients.producer.internals.DefaultPartitioner
对于 org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
当 key 不为 null 时,默认分区器会对 key 进行哈希,根据哈希值来计算分区号,因此拥有相同 key 的消息会落到同一分区;如果 key 为 null,那么消息将会以轮询的方式发送给该消息对应的 topic 的可用的 partition。
序列化器
对 key 和 value 进行序列化,转成字节数组才能通过网络发送给 Kafka 服务端
spring-kafka 默认采用的是 org.apache.kafka.common.serialization.StringSerializer。
如果想自定义,就要实现 org.apache.kafka.common.serialization.Serializer 接口
public interface Serializer<T> extends Closeable {
/**
* 配置当前类
* @param configs configs 存放了配置信息
* @param isKey true-是key false-value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
// configs 中有3个参数用来配置序列化的编码类型:key.serializer.encoding、value.serializer.encoding 和 serializer.encoding
}
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/
byte[] serialize(String topic, T data);
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param headers headers associated with the record
* @param data typed data
* @return serialized bytes
*/
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
/**
* Close this serializer.
* <p>
* 该方法必须是幂等的,因为它可能会被多次调用。
*/
@Override
default void close() {
// intentionally left blank
}
}
重要的参数配置
温馨提示:kafka 大部分参数都有默认值并且较为合理,如果不是特别熟悉,请不要擅自改动。
acks
这个参数用来指定分区中必须要有多少个副本收到这条消息后,生产者才会认为消息写入成功
- acks=1
默认值是1,只要消息写入 leader,服务端就会返回成功的 ack 给生产者,生产者才认为投递成功
-
acks=0
生产者只要发送成功,无需等待服务端的响应,直接认为成功
-
acks=-1 或 acks=all
需要所有 ISR 均成功写入消息,服务端才会返回成功的 ack 给生产者,生产者才认为投递成功。需要注意的是,当 ISR 中只有 leader 这种情况,那么将会退化成 acks=1。因此,还需要配合
min.insync.replicas=n
(这个参数设置了 ISR 副本数量最少要有多少个)使用。
max.request.size
这个参数是用来限制生产者能发送的消息的最大值,默认值是 1 MB
这里需要知道的是,客户端有客户端的配置,服务端也有服务端的配置,这个当客户端允许的最大值超过了服务端的 message.max.bytes
就会抛异常。除了上面这个例子以外,还有一些其它的配置和 max.request.size
也有联动,所以最好不要胡乱改。
retries 和 retry.backoff.ms
retries 用来配置生产者重试投递的次数,默认为 0。
retry.backoff.ms 用来配置重试之间的时间间隔,默认 100 ms。
compression.type
这个参数用来指定消息的压缩方式,默认为 “none”,即默认不压缩消息。可配置类型:
- none
- gzip
- snappy
- lz4
connections.max.idle.ms
这个参数用来指定在多久之后关闭闲置的连接,默认值是 540000 ms,即 9 分钟。
linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认为 0。生产者会在 ProducerBatch 被填满或者等待时间超过 linger.ms
值时发送出去。这个参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。
receive.buffer.bytes
这个参数用来设置 Socket 接收消息缓冲区(SO_RECEBUF)的大小,默认为 32 KB。如果设置为 -1,则使用操作系统的默认值。
send.buffer.bytes
这个参数用来设置 Socket 发送消息缓冲区(SO_RECEBUF)的大小,默认为 128 KB。如果设置为 -1,则使用操作系统的默认值。