10.5.5 端到端一致性方案与实践(事务、幂等、去重)
端到端一致性要覆盖生产者、Broker、消费者与下游存储,核心是“可重试+可去重”。生产侧用幂等与事务避免重复写入,消费侧用幂等落库与去重保证重复消费不影响结果,并用监控指标验证一致性闭环。
原理草图(EOS与外部存储对比):
一、Kafka事务+EOS(Exactly-Once)示例
目标:消费消息并将结果写回Kafka,同时提交offset,保证原子性。
1)生产者开启幂等与事务(客户端配置)
# file: /opt/kafka/config/producer-eos.properties
bootstrap.servers=127.0.0.1:9092
enable.idempotence=true
acks=all
retries=5
transactional.id=tx-eos-001
max.in.flight.requests.per.connection=5
2)消费者使用“读已提交”
# file: /opt/kafka/config/consumer-eos.properties
bootstrap.servers=127.0.0.1:9092
group.id=group-eos
enable.auto.commit=false
isolation.level=read_committed
auto.offset.reset=earliest
3)可执行示例(伪代码,关键调用顺序)
// 关键调用顺序:initTransactions -> begin -> send -> sendOffsetsToTransaction -> commit
producer.initTransactions();
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
producer.beginTransaction();
for (ConsumerRecord<String,String> r : records) {
// 业务处理后写回Kafka
producer.send(new ProducerRecord<>("topic-out", r.key(), r.value()));
}
// 将offset加入同一事务
Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(records);
producer.sendOffsetsToTransaction(offsets, "group-eos");
producer.commitTransaction();
}
4)验证命令(查看事务状态)
# 列出事务主题
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list | grep __transaction_state
# 查看主题消费是否只读已提交
bin/kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic topic-out --from-beginning \
--consumer.config /opt/kafka/config/consumer-eos.properties
二、幂等消费+外部存储去重示例(MySQL)
目标:至少一次投递,幂等落库,重复消费不产生副作用。
1)表结构(唯一约束去重)
-- file: /opt/sql/order.sql
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY,
amount DECIMAL(10,2),
status VARCHAR(16),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE msg_dedup (
msg_id VARCHAR(64) PRIMARY KEY,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2)幂等写入流程(伪代码)
BEGIN;
INSERT INTO msg_dedup(msg_id) VALUES(:msg_id); -- 若重复会主键冲突
UPDATE orders SET status=:status, amount=:amount WHERE order_id=:order_id;
COMMIT;
3)重复处理的预期效果
- 重复msg_id插入失败 → 回滚或忽略
- orders更新仅一次生效 → 业务结果不变
4)消费端Offset提交策略(命令解释)
# 关闭自动提交,业务处理成功后再提交
# 关键参数含义:
# enable.auto.commit=false:手动提交offset
# max.poll.interval.ms:避免处理慢导致重平衡
# session.timeout.ms:心跳超时
三、Outbox+CDC方案示例
适合“数据库为源”的业务,避免分布式事务。
1)Outbox表
CREATE TABLE outbox (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_id VARCHAR(64),
payload JSON,
state VARCHAR(16) DEFAULT 'NEW',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2)Debezium示例配置(截取关键)
{
"name": "mysql-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.user": "dbz",
"database.password": "dbz",
"database.server.id": "1001",
"table.include.list": "app.outbox",
"topic.prefix": "outbox"
}
}
四、关键命令与配置检查
# 检查Broker事务状态与ISR
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic topic-out
# 查看消费者组offset提交情况
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--group group-eos --describe
# 检查生产者幂等是否开启(查看客户端配置)
grep -E "enable.idempotence|transactional.id" /opt/kafka/config/producer-eos.properties
五、排错清单(常见问题→定位→修复)
- 事务卡住/超时
- 定位:查看Broker日志 transaction.timeout.ms 与客户端 transaction.timeout.ms
- 修复:提高超时或缩短处理时间
- 读到未提交消息
- 定位:消费者 isolation.level 是否为 read_committed
- 修复:修改配置并重启消费者
- 重复消费落库
- 定位:去重表/唯一索引是否缺失,或者插入未放在同一事务
- 修复:添加唯一约束,写入与业务更新放在同一DB事务
- offset未提交导致重复
- 定位:enable.auto.commit 配置与提交逻辑
- 修复:处理成功后显式提交offset
六、练习
1)配置一个开启幂等与事务的生产者,并用消费者读取 read_committed,验证重试不会产生重复。
2)用MySQL唯一索引做去重,构造重复msg_id,验证仅一次生效。
3)模拟消费者处理异常,确认事务回滚后消息不会出现在下游topic。
4)观察 kafka-consumer-groups.sh --describe 的lag变化,记录事务失败时的offset状态。