前言
一个消息队列,必然存在着生产者和消费者,而生产者(Producer)负责向Kafka服务节点(Broker)。
从一个示例开始
public class KafkaProducerDemo {
private static final String brokerList = "localhost:9092";
private static final String topic = "topic-demo";
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.server", brokerList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "id.demo");
return props;
}
public static void main(String[] args) throw Exception {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<topic, "Message Test">;
producer.send(record);
}
}
1. 生产者客户端
1.1 生产者创建及参数说明
创建Kafka生产者客户端KafkaProducer有3个参数必填项:
1)bootstrap.servers:指定生产者客户端连接Kafka集群的地址列表,多个以逗号隔开(如:127.0.0.1:9092,127.0.0.1:9093)。连接Kafka集群并不需要配置所有的broker地址,因为生产者能从broker获取到其他broker的信息,一般至少设置两个,一个broker宕机时也仍然能连接到Kafka集群。
2)key.serializer:将Key(可以用key来计算分区号,从而将消息归类到某个指定分区)序列化成字节数组。
3)value.serializer:将value序列化成字节数组。
1.2 消息发送
根据参数构建完生产者后,就是创建消息对象 ProducerRecord,属性如下:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partitoin;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}
- 其中 topic 和 value 为必填项,其余属性可选填
- key 用来指定消息的键,属于消息的附加信息,可以用来计算分区号,让消息可以发往特定的分区,除了 topic 外消息的二次归类,即同一个key的消息会被划分到同一个分区。
发送消息的三种模式
发送消息的方法本身是异步的,同步只是在调用方法后对后续操作进行了阻塞
发送消息方法如下:
public class KafkaProducer<K, V> implements Producer<K, V> {
// ...
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
// ...
}
- 发后即忘
producer.send(record);
- 同步
Future<RecordMetadata> future = producer.send(record);
// 调用 get方法阻塞等待响应,从而达到同步效果
RecordMetadata metadata = future.get();
-
异步
利用send()方法的Callback,在Kafka返回响应时调用该函数实现异步的操作,如:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("send message exception:{}", exception);
} else {
log.info("send message success, topic:{} - partition:{} - offset:{}", metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
通常KafkaProducer不会只负责发送单条消息,一般是发送多条消息。对于同一分区的不同消息,先发送的消息,回调也会先执行,即回调函数的执行是分区有序的。
关于close方法
发送完消息后,需要调用KafkaProducer.close()方法回收资源。
close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。
也提供了带有超时时间的close方法,在超过等待时间后会强行关闭KafkaProducer,一般不建议使用。
1.3 序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka,同样的,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。
序列化器均实现接口:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
byte[] serialize(String var1, T var2);
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}
1)configure()方法在KafkaProducer创建时调用,用来配置当前类,如编码类型的确定。
2)serialize()方法用来执行序列化操作。
3)close()方法用来关闭当前的序列化器(一般是个空方法),该方法可能会被KafkaProducer调用多次,实现的话需要保证方法的幂等性。
默认实现的序列化器如:
org.apache.kafka.common.serialization.StringSerializer
1.4 分区器
消息经过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)才会被送达broker。
如果消息ProducerRecord 指定了 partition 字段,就不需要分区器进行分区,因为分区已被 partition 指定。
分区器默认需要实现接口:
public interface Partitioner extends Configurable, Closeable {
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
void close();
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
默认实现:org.apache.kafka.clients.producer.internals.DefaultPartitioner
public class DefaultPartitioner implements Partitioner {
// ...
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
public void close() {
}
}
默认分区器DefaultPartitioner的分区规则:
- 如果 key 不为null,则对 key 进行哈希,最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区(计算得到的分区号会是所有分区中的任意一个,与key为null是轮询可用分区有差别)。
- 如果 key 为null,消息会已轮询的方式发往 topic 内的各个可用分区。
1.5 拦截器
生产者拦截器可以用来在消息发送前做一些准备工作,如按指定规则过滤不符合要求的消息,对消息内容进行加工处理等;也可以用来在发送回调逻辑前做一些操作,比如统计消息发送的成功率。
public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
1)将消息序列化和计算分区之前会调用拦截器的onSend()方法对消息进行相应的定制化操作(一般不要对topic、key 和 partition进行修改)。
2)在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,且优先于Callback。该方法运行在Producer的IO线程,应尽量简单,不然会影响消息发送效率。
2. 原理分析
2.1 生产者客户端架构
整个生产者客户端由主线程和Sender线程协调运行。主线程中创建消息,然后通过拦截器、序列化器和分区器处理后缓存到消息累加器(RecordAccumulator),Sender线程负责从消息累加器中获取消息并发送到broker。
RecordAccumulator用来缓存消息便于Sender线程可以批量发送,减少网络传输的资源消耗,从而提高性能。如果生产者发送消息的速度超过Sender发送到服务器的速度,会导致生产者空间不足,将会阻塞一段时间后,抛出异常,与参数max.block.ms有关,默认60秒。而buffer.memory参数则可以设置缓存空间大小,默认为32MB。
RecordAccumulator中为每个分区维护了一个双端队列,队列中的内容是ProducerBatch,即Deque<ProduderBatch>,创建消息写入到尾部,发送消息从头部读取。ProducerBatch是消息发送的一个批次,里面包含了一个或多个ProducerRecord。
Sender从RecordAccumulator中获取到缓存的消息之后,会进一步将<分区,Dequeue<ProducerBatch>>
转换为<Node,List<ProruderBatch>>,Node表示的是kafka集群的broker节点。这里是一个概念的转变,对于网络连接来说,生产者客户端与具体broker节点建立的连接,也就是向具体的broker节点发送消息而不关心具体分区。而对于KafkaProducer来说,它只关心向哪个分区发送消息。所以这里做一个从应用逻辑层面到网络IO层面的转换。
请求在发送给Kafka之前还会保存到InFlightRequests中,形式为: Map<NodeId,Dequeue<Request>>
主要作用是缓存了已经发出去但是还未收到响应的请求。InFlightRequests通过配置参数max.flight.requests.per.connection可以限制每个链接最多缓存数量,默认值为5,即每个链接最多只能缓存5个未响应的请求,超过该参数之后就不能继续像这个连接发送请求。