10.5.1 消息可靠性模型与投递语义(at-most-once/at-least-once/exactly-once)
消息可靠性模型与投递语义(at-most-once/at-least-once/exactly-once)#
消息可靠性模型描述从生产到消费的丢失与重复风险。Kafka 通过生产者确认、Broker 副本复制与消费者提交策略组合形成不同投递语义。运维需在可靠性、吞吐与延迟之间权衡。
原理草图(投递语义与关键点)
1)At-most-once(至多一次)#
- 定义:消息可能丢失,但不会重复。
- 实现条件:生产端不重试或弱确认;消费者先提交再处理。
- 适用:日志采集、指标统计。
示例:消费者先提交后处理(可能丢失)
# /opt/kafka/config/consumer-at-most-once.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-at-most-once
enable.auto.commit=true
auto.commit.interval.ms=1000
# 运行消费者(自动提交,处理前提交offset)
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic demo-topic \
--consumer.config /opt/kafka/config/consumer-at-most-once.properties
2)At-least-once(至少一次)#
- 定义:消息不丢失,但可能重复。
- 实现条件:生产端 acks=all + 重试;消费者处理完成后再提交。
- 适用:订单、账务、关键业务事件。
示例:生产端 acks=all + 重试
# /opt/kafka/config/producer-at-least-once.properties
bootstrap.servers=127.0.0.1:9092
acks=all
retries=5
linger.ms=10
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic demo-topic \
--producer.config /opt/kafka/config/producer-at-least-once.properties
示例:消费后手动提交
# 使用kafka-consumer-groups查看滞后
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server 127.0.0.1:9092 \
--describe --group demo-at-least-once
// 伪代码:处理完成后手动提交
while (true) {
records = consumer.poll()
process(records)
consumer.commitSync()
}
3)Exactly-once(恰好一次)#
- 定义:消息既不丢失也不重复(端到端)。
- 实现条件:生产端幂等+事务;消费端 read_committed;下游支持幂等/事务。
- 适用:金融交易、库存扣减。
示例:事务生产者(幂等+事务)
# /opt/kafka/config/producer-exactly-once.properties
bootstrap.servers=127.0.0.1:9092
enable.idempotence=true
acks=all
retries=5
transactional.id=tx-producer-1
# 以事务方式发送(示例,需应用代码支持)
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic tx-topic \
--producer.config /opt/kafka/config/producer-exactly-once.properties
示例:消费者仅读已提交事务
# /opt/kafka/config/consumer-exactly-once.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-exactly-once
isolation.level=read_committed
enable.auto.commit=false
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic tx-topic \
--consumer.config /opt/kafka/config/consumer-exactly-once.properties
安装与环境准备(本节示例用)#
单机快速安装(Kafka + ZooKeeper)
# 1) 下载与解压
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz -O /tmp/kafka.tgz
mkdir -p /opt/kafka && tar -zxvf /tmp/kafka.tgz -C /opt/kafka --strip-components=1
# 2) 启动 ZooKeeper 与 Kafka(简化演示)
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
# 3) 创建测试主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic demo-topic --partitions 1 --replication-factor 1
验证
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
语义选择建议(简表)#
- 允许丢失:at-most-once(低延迟)。
- 不允许丢失但可去重:at-least-once(默认推荐)。
- 强一致:exactly-once(配置复杂、吞吐降低)。
排错清单(可靠性相关)#
- 重复消费
- 检查enable.auto.commit是否为true。
- 重平衡频繁:查看session.timeout.ms与max.poll.interval.ms。 - 消息丢失
- 检查生产端acks=all、retries。
- ISR 不足:min.insync.replicas过高或副本故障。 - 事务消息不可见
- 消费端isolation.level是否为read_committed。
- 事务未commitTransaction。 - 发送超时
- 检查request.timeout.ms与 broker 负载。
- 查看server.log是否有 ISR 收缩或网络抖动。
快速诊断命令
# 查看 ISR 情况
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--describe --topic demo-topic
# 查看消费滞后
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--describe --group demo-at-least-once
练习#
- 用
acks=0与acks=all对比发送延迟与丢失率。 - 将消费者从自动提交改为手动提交,观察重复消费变化。
- 在事务生产者中故意不提交事务,验证
read_committed下是否不可见。 - 调整
min.insync.replicas,模拟 ISR 收缩与发送失败。