kafka应用最少需要两部分,一部分是producer,另外一部分是consumer,这两部分可以在一个应用中,也可以不在一个应用中。在通常情况下,为了消费性能,可能需要多个消费者,也可能需要多个生产者,而消费者和生产者可能处于不同的位置或者环境,所以本示例将生产者和消费者放在不同的应用中。
生产者端
引入依赖
在spring boot中使用kafka生产者端,需要引入如下依赖
<dependencies>
......
<!--Spring 的kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
......
</dependencies>
启用kafka支持
在项目中的配置类,或者启动类上增加@EnableKafka,它会帮助我们创建一些必要的Bean,包括KafkaTemplate,KafkaMessageListenerContainer等
@EnableKafka
public class KafkaConfig{
}
修改连接配置
修改producer的配置文件,配置连接地址
spring:
kafka:
# 指定kafka集群地址,多为地址用逗号分割
bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
创建Topic
如果你的topic还未在kafka中创建,则可以使用spring-boot自动创建主题,只需创建一个类型为NewTopic的bean,并指定topic相关的信息即可
/**
* 新建一个主题
*
* @return
*/
@Bean
public NewTopic testTopic() {
return TopicBuilder.name("test")// 指定主题名称
.partitions(30) // 指定分区数量,这个数量通常要大于消费者的数量,按消费者线程数计算
.replicas(2) // 指定副本数量
.compact()
.build();
}
基本的配置已经完成,接下来就是发送消息了。
使用KafkaTemplate发送消息
接下来使用kafkaTempalte发送消息到服务端,以下是一个极简示例
/**
* 使用Spring boot test测试消息发送
*/
@SpringBootTest
class KafkaDemoApplicationTests {
/**
* 注入KafkaTemplate,用于发送消息
*/
@Autowired
private KafkaTemplate template;
@Test
public void newMessage() {
System.out.println("start at " + ZonedDateTime.now() + "");
for (int i = 0; i < 1000000; i++) {
long now = System.currentTimeMillis();
// 调用template,将消息发送到kafka
// 第一个参数是topic名称,第二个参数是要发送的消息内容
template.send("test", "adg" + now);
}
}
}
消费者端
根据上面的生产者,需要一个消费者来消费生产者生产的数据。spring boot整合kafka的消费者也非常方便
引入依赖
生产者和消费者的依赖是一致的。在此不再赘述
启用kafka支持
该操作和生产者应用一致,不再赘述。
修改连接配置
消费者的配置需要除了需要指定连之外,最好指定一些额外的配置参数,以便提高消费者性能
spring:
kafka:
# 指定kafka集群地址
bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
consumer:
# 如果两个应用程序为并行消费某个topic的消息,需要将两个应用的group-id指定一致
group-id: "message-group"
listener:
# 指定消息消费的模式,type=batch代表可以批量消费
type: batch
# 指定消费者的并发数,也就是可以同时有多少个消费者线程在监听数据,默认为1,
# 更具情况设置并行数据,通常建议最小为Cpu的核心数
concurrency: 16
创建消费者
消费者的就是一个普通的Spring bean.在对应的方法上添加@KafkaListener注解,并指定需要消费的topic即可开始消费者监听。
@Component
public class Consumer {
/**
* 注入repository,用户数据持久化(略)
*/
@Autowired
private MessageRepository repository;
/**
* 使用@KafkaListener注解标记消费方法,指定topics属性指定监听的待消费topic
*
* @param messages 待消费的数据,由于启用了批量消费模式,所以监听获取到的是一个集合
*/
@KafkaListener(topics = {"test"})
@Transactional
public void test(List<String> messages) {
List<Message> result = messages.stream().map(Message::new).collect(Collectors.toList());
repository.saveAll(result);
System.out.println("save message [" + messages.size() + "] 条 at" + ZonedDateTime.now().toString());
}
}
测试项目
当项目构建完成之后,可以按照如下步骤来测试项目
- 启动消费者程序
- 执行生产者测试代码,观察生产者执行结果