10.4.4 手动提交与幂等消费
手动提交用于精细控制位移提交时机,避免自动提交导致的丢失或重复。典型流程是先拉取消息、处理完成后再提交位移,确保“先处理、后提交”。在高可靠场景下应关闭自动提交并采用同步/异步提交结合重试机制。幂等消费用于在“至少一次”投递下避免重复处理的副作用,核心是对同一业务消息重复消费时结果一致。
原理草图(手动提交 + 幂等消费):
手动提交关键点:
- 关闭自动提交:enable.auto.commit=false,由应用显式提交。
- 提交范围:按批次提交本次处理完成的最大位移,避免逐条提交带来的性能损耗。
- 处理失败策略:失败消息重试、投递到死信队列或持久化后补偿处理。
- 异步提交回调:记录失败并触发补提或回滚策略。
幂等消费实践要点:
- 业务唯一标识:消息包含全局唯一ID(如订单号、事务ID)。
- 去重存储:使用数据库唯一索引或缓存集合(如Redis Set)记录已处理ID。
- 幂等写入:数据库使用INSERT ... ON DUPLICATE KEY或UPDATE实现幂等落库。
- 事务一致性:幂等检查与业务写入在同一事务或可重放逻辑中完成。
- 重试安全:重试只会触发幂等判断,不会产生副作用。
安装与环境准备(单机演示):
# 1) 启动 Kafka(示例使用 docker-compose)
cat > docker-compose.yml <<'EOF'
version: "3.8"
services:
zookeeper:
image: bitnami/zookeeper:3.9
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ports: ["2181:2181"]
kafka:
image: bitnami/kafka:3.6
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER=yes
ports: ["9092:9092"]
depends_on: [zookeeper]
EOF
docker compose up -d
# 2) 创建主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic demo-orders --partitions 3 --replication-factor 1
Java 消费者示例(手动提交 + 幂等去重,带参数解释):
// 文件: src/main/java/com/demo/ManualCommitIdempotentConsumer.java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("demo-orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> r : records) {
// 假设消息格式: orderId,amount
String orderId = r.value().split(",")[0];
// 幂等检查:伪代码,实际应调用DB/Redis
if (IdempotentStore.alreadyProcessed(orderId)) {
// 已处理,跳过但仍推进位移
} else {
// 业务处理
OrderService.process(r.value());
IdempotentStore.markProcessed(orderId);
}
// 按分区记录最大位移,提交时需要+1
offsets.put(
new TopicPartition(r.topic(), r.partition()),
new OffsetAndMetadata(r.offset() + 1)
);
}
if (!offsets.isEmpty()) {
// 同步提交,失败可捕获异常重试
consumer.commitSync(offsets);
}
}
}
数据库幂等写入示例(MySQL):
-- 表结构:订单号唯一
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY,
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 幂等写入:重复订单号不会导致重复插入
INSERT INTO orders(order_id, amount, status)
VALUES('A001', 100.00, 'PAID')
ON DUPLICATE KEY UPDATE status=VALUES(status);
Redis 去重示例(命令 + 解释):
# 使用 SET 记录已处理的订单ID
redis-cli SADD processed_orders A001 # 返回 1 表示新增
redis-cli SADD processed_orders A001 # 返回 0 表示已存在
redis-cli SISMEMBER processed_orders A001 # 返回 1 表示已处理
提交失败排错与处理:
# 1) 查看消费者组位移与延迟
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group demo-consumer-group
# 2) 常见异常处理
# - CommitFailedException:消费组再平衡导致提交失败
# 处理:缩短处理时间或调大 max.poll.interval.ms
# - Rebalance频繁:检查分区数量与消费者数量
# - 处理卡顿:提升并发、优化下游DB写入
手动提交与幂等消费组合策略示例(伪流程):
poll -> 幂等检查 -> 业务处理 -> 写幂等记录 -> commitSync
失败 -> 不提交位移 -> 记录日志/重试/投递DLQ
练习:
1. 启动本节的 Kafka 环境,创建主题 demo-orders,编写生产者发送 100 条订单消息,包含重复订单号。
2. 实现消费者手动提交位移,并在 MySQL 中实现幂等写入,验证重复订单不会重复插入。
3. 人为制造提交失败(例如停止消费者后快速重启),观察 kafka-consumer-groups.sh 输出并解释位移变化。
4. 将 commitSync 改为 commitAsync 并加入回调,记录失败次数,比较两种提交方式的表现。