Kafka(2)SpringBoot2整合Kafka

1 前期准备

  • 安装并启动Zookeeper服务。
  • 安装并启动Kafka服务(可参考前一篇文章安装Kafka运行环境)。
  • 本文采用的开发工具为IDEA,版本为Spring-Boot-2.3.0.RELEASE、Kafka-clients-2.5.0、JDK8。
  • Spring-kafka是在kafka-clients基础上开发封装的项目,所以选择版本需要注意兼容对应,以下是Spring官方给出的版本兼容表。
版本兼容表

2 创建项目

  • 使用项目构建工具Maven,创建项目名为spring-kafka的父项目,包括两个子模块,一个生产端模块spring-kafka-producer,一个消费端模块spring-kafka-consumer。项目整体目录结构如下图所示。
项目整体目录结构

3 添加项目依赖

项目依赖pom.xml文件如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- kafka 依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- spring boot 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- json 工具包 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

4 Kafka消息生产端

  • 新建生产端spring配置文件application.yml。
server:
  port: 8081

spring:
  kafka:
    producer:
      # 生产客户端id,默认值为""
      client-id: 1
      # 连接的broker地址,如有多个用逗号隔开
      bootstrap-servers: localhost:9092
      # key序列化类,可以自定义序列化(broker端接受的消息必须以字节数组的形式)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value序列化类,可以自定义序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 重试次数(提高可靠性,会影响同步性能,需要等待上一条消息发送完成后才发送下一条)
      retries: 1
  • 新建生产端启动类KafkaProducerApplication.java。
package com.johnny.september.kafka.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * @author Johnny Lu
 */
@SpringBootApplication
public class KafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }
}
  • 新建消息实体类Message.java。
package com.johnny.september.kafka.producer.model;

import java.io.Serializable;

/**
 * 消息实体
 * @author Johnny Lu
 */
public class Message implements Serializable {

    private static final long serialVersionUID = -118L;

    /** 内容 */
    private String content;

    public Message() {
    }

    public Message(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

}
  • 新建Kafka配置类KafkaConfig.java。
package com.johnny.september.kafka.producer.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

/**
 * Kafka配置类
 * @author Johnny Lu
 */
@Configuration
public class KafkaConfig {

    /**
     * 创建topic,指定主题名称,分区数量,副本数量
     *
     * @return
     */
    @Bean
    public NewTopic topicTest() {
        return TopicBuilder.name("topic_test_1").partitions(3).replicas(1).build();
    }
}
  • 新建消息生产控制器类MessageController.java。
package com.johnny.september.kafka.producer.controller;

import com.alibaba.fastjson.JSON;
import com.johnny.september.kafka.producer.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 控制器
 * @author Johnny Lu
 */
@RestController
public class MessageController {
    private static final Logger logger = LoggerFactory.getLogger(MessageController.class);

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 发送消息
     *
     * @param content
     */
    @RequestMapping(path = "/send/{content}")
    public void sendMessage(@PathVariable String content) {
        kafkaTemplate.send("topic_test_1", JSON.toJSONString(new Message(content)));
    }

    /**
     * 发送消息,且阻塞等待broker的响应,直到消息发送成功,设置超时时间,超时异常处理
     *
     * @param content
     * @return
     */
    @RequestMapping(path = "/sendWaitResult/{content}")
    public String sendMessageWaitResult(@PathVariable String content) {
        String result = "发送成功";
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate
                .send("topic_test_1", JSON.toJSONString(new Message(content)));
        try {
            future.get(3000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            result = "发送失败";
        } catch (ExecutionException e) {
            e.printStackTrace();
            result = "执行失败";
        } catch (TimeoutException e) {
            e.printStackTrace();
            result = "发送超时";
        }
        logger.info("发送消息:{}, 结果:{}", content, result);
        return result;
    }
}

5 Kafka消息消费端

  • 新建消费端的spring配置文件application.yml。
server:
  port: 8082

spring:
  kafka:
    consumer:
      # 消费客户端id,默认值为""
      client-id: 1
      # 连接的broker地址,如有多个用逗号隔开
      bootstrap-servers: localhost:9092
      # key反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • 新建消费端启动类KafkaConsumerApplication.java。
package com.johnny.september.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * @author Johnny Lu
 */
@SpringBootApplication
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}
  • 新建消息监听类MessageListener.java。
package com.johnny.september.kafka.consumer.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听
 *
 * @author Johnny Lu
 */
@Component
public class MessageListener {

    private final Logger logger = LoggerFactory.getLogger(MessageListener.class);

    /**
     * 监听消息,接受消息后处理业务逻辑
     * 消费组:messageGroup
     *
     * @param message
     */
    @KafkaListener(id = "messageGroup", topics = "topic_test_1")
    public void listen(String message) {
        logger.info("接受消息: " + message);
    }
}

6 启动项目验证

  • Run KafkaProducerApplication.java类的main方法。
  • Run KafkaConsumerApplication.java类的main方法。
  • 打开浏览器,执行发送消息请求http://localhost:8081/send/kafka,然后在消费端的控制台会打印接收到的消息,如下图所示。
    消费端控制台

    能发送消息和完整的接收消息了,表示整合初步OK了。

7 自定义拦截器

Kafka共有两种拦截器:生产者拦截器和消费者拦截器。

生产者拦截器

  • 生产者拦截器可以用来在消息发送前做一些准备工作,比如日志打印、不符合条件的消息过滤等,也可以在回调逻辑前做一些数据统计等工作。
  • 自定义拦截器主要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。这里新建一个自定义拦截器CustomProducerInterceptor.java,具体代码如下。
package com.johnny.september.kafka.producer.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * 自定义生产端拦截器
 * @author Johnny Lu
 */
public class CustomProducerInterceptor implements ProducerInterceptor {

    private static final Logger logger = LoggerFactory.getLogger(CustomProducerInterceptor.class);

    /**
     * 发送前做一些处理
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        logger.info("发送消息 :{}", record.toString());
        return record;
    }

    /**
     * 这个方法在应答前或消息发送失败时被调用
     *
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    /**
     * 关闭这个拦截器时被调用
     */
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}
  • 自定义拦截器后,需要进行配置 interceptor.classes才能生效。支持多个拦截器,用逗号隔开,多个拦截器会形成拦截链,按配置的顺序一一调用。在application.yml文件增加如下配置。
spring:
  kafka:
    producer:
      properties:
        interceptor.classes: com.johnny.september.kafka.producer.config.CustomProducerInterceptor

消费者拦截器

  • 消费者拦截器主要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口。这里新建一个自定义拦截器CustomConsumerInterceptor.java,具体代码如下。
package com.johnny.september.kafka.consumer.config;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

/**
 * 自定义消费端拦截器
 * @author Johnny Lu
 */
public class CustomConsumerInterceptor implements ConsumerInterceptor {

    /**
     * 这个方法,在拉取到消息调用
     * @param records
     * @return
     */
    @Override
    public ConsumerRecords onConsume(ConsumerRecords records) {
        return records;
    }

    /**
     * 这个方法,在提交请求响应成功时被调用
     * @param offsets
     */
    @Override
    public void onCommit(Map offsets) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

  • 自定义拦截器后,也需要进行配置 interceptor.classes才能生效。在application.yml文件增加如下配置。
spring:
  kafka:
    consumer:
      properties:
        # 支持多个拦截器,用逗号隔开,多个形成拦截链,按顺序一一调用
        interceptor.classes: com.johnny.september.kafka.consumer.config.CustomConsumerInterceptor

8 自定义分区器

  • 生产者发送消息可能需要经过生产者拦截器、序列化器、分区器一系列过程后才会发往broker。拦截器一般是非必须的,序列化器是必须的。如果消息发送参数指定了分区partition字段,就不要分区器。如果消息发送参数没有指定partition字段,那么需要分区器为消息分配发往的分区。
  • 默认分区器org.apache.kafka.clients.producer.internals.DefaultPartitioner。
  • 自定义分区器主要实现org.apache.kafka.clients.producer.Partitioner接口。这里新建一个自定义拦截器CustomPartitioner.java,具体代码如下。
package com.johnny.september.kafka.producer.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 自定义分区器
 * @author Johnny Lu
 */
public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
            Cluster cluster) {
        Integer partitionNums = cluster.partitionCountForTopic(topic);
        if (keyBytes == null) {
            // 随机分区
            return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNums;
        } else {
            // 保持和 DefaultPartitioner 一样采用murmur2算法分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionNums;
        }
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

  • 自定义分区器后,需要进行配置partitioner.class才能生效。在生产端application.yml文件增加如下配置。
spring:
  kafka:
    producer:
      properties:
        partitioner.class: com.johnny.september.kafka.producer.config.CustomPartitioner

重新启动项目进行验证。

9 结语

到这里,Spring Boot2整合Kafka简易版完成了。以后会继续记录Kafka其他功能及用法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355