分区策略
1.更改分区策略
如何指定分区器?
- application.properties形式
// 指定自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
- 编码形式(部分代码)
public ProducerFactory<Object, Object> kafkaProducerFactory() {
// 构建配置对象
Map<String, Object> configurationProperties = kafkaProperties.buildProducerProperties();
// 更改自定义的分区策略
// kafka-clients 2.7.1自带RoundRobinPartitioner和UniformStickyPartitioner
// 也可指定为自定义的分区策略
configurationProperties.put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");
...
}
编码形式进行配置kafka config(完整代码)
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaTemplateConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(@Qualifier("defaultFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
public ProducerListener<Object, Object> kafkaProducerListener() {
return new LoggingProducerListener<>();
}
@Bean(name = "defaultFactory")
public ProducerFactory<Object, Object> kafkaProducerFactory() {
// 构建配置对象
Map<String, Object> configurationProperties = kafkaProperties.buildProducerProperties();
// 更改自定义的分区策略
// kafka-clients 2.7.1自带RoundRobinPartitioner和UniformStickyPartitioner
// 也可指定为自定义的分区策略
configurationProperties.put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
configurationProperties);
String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
}
2.kafka分区器源码示例
Roundrobin源码 (kafka-clients 2.7.1)
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
// kafka-clients-2.7.1
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取topic所有分区 1)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// concurrentMap维护了各个topic的计数器(原子整形),计数器自增
int nextValue = this.nextValue(topic);
// 获取topic所有可用分区 2)
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 可用分区非空
if (!availablePartitions.isEmpty()) {
// 取余求得现在消息的分区数
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
// 无可用分区 3)
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
//没有该topic,则返回AtomicInteger(0)
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
// 原子变量++
return counter.getAndIncrement();
}
public void close() {
}
}
// 1)
public List<PartitionInfo> partitionsForTopic(String topic) {
return (List)this.partitionsByTopic.getOrDefault(topic, Collections.emptyList());
}
// 2)
public List<PartitionInfo> availablePartitionsForTopic(String topic) {
return (List)this.availablePartitionsByTopic.getOrDefault(topic, Collections.emptyList());
}
// 3)
public static int toPositive(int number) {
return number & 2147483647;
}
3.自定义分区策略
- 仿照RoundRoin或者UniformSticky,写自定义分区器实现Partitioner接口
- 依照前文指定自定义分区器
二、自定义拦截器
/**
* @Author: LiMingshan
* @Description: Kafka自定义拦截器
*/
@Slf4j
public class countInterceptor implements ProducerInterceptor<String, String> {
private int numOfSuccess = 0, numOfFailure = 0;
// 获取配置信息和初始化数据时调用。
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(),
producerRecord.key(), producerRecord.value(), producerRecord.headers());
}
// 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法
// 疑惑:为什么当我配置多个拦截器,并对kafka配置拦截器链发送消息会报空指针异常?
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// 通过异常对成功与失败消息进行统计
if (null != e) {
numOfFailure++;
} else {
numOfSuccess++;
}
log.info("成功发送消息数目: {}", numOfSuccess);
log.info("失败发送消息数目: {}", numOfFailure);
}
// 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
编码形式添加配置
configurationProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.roy.something.constant.countInterceptor");
另外:spring.kafka.producer.properties