10.5.5 端到端一致性方案与实践(事务、幂等、去重)

端到端一致性要覆盖生产者、Broker、消费者与下游存储,核心是“可重试+可去重”。生产侧用幂等与事务避免重复写入,消费侧用幂等落库与去重保证重复消费不影响结果,并用监控指标验证一致性闭环。

原理草图(EOS与外部存储对比):

文章图片

一、Kafka事务+EOS(Exactly-Once)示例
目标:消费消息并将结果写回Kafka,同时提交offset,保证原子性。

1)生产者开启幂等与事务(客户端配置)

# file: /opt/kafka/config/producer-eos.properties
bootstrap.servers=127.0.0.1:9092
enable.idempotence=true
acks=all
retries=5
transactional.id=tx-eos-001
max.in.flight.requests.per.connection=5

2)消费者使用“读已提交”

# file: /opt/kafka/config/consumer-eos.properties
bootstrap.servers=127.0.0.1:9092
group.id=group-eos
enable.auto.commit=false
isolation.level=read_committed
auto.offset.reset=earliest

3)可执行示例(伪代码,关键调用顺序)

// 关键调用顺序:initTransactions -> begin -> send -> sendOffsetsToTransaction -> commit
producer.initTransactions();
while (true) {
  ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
  producer.beginTransaction();
  for (ConsumerRecord<String,String> r : records) {
    // 业务处理后写回Kafka
    producer.send(new ProducerRecord<>("topic-out", r.key(), r.value()));
  }
  // 将offset加入同一事务
  Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(records);
  producer.sendOffsetsToTransaction(offsets, "group-eos");
  producer.commitTransaction();
}

4)验证命令(查看事务状态)

# 列出事务主题
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list | grep __transaction_state

# 查看主题消费是否只读已提交
bin/kafka-console-consumer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic topic-out --from-beginning \
  --consumer.config /opt/kafka/config/consumer-eos.properties

二、幂等消费+外部存储去重示例(MySQL)
目标:至少一次投递,幂等落库,重复消费不产生副作用。

1)表结构(唯一约束去重)

-- file: /opt/sql/order.sql
CREATE TABLE orders (
  order_id VARCHAR(64) PRIMARY KEY,
  amount DECIMAL(10,2),
  status VARCHAR(16),
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE msg_dedup (
  msg_id VARCHAR(64) PRIMARY KEY,
  processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

2)幂等写入流程(伪代码)

BEGIN;
INSERT INTO msg_dedup(msg_id) VALUES(:msg_id); -- 若重复会主键冲突
UPDATE orders SET status=:status, amount=:amount WHERE order_id=:order_id;
COMMIT;

3)重复处理的预期效果
- 重复msg_id插入失败 → 回滚或忽略
- orders更新仅一次生效 → 业务结果不变

4)消费端Offset提交策略(命令解释)

# 关闭自动提交,业务处理成功后再提交
# 关键参数含义:
# enable.auto.commit=false:手动提交offset
# max.poll.interval.ms:避免处理慢导致重平衡
# session.timeout.ms:心跳超时

三、Outbox+CDC方案示例
适合“数据库为源”的业务,避免分布式事务。

1)Outbox表

CREATE TABLE outbox (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  aggregate_id VARCHAR(64),
  payload JSON,
  state VARCHAR(16) DEFAULT 'NEW',
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

2)Debezium示例配置(截取关键)

{
  "name": "mysql-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "127.0.0.1",
    "database.user": "dbz",
    "database.password": "dbz",
    "database.server.id": "1001",
    "table.include.list": "app.outbox",
    "topic.prefix": "outbox"
  }
}

四、关键命令与配置检查

# 检查Broker事务状态与ISR
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic topic-out

# 查看消费者组offset提交情况
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
  --group group-eos --describe

# 检查生产者幂等是否开启(查看客户端配置)
grep -E "enable.idempotence|transactional.id" /opt/kafka/config/producer-eos.properties

五、排错清单(常见问题→定位→修复)
- 事务卡住/超时
- 定位:查看Broker日志 transaction.timeout.ms 与客户端 transaction.timeout.ms
- 修复:提高超时或缩短处理时间
- 读到未提交消息
- 定位:消费者 isolation.level 是否为 read_committed
- 修复:修改配置并重启消费者
- 重复消费落库
- 定位:去重表/唯一索引是否缺失,或者插入未放在同一事务
- 修复:添加唯一约束,写入与业务更新放在同一DB事务
- offset未提交导致重复
- 定位:enable.auto.commit 配置与提交逻辑
- 修复:处理成功后显式提交offset

六、练习
1)配置一个开启幂等与事务的生产者,并用消费者读取 read_committed,验证重试不会产生重复。
2)用MySQL唯一索引做去重,构造重复msg_id,验证仅一次生效。
3)模拟消费者处理异常,确认事务回滚后消息不会出现在下游topic。
4)观察 kafka-consumer-groups.sh --describe 的lag变化,记录事务失败时的offset状态。