10.1.4 生产者、消费者与消费组
生产者、消费者与消费组#
原理草图(Producer/Consumer/Group)#
生产者(Producer)职责与工作流(含示例)#
核心流程:分区选择 → 批量/压缩 → 发送确认 → 重试/幂等。
安装与准备(二进制示例)
# 1) 下载并解压
tar -zxvf kafka_2.13-3.6.1.tgz -C /opt/
ln -s /opt/kafka_2.13-3.6.1 /opt/kafka
# 2) 启动本地单机(开发测试)
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
# 3) 创建主题
/opt/kafka/bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
--partitions 2 --replication-factor 1
生产者 CLI 示例
# 发送单条消息
echo "order_id=1001 amount=99.9" | /opt/kafka/bin/kafka-console-producer.sh \
--topic demo-topic --bootstrap-server 127.0.0.1:9092
# 发送带 Key 的消息(触发 hash 分区)
/opt/kafka/bin/kafka-console-producer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
--property "parse.key=true" --property "key.separator=:" <<EOF
user1:login
user2:logout
user1:buy
EOF
生产者配置示例(properties 文件)
/opt/kafka/config/producer.properties
bootstrap.servers=127.0.0.1:9092
acks=all
retries=5
retry.backoff.ms=100
enable.idempotence=true
max.in.flight.requests.per.connection=5
linger.ms=10
batch.size=32768
compression.type=lz4
命令解释
- acks=all:等待所有 ISR 副本确认,提升可靠性。
- enable.idempotence=true:启用幂等避免重复写入。
- linger.ms/batch.size:控制批量合并,提升吞吐。
消费者(Consumer)职责与工作流(含示例)#
关键点:拉取模型、偏移量管理、并行消费、反压控制。
消费 CLI 示例
# 读取消息(从最新开始)
/opt/kafka/bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092
# 读取消息(从最早)
/opt/kafka/bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
--from-beginning
# 指定消费组
/opt/kafka/bin/kafka-console-consumer.sh --topic demo-topic --bootstrap-server 127.0.0.1:9092 \
--group demo-group --from-beginning
消费者配置示例(properties 文件)
/opt/kafka/config/consumer.properties
bootstrap.servers=127.0.0.1:9092
group.id=demo-group
enable.auto.commit=false
auto.offset.reset=earliest
max.poll.records=200
max.poll.interval.ms=300000
fetch.min.bytes=1
fetch.max.wait.ms=500
命令解释
- enable.auto.commit=false:业务处理成功后手动提交。
- auto.offset.reset=earliest:无偏移时从最早开始。
- max.poll.records:单次拉取条数,控制批量大小。
手动提交偏移量示例(伪代码逻辑)
# 1) 业务消费
# 2) 处理成功
# 3) 提交偏移量
# 在 CLI 仅演示概念,实际需使用客户端 SDK
消费组(Consumer Group)机制(含示例)#
组内分配原则:一个分区同一时间只能分配给一个消费者实例。
查看消费组状态
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--describe --group demo-group
输出示例与解释
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
demo-topic 0 10 12 2 consumer-1
demo-topic 1 8 8 0 consumer-2
LAG:积压量,持续增大说明消费跟不上。CONSUMER-ID:当前分区归属实例。
分配策略示例
/opt/kafka/config/consumer.properties
partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor
消息消费语义(含示例)#
- 至少一次:处理成功后提交偏移,可能重复。
- 至多一次:先提交后处理,可能丢失。
- 恰好一次:幂等生产者 + 事务 + 精准偏移。
事务示例(命令思路)
# 1) 生产端启用事务 (client 代码中)
# 2) 消费端处理并提交 offset 与结果在同一事务中
# CLI 无法模拟,需使用 SDK
常见故障与排错#
1) 生产失败:TimeoutException
# 查看 broker 端口与状态
ss -lntp | grep 9092
tail -n 100 /opt/kafka/logs/server.log
排查点:bootstrap.servers 是否可达、acks=all 时 ISR 是否齐全。
2) 消费组频繁 Rebalance
# 查看组状态与成员
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 \
--describe --group demo-group
排查点:max.poll.interval.ms 是否过小、消费者处理是否阻塞。
3) 消费延迟持续增加
# 查看主题末尾 offset
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list 127.0.0.1:9092 --topic demo-topic --time -1
排查点:分区数不足、消费者实例不足、业务处理耗时过长。
练习#
1) 创建 demo-topic 分区数为 3,并启动 2 个消费者实例,观察分区分配结果。
2) 将 enable.auto.commit 关闭,模拟“处理失败不提交”的场景,验证重复消费现象。
3) 设置 acks=0 与 acks=all 分别发送 10000 条消息,比较吞吐与丢失风险。