下载 kafka

我这里使用的是 kafka 3.6.1

默认的 Kafka 不受认证约束,可不用账号就可以连接到服务,也就是默认的 PLAIN 方式,不需要认证;配置了 SASL 认证之后,连接Kafka只能用凭证连接登录。

SASL 支持的认证方式有多种:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER;其中 GSSAPI / OAUTHBEARER 都需要额外的独立服务,显得麻烦。
那本文讲述的是比较简单的 SASL_PLAINTEXT 方式,认证机制统一为:SCRAM-SHA-512,ACL/SCRAM机制可追加新用户并授权到TOPIC。

启动 kafka

首先,需要为kafka集群生成一个唯一的id,如果使用的是集群部署,必须保证每个节点使用的是相同的id

bin/kafka-storage.sh random-uuid

我这里生成的id是 h91audLXSlOG-DgPuCFoIQ
初始化数据存储目录(第一次启动)

bin/kafka-storage.sh format -t h91audLXSlOG-DgPuCFoIQ -c ./config/kraft/server.properties

启动 kafka

./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

创建新用户

因为我们在后续创建用户的时候需要使用到一个用户,所以需要提前创建一个

# 创建用户
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
  --entity-type users --entity-name admin --add-config 'SCRAM-SHA-512=[password=a123456]'

# 查看所有用户
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users
# 查看指定用户
bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name admin

在这里我创建了一个用户名为:admin、密码为:a123456的账号

SASL 认证授权配置

修改kafka根目录下./config/kraft/server.properties文件的以下配置

listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092

增加以下配置

# SASL 启用的认证机制
sasl.enabled.mechanisms=SCRAM-SHA-512
# Broker节点之间的认证机制用已启用的机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
######## ACL 账号限制 配置
# 定义超级管理员
super.users=User:admin
# true:支持账号黑名单;false:支持账号白名单
allow.everyone.if.no.acl.found=true
# ACL 对于 KRaft 模式的授权方式(类名)
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

这样配置让内部、外部都使用sasl_plaintext的方式

配置 BRODER JAAS 凭证文件

首先我们需要配置 broker 之前连接所用的 JAAS 凭证
之前我们以及配置了一个超级管理员账号、在这里面我们需要添加该账号
创建名称为 broker-jaas 的文件,内容如下:

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="a123456";
};

然后将 jaas 文件配置到环境变量
编辑bin/kafka-server-start.sh,在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"前面一行添加:

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-3.6.1/broker-jaas"

重启 kafka 服务
至此、broker之间连接认证就已经完成了

配置 client 的配置文件

broker 之前连接的配置文件已经创建好了,但是我们要使用命令行来操作 kafka 也是需要权限的,所以还需要配置一个 client 的jaas凭证
创建一个 console-jaas 文件,内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="a123456";

使用命令行创建一个 producerA用户,该用户用于:
如果不携带 jaas 文件:

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
  --entity-type users --entity-name producerA --add-config 'SCRAM-SHA-512=[password=a123456]'

服务端就会报错:

[2023-12-19 22:40:49,707] INFO [SocketServer listenerType=BROKER, nodeId=1] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:60556-16) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

携带 jaas 文件,就可以正常执行了

bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
  --entity-type users --entity-name producerA --add-config 'SCRAM-SHA-512=[password=a123456]' --command-config console-jaas

ACL 授权用户到TOPIC

首先我们先使用 producerA 用户向 topicA 测试写入数据
先创建一个 producerA-jaas

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="producerA" password="a123456";

然后向topicA推送数据

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA --producer.config producerA-jaas

发现连接不上,报错

WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

然后我们给 producerA 用户 topicA 主题的写入权限

# 授权用户到指定 topic 的生产端(写入)权限
# --allow-principal:指定用户
# --topic {topic-name}:指定某个主题
# --producer:指定为(生产端的)写入权限
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --producer \
  --allow-principal User:producerA --topic topicA --command-config console-jaas

再次尝试向 topicA 推送数据

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA --producer.config producerA-jaas
A
AWAW
FAWFAW
AWFAWFAWF
FAWFAWFWA
AAAAAAAAA

查看对应topic是否有刚刚发送的数据

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicA --from-beginning --consumer.config console-jaas

异常

如果集群启动时报错

[2021-11-19 19:19:08,916] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalStateException: Configured voter set: [1, 2] is different from the voter set read from the state file: [1]. Check if the quorum configuration is up to date, or wipe out the local state file if necessary
        at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:132)
        at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:362)
        at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:203)
        at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:125)
        at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:73)
        at kafka.Kafka$.buildServer(Kafka.scala:79)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)

找到config/kraft/server.properties配置文件中的log.dirs目录地址,删除里面的__cluster_metadata-0目录即可。

至此、针对 producerA 账号对于 topicA 的推送权限已经配置好了

最后修改:2024 年 01 月 10 日
如果觉得我的文章对你有用,请随意赞赏