10.4.4 手动提交与幂等消费

手动提交用于精细控制位移提交时机,避免自动提交导致的丢失或重复。典型流程是先拉取消息、处理完成后再提交位移,确保“先处理、后提交”。在高可靠场景下应关闭自动提交并采用同步/异步提交结合重试机制。幂等消费用于在“至少一次”投递下避免重复处理的副作用,核心是对同一业务消息重复消费时结果一致。

原理草图(手动提交 + 幂等消费):

文章图片

手动提交关键点:
- 关闭自动提交:enable.auto.commit=false,由应用显式提交。
- 提交范围:按批次提交本次处理完成的最大位移,避免逐条提交带来的性能损耗。
- 处理失败策略:失败消息重试、投递到死信队列或持久化后补偿处理。
- 异步提交回调:记录失败并触发补提或回滚策略。

幂等消费实践要点:
- 业务唯一标识:消息包含全局唯一ID(如订单号、事务ID)。
- 去重存储:使用数据库唯一索引或缓存集合(如Redis Set)记录已处理ID。
- 幂等写入:数据库使用INSERT ... ON DUPLICATE KEYUPDATE实现幂等落库。
- 事务一致性:幂等检查与业务写入在同一事务或可重放逻辑中完成。
- 重试安全:重试只会触发幂等判断,不会产生副作用。

安装与环境准备(单机演示):

# 1) 启动 Kafka(示例使用 docker-compose)
cat > docker-compose.yml <<'EOF'
version: "3.8"
services:
  zookeeper:
    image: bitnami/zookeeper:3.9
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports: ["2181:2181"]
  kafka:
    image: bitnami/kafka:3.6
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
    ports: ["9092:9092"]
    depends_on: [zookeeper]
EOF

docker compose up -d

# 2) 创建主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic demo-orders --partitions 3 --replication-factor 1

Java 消费者示例(手动提交 + 幂等去重,带参数解释):

// 文件: src/main/java/com/demo/ManualCommitIdempotentConsumer.java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList("demo-orders"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        for (ConsumerRecord<String, String> r : records) {
            // 假设消息格式: orderId,amount
            String orderId = r.value().split(",")[0];

            // 幂等检查:伪代码,实际应调用DB/Redis
            if (IdempotentStore.alreadyProcessed(orderId)) {
                // 已处理,跳过但仍推进位移
            } else {
                // 业务处理
                OrderService.process(r.value());
                IdempotentStore.markProcessed(orderId);
            }

            // 按分区记录最大位移,提交时需要+1
            offsets.put(
                new TopicPartition(r.topic(), r.partition()),
                new OffsetAndMetadata(r.offset() + 1)
            );
        }

        if (!offsets.isEmpty()) {
            // 同步提交,失败可捕获异常重试
            consumer.commitSync(offsets);
        }
    }
}

数据库幂等写入示例(MySQL):

-- 表结构:订单号唯一
CREATE TABLE orders (
  order_id VARCHAR(64) PRIMARY KEY,
  amount DECIMAL(10,2),
  status VARCHAR(20),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 幂等写入:重复订单号不会导致重复插入
INSERT INTO orders(order_id, amount, status)
VALUES('A001', 100.00, 'PAID')
ON DUPLICATE KEY UPDATE status=VALUES(status);

Redis 去重示例(命令 + 解释):

# 使用 SET 记录已处理的订单ID
redis-cli SADD processed_orders A001     # 返回 1 表示新增
redis-cli SADD processed_orders A001     # 返回 0 表示已存在
redis-cli SISMEMBER processed_orders A001  # 返回 1 表示已处理

提交失败排错与处理:

# 1) 查看消费者组位移与延迟
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group demo-consumer-group

# 2) 常见异常处理
# - CommitFailedException:消费组再平衡导致提交失败
#   处理:缩短处理时间或调大 max.poll.interval.ms
# - Rebalance频繁:检查分区数量与消费者数量
# - 处理卡顿:提升并发、优化下游DB写入

手动提交与幂等消费组合策略示例(伪流程):

poll -> 幂等检查 -> 业务处理 -> 写幂等记录 -> commitSync
失败 -> 不提交位移 -> 记录日志/重试/投递DLQ

练习:
1. 启动本节的 Kafka 环境,创建主题 demo-orders,编写生产者发送 100 条订单消息,包含重复订单号。
2. 实现消费者手动提交位移,并在 MySQL 中实现幂等写入,验证重复订单不会重复插入。
3. 人为制造提交失败(例如停止消费者后快速重启),观察 kafka-consumer-groups.sh 输出并解释位移变化。
4. 将 commitSync 改为 commitAsync 并加入回调,记录失败次数,比较两种提交方式的表现。