10.4.5 常见异常与重试处理

常见异常与重试处理围绕“定位原因—选择策略—控制副作用”展开。目标是让生产端可控重试、消费者可恢复处理,并通过监控与日志快速定位根因。

原理草图(异常链路与重试决策)

文章图片

1. 生产者常见异常与处理示例#

典型异常TimeoutExceptionNotEnoughReplicas/NotEnoughReplicasAfterAppendRecordTooLargeExceptionInvalidTopicExceptionAuthorizationExceptionUnknownTopicOrPartition

关键配置示例(/opt/kafka/config/producer-retry.properties)

bootstrap.servers=127.0.0.1:9092
acks=all
retries=10
retry.backoff.ms=200
delivery.timeout.ms=120000
request.timeout.ms=30000
linger.ms=10
batch.size=32768
enable.idempotence=true
max.in.flight.requests.per.connection=5

命令:使用控制台生产者验证超时与重试

# 1) 创建主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic t_retry --partitions 1 --replication-factor 1

# 2) 启动生产者并使用配置文件
/opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic t_retry \
  --producer.config /opt/kafka/config/producer-retry.properties

预期效果:消息发送成功;如 Broker 停止将触发重试,日志出现 TimeoutException 后在 Broker 恢复时自动恢复发送。

排错命令与解释

# 查看 Broker 端是否有 ISR 不足
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --describe --topic t_retry

# 查看 Broker 日志(超时、ISR不足、权限)
tail -f /opt/kafka/logs/server.log
  • NotEnoughReplicas:提高副本因子或恢复宕机副本,确认 acks=all 是否符合业务要求。
  • RecordTooLargeException:调整生产端 max.request.size 与 Broker message.max.bytes

消息过大示例与修复

# /opt/kafka/config/server.properties
message.max.bytes=2097152
# /opt/kafka/config/producer-retry.properties
max.request.size=2097152

重启 Broker 使配置生效

/opt/kafka/bin/kafka-server-stop.sh
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

2. 消费者常见异常与处理示例#

典型异常CommitFailedExceptionRebalanceInProgressExceptionTimeoutExceptionWakeupExceptionSerializationExceptionOffsetOutOfRangeExceptionRecordTooLargeException

关键配置示例(/opt/kafka/config/consumer-retry.properties)

bootstrap.servers=127.0.0.1:9092
group.id=g_retry
enable.auto.commit=false
max.poll.records=100
max.poll.interval.ms=300000
session.timeout.ms=10000
request.timeout.ms=30000

命令:使用控制台消费者模拟再平衡与手动提交

# 1) 启动消费者
/opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --topic t_retry \
  --group g_retry \
  --property print.key=true \
  --property print.offset=true

预期效果:消费正常;若消费处理过慢导致再平衡,可能出现 CommitFailedException

排错命令与解释

# 查看消费者组状态与滞后
/opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --group g_retry --describe
  • OffsetOutOfRangeException:根据业务选择 --reset-offsets --to-earliest--to-latest
  • SerializationException:隔离坏消息并写入 DLQ,避免阻塞主消费

位点修复示例

/opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server 127.0.0.1:9092 \
  --group g_retry \
  --topic t_retry \
  --reset-offsets --to-earliest --execute

3. 业务层重试与DLQ设计示例#

重试拓扑草图

文章图片

示例:本地重试 + 超限写入 DLQ(伪代码)

for (record : records) {
  try {
    process(record);
    commitOffset(record);
  } catch (Exception e) {
    if (retryCount < 3) {
      retryLater(record); // 延迟重试
    } else {
      sendToDLQ(record, e.getMessage());
    }
  }
}

命令:创建重试与死信主题

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic t_retry_delay --partitions 3 --replication-factor 1

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
  --create --topic t_retry_dlq --partitions 3 --replication-factor 1

4. 监控指标与故障定位#

重点指标
- 生产端:record-error-raterecord-retry-raterequest-latency-avg
- 消费端:records-lag-maxpoll-latency-avgcommit-latency-avg
- Broker:UnderReplicatedPartitionsISR、磁盘/网络利用率

常用运维命令

# Broker 线程与磁盘压力
top -Hp $(pgrep -f kafka.Kafka)

# 查看磁盘利用率
df -h /var/lib/kafka

# 网络延迟与丢包
ping -c 5 127.0.0.1

5. 练习#

1) 超时重试演练:停止 Broker 30 秒后恢复,观察生产端 TimeoutException 与自动恢复发送。
2) 位点超界演练:消费完成后重置到 earliest,再次消费验证历史回放。
3) 大消息演练:设置 max.request.size=512,发送 1KB 消息,观察 RecordTooLargeException 并完成修复。
4) DLQ演练:将特定 key 的消息强制抛错,超过 3 次写入 t_retry_dlq,验证 DLQ 消息可追溯。