10.1.4 生产者、消费者与消费组

生产者、消费者与消费组#

原理草图(Producer/Consumer/Group)#

文章图片

生产者(Producer)职责与工作流(含示例)#

核心流程:分区选择 → 批量/压缩 → 发送确认 → 重试/幂等。

安装与准备(二进制示例)

# 1) 下载并解压
tar -zxvf kafka_2.13-3.6.1.tgz -C /opt/
ln -s /opt/kafka_2.13-3.6.1 /opt/kafka

# 2) 启动本地单机(开发测试)
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

# 3) 创建主题
/opt/kafka/bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
  --partitions 2 --replication-factor 1

生产者 CLI 示例

# 发送单条消息
echo "order_id=1001 amount=99.9" | /opt/kafka/bin/kafka-console-producer.sh \
  --topic demo-topic --bootstrap-server 127.0.0.1:9092

# 发送带 Key 的消息(触发 hash 分区)
/opt/kafka/bin/kafka-console-producer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
  --property "parse.key=true" --property "key.separator=:" <<EOF
user1:login
user2:logout
user1:buy
EOF

生产者配置示例(properties 文件)
/opt/kafka/config/producer.properties

bootstrap.servers=127.0.0.1:9092
acks=all
retries=5
retry.backoff.ms=100
enable.idempotence=true
max.in.flight.requests.per.connection=5
linger.ms=10
batch.size=32768
compression.type=lz4

命令解释
- acks=all:等待所有 ISR 副本确认,提升可靠性。
- enable.idempotence=true:启用幂等避免重复写入。
- linger.ms/batch.size:控制批量合并,提升吞吐。

消费者(Consumer)职责与工作流(含示例)#

关键点:拉取模型、偏移量管理、并行消费、反压控制。

消费 CLI 示例

# 读取消息(从最新开始)
/opt/kafka/bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092

# 读取消息(从最早)
/opt/kafka/bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
  --from-beginning

# 指定消费组
/opt/kafka/bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
  --group demo-group --from-beginning

消费者配置示例(properties 文件)
/opt/kafka/config/consumer.properties

bootstrap.servers=127.0.0.1:9092
group.id=demo-group
enable.auto.commit=false
auto.offset.reset=earliest
max.poll.records=200
max.poll.interval.ms=300000
fetch.min.bytes=1
fetch.max.wait.ms=500

命令解释
- enable.auto.commit=false:业务处理成功后手动提交。
- auto.offset.reset=earliest:无偏移时从最早开始。
- max.poll.records:单次拉取条数,控制批量大小。

手动提交偏移量示例(伪代码逻辑)

# 1) 业务消费
# 2) 处理成功
# 3) 提交偏移量
# 在 CLI 仅演示概念,实际需使用客户端 SDK

消费组(Consumer Group)机制(含示例)#

组内分配原则:一个分区同一时间只能分配给一个消费者实例。

查看消费组状态

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --group demo-group

输出示例与解释

TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
demo-topic   0          10              12              2    consumer-1
demo-topic   1          8               8               0    consumer-2
  • LAG:积压量,持续增大说明消费跟不上。
  • CONSUMER-ID:当前分区归属实例。

分配策略示例
/opt/kafka/config/consumer.properties

partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor

消息消费语义(含示例)#

  • 至少一次:处理成功后提交偏移,可能重复。
  • 至多一次:先提交后处理,可能丢失。
  • 恰好一次:幂等生产者 + 事务 + 精准偏移。

事务示例(命令思路)

# 1) 生产端启用事务 (client 代码中)
# 2) 消费端处理并提交 offset 与结果在同一事务中
# CLI 无法模拟,需使用 SDK

常见故障与排错#

1) 生产失败:TimeoutException

# 查看 broker 端口与状态
ss -lntp | grep 9092
tail -n 100 /opt/kafka/logs/server.log

排查点:bootstrap.servers 是否可达、acks=all 时 ISR 是否齐全。

2) 消费组频繁 Rebalance

# 查看组状态与成员
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --group demo-group

排查点:max.poll.interval.ms 是否过小、消费者处理是否阻塞。

3) 消费延迟持续增加

# 查看主题末尾 offset
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list 127.0.0.1:9092 --topic demo-topic --time -1

排查点:分区数不足、消费者实例不足、业务处理耗时过长。

练习#

1) 创建 demo-topic 分区数为 3,并启动 2 个消费者实例,观察分区分配结果。
2) 将 enable.auto.commit 关闭,模拟“处理失败不提交”的场景,验证重复消费现象。
3) 设置 acks=0acks=all 分别发送 10000 条消息,比较吞吐与丢失风险。