10.4.1 生产者配置与发送模型

生产者侧需要在可靠性、吞吐与延迟间权衡。下面给出发送模型原理、核心配置、可执行示例、排错与练习,便于落地。

原理草图(发送模型与确认路径):

文章图片

关键配置说明(带命令化解释):
- bootstrap.servers:集群入口,至少 2 个
- acks:0/1/all,确认级别
- retriesretry.backoff.ms:重试与退避
- linger.msbatch.size:批量与延迟
- compression.type:snappy/lz4/zstd
- enable.idempotence:幂等性
- max.in.flight.requests.per.connection:并发请求限制

安装/环境准备(Kafka 已安装情况下跳过;以下仅验证与准备主题):

# 1) 检查 Kafka 是否可用
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

# 2) 创建主题(3分区,2副本)
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic ops_orders --partitions 3 --replication-factor 2

# 3) 查看主题详情
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --topic ops_orders

生产者配置文件示例(可直接用于 Java Producer):

# /opt/kafka/config/producer.properties
bootstrap.servers=127.0.0.1:9092,127.0.0.1:9093
client.id=ops-producer-01
acks=all
retries=5
retry.backoff.ms=200
linger.ms=10
batch.size=32768
compression.type=lz4
enable.idempotence=true
max.in.flight.requests.per.connection=5
delivery.timeout.ms=120000
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

发送模型实战(同步、异步、批量):

// 文件:ProducerDemo.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "ops-producer-01");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, "5");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "200");
        props.put(ProducerConfig.LINGER_MS_CONFIG, "10");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "32768");
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            // 1) 同步发送
            ProducerRecord<String, String> r1 =
                new ProducerRecord<>("ops_orders", "order-1", "sync-msg-1");
            Future<RecordMetadata> f = producer.send(r1);
            RecordMetadata m = f.get(); // 阻塞等待
            System.out.println("sync: partition=" + m.partition() + ", offset=" + m.offset());

            // 2) 异步发送
            ProducerRecord<String, String> r2 =
                new ProducerRecord<>("ops_orders", "order-2", "async-msg-1");
            producer.send(r2, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("async error: " + exception.getMessage());
                } else {
                    System.out.println("async: partition=" + metadata.partition()
                            + ", offset=" + metadata.offset());
                }
            });

            // 3) 批量发送(配合 linger.ms/batch.size 自动聚合)
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<>("ops_orders", "batch-key", "batch-" + i));
            }

            producer.flush(); // 确保缓冲区消息发送完
        }
    }
}

编译与运行(含预期效果):

# 假设已安装 JDK 与 Kafka 客户端依赖
$ javac -cp "/opt/kafka/libs/*" ProducerDemo.java
$ java  -cp ".:/opt/kafka/libs/*" ProducerDemo

# 预期输出:同步发送返回 partition/offset,异步发送有回调输出

排错与故障处理(含命令):
1) 无法连接 Broker(常见于端口/监听地址)

# 检查端口监听
$ ss -lntp | grep 9092

# 验证 advertised.listeners 配置是否为客户端可达地址
$ grep -n "advertised.listeners" /opt/kafka/config/server.properties

2) TimeoutException 或发送延迟高

# 检查集群健康与ISR
$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --topic ops_orders

# 调整配置:提高 linger.ms/batch.size 或增加 broker 资源

3) 乱序风险(重试导致)

# 确保幂等与并发限制
enable.idempotence=true
max.in.flight.requests.per.connection=5

练习(动手验证):
1) 将 acks=1acks=all 分别发送 1000 条消息,对比吞吐与延迟。
2) 将 linger.ms 从 0 调整到 20,观察批次大小变化(可结合 JMX 或日志)。
3) 关闭 enable.idempotence,模拟网络抖动时观察是否出现重复消息。

命令解释速记:
- kafka-topics.sh --create:创建主题
- --partitions:分区数影响并行度
- --replication-factor:副本数影响可靠性
- producer.send().get():同步发送并返回元数据
- producer.send(callback):异步回调处理结果