10.1.3 Broker、Controller与元数据管理
Broker 是 Kafka 集群中的核心节点,负责接收生产者写入、为消费者提供读取、存储分区日志以及处理副本同步。每个 Broker 维护本地分区日志、索引与段文件,并通过网络线程、I/O 线程、请求队列实现高并发读写。分区的 Leader 在某个 Broker 上承担写入与读出,Follower 负责拉取并同步数据,形成副本冗余。
Controller 是集群的“协调者”,在多个 Broker 中选举产生,负责分区 Leader 选举、副本状态变更、ISR 管理、Broker 上下线处理等关键控制逻辑。Controller 发生切换时,需重新加载元数据并触发必要的重分配与选举,短时间内可能出现元数据传播延迟。
元数据管理用于描述集群的拓扑与分区状态,包括主题、分区、副本列表、ISR、Leader、配置项等信息。传统模式下元数据存储在 ZooKeeper,Broker 启动后会读取元数据并缓存;新版 KRaft 模式由 Kafka 自身管理元数据日志。元数据更新流程通常为:控制层检测事件 → 更新元数据 → 广播到 Broker → 客户端刷新缓存。
安装与最小集群示例(KRaft 模式,3 Broker)#
以下示例在单机上模拟 3 个 Broker,便于理解 Controller 与元数据管理流程。
# 1) 下载并解压
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -zxvf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
# 2) 生成 KRaft 集群ID
KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
echo $KAFKA_CLUSTER_ID
# 3) 准备三份配置
mkdir -p /data/kafka/{k1,k2,k3}
cp config/kraft/server.properties /data/kafka/k1/server.properties
cp config/kraft/server.properties /data/kafka/k2/server.properties
cp config/kraft/server.properties /data/kafka/k3/server.properties
编辑 /data/kafka/k1/server.properties(k2/k3 修改 node.id、端口、日志目录):
# /data/kafka/k1/server.properties
node.id=1
process.roles=broker,controller
controller.quorum.voters=1@127.0.0.1:19091,2@127.0.0.1:19092,3@127.0.0.1:19093
listeners=PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:19091
inter.broker.listener.name=PLAINTEXT
log.dirs=/data/kafka/k1/logs
num.partitions=3
# 4) 格式化存储
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /data/kafka/k1/server.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /data/kafka/k2/server.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /data/kafka/k3/server.properties
# 5) 启动三个 Broker
bin/kafka-server-start.sh -daemon /data/kafka/k1/server.properties
bin/kafka-server-start.sh -daemon /data/kafka/k2/server.properties
bin/kafka-server-start.sh -daemon /data/kafka/k3/server.properties
预期效果:任意一个 Broker 会成为 Controller,集群元数据可被查询。
元数据与 Controller 观察示例#
# 查看当前 Controller
bin/kafka-metadata-quorum.sh --bootstrap-server 127.0.0.1:9092 describe
# 查看主题、分区、Leader 与 ISR
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe
# 创建主题并观察分区副本
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 \
--create --topic demo --partitions 3 --replication-factor 3
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic demo
命令解释
- kafka-metadata-quorum.sh describe:显示当前 Controller 与投票节点状态。
- kafka-topics.sh --describe:列出每个分区的 Leader 与 ISR,用于判断副本同步状态。
- --replication-factor 3:每个分区 3 副本,对应 3 个 Broker。
Broker 负载与分区迁移示例#
当热点分区集中在少数 Broker 上时,可以通过分区重分配平衡负载。
# 1) 生成迁移计划
cat > /tmp/reassign.json << 'EOF'
{
"topics": [{"topic": "demo"}],
"version": 1
}
EOF
bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 \
--generate --topics-to-move-json-file /tmp/reassign.json \
--broker-list "1,2,3"
# 2) 执行迁移(将上一步输出的提案复制到此处)
cat > /tmp/reassign-exec.json << 'EOF'
{
"version":1,
"partitions":[
{"topic":"demo","partition":0,"replicas":[1,2,3]},
{"topic":"demo","partition":1,"replicas":[2,3,1]},
{"topic":"demo","partition":2,"replicas":[3,1,2]}
]
}
EOF
bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 \
--execute --reassignment-json-file /tmp/reassign-exec.json
# 3) 查看迁移状态
bin/kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 \
--verify --reassignment-json-file /tmp/reassign-exec.json
常见排错与定位#
1) 找不到 Leader 或 ISR 缩小
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic demo
# 观察 Leader=-1 或 ISR 仅剩 1 个 Broker
- 排查点:Broker 是否下线、网络是否抖动、磁盘是否满。
2) Controller 频繁切换
# 查看日志
grep -E "Controller|Election" /data/kafka/k1/logs/server.log | tail -n 50
- 排查点:节点资源耗尽、GC 过长、时钟偏移、控制通道不稳定。
3) 元数据不一致或客户端缓存旧
# 客户端强制刷新元数据(示例:kafka-console-producer)
bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic demo
- 排查点:客户端版本与 Broker 版本不兼容,或 DNS/IP 变化未更新。
关键运维关注点#
- Broker 负载均衡:避免热点分区集中在少数节点,影响写入与消费性能。
- Controller 稳定性:监控频繁切换,排查网络抖动、资源瓶颈与选举超时。
- 元数据一致性:关注 ISR 过小、Leader 频繁迁移、分区“无 Leader”。
- 客户端与 Broker 版本兼容:确保元数据协议一致,避免升级时缓存失效问题。
练习#
1) 创建 3 个分区、3 副本的主题 demo,观察 Leader 与 ISR。
2) 手动停止一个 Broker,观察 ISR 变化并记录恢复后 ISR 回补时间。
3) 执行一次分区重分配,验证迁移状态与最终 Leader 分布是否均衡。
4) 通过调整 num.network.threads 和 num.io.threads(在配置文件中),观察高并发写入时的延迟变化。