10.1.5 偏移量、日志段与保留策略
偏移量(Offset)#
- 定义:每条消息在分区中的唯一序号,单调递增,用于定位消费进度与消息顺序。
- 提交方式:
- 自动提交:周期性提交,配置
enable.auto.commit与auto.commit.interval.ms。 - 手动提交:同步或异步提交,适用于精确控制与故障恢复。
- 存储位置:偏移量保存在内部主题
__consumer_offsets,按消费组与分区维度记录。 - 重置策略:消费组首次无偏移或偏移过期时,按
auto.offset.reset(earliest/latest/none)决定起始位置。
原理草图
示例:生产、消费与偏移量观察
# 1) 创建主题(单分区便于观察offset)
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic demo-offset --partitions 1 --replication-factor 1
# 2) 生产消息(终端输入3行消息)
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 \
--topic demo-offset
# 输入:
# a
# b
# c
# 3) 消费并打印offset
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
--topic demo-offset --from-beginning --property print.offset=true
# 预期输出:
# 0 a
# 1 b
# 2 c
示例:手动提交偏移量(Java配置片段)
# /opt/kafka/config/consumer.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-group
enable.auto.commit=false
auto.offset.reset=earliest
# 运行消费者(示例为脚本/应用读取上述配置)
java -jar /opt/apps/consumer-manual-commit.jar \
--config /opt/kafka/config/consumer.properties
# 预期:消费后手动提交offset,重启不重复消费
排错要点
- 重复消费:检查 enable.auto.commit 与业务处理/提交顺序;确保处理成功后提交。
- 无消息可读:确认 auto.offset.reset;若为 latest,新组从尾部开始。
- 提交失败:查看消费者日志是否有 CommitFailedException(通常是 rebalance 引起)。
日志段(Log Segment)#
- 结构:每个分区对应一组日志段文件,包含
.log(消息)与.index/.timeindex(索引)。 - 滚动条件:
- 按大小
log.segment.bytes - 按时间
log.segment.ms - 索引与查找:稀疏索引降低存储开销,依赖二分与顺序扫描实现快速定位。
原理草图
示例:查看分区日志段
# 假设 Kafka 日志目录为 /data/kafka-logs
ls -lh /data/kafka-logs/demo-offset-0
# 预期看到 .log/.index/.timeindex 文件
示例:调整段大小并触发滚动
# /opt/kafka/config/server.properties
log.segment.bytes=10485760 # 10MB
log.segment.ms=3600000 # 1小时
# 重启broker生效
systemctl restart kafka
# 发送大量消息后查看是否生成新段文件
排错要点
- 段增长过快:降低 log.segment.bytes 或检查写入速率。
- 索引损坏:查看 broker 日志中的 CorruptIndexException,可尝试停机后删除损坏索引文件让 Kafka 重建。
保留策略(Retention)#
- 时间保留:
log.retention.ms或log.retention.hours,超过时间删除旧段。 - 大小保留:
log.retention.bytes,分区总大小超过阈值删除旧段。 - 压缩策略:
cleanup.policy=delete:按时间/大小删除。cleanup.policy=compact:保留每个 key 的最新消息,适合状态类数据。- 可同时配置为
compact,delete。 - 最小保留:
log.segment.delete.delay.ms防止段刚滚动即删除。 - 消费影响:过期删除会导致老偏移失效,触发偏移重置逻辑。
示例:为主题设置保留策略
# 时间保留 2 小时,大小 1GB
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--entity-type topics --entity-name demo-offset \
--alter --add-config "log.retention.ms=7200000,log.retention.bytes=1073741824"
# 查看配置
kafka-configs.sh --bootstrap-server 127.0.0.1:9092 \
--entity-type topics --entity-name demo-offset --describe
示例:压缩主题
# 创建压缩主题
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic demo-compact --partitions 1 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000
# 发送含key消息
kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 \
--topic demo-compact --property parse.key=true --property key.separator=:
# 输入:
# user1:state1
# user1:state2
# 预期:压缩后仅保留 user1 的最新值 state2
排错要点
- 旧消息消失:检查 log.retention.* 与 cleanup.policy,确认是否触发删除。
- 压缩不生效:确认消息带 key,且 compaction lag 配置允许压缩执行。
- 磁盘不释放:检查 broker 日志中的 LogCleaner 是否异常,必要时重启。
实践要点#
- 大吞吐场景优先调大
log.segment.bytes与保留时间,减少段切换与删除压力。 - 使用压缩主题时,合理设置
min.compaction.lag.ms/max.compaction.lag.ms,避免频繁压缩。 - 监控
LogEndOffset与消费位点差值,评估积压与保留风险。
关键命令汇总
# 查看消费组offset与lag
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--describe --group demo-group
# 查看主题分区详细信息
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--describe --topic demo-offset
练习
1. 创建主题 retention-test,设置 log.retention.ms=300000,生产消息后等待 5 分钟观察消息是否过期。
2. 创建压缩主题并写入同一 key 的多条消息,观察压缩结果。
3. 将消费者 auto.offset.reset 分别设置为 earliest 与 latest,比较启动后的消费行为。