10.7.3 日志与审计(Broker/Controller/Client)

日志与审计是Kafka运维与故障排查的第一手证据,必须覆盖Broker、Controller与Client,并保证可追溯、可检索、可审计。本节给出统一规范、采集落地、排错流程与练习,配套命令与示例。

一、原理草图(日志产生与采集链路)

文章图片

二、日志类型与关键字段(规范化)
- Broker:server.log、controller.log、state-change.log、log-cleaner.log、request.log
- Client:producer/consumer日志
- 统一字段建议(JSON):timestamp、level、component、brokerId、topic、partition、clientId、traceId

示例:log4j2 配置为 JSON(Broker)

# /opt/kafka/config/log4j2.properties 关键片段
appender.kafkaAppender.type = RollingFile
appender.kafkaAppender.name = kafkaAppender
appender.kafkaAppender.fileName = /opt/kafka/logs/server.log
appender.kafkaAppender.filePattern = /opt/kafka/logs/server.log.%d{yyyy-MM-dd}.gz
appender.kafkaAppender.layout.type = JsonLayout
appender.kafkaAppender.layout.compact = true
appender.kafkaAppender.layout.eventEol = true

rootLogger.level = INFO
rootLogger.appenderRefs = kafkaAppender
rootLogger.appenderRef.kafkaAppender.ref = kafkaAppender

效果:日志按天滚动,JSON结构化,便于集中采集与检索。


三、安装与采集示例(Filebeat → Elasticsearch)
1)安装与配置 Filebeat

# 安装
sudo rpm -ivh filebeat-8.11.3-x86_64.rpm

# 配置采集 Kafka 日志
sudo tee /etc/filebeat/filebeat.yml >/dev/null <<'EOF'
filebeat.inputs:
  - type: filestream
    id: kafka-logs
    paths:
      - /opt/kafka/logs/*.log
    fields:
      service: kafka
      env: prod
    fields_under_root: true

output.elasticsearch:
  hosts: ["http://10.0.0.10:9200"]
  index: "kafka-logs-%{+yyyy.MM.dd}"
EOF

# 启动
sudo systemctl enable --now filebeat

2)验证采集

curl -s "http://10.0.0.10:9200/_cat/indices?v" | grep kafka-logs

四、关键排错命令与解释
1. Broker/Controller 关键事件检索

# 选主/ISR变化
grep -E "LeaderAndIsr|ISR|UnderReplicatedPartitions" /opt/kafka/logs/server.log

# Controller切换
grep -E "Controller|ActiveController" /opt/kafka/logs/controller.log

解释:频繁 Controller 切换通常指向 ZooKeeper 抖动或网络问题。

2. 磁盘与LogSegment异常

grep -E "LogSegment|IOException|disk" /opt/kafka/logs/server.log
df -h /kafka-logs
iostat -x 1 5

解释:IO高等待和磁盘满会导致ISR收缩与写入延迟。

3. 客户端错误定位

# Producer 常见错误
grep -E "TimeoutException|OutOfOrderSequence|RecordTooLarge" /var/log/app/producer.log

# Consumer Rebalance 或位点异常
grep -E "Rebalance|CommitFailed|OffsetOutOfRange" /var/log/app/consumer.log

五、审计与变更记录示例
1. 记录Topic创建/删除与配额调整

# 变更前记录现状
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.0.2:9092 --describe > /var/log/kafka_audit/topics_before.txt

# 创建Topic并写入审计记录
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.0.2:9092 \
  --create --topic orders --partitions 6 --replication-factor 3

echo "$(date '+%F %T') user=ops action=create topic=orders ticket=CHG-2024-001" \
  >> /var/log/kafka_audit/changes.log

解释:审计日志记录变更人、变更动作、工单号,便于追溯。


六、常见日志模式 → 诊断 → 处理
1. ISR频繁收缩
- 日志:UnderReplicatedPartitions
- 诊断:磁盘IO、网络丢包、Broker负载
- 处理示例:

iostat -x 1 5
sar -n DEV 1 5
# 临时增加副本或降低写入速率

2. Controller频繁切换
- 日志:ActiveController频繁变化
- 诊断:ZooKeeper会话超时
- 处理示例:

# ZooKeeper 会话与网络排查
echo stat | nc 10.0.0.3 2181
ping -c 5 10.0.0.3

3. 客户端大量超时
- 日志:TimeoutException
- 诊断:Broker请求处理能力不足
- 处理示例:

# 查看Broker请求队列与处理时间(JMX/Exporter)
curl -s http://10.0.0.2:9308/metrics | grep request

七、练习(可执行)
1)模拟Controller切换
- 操作:停止当前Controller Broker

# 找出Controller
/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 10.0.0.2:9092

# 停止该Broker
sudo systemctl stop kafka
  • 预期:controller.log 出现新的 ActiveController 记录。

2)模拟磁盘告警
- 操作:写入大文件占满日志盘

dd if=/dev/zero of=/kafka-logs/fill.bin bs=100M count=5
  • 预期:server.log 出现磁盘空间不足提示。

3)客户端超时
- 操作:设置极小 request.timeout.ms

# producer.properties 添加
request.timeout.ms=1
  • 预期:producer.log 出现 TimeoutException。

八、运维建议
- 生产环境INFO为主,排障临时DEBUG并设置回退计划
- 审计日志长期留存(≥180天),普通运行日志短期滚动
- 与CMDB/工单系统对接,形成“变更—日志—告警—回滚”闭环