10.8.4 安全审计与合规实践
安全审计与合规实践围绕“可追溯、可验证、可报告”展开,目标是对访问、配置变更、权限使用与数据流向形成闭环证据链。Kafka中需要重点审计的对象包括:集群与Broker配置变更、Topic/Partition创建与删除、ACL增删改、生产与消费行为、管理接口与客户端认证结果、跨集群复制与镜像任务。
原理与证据链草图(审计数据流):
安装与启用审计日志(以Kafka自带日志为基础,结合采集器):
1)Broker端配置日志级别与位置,确保授权日志可审计
文件:/opt/kafka/config/log4j.properties
# 关键:开启授权与认证相关日志
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.File=/var/log/kafka/authorizer.log
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
2)启用ACL Authorizer(确保授权日志生效)
文件:/opt/kafka/config/server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 管理员账号,避免误锁
super.users=User:admin
# 可选:允许无ACL时拒绝
allow.everyone.if.no.acl.found=false
3)重启服务并验证日志生成
# 以systemd为例
sudo systemctl restart kafka
sudo tail -n 20 /var/log/kafka/authorizer.log
预期效果:出现包含principal、operation、resource、result的记录。
命令示例:审计关键操作(Topic/ACL)
# 创建Topic(触发审计)
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.0.11:9092 \
--create --topic audit_demo --partitions 3 --replication-factor 2
# 添加ACL(触发审计)
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.0.11:9092 \
--add --allow-principal User:app1 \
--operation Read --topic audit_demo --group app1_group
命令解释:
- --bootstrap-server:指定Broker连接入口
- --create:创建Topic
- --add:添加ACL规则
- --allow-principal:被授权主体
- --operation:授权操作类型(Read/Write/Describe等)
- --group:消费者组授权
审计日志采集示例(Filebeat)
安装:
sudo apt-get install -y filebeat
配置:/etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/kafka/authorizer.log
fields:
service: kafka
log_type: authorizer
output.elasticsearch:
hosts: ["http://10.0.0.21:9200"]
index: "kafka-audit-%{+yyyy.MM.dd}"
启动:
sudo systemctl enable filebeat
sudo systemctl restart filebeat
预期效果:ES中出现kafka-audit-*索引。
合规基线与控制措施(可落地操作):
- 最小权限:按业务域与环境隔离,ACL以Topic与Group为最小授权单元,禁止通配过大。
- 可追溯身份:统一认证入口,禁止匿名或共享账户;生产/消费使用独立principal。
- 变更管理:关键资源操作纳入审批与变更窗口,自动化工具账号与权限隔离。
- 数据保留与脱敏:设置审计数据保留周期,对敏感字段脱敏或哈希处理。
监控与告警示例(基于日志告警规则):
# 伪示例:匹配授权拒绝
rule:
name: kafka_acl_denied
match:
file: /var/log/kafka/authorizer.log
contains: "Authorization failed"
threshold: 5 per 1m
action: notify_ops
排错清单与定位命令:
1)没有授权日志
- 检查Authorizer配置与日志级别
grep -E "authorizer.class.name|allow.everyone" /opt/kafka/config/server.properties
grep -E "authorizerAppender|kafka.authorizer" /opt/kafka/config/log4j.properties
2)ACL生效但日志无principal
- 检查客户端是否使用SASL
grep -E "SASL" /opt/kafka/config/server.properties
3)日志生成但采集不到
- 检查Filebeat状态与权限
sudo systemctl status filebeat
sudo ls -l /var/log/kafka/authorizer.log
合规落地检查清单(可直接执行):
# 1. 校验SASL/SSL启用
grep -E "listener.name|sasl|ssl" /opt/kafka/config/server.properties
# 2. 列出过宽ACL
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.0.11:9092 \
--list | grep -E "\*|All"
# 3. 变更记录是否存在(示例查日志)
grep -E "CreateTopics|AlterConfigs|AddAcls" /var/log/kafka/authorizer.log | tail -n 20
练习:
1)为audit_demo创建只读ACL,并验证拒绝写入的审计记录。
2)模拟错误principal连接,观察Authorization failed日志并生成告警。
3)设置审计日志保留7天,验证采集系统索引滚动策略是否生效。