服务端
-
在服务器节点配置认证文件:
文件路径:kafka/config/kafka_server_jaas.conf
文件内容:KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_rex="123456" user_alice="123456" user_lucy="123456"; };
注意配置文件中的两个分号的位置,多一不可,缺一不可。
-
修改服务器节点的启动配置文件:
复制kafka/config/server.properties
为kafka/config/server-sasl.properties
,在文件最末尾添加:listeners=SASL_PLAINTEXT://localhost:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
-
修改服务器节点的启动脚本:
复制kafka/bin/kafka-server-start.sh
为kafka/bin/kafka-server-start-sasl.sh
,将认证信息配置到 kafka 服务器节点的 JVM 启动参数中:if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_server_jaas.conf" fi
启动 zookeeper:
./bin/zookeeper-server-start.sh config/zookeeper.properties
以安全认证的方式启动 kafka-server:
./bin/kafka-server-start-sasl.sh config/server-sasl.properties
创建一个 topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
生产者 + 消费者
-
配置 生产者/消费者 的认证信息:
文件路径:kafka/config/kafka_client_jaas.conf
文件内容:KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };
注意配置文件中的两个分号的位置,多一不可,缺一不可。
-
修改 生产者 的启动配置文件:
复制kafka/config/producer.properties
为kafka/config/producer-sasl.properties
在文件最末尾添加:security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
修改 消费者 的启动配置文件:
复制kafka/config/consumer.properties
为kafka/config/consumer-sasl.properties
在文件最末尾添加:security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
修改生产者的启动脚本:
复制kafka/bin/kafka-console-producer.sh
为kafka/bin/kafka-console-producer-sasl.sh
,将认证信息配置到 kafka 生产者的 JVM 启动参数中:if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_client_jaas.conf" fi
-
修改消费者的启动脚本:
复制kafka/bin/kafka-console-consumer.sh
为kafka/bin/kafka-console-consumer-sasl.sh
,将认证信息配置到 kafka 生产者的 JVM 启动参数中:if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_client_jaas.conf" fi
-
以安全认证的方式启动 kafka-producer:
./bin/kafka-console-producer-sasl.sh --broker-list localhost:9092 --topic test --producer.config config/producer-sasl.properties
发送消息测试:
-
以安全认证的方式启动 kafka-consumer:
./bin/kafka-console-consumer-sasl.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-sasl.properties
java client
- maven
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.6.0</version>
</dependency>
-
java client 中添加 SASL 设置信息:
注意
sasl.jaas.config
配置中的分号必不可少。package kafka; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestKafkaSasl { private static final Logger logger = LoggerFactory.getLogger(TestKafkaSasl.class); public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // sasl.jaas.config的配置, 结尾分号必不可少. props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); @SuppressWarnings("resource") KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test")); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n", record.offset(), record.partition(), record.key(), record.value()); logger.info("offset = {}, partition = {}, key = {}, value = {}", record.offset(), record.partition(), record.key(), record.value()); } } catch (Exception e) { e.printStackTrace(); logger.error(e.getMessage()); } } } }
运行测试:
可能遇到的问题
-
生产者进程启动报错
解决方案:以安全认证的方式启动 kafka-producer,别忘记添加设置配置文件参数。./bin/kafka-console-producer-sasl.sh --broker-list localhost:9092 --topic test --producer.config config/producer-sasl.properties
-
消费者进程启动报错
解决方案:以安全认证的方式启动 kafka-consumer,别忘记添加设置配置文件参数。./bin/kafka-console-consumer-sasl.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-sasl.properties