10.4.1 生产者配置与发送模型
生产者侧需要在可靠性、吞吐与延迟间权衡。下面给出发送模型原理、核心配置、可执行示例、排错与练习,便于落地。
原理草图(发送模型与确认路径):
关键配置说明(带命令化解释):
- bootstrap.servers:集群入口,至少 2 个
- acks:0/1/all,确认级别
- retries、retry.backoff.ms:重试与退避
- linger.ms、batch.size:批量与延迟
- compression.type:snappy/lz4/zstd
- enable.idempotence:幂等性
- max.in.flight.requests.per.connection:并发请求限制
安装/环境准备(Kafka 已安装情况下跳过;以下仅验证与准备主题):
# 1) 检查 Kafka 是否可用
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 2) 创建主题(3分区,2副本)
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic ops_orders --partitions 3 --replication-factor 2
# 3) 查看主题详情
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--describe --topic ops_orders
生产者配置文件示例(可直接用于 Java Producer):
# /opt/kafka/config/producer.properties
bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093
client.id=ops-producer-01
acks=all
retries=5
retry.backoff.ms=200
linger.ms=10
batch.size=32768
compression.type=lz4
enable.idempotence=true
max.in.flight.requests.per.connection=5
delivery.timeout.ms=120000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
发送模型实战(同步、异步、批量):
// 文件:ProducerDemo.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ops-producer-01");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "5");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "200");
props.put(ProducerConfig.LINGER_MS_CONFIG, "10");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "32768");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 1) 同步发送
ProducerRecord<String, String> r1 =
new ProducerRecord<>("ops_orders", "order-1", "sync-msg-1");
Future<RecordMetadata> f = producer.send(r1);
RecordMetadata m = f.get(); // 阻塞等待
System.out.println("sync: partition=" + m.partition() + ", offset=" + m.offset());
// 2) 异步发送
ProducerRecord<String, String> r2 =
new ProducerRecord<>("ops_orders", "order-2", "async-msg-1");
producer.send(r2, (metadata, exception) -> {
if (exception != null) {
System.err.println("async error: " + exception.getMessage());
} else {
System.out.println("async: partition=" + metadata.partition()
+ ", offset=" + metadata.offset());
}
});
// 3) 批量发送(配合 linger.ms/batch.size 自动聚合)
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("ops_orders", "batch-key", "batch-" + i));
}
producer.flush(); // 确保缓冲区消息发送完
}
}
}
编译与运行(含预期效果):
# 假设已安装 JDK 与 Kafka 客户端依赖
$ javac -cp "/opt/kafka/libs/*" ProducerDemo.java
$ java -cp ".:/opt/kafka/libs/*" ProducerDemo
# 预期输出:同步发送返回 partition/offset,异步发送有回调输出
排错与故障处理(含命令):
1) 无法连接 Broker(常见于端口/监听地址)
# 检查端口监听
$ ss -lntp | grep 9092
# 验证 advertised.listeners 配置是否为客户端可达地址
$ grep -n "advertised.listeners" /opt/kafka/config/server.properties
2) TimeoutException 或发送延迟高
# 检查集群健康与ISR
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--describe --topic ops_orders
# 调整配置:提高 linger.ms/batch.size 或增加 broker 资源
3) 乱序风险(重试导致)
# 确保幂等与并发限制
enable.idempotence=true
max.in.flight.requests.per.connection=5
练习(动手验证):
1) 将 acks=1 与 acks=all 分别发送 1000 条消息,对比吞吐与延迟。
2) 将 linger.ms 从 0 调整到 20,观察批次大小变化(可结合 JMX 或日志)。
3) 关闭 enable.idempotence,模拟网络抖动时观察是否出现重复消息。
命令解释速记:
- kafka-topics.sh --create:创建主题
- --partitions:分区数影响并行度
- --replication-factor:副本数影响可靠性
- producer.send().get():同步发送并返回元数据
- producer.send(callback):异步回调处理结果