10.7.3 日志与审计(Broker/Controller/Client)
日志与审计是Kafka运维与故障排查的第一手证据,必须覆盖Broker、Controller与Client,并保证可追溯、可检索、可审计。本节给出统一规范、采集落地、排错流程与练习,配套命令与示例。
一、原理草图(日志产生与采集链路)
二、日志类型与关键字段(规范化)
- Broker:server.log、controller.log、state-change.log、log-cleaner.log、request.log
- Client:producer/consumer日志
- 统一字段建议(JSON):timestamp、level、component、brokerId、topic、partition、clientId、traceId
示例:log4j2 配置为 JSON(Broker)
# /opt/kafka/config/log4j2.properties 关键片段
appender.kafkaAppender.type = RollingFile
appender.kafkaAppender.name = kafkaAppender
appender.kafkaAppender.fileName = /opt/kafka/logs/server.log
appender.kafkaAppender.filePattern = /opt/kafka/logs/server.log.%d{yyyy-MM-dd}.gz
appender.kafkaAppender.layout.type = JsonLayout
appender.kafkaAppender.layout.compact = true
appender.kafkaAppender.layout.eventEol = true
rootLogger.level = INFO
rootLogger.appenderRefs = kafkaAppender
rootLogger.appenderRef.kafkaAppender.ref = kafkaAppender
效果:日志按天滚动,JSON结构化,便于集中采集与检索。
三、安装与采集示例(Filebeat → Elasticsearch)
1)安装与配置 Filebeat
# 安装
sudo rpm -ivh filebeat-8.11.3-x86_64.rpm
# 配置采集 Kafka 日志
sudo tee /etc/filebeat/filebeat.yml >/dev/null <<'EOF'
filebeat.inputs:
- type: filestream
id: kafka-logs
paths:
- /opt/kafka/logs/*.log
fields:
service: kafka
env: prod
fields_under_root: true
output.elasticsearch:
hosts: ["http://10.0.0.10:9200"]
index: "kafka-logs-%{+yyyy.MM.dd}"
EOF
# 启动
sudo systemctl enable --now filebeat
2)验证采集
curl -s "http://10.0.0.10:9200/_cat/indices?v" | grep kafka-logs
四、关键排错命令与解释
1. Broker/Controller 关键事件检索
# 选主/ISR变化
grep -E "LeaderAndIsr|ISR|UnderReplicatedPartitions" /opt/kafka/logs/server.log
# Controller切换
grep -E "Controller|ActiveController" /opt/kafka/logs/controller.log
解释:频繁 Controller 切换通常指向 ZooKeeper 抖动或网络问题。
2. 磁盘与LogSegment异常
grep -E "LogSegment|IOException|disk" /opt/kafka/logs/server.log
df -h /kafka-logs
iostat -x 1 5
解释:IO高等待和磁盘满会导致ISR收缩与写入延迟。
3. 客户端错误定位
# Producer 常见错误
grep -E "TimeoutException|OutOfOrderSequence|RecordTooLarge" /var/log/app/producer.log
# Consumer Rebalance 或位点异常
grep -E "Rebalance|CommitFailed|OffsetOutOfRange" /var/log/app/consumer.log
五、审计与变更记录示例
1. 记录Topic创建/删除与配额调整
# 变更前记录现状
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.0.2:9092 --describe > /var/log/kafka_audit/topics_before.txt
# 创建Topic并写入审计记录
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.0.2:9092 \
--create --topic orders --partitions 6 --replication-factor 3
echo "$(date '+%F %T') user=ops action=create topic=orders ticket=CHG-2024-001" \
>> /var/log/kafka_audit/changes.log
解释:审计日志记录变更人、变更动作、工单号,便于追溯。
六、常见日志模式 → 诊断 → 处理
1. ISR频繁收缩
- 日志:UnderReplicatedPartitions
- 诊断:磁盘IO、网络丢包、Broker负载
- 处理示例:
iostat -x 1 5
sar -n DEV 1 5
# 临时增加副本或降低写入速率
2. Controller频繁切换
- 日志:ActiveController频繁变化
- 诊断:ZooKeeper会话超时
- 处理示例:
# ZooKeeper 会话与网络排查
echo stat | nc 10.0.0.3 2181
ping -c 5 10.0.0.3
3. 客户端大量超时
- 日志:TimeoutException
- 诊断:Broker请求处理能力不足
- 处理示例:
# 查看Broker请求队列与处理时间(JMX/Exporter)
curl -s http://10.0.0.2:9308/metrics | grep request
七、练习(可执行)
1)模拟Controller切换
- 操作:停止当前Controller Broker
# 找出Controller
/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 10.0.0.2:9092
# 停止该Broker
sudo systemctl stop kafka
- 预期:controller.log 出现新的 ActiveController 记录。
2)模拟磁盘告警
- 操作:写入大文件占满日志盘
dd if=/dev/zero of=/kafka-logs/fill.bin bs=100M count=5
- 预期:server.log 出现磁盘空间不足提示。
3)客户端超时
- 操作:设置极小 request.timeout.ms
# producer.properties 添加
request.timeout.ms=1
- 预期:producer.log 出现 TimeoutException。
八、运维建议
- 生产环境INFO为主,排障临时DEBUG并设置回退计划
- 审计日志长期留存(≥180天),普通运行日志短期滚动
- 与CMDB/工单系统对接,形成“变更—日志—告警—回滚”闭环