下载 kafka
我这里使用的是 kafka 3.6.1
默认的 Kafka 不受认证约束,可不用账号就可以连接到服务,也就是默认的 PLAIN 方式,不需要认证;配置了 SASL 认证之后,连接Kafka只能用凭证连接登录。
SASL 支持的认证方式有多种:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER;其中 GSSAPI / OAUTHBEARER 都需要额外的独立服务,显得麻烦。
那本文讲述的是比较简单的 SASL_PLAINTEXT 方式,认证机制统一为:SCRAM-SHA-512,ACL/SCRAM机制可追加新用户并授权到TOPIC。
启动 kafka
首先,需要为kafka集群生成一个唯一的id,如果使用的是集群部署,必须保证每个节点使用的是相同的id
bin/kafka-storage.sh random-uuid
我这里生成的id是 h91audLXSlOG-DgPuCFoIQ
初始化数据存储目录(第一次启动)
bin/kafka-storage.sh format -t h91audLXSlOG-DgPuCFoIQ -c ./config/kraft/server.properties
启动 kafka
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
创建新用户
因为我们在后续创建用户的时候需要使用到一个用户,所以需要提前创建一个
# 创建用户
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--entity-type users --entity-name admin --add-config 'SCRAM-SHA-512=[password=a123456]'
# 查看所有用户
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users
# 查看指定用户
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name admin
在这里我创建了一个用户名为:admin、密码为:a123456的账号
SASL 认证授权配置
修改kafka根目录下./config/kraft/server.properties文件的以下配置
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092
增加以下配置
# SASL 启用的认证机制
sasl.enabled.mechanisms=SCRAM-SHA-512
# Broker节点之间的认证机制用已启用的机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
######## ACL 账号限制 配置
# 定义超级管理员
super.users=User:admin
# true:支持账号黑名单;false:支持账号白名单
allow.everyone.if.no.acl.found=true
# ACL 对于 KRaft 模式的授权方式(类名)
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
这样配置让内部、外部都使用sasl_plaintext的方式
配置 BRODER JAAS 凭证文件
首先我们需要配置 broker 之前连接所用的 JAAS 凭证
之前我们以及配置了一个超级管理员账号、在这里面我们需要添加该账号
创建名称为 broker-jaas 的文件,内容如下:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="a123456";
};
然后将 jaas 文件配置到环境变量
编辑bin/kafka-server-start.sh,在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
前面一行添加:
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-3.6.1/broker-jaas"
重启 kafka 服务
至此、broker之间连接认证就已经完成了
配置 client 的配置文件
broker 之前连接的配置文件已经创建好了,但是我们要使用命令行来操作 kafka 也是需要权限的,所以还需要配置一个 client 的jaas凭证
创建一个 console-jaas 文件,内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="a123456";
使用命令行创建一个 producerA用户,该用户用于:
如果不携带 jaas 文件:
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--entity-type users --entity-name producerA --add-config 'SCRAM-SHA-512=[password=a123456]'
服务端就会报错:
[2023-12-19 22:40:49,707] INFO [SocketServer listenerType=BROKER, nodeId=1] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:60556-16) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
携带 jaas 文件,就可以正常执行了
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--entity-type users --entity-name producerA --add-config 'SCRAM-SHA-512=[password=a123456]' --command-config console-jaas
ACL 授权用户到TOPIC
首先我们先使用 producerA 用户向 topicA 测试写入数据
先创建一个 producerA-jaas
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producerA" password="a123456";
然后向topicA推送数据
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA --producer.config producerA-jaas
发现连接不上,报错
WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
然后我们给 producerA 用户 topicA 主题的写入权限
# 授权用户到指定 topic 的生产端(写入)权限
# --allow-principal:指定用户
# --topic {topic-name}:指定某个主题
# --producer:指定为(生产端的)写入权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --producer \
--allow-principal User:producerA --topic topicA --command-config console-jaas
再次尝试向 topicA 推送数据
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA --producer.config producerA-jaas
A
AWAW
FAWFAW
AWFAWFAWF
FAWFAWFWA
AAAAAAAAA
查看对应topic是否有刚刚发送的数据
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicA --from-beginning --consumer.config console-jaas
异常
如果集群启动时报错
[2021-11-19 19:19:08,916] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalStateException: Configured voter set: [1, 2] is different from the voter set read from the state file: [1]. Check if the quorum configuration is up to date, or wipe out the local state file if necessary
at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:132)
at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:362)
at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:203)
at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:125)
at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:73)
at kafka.Kafka$.buildServer(Kafka.scala:79)
at kafka.Kafka$.main(Kafka.scala:87)
at kafka.Kafka.main(Kafka.scala)
找到config/kraft/server.properties配置文件中的log.dirs目录地址,删除里面的__cluster_metadata-0目录即可。
至此、针对 producerA 账号对于 topicA 的推送权限已经配置好了
4 条评论
这篇文章写得深入浅出,让我这个小白也看懂了!
大佬好强!!!orz orz orz
重启 kafka 服务
至此、broker之间连接认证就已经完成了
到这一步时候 我遇到了这个报错
[2024-01-10 00:09:11,383] INFO [SocketServer listenerType=BROKER, nodeId=1] Failed authentication with /192.168.1.111 (channelId=192.168.1.111:90
1.配置了 sasl 过后,如果启动出现问题,可能是在 kafka-server-start.sh中没有正确配置 broker-jaas 环境变量。
2.如果启动成功,使用命令行工具报错,可能是因为没有指定jaas配置文件、或者jaas配置文件中的用户本身就没有权限。
3.建议先检查一下 /kraft/server.properties 文件、以及创建用户的时候加密方式是否和配置中一致