10.4.6 Java客户端实战示例

本节以 Java 客户端为例,演示 Kafka 生产者与消费者的完整链路,覆盖依赖安装、核心配置、发送与消费、手动提交、幂等与事务、异常重试与关闭流程,并补充可执行命令、排错方法与练习。

1. 原理草图:生产-消费链路#

文章图片

2. 依赖安装与版本说明#

  • 推荐版本:Kafka Client 3.x(与集群版本兼容为准)
  • Maven 依赖(pom.xml):
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.6.0</version>
</dependency>

3. 环境准备与验证命令#

1) 创建主题与查看分区(运维验证)

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka1:9092 \
  --create --topic topic-demo --partitions 3 --replication-factor 2

/opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka1:9092 \
  --describe --topic topic-demo

2) 运行控制台生产/消费验证链路

/opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka1:9092 \
  --topic topic-demo

/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server kafka1:9092 \
  --topic topic-demo --from-beginning
  • 预期效果:在生产端输入字符串,消费端可实时收到消息。

4. 生产者示例(同步/异步+关键参数解释)#

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        // Kafka 集群入口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        // 序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 可靠性与性能
        props.put(ProducerConfig.ACKS_CONFIG, "all");          // 所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, 3);           // 失败重试次数
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);         // 批量等待时间
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024);// 批量大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record =
                new ProducerRecord<>("topic-demo", "key-1", "hello kafka");

        // 异步发送
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("send failed: " + exception.getMessage());
            } else {
                System.out.printf("sent to %s-%d offset=%d%n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        });

        // 同步发送
        RecordMetadata md = producer.send(record).get();
        System.out.println("sync sent offset=" + md.offset());

        producer.flush();
        producer.close();
    }
}

5. 消费者示例(手动提交+关键参数解释)#

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-demo");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 手动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic-demo"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> r : records) {
                    System.out.printf("topic=%s partition=%d offset=%d key=%s value=%s%n",
                            r.topic(), r.partition(), r.offset(), r.key(), r.value());
                    // 业务处理
                }
                // 批量提交
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}

6. 幂等与事务(生产端)示例#

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 幂等
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// 事务示例
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-demo-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic-demo", "k1", "v1"));
    producer.send(new ProducerRecord<>("topic-demo", "k2", "v2"));
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
} finally {
    producer.close();
}

7. 消费端隔离级别与事务配合#

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
  • 作用:只消费已提交事务的消息,避免读到未提交数据。

8. 常见异常、排错命令与处理建议#

  • TimeoutException:检查 broker 可达性、网络、防火墙、DNS
    排错命令:
nc -vz kafka1 9092
curl -s http://kafka1:9092  # 若启用明文探测,仅用于连通性测试
  • RecordTooLargeException:调整 max.request.size 与 broker message.max.bytes
    排错命令:
grep -E "message.max.bytes|replica.fetch.max.bytes" /opt/kafka/config/server.properties
  • CommitFailedException:处理再平衡,缩短处理时间或增大 max.poll.interval.ms
    排错命令:
/opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group group-demo --describe
  • SerializationException:检查序列化一致性与 schema 版本
    排错建议:确保生产与消费使用同一序列化器,必要时记录消息头版本号。

9. 关闭与资源释放(优雅退出)#

try {
    // poll loop
} catch (org.apache.kafka.common.errors.WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.commitSync();
    consumer.close();
}
  • 生产者:flush()close()
  • 消费者:close() 触发资源释放与偏移提交

10. 运维验证清单#

  • topic 副本因子、分区数是否合理
  • 生产者 acksretriesenable.idempotence 是否符合可靠性要求
  • 消费者是否启用手动提交并具备幂等处理
  • 客户端与 broker 版本兼容性
  • 日志中是否存在频繁重平衡或超时告警

11. 练习与实战任务#

1) 使用 Java 生产者发送 1000 条消息,统计吞吐并输出分区分布。
2) 设置 enable.idempotence=trueacks=1 对比重复消息概率。
3) 在消费者中故意 sleep(10s),观察 max.poll.interval.ms 触发的再平衡。
4) 使用 kafka-consumer-groups.sh --describe 验证 offset 提交进度。