Spring Boot整kafka的极简操作

kafka应用最少需要两部分,一部分是producer,另外一部分是consumer,这两部分可以在一个应用中,也可以不在一个应用中。在通常情况下,为了消费性能,可能需要多个消费者,也可能需要多个生产者,而消费者和生产者可能处于不同的位置或者环境,所以本示例将生产者和消费者放在不同的应用中。

生产者端

引入依赖

在spring boot中使用kafka生产者端,需要引入如下依赖

 <dependencies>
    ......
    <!--Spring 的kafka依赖-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    ......
  </dependencies>

启用kafka支持

在项目中的配置类,或者启动类上增加@EnableKafka,它会帮助我们创建一些必要的Bean,包括KafkaTemplate,KafkaMessageListenerContainer等

@EnableKafka
public class KafkaConfig{
}

修改连接配置

修改producer的配置文件,配置连接地址

spring:
  kafka:
    # 指定kafka集群地址,多为地址用逗号分割
    bootstrap-servers: dm105:9092,dm106:9092,dm107:9092

创建Topic

如果你的topic还未在kafka中创建,则可以使用spring-boot自动创建主题,只需创建一个类型为NewTopic的bean,并指定topic相关的信息即可

    /**
     * 新建一个主题
     *
     * @return
     */
    @Bean
    public NewTopic testTopic() {
        return TopicBuilder.name("test")// 指定主题名称
            .partitions(30) // 指定分区数量,这个数量通常要大于消费者的数量,按消费者线程数计算
            .replicas(2) // 指定副本数量
            .compact()
            .build();
    }

基本的配置已经完成,接下来就是发送消息了。

使用KafkaTemplate发送消息

接下来使用kafkaTempalte发送消息到服务端,以下是一个极简示例

/**
 * 使用Spring boot test测试消息发送
 */
@SpringBootTest
class KafkaDemoApplicationTests {

    /**
     * 注入KafkaTemplate,用于发送消息
     */
    @Autowired
    private KafkaTemplate template;

    @Test
    public void newMessage() {
        System.out.println("start at " + ZonedDateTime.now() + "");

        for (int i = 0; i < 1000000; i++) {
            long now = System.currentTimeMillis();
            // 调用template,将消息发送到kafka
            // 第一个参数是topic名称,第二个参数是要发送的消息内容
            template.send("test", "adg" + now);
        }
    }
}

消费者端

根据上面的生产者,需要一个消费者来消费生产者生产的数据。spring boot整合kafka的消费者也非常方便

引入依赖

生产者和消费者的依赖是一致的。在此不再赘述

启用kafka支持

该操作和生产者应用一致,不再赘述。

修改连接配置

消费者的配置需要除了需要指定连之外,最好指定一些额外的配置参数,以便提高消费者性能

spring:
  kafka:
    # 指定kafka集群地址
    bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
    consumer:
      # 如果两个应用程序为并行消费某个topic的消息,需要将两个应用的group-id指定一致
      group-id: "message-group"
    listener:
      # 指定消息消费的模式,type=batch代表可以批量消费
      type: batch
      # 指定消费者的并发数,也就是可以同时有多少个消费者线程在监听数据,默认为1,
      # 更具情况设置并行数据,通常建议最小为Cpu的核心数
      concurrency: 16

创建消费者

消费者的就是一个普通的Spring bean.在对应的方法上添加@KafkaListener注解,并指定需要消费的topic即可开始消费者监听。

@Component
public class Consumer {
    
    /**
     * 注入repository,用户数据持久化(略)
     */
    @Autowired
    private MessageRepository repository;

    /**
     * 使用@KafkaListener注解标记消费方法,指定topics属性指定监听的待消费topic
     *
     * @param messages 待消费的数据,由于启用了批量消费模式,所以监听获取到的是一个集合
     */
    @KafkaListener(topics = {"test"})
    @Transactional
    public void test(List<String> messages) {
        List<Message> result = messages.stream().map(Message::new).collect(Collectors.toList());
        repository.saveAll(result);
        System.out.println("save message [" + messages.size() + "] 条 at" + ZonedDateTime.now().toString());
    }
}

测试项目

当项目构建完成之后,可以按照如下步骤来测试项目

  1. 启动消费者程序
  2. 执行生产者测试代码,观察生产者执行结果

项目仓库地址

完整项目参考https://github.com/ldwqh0/hadoop-demo

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345