版本
- kafka_2.12-3.5.0
- apache-zookeeper-3.6.4
一、配置zk
1. vim apache-zookeeper-3.6.4-bin/conf/zoo.cfg
dataDir=/data/zookeeper/data
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
# 当设置为true 时,Zookeeper 服务器将只接受通过 SASL 向服务器进行身份验证来自客户端的连接和请求
#sessionRequireClientSASLAuth=false
2. 新建 apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf
// 实际配置时删除所有注释
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeepersecret"
user_zookeeper="zookeepersecret" // 这项必须有
user_kafka="kafkasecret";
};
// zk client连接时需要用此配置
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeepersecret";
};
3. 新建 apache-zookeeper-3.6.4-bin/config/java.env 或直接修改启动脚本,指定参数zk_jaas.conf
export JVMFLAGS="-Djava.security.auth.login.config=/opt/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf ${JVMFLAGS}"
4. 重启zk ./bin/zkServer.sh restart
5. 测试是否可用 ./bin/zkCli.sh
023-12-07 17:39:46,380 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1181] - Opening socket connection to server localhost/127.0.0.1:2181.
2023-12-07 17:39:46,381 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1183] - SASL config status: Will attempt to SASL-authenticate using Login Context section 'Client'
2023-12-07 17:39:46,402 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1013] - Socket connection established, initiating session, client: /127.0.0.1:54168, server: localhost/127.0.0.1:2181
2023-12-07 17:39:46,435 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1452] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x100a114e4fa0008, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
WATCHER::
WatchedEvent state:SaslAuthenticated type:None path:null
[zk: localhost:2181(CONNECTED) 0]
有较明显的Will attempt to SASL-authenticate using Login Context section 'Client'的提示
二、配置kafka
1. 创建证书
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=adminsecret]' --entity-type users --entity-name admin
2.创建kafka_2.12-3.5.0/config/kafka_server_jaas.conf
// 实际配置时删除所有注释
// broker -> broker认证时使用
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="adminsecret"
user_admin="adminsecret"
user_kafka="kafkasecret";
};
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="adminsecret";
};
// broker -> zk认证时使用
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkasecret";
};
3. 修改启动脚本
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka_2.12-3.5.0/config/kafka_server_jaas.conf kafka.Kafka "$@"
或
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.12-3.5.0/config/kafka_server_jaas.conf"
4.修改config/server.properties
listeners=SASL_PLAINTEXT://对外IP:9092
advertised.listeners=SASL_PLAINTEXT://对外IP:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
#ACL
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
5. 启动kafka
三、客户端连接
1. 创建配置文件config/cmd-config
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adminsecret";
2. 创建topic
/bin/kafka-topics.sh --bootstrap-server IP:9091 --create --topic dial002 --partitions 1 --command-config config/cmd-config
3. 生产与消费,创建配置文件config/client_jaas.conf
Client{
org.apache.zookeeper.server.auth.DigestLoginModule required
username="kafka"
password="kafkasecret";
};
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="adminsecret";
};
4.修改启动脚本./bin/kafka-console-producer.sh、bin/kafka-console-consumer.sh
增加
export KAFKA_HEAP_OPTS="$KAFKA_HEAP_OPTS -Djava.security.auth.login.config=/opt/kafka_2.12-3.5.0/config/kafka_server_jaas.conf"
5.生产与消费
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic dial002 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=SCRAM-SHA-256
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dial002 --group gflink --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256