10.3.5 主题配置参数与保留策略

主题配置参数与保留策略#

主题配置决定消息保存时长、清理方式和可靠性边界。本节给出原理草图、可执行命令示例、排错要点与练习,便于落地实施与验证。

原理草图:日志段与清理#

文章图片

核心配置参数(含示例解释)#

  • cleanup.policy: delete(按时间/大小删除)或 compact(按 key 保留最新),可组合 compact,delete
  • retention.ms/retention.bytes: 时间/大小保留阈值,任一达到即触发清理。
  • segment.ms/segment.bytes: 日志段滚动频率,过小会导致段过多。
  • min.insync.replicas: acks=all 时最小 ISR 副本数,控制写入可靠性。
  • unclean.leader.election.enable: 是否允许非 ISR 选主,建议关闭。
  • message.timestamp.type: CreateTimeLogAppendTime,决定保留时间基准。
  • max.message.bytes: 主题级消息大小上限。
  • compression.type: producer/gzip/lz4/zstd,影响吞吐与存储。

配置与查看示例(含预期效果)#

1)创建主题并设置保留策略(时间+大小):

# 创建主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic order-events --partitions 6 --replication-factor 3

# 设置保留策略:7天或50GB触发清理,delete策略
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --alter --entity-type topics --entity-name order-events \
  --add-config cleanup.policy=delete,retention.ms=604800000,retention.bytes=53687091200

预期效果:主题 order-events 将在 7 天或超过 50GB 时清理旧段。

2)设置压缩策略(状态类主题):

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic user-profile --partitions 3 --replication-factor 3

/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --alter --entity-type topics --entity-name user-profile \
  --add-config cleanup.policy=compact,min.compaction.lag.ms=3600000

预期效果:user-profile 主题按 key 压缩保留最新值,压缩滞后 1 小时。

3)查看主题配置与生效结果:

/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --entity-type topics --entity-name order-events

预期效果:输出中包含 cleanup.policy=deleteretention.ms=604800000 等。

命令解释要点#

  • --add-config 会覆盖主题级配置(仅此主题生效)。
  • retention.ms/bytes 任一触发即删除,不是“都满足才删除”。
  • segment.ms/bytes 影响清理粒度,越小段越多,清理频繁但更碎。

常见排错场景与处理#

1)保留策略未生效
症状:retention.ms 已设置,但日志未清理。
排查:

# 确认生效配置
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --entity-type topics --entity-name order-events

# 查看 broker 端是否覆盖
grep -E "log.retention|log.cleanup" /opt/kafka/config/server.properties

处理要点:
- 如果 log.retention.ms 在 broker 端更短,主题配置不会被更短值覆盖。
- 确认 message.timestamp.typeCreateTimeLogAppendTime 与生产端一致。

2)磁盘占用持续攀升
排查:

# 查看段文件数量和大小
ls -lh /data/kafka-logs/order-events-* | head

# 查看清理线程相关配置
grep -E "log.cleaner|log.retention" /opt/kafka/config/server.properties

处理要点:
- 调整 segment.bytes 增大段大小,减少碎片。
- 检查 log.cleaner.enable=truelog.cleaner.threads 足够。

3)压缩主题消费到旧值
排查:

# 查看主题策略
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --entity-type topics --entity-name user-profile

处理要点:
- cleanup.policy=compact 仅保证“最终一致性”,压缩非实时。
- 结合 min.compaction.lag.ms 与消费逻辑处理延迟。

实操练习#

1)为主题 audit-log 设置保留 90 天、大小 200GB,并禁用不洁选主。
2)创建 config-store 主题并设置 cleanup.policy=compact,delete,保留 30 天。
3)调整 order-eventssegment.bytes 为 1GB,观察段数量变化。

练习参考命令#

# 练习1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic audit-log --partitions 6 --replication-factor 3

/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --alter --entity-type topics --entity-name audit-log \
  --add-config retention.ms=7776000000,retention.bytes=214748364800,unclean.leader.election.enable=false

# 练习2
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic config-store --partitions 3 --replication-factor 3

/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --alter --entity-type topics --entity-name config-store \
  --add-config cleanup.policy=compact,delete,retention.ms=2592000000

# 练习3
/opt/kafka/bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --alter --entity-type topics --entity-name order-events \
  --add-config segment.bytes=1073741824

预期效果:
- audit-log 保留 90 天或 200GB 清理;
- config-store 同时压缩与删除;
- order-events 段文件减少,清理更高效。