10.3.5 主题配置参数与保留策略
主题配置参数与保留策略#
主题配置决定消息保存时长、清理方式和可靠性边界。本节给出原理草图、可执行命令示例、排错要点与练习,便于落地实施与验证。
原理草图:日志段与清理#
核心配置参数(含示例解释)#
cleanup.policy:delete(按时间/大小删除)或compact(按 key 保留最新),可组合compact,delete。retention.ms/retention.bytes: 时间/大小保留阈值,任一达到即触发清理。segment.ms/segment.bytes: 日志段滚动频率,过小会导致段过多。min.insync.replicas:acks=all时最小 ISR 副本数,控制写入可靠性。unclean.leader.election.enable: 是否允许非 ISR 选主,建议关闭。message.timestamp.type:CreateTime或LogAppendTime,决定保留时间基准。max.message.bytes: 主题级消息大小上限。compression.type:producer/gzip/lz4/zstd,影响吞吐与存储。
配置与查看示例(含预期效果)#
1)创建主题并设置保留策略(时间+大小):
# 创建主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic order-events --partitions 6 --replication-factor 3
# 设置保留策略:7天或50GB触发清理,delete策略
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--alter --entity-type topics --entity-name order-events \
--add-config cleanup.policy=delete,retention.ms=604800000,retention.bytes=53687091200
预期效果:主题 order-events 将在 7 天或超过 50GB 时清理旧段。
2)设置压缩策略(状态类主题):
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic user-profile --partitions 3 --replication-factor 3
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--alter --entity-type topics --entity-name user-profile \
--add-config cleanup.policy=compact,min.compaction.lag.ms=3600000
预期效果:user-profile 主题按 key 压缩保留最新值,压缩滞后 1 小时。
3)查看主题配置与生效结果:
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--describe --entity-type topics --entity-name order-events
预期效果:输出中包含 cleanup.policy=delete、retention.ms=604800000 等。
命令解释要点#
--add-config会覆盖主题级配置(仅此主题生效)。retention.ms/bytes任一触发即删除,不是“都满足才删除”。segment.ms/bytes影响清理粒度,越小段越多,清理频繁但更碎。
常见排错场景与处理#
1)保留策略未生效
症状:retention.ms 已设置,但日志未清理。
排查:
# 确认生效配置
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--describe --entity-type topics --entity-name order-events
# 查看 broker 端是否覆盖
grep -E "log.retention|log.cleanup" /opt/kafka/config/server.properties
处理要点:
- 如果 log.retention.ms 在 broker 端更短,主题配置不会被更短值覆盖。
- 确认 message.timestamp.type 为 CreateTime 或 LogAppendTime 与生产端一致。
2)磁盘占用持续攀升
排查:
# 查看段文件数量和大小
ls -lh /data/kafka-logs/order-events-* | head
# 查看清理线程相关配置
grep -E "log.cleaner|log.retention" /opt/kafka/config/server.properties
处理要点:
- 调整 segment.bytes 增大段大小,减少碎片。
- 检查 log.cleaner.enable=true 且 log.cleaner.threads 足够。
3)压缩主题消费到旧值
排查:
# 查看主题策略
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--describe --entity-type topics --entity-name user-profile
处理要点:
- cleanup.policy=compact 仅保证“最终一致性”,压缩非实时。
- 结合 min.compaction.lag.ms 与消费逻辑处理延迟。
实操练习#
1)为主题 audit-log 设置保留 90 天、大小 200GB,并禁用不洁选主。
2)创建 config-store 主题并设置 cleanup.policy=compact,delete,保留 30 天。
3)调整 order-events 的 segment.bytes 为 1GB,观察段数量变化。
练习参考命令#
# 练习1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic audit-log --partitions 6 --replication-factor 3
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--alter --entity-type topics --entity-name audit-log \
--add-config retention.ms=7776000000,retention.bytes=214748364800,unclean.leader.election.enable=false
# 练习2
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic config-store --partitions 3 --replication-factor 3
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--alter --entity-type topics --entity-name config-store \
--add-config cleanup.policy=compact,delete,retention.ms=2592000000
# 练习3
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--alter --entity-type topics --entity-name order-events \
--add-config segment.bytes=1073741824
预期效果:
- audit-log 保留 90 天或 200GB 清理;
- config-store 同时压缩与删除;
- order-events 段文件减少,清理更高效。