10.4.5 常见异常与重试处理
常见异常与重试处理围绕“定位原因—选择策略—控制副作用”展开。目标是让生产端可控重试、消费者可恢复处理,并通过监控与日志快速定位根因。
原理草图(异常链路与重试决策)
1. 生产者常见异常与处理示例#
典型异常:TimeoutException、NotEnoughReplicas/NotEnoughReplicasAfterAppend、RecordTooLargeException、InvalidTopicException、AuthorizationException、UnknownTopicOrPartition
关键配置示例(/opt/kafka/config/producer-retry.properties)
bootstrap.servers=127.0.0.1:9092
acks=all
retries=10
retry.backoff.ms=200
delivery.timeout.ms=120000
request.timeout.ms=30000
linger.ms=10
batch.size=32768
enable.idempotence=true
max.in.flight.requests.per.connection=5
命令:使用控制台生产者验证超时与重试
# 1) 创建主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic t_retry --partitions 1 --replication-factor 1
# 2) 启动生产者并使用配置文件
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic t_retry \
--producer.config /opt/kafka/config/producer-retry.properties
预期效果:消息发送成功;如 Broker 停止将触发重试,日志出现 TimeoutException 后在 Broker 恢复时自动恢复发送。
排错命令与解释
# 查看 Broker 端是否有 ISR 不足
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--describe --topic t_retry
# 查看 Broker 日志(超时、ISR不足、权限)
tail -f /opt/kafka/logs/server.log
- 若
NotEnoughReplicas:提高副本因子或恢复宕机副本,确认acks=all是否符合业务要求。 - 若
RecordTooLargeException:调整生产端max.request.size与 Brokermessage.max.bytes。
消息过大示例与修复
# /opt/kafka/config/server.properties
message.max.bytes=2097152
# /opt/kafka/config/producer-retry.properties
max.request.size=2097152
重启 Broker 使配置生效
/opt/kafka/bin/kafka-server-stop.sh
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
2. 消费者常见异常与处理示例#
典型异常:CommitFailedException、RebalanceInProgressException、TimeoutException、WakeupException、SerializationException、OffsetOutOfRangeException、RecordTooLargeException
关键配置示例(/opt/kafka/config/consumer-retry.properties)
bootstrap.servers=127.0.0.1:9092
group.id=g_retry
enable.auto.commit=false
max.poll.records=100
max.poll.interval.ms=300000
session.timeout.ms=10000
request.timeout.ms=30000
命令:使用控制台消费者模拟再平衡与手动提交
# 1) 启动消费者
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic t_retry \
--group g_retry \
--property print.key=true \
--property print.offset=true
预期效果:消费正常;若消费处理过慢导致再平衡,可能出现 CommitFailedException。
排错命令与解释
# 查看消费者组状态与滞后
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server 127.0.0.1:9092 \
--group g_retry --describe
OffsetOutOfRangeException:根据业务选择--reset-offsets --to-earliest或--to-latestSerializationException:隔离坏消息并写入 DLQ,避免阻塞主消费
位点修复示例
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server 127.0.0.1:9092 \
--group g_retry \
--topic t_retry \
--reset-offsets --to-earliest --execute
3. 业务层重试与DLQ设计示例#
重试拓扑草图
示例:本地重试 + 超限写入 DLQ(伪代码)
for (record : records) {
try {
process(record);
commitOffset(record);
} catch (Exception e) {
if (retryCount < 3) {
retryLater(record); // 延迟重试
} else {
sendToDLQ(record, e.getMessage());
}
}
}
命令:创建重试与死信主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic t_retry_delay --partitions 3 --replication-factor 1
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic t_retry_dlq --partitions 3 --replication-factor 1
4. 监控指标与故障定位#
重点指标
- 生产端:record-error-rate、record-retry-rate、request-latency-avg
- 消费端:records-lag-max、poll-latency-avg、commit-latency-avg
- Broker:UnderReplicatedPartitions、ISR、磁盘/网络利用率
常用运维命令
# Broker 线程与磁盘压力
top -Hp $(pgrep -f kafka.Kafka)
# 查看磁盘利用率
df -h /var/lib/kafka
# 网络延迟与丢包
ping -c 5 127.0.0.1
5. 练习#
1) 超时重试演练:停止 Broker 30 秒后恢复,观察生产端 TimeoutException 与自动恢复发送。
2) 位点超界演练:消费完成后重置到 earliest,再次消费验证历史回放。
3) 大消息演练:设置 max.request.size=512,发送 1KB 消息,观察 RecordTooLargeException 并完成修复。
4) DLQ演练:将特定 key 的消息强制抛错,超过 3 次写入 t_retry_dlq,验证 DLQ 消息可追溯。