10.5.1 消息可靠性模型与投递语义(at-most-once/at-least-once/exactly-once)

消息可靠性模型与投递语义(at-most-once/at-least-once/exactly-once)#

消息可靠性模型描述从生产到消费的丢失与重复风险。Kafka 通过生产者确认、Broker 副本复制与消费者提交策略组合形成不同投递语义。运维需在可靠性、吞吐与延迟之间权衡。

原理草图(投递语义与关键点)

文章图片

1)At-most-once(至多一次)#

  • 定义:消息可能丢失,但不会重复。
  • 实现条件:生产端不重试或弱确认;消费者先提交再处理。
  • 适用:日志采集、指标统计。

示例:消费者先提交后处理(可能丢失)

# /opt/kafka/config/consumer-at-most-once.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-at-most-once
enable.auto.commit=true
auto.commit.interval.ms=1000
# 运行消费者(自动提交,处理前提交offset)
/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic demo-topic \
  --consumer.config /opt/kafka/config/consumer-at-most-once.properties

2)At-least-once(至少一次)#

  • 定义:消息不丢失,但可能重复。
  • 实现条件:生产端 acks=all + 重试;消费者处理完成后再提交。
  • 适用:订单、账务、关键业务事件。

示例:生产端 acks=all + 重试

# /opt/kafka/config/producer-at-least-once.properties
bootstrap.servers=127.0.0.1:9092
acks=all
retries=5
linger.ms=10
/opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic demo-topic \
  --producer.config /opt/kafka/config/producer-at-least-once.properties

示例:消费后手动提交

# 使用kafka-consumer-groups查看滞后
/opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --describe --group demo-at-least-once
// 伪代码:处理完成后手动提交
while (true) {
  records = consumer.poll()
  process(records)
  consumer.commitSync()
}

3)Exactly-once(恰好一次)#

  • 定义:消息既不丢失也不重复(端到端)。
  • 实现条件:生产端幂等+事务;消费端 read_committed;下游支持幂等/事务。
  • 适用:金融交易、库存扣减。

示例:事务生产者(幂等+事务)

# /opt/kafka/config/producer-exactly-once.properties
bootstrap.servers=127.0.0.1:9092
enable.idempotence=true
acks=all
retries=5
transactional.id=tx-producer-1
# 以事务方式发送(示例,需应用代码支持)
/opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic tx-topic \
  --producer.config /opt/kafka/config/producer-exactly-once.properties

示例:消费者仅读已提交事务

# /opt/kafka/config/consumer-exactly-once.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-exactly-once
isolation.level=read_committed
enable.auto.commit=false
/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic tx-topic \
  --consumer.config /opt/kafka/config/consumer-exactly-once.properties

安装与环境准备(本节示例用)#

单机快速安装(Kafka + ZooKeeper)

# 1) 下载与解压
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz -O /tmp/kafka.tgz
mkdir -p /opt/kafka && tar -zxvf /tmp/kafka.tgz -C /opt/kafka --strip-components=1

# 2) 启动 ZooKeeper 与 Kafka(简化演示)
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

# 3) 创建测试主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic demo-topic --partitions 1 --replication-factor 1

验证

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

语义选择建议(简表)#

  • 允许丢失:at-most-once(低延迟)。
  • 不允许丢失但可去重:at-least-once(默认推荐)。
  • 强一致:exactly-once(配置复杂、吞吐降低)。

排错清单(可靠性相关)#

  1. 重复消费
    - 检查 enable.auto.commit 是否为 true
    - 重平衡频繁:查看 session.timeout.msmax.poll.interval.ms
  2. 消息丢失
    - 检查生产端 acks=allretries
    - ISR 不足:min.insync.replicas 过高或副本故障。
  3. 事务消息不可见
    - 消费端 isolation.level 是否为 read_committed
    - 事务未 commitTransaction
  4. 发送超时
    - 检查 request.timeout.ms 与 broker 负载。
    - 查看 server.log 是否有 ISR 收缩或网络抖动。

快速诊断命令

# 查看 ISR 情况
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --topic demo-topic

# 查看消费滞后
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --group demo-at-least-once

练习#

  1. acks=0acks=all 对比发送延迟与丢失率。
  2. 将消费者从自动提交改为手动提交,观察重复消费变化。
  3. 在事务生产者中故意不提交事务,验证 read_committed 下是否不可见。
  4. 调整 min.insync.replicas,模拟 ISR 收缩与发送失败。