10.4.6 Java客户端实战示例
本节以 Java 客户端为例,演示 Kafka 生产者与消费者的完整链路,覆盖依赖安装、核心配置、发送与消费、手动提交、幂等与事务、异常重试与关闭流程,并补充可执行命令、排错方法与练习。
1. 原理草图:生产-消费链路#
2. 依赖安装与版本说明#
- 推荐版本:Kafka Client 3.x(与集群版本兼容为准)
- Maven 依赖(
pom.xml):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
3. 环境准备与验证命令#
1) 创建主题与查看分区(运维验证)
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka1:9092 \
--create --topic topic-demo --partitions 3 --replication-factor 2
/opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka1:9092 \
--describe --topic topic-demo
2) 运行控制台生产/消费验证链路
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server kafka1:9092 \
--topic topic-demo
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka1:9092 \
--topic topic-demo --from-beginning
- 预期效果:在生产端输入字符串,消费端可实时收到消息。
4. 生产者示例(同步/异步+关键参数解释)#
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// Kafka 集群入口
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// 序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可靠性与性能
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试次数
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 批量等待时间
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024);// 批量大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =
new ProducerRecord<>("topic-demo", "key-1", "hello kafka");
// 异步发送
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("send failed: " + exception.getMessage());
} else {
System.out.printf("sent to %s-%d offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
// 同步发送
RecordMetadata md = producer.send(record).get();
System.out.println("sync sent offset=" + md.offset());
producer.flush();
producer.close();
}
}
5. 消费者示例(手动提交+关键参数解释)#
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-demo");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic-demo"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
System.out.printf("topic=%s partition=%d offset=%d key=%s value=%s%n",
r.topic(), r.partition(), r.offset(), r.key(), r.value());
// 业务处理
}
// 批量提交
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
6. 幂等与事务(生产端)示例#
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 幂等
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 事务示例
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-demo-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic-demo", "k1", "v1"));
producer.send(new ProducerRecord<>("topic-demo", "k2", "v2"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
} finally {
producer.close();
}
7. 消费端隔离级别与事务配合#
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
- 作用:只消费已提交事务的消息,避免读到未提交数据。
8. 常见异常、排错命令与处理建议#
TimeoutException:检查 broker 可达性、网络、防火墙、DNS
排错命令:
nc -vz kafka1 9092
curl -s http://kafka1:9092 # 若启用明文探测,仅用于连通性测试
RecordTooLargeException:调整max.request.size与 brokermessage.max.bytes
排错命令:
grep -E "message.max.bytes|replica.fetch.max.bytes" /opt/kafka/config/server.properties
CommitFailedException:处理再平衡,缩短处理时间或增大max.poll.interval.ms
排错命令:
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group group-demo --describe
SerializationException:检查序列化一致性与 schema 版本
排错建议:确保生产与消费使用同一序列化器,必要时记录消息头版本号。
9. 关闭与资源释放(优雅退出)#
try {
// poll loop
} catch (org.apache.kafka.common.errors.WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync();
consumer.close();
}
- 生产者:
flush()后close() - 消费者:
close()触发资源释放与偏移提交
10. 运维验证清单#
- topic 副本因子、分区数是否合理
- 生产者
acks、retries、enable.idempotence是否符合可靠性要求 - 消费者是否启用手动提交并具备幂等处理
- 客户端与 broker 版本兼容性
- 日志中是否存在频繁重平衡或超时告警
11. 练习与实战任务#
1) 使用 Java 生产者发送 1000 条消息,统计吞吐并输出分区分布。
2) 设置 enable.idempotence=true 与 acks=1 对比重复消息概率。
3) 在消费者中故意 sleep(10s),观察 max.poll.interval.ms 触发的再平衡。
4) 使用 kafka-consumer-groups.sh --describe 验证 offset 提交进度。