10.5.2 生产者可靠性机制(acks、重试、幂等、事务)

生产者可靠性依赖 acks、重试、幂等与事务四类机制组合。核心目标是平衡吞吐与一致性,并在网络抖动、Broker 故障时避免丢失与重复。

原理草图(生产者可靠性机制协作关系):

文章图片

1) 关键机制与参数解释(含配置示例)#

生产者关键参数建议配置如下(config/producer.properties):

# 可靠性
acks=all
min.insync.replicas=2

# 重试与超时
retries=10
retry.backoff.ms=100
request.timeout.ms=30000
delivery.timeout.ms=120000

# 幂等
enable.idempotence=true
max.in.flight.requests.per.connection=5

# 性能
linger.ms=5
batch.size=65536
compression.type=snappy

参数说明:
- acks:写入成功判定条件。0 不等待确认,1 仅 leader 确认,all 等待 ISR 副本确认。
- min.insync.replicas:ISR 最小副本数,配合 acks=all 避免副本不足导致写入成功但不可靠。
- retries/retry.backoff.ms/delivery.timeout.ms:控制重试次数、间隔与总投递超时窗口。
- enable.idempotence:启用幂等,避免重试导致重复。
- transactional.id:开启事务需设置(示例见后文)。

2) 生产者可靠性示例(命令可执行)#

2.1 创建主题与副本#

# 3 副本,保证 ISR 有空间
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders --partitions 3 --replication-factor 3

2.2 发送消息(acks=all+幂等+重试)#

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders --producer.config config/producer.properties

预期效果:网络波动时控制台仍可持续发送,重复概率大幅降低。

2.3 事务生产者示例(Java 伪代码)#

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("enable.idempotence", "true");
props.put("transactional.id", "order-txn-1");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();

producer.send(new ProducerRecord<>("orders", "k1", "v1"));
producer.send(new ProducerRecord<>("orders", "k2", "v2"));

producer.commitTransaction();
// 异常场景
// producer.abortTransaction();

解释:
- initTransactions():初始化事务并申请 PID。
- commitTransaction():原子提交,消费者设置 read_committed 才可读取。

3) 相关安装/配置补充(最小可运行环境)#

Kafka 单机示例(Linux):

# 1) 解压
tar -xf kafka_2.13-3.6.0.tgz -C /opt/
cd /opt/kafka_2.13-3.6.0

# 2) 启动(KRaft 单机示例)
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t "$KAFKA_CLUSTER_ID" -c config/kraft/server.properties
bin/kafka-server-start.sh -daemon config/kraft/server.properties

4) 排错与验证#

4.1 常见故障与命令#

  1. 写入失败:NotEnoughReplicas
# 查看 ISR 状态
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

处理建议:检查 min.insync.replicasreplication.factor,确保 ISR 足够。

  1. 生产者超时:TimeoutException
# 查看 broker 日志
tail -f /opt/kafka_2.13-3.6.0/logs/server.log

处理建议:调大 delivery.timeout.ms,降低网络抖动影响。

  1. 重复消息
    - 未开启幂等或 max.in.flight.requests.per.connection 过大。
    - 检查 enable.idempotence 是否生效(生产者日志输出)。

4.2 验证事务隔离#

消费者配置 read_committed

# config/consumer.properties
isolation.level=read_committed
enable.auto.commit=false

运行后仅能读到已提交事务的消息。

5) 实战练习#

  1. 创建 orders 主题(3 副本)并设置 min.insync.replicas=2,验证在停止一个 broker 时仍可写入。
  2. acks=1acks=all 的延迟对比,记录吞吐与失败率。
  3. 开启幂等后模拟网络抖动(限制带宽或短暂重启 broker),观察是否出现重复消息。
  4. 使用事务生产者与 read_committed 消费者,验证未提交事务消息不可见。

综合建议:一般场景使用 acks=all + 幂等 + 合理重试;端到端一致性需求才启用事务;通过批量发送、压缩与合理分区设计降低可靠性带来的性能损耗。