10.8.4 安全审计与合规实践

安全审计与合规实践围绕“可追溯、可验证、可报告”展开,目标是对访问、配置变更、权限使用与数据流向形成闭环证据链。Kafka中需要重点审计的对象包括:集群与Broker配置变更、Topic/Partition创建与删除、ACL增删改、生产与消费行为、管理接口与客户端认证结果、跨集群复制与镜像任务。

原理与证据链草图(审计数据流):

文章图片

安装与启用审计日志(以Kafka自带日志为基础,结合采集器):
1)Broker端配置日志级别与位置,确保授权日志可审计
文件:/opt/kafka/config/log4j.properties

# 关键:开启授权与认证相关日志
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.File=/var/log/kafka/authorizer.log
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

2)启用ACL Authorizer(确保授权日志生效)
文件:/opt/kafka/config/server.properties

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 管理员账号,避免误锁
super.users=User:admin
# 可选:允许无ACL时拒绝
allow.everyone.if.no.acl.found=false

3)重启服务并验证日志生成

# 以systemd为例
sudo systemctl restart kafka
sudo tail -n 20 /var/log/kafka/authorizer.log

预期效果:出现包含principal、operation、resource、result的记录。

命令示例:审计关键操作(Topic/ACL)

# 创建Topic(触发审计)
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.0.11:9092 \
  --create --topic audit_demo --partitions 3 --replication-factor 2

# 添加ACL(触发审计)
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.0.11:9092 \
  --add --allow-principal User:app1 \
  --operation Read --topic audit_demo --group app1_group

命令解释:
- --bootstrap-server:指定Broker连接入口
- --create:创建Topic
- --add:添加ACL规则
- --allow-principal:被授权主体
- --operation:授权操作类型(Read/Write/Describe等)
- --group:消费者组授权

审计日志采集示例(Filebeat)
安装:

sudo apt-get install -y filebeat

配置:/etc/filebeat/filebeat.yml

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/kafka/authorizer.log
    fields:
      service: kafka
      log_type: authorizer

output.elasticsearch:
  hosts: ["http://10.0.0.21:9200"]
  index: "kafka-audit-%{+yyyy.MM.dd}"

启动:

sudo systemctl enable filebeat
sudo systemctl restart filebeat

预期效果:ES中出现kafka-audit-*索引。

合规基线与控制措施(可落地操作):
- 最小权限:按业务域与环境隔离,ACL以Topic与Group为最小授权单元,禁止通配过大。
- 可追溯身份:统一认证入口,禁止匿名或共享账户;生产/消费使用独立principal。
- 变更管理:关键资源操作纳入审批与变更窗口,自动化工具账号与权限隔离。
- 数据保留与脱敏:设置审计数据保留周期,对敏感字段脱敏或哈希处理。

监控与告警示例(基于日志告警规则):

# 伪示例:匹配授权拒绝
rule:
  name: kafka_acl_denied
  match:
    file: /var/log/kafka/authorizer.log
    contains: "Authorization failed"
  threshold: 5 per 1m
  action: notify_ops

排错清单与定位命令:
1)没有授权日志
- 检查Authorizer配置与日志级别

grep -E "authorizer.class.name|allow.everyone" /opt/kafka/config/server.properties
grep -E "authorizerAppender|kafka.authorizer" /opt/kafka/config/log4j.properties

2)ACL生效但日志无principal
- 检查客户端是否使用SASL

grep -E "SASL" /opt/kafka/config/server.properties

3)日志生成但采集不到
- 检查Filebeat状态与权限

sudo systemctl status filebeat
sudo ls -l /var/log/kafka/authorizer.log

合规落地检查清单(可直接执行):

# 1. 校验SASL/SSL启用
grep -E "listener.name|sasl|ssl" /opt/kafka/config/server.properties

# 2. 列出过宽ACL
/opt/kafka/bin/kafka-acls.sh --bootstrap-server 10.0.0.11:9092 \
  --list | grep -E "\*|All"

# 3. 变更记录是否存在(示例查日志)
grep -E "CreateTopics|AlterConfigs|AddAcls" /var/log/kafka/authorizer.log | tail -n 20

练习:
1)为audit_demo创建只读ACL,并验证拒绝写入的审计记录。
2)模拟错误principal连接,观察Authorization failed日志并生成告警。
3)设置审计日志保留7天,验证采集系统索引滚动策略是否生效。