10.1.5 偏移量、日志段与保留策略

偏移量(Offset)#

  • 定义:每条消息在分区中的唯一序号,单调递增,用于定位消费进度与消息顺序。
  • 提交方式
  • 自动提交:周期性提交,配置 enable.auto.commitauto.commit.interval.ms
  • 手动提交:同步或异步提交,适用于精确控制与故障恢复。
  • 存储位置:偏移量保存在内部主题 __consumer_offsets,按消费组与分区维度记录。
  • 重置策略:消费组首次无偏移或偏移过期时,按 auto.offset.reset(earliest/latest/none)决定起始位置。

原理草图

文章图片

示例:生产、消费与偏移量观察

# 1) 创建主题(单分区便于观察offset)
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic demo-offset --partitions 1 --replication-factor 1

# 2) 生产消息(终端输入3行消息)
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 \
  --topic demo-offset
# 输入:
# a
# b
# c

# 3) 消费并打印offset
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
  --topic demo-offset --from-beginning --property print.offset=true
# 预期输出:
# 0  a
# 1  b
# 2  c

示例:手动提交偏移量(Java配置片段)

# /opt/kafka/config/consumer.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-group
enable.auto.commit=false
auto.offset.reset=earliest
# 运行消费者(示例为脚本/应用读取上述配置)
java -jar /opt/apps/consumer-manual-commit.jar \
  --config /opt/kafka/config/consumer.properties
# 预期:消费后手动提交offset,重启不重复消费

排错要点
- 重复消费:检查 enable.auto.commit 与业务处理/提交顺序;确保处理成功后提交。
- 无消息可读:确认 auto.offset.reset;若为 latest,新组从尾部开始。
- 提交失败:查看消费者日志是否有 CommitFailedException(通常是 rebalance 引起)。


日志段(Log Segment)#

  • 结构:每个分区对应一组日志段文件,包含 .log(消息)与 .index/.timeindex(索引)。
  • 滚动条件
  • 按大小 log.segment.bytes
  • 按时间 log.segment.ms
  • 索引与查找:稀疏索引降低存储开销,依赖二分与顺序扫描实现快速定位。

原理草图

文章图片

示例:查看分区日志段

# 假设 Kafka 日志目录为 /data/kafka-logs
ls -lh /data/kafka-logs/demo-offset-0
# 预期看到 .log/.index/.timeindex 文件

示例:调整段大小并触发滚动

# /opt/kafka/config/server.properties
log.segment.bytes=10485760   # 10MB
log.segment.ms=3600000       # 1小时
# 重启broker生效
systemctl restart kafka
# 发送大量消息后查看是否生成新段文件

排错要点
- 段增长过快:降低 log.segment.bytes 或检查写入速率。
- 索引损坏:查看 broker 日志中的 CorruptIndexException,可尝试停机后删除损坏索引文件让 Kafka 重建。


保留策略(Retention)#

  • 时间保留log.retention.mslog.retention.hours,超过时间删除旧段。
  • 大小保留log.retention.bytes,分区总大小超过阈值删除旧段。
  • 压缩策略
  • cleanup.policy=delete:按时间/大小删除。
  • cleanup.policy=compact:保留每个 key 的最新消息,适合状态类数据。
  • 可同时配置为 compact,delete
  • 最小保留log.segment.delete.delay.ms 防止段刚滚动即删除。
  • 消费影响:过期删除会导致老偏移失效,触发偏移重置逻辑。

示例:为主题设置保留策略

# 时间保留 2 小时,大小 1GB
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --entity-type topics --entity-name demo-offset \
  --alter --add-config "log.retention.ms=7200000,log.retention.bytes=1073741824"

# 查看配置
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
  --entity-type topics --entity-name demo-offset --describe

示例:压缩主题

# 创建压缩主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic demo-compact --partitions 1 --replication-factor 1 \
  --config cleanup.policy=compact --config min.compaction.lag.ms=60000

# 发送含key消息
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 \
  --topic demo-compact --property parse.key=true --property key.separator=:
# 输入:
# user1:state1
# user1:state2
# 预期:压缩后仅保留 user1 的最新值 state2

排错要点
- 旧消息消失:检查 log.retention.*cleanup.policy,确认是否触发删除。
- 压缩不生效:确认消息带 key,且 compaction lag 配置允许压缩执行。
- 磁盘不释放:检查 broker 日志中的 LogCleaner 是否异常,必要时重启。


实践要点#

  • 大吞吐场景优先调大 log.segment.bytes 与保留时间,减少段切换与删除压力。
  • 使用压缩主题时,合理设置 min.compaction.lag.ms/max.compaction.lag.ms,避免频繁压缩。
  • 监控 LogEndOffset 与消费位点差值,评估积压与保留风险。

关键命令汇总

# 查看消费组offset与lag
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --group demo-group

# 查看主题分区详细信息
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --topic demo-offset

练习
1. 创建主题 retention-test,设置 log.retention.ms=300000,生产消息后等待 5 分钟观察消息是否过期。
2. 创建压缩主题并写入同一 key 的多条消息,观察压缩结果。
3. 将消费者 auto.offset.reset 分别设置为 earliestlatest,比较启动后的消费行为。