MetadataCache 是指 Broker 上的元数据缓存,这些数据是 Controller 通过 UpdateMetadataRequest 请求发送给 Broker 的。换句话说,Controller 实现了一个异步更新机制,能够将最新的集群信息广播给所有 Broker,Kafka 通过异步更新机制来保证所有 Broker 上的元数据缓存实现最终一致性。
每台 Broker 上都要保存这份相同的数据有两个原因。
- 保存了这部分数据,Broker 就能够及时响应客户端发送的元数据请求,也就是处理 Metadata 请求。Metadata 请求是为数不多的能够被集群任意 Broker 处理的请求类型之一,也就是说,客户端程序能够随意地向任何一个 Broker 发送 Metadata 请求,去获取集群的元数据信息,这完全得益于 MetadataCache 的存在。
- Kafka 的一些重要组件会用到这部分数据。比如副本管理器会使用它来获取 Broker 的节点信息,事务管理器会使用它来获取分区 Leader 副本的信息,等等。
MetadataCache的定义如下:
class MetadataCache(brokerId: Int) extends Logging {// 保护它写入的锁对象private val partitionMetadataLock = new ReentrantReadWriteLock()//this is the cache state. every MetadataSnapshot instance is immutable, and updates (performed under a lock)//replace the value with a completely new one. this means reads (which are not under any lock) need to grab//the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.//multiple reads of this value risk getting different snapshots.// 保存了实际的元数据信息@volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)// 仅仅用于日志输出this.logIdent = s"[MetadataCache brokerId=$brokerId] "pr
上面的 metadataSnapshot 字段保存了实际的元数据信息,它的定义如下
case class MetadataSnapshot(// 这是一个 Map 类型。Key 是主题名称,Value 又是一个 Map 类型,其 Key 是分区号,Value 是一个 UpdateMetadataPartitionState 类型的字段。// UpdateMetadataPartitionState 类型是 UpdateMetadataRequest 请求内部所需的数据结构。partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],controllerId: Option[Int], // Controller 所在 Broker 的 ID。aliveBrokers: mutable.LongMap[Broker], // 当前集群中所有存活着的 Broker 对象列表。aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) // 也是一个 Map 的 Map 类型。其 Key 是 Broker ID 序号,Value 是 Map 类型,其 Key 是 ListenerName,// 即 Broker 监听器类型,而 Value 是 Broker 节点对象。
其中UpdateMetadataPartitionState的定义如下:
static public class UpdateMetadataPartitionState implements Message {private String topicName; // 主题名称private int partitionIndex; // 分区号private int controllerEpoch; // Controller Epoch值private int leader; // Leader副本所在Broker IDprivate int leaderEpoch; // Leader Epoch值private List<Integer> isr; // ISR列表private int zkVersion; // ZooKeeper节点Stat统计信息中的版本号private List<Integer> replicas; // 副本列表private List<Integer> offlineReplicas; // 离线副本列表private List<RawTaggedField> _unknownTaggedFields; // 未知字段列表
}
下面看看MetadataCache 类的重要方法。最重要的方法就是操作 metadataSnapshot 字段的方法
我把 MetadataCache 类的方法大致分为三大类:判断类;获取类;更新类。
判断类方法
判断给定主题或主题分区是否包含在元数据缓存中的方法,比如:
// 判断给定主题是否包含在元数据缓存中def contains(topic: String): Boolean = {metadataSnapshot.partitionStates.contains(topic)}
// 判断给定主题分区是否包含在元数据缓存中def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
获取类方法
// 获取给定主题分区的详细数据信息。如果没有找到对应记录,返回Nonedef getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {metadataSnapshot.partitionStates.get(topic).flatMap(_.get(partitionId))}
// 返回当前集群元数据缓存中的所有主题。private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {snapshot.partitionStates.keySet}
//参数为主题分区和 ListenerName,以获取指定监听器类型下该主题分区所有副本的 Broker 节点对象def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {// 使用局部变量获取当前元数据缓存val snapshot = metadataSnapshot// 获取给定主题分区的数据snapshot.partitionStates.get(tp.topic).flatMap(_.get(tp.partition)).map { partitionInfo =>val replicaIds = partitionInfo.replicasreplicaIds.asScala.map(replicaId => replicaId.intValue() -> {// 获取副本所在的Broker Idsnapshot.aliveBrokers.get(replicaId.longValue()) match {case Some(broker) =>// 根据Broker Id去获取对应的Broker节点对象broker.getNode(listenerName).getOrElse(Node.noNode())case None =>// 如果找不到节点Node.noNode()}}).toMap.filter(pair => pair match {case (_, node) => !node.isEmpty})}.getOrElse(Map.empty[Int, Node])}
更新类方法
updateMetadata 方法的主要逻辑,就是读取 UpdateMetadataRequest 请求中的分区数据,然后更新本地元数据缓存。
// 基于 UpdateMetadataRequest 请求更新每个分区的状态信息,并返回需要被移除的分区集合def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {inWriteLock(partitionMetadataLock) {// 第一部分,给后面的操作准备数据,即 aliveBrokers 和 aliveNodes 两个字段中保存的数据。// 保存存活Broker对象。Key是Broker ID,Value是Broker对象val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)// 保存存活节点对象。Key是Broker ID,Value是监听器->节点对象val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)// 从UpdateMetadataRequest中获取Controller所在的Broker ID// 如果当前没有Controller,赋值为Noneval controllerId = updateMetadataRequest.controllerId match {case id if id < 0 => Nonecase id => Some(id)}// 遍历UpdateMetadataRequest请求中的所有存活Broker对象updateMetadataRequest.liveBrokers.asScala.foreach { broker =>// `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which// is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could// move to `AnyRefMap`, which has comparable performance.val nodes = new java.util.HashMap[ListenerName, Node]val endPoints = new mutable.ArrayBuffer[EndPoint]// 遍历它的所有EndPoint类型,也就是为Broker配置的监听器broker.endpoints.asScala.foreach { ep =>val listenerName = new ListenerName(ep.listener)endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol))// 将<监听器,Broker节点对象>对保存起来nodes.put(listenerName, new Node(broker.id, ep.host, ep.port))}// 将Broker加入到存活Broker对象集合aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))// 将Broker节点加入到存活节点对象集合aliveNodes(broker.id) = nodes.asScala}// 确保集群 Broker 配置了相同的监听器,同时初始化已删除分区数组对象,等待下一部分代码逻辑对它进行操作。// 使用上一部分中的存活Broker节点对象,// 获取当前Broker所有的<监听器,节点>对aliveNodes.get(brokerId).foreach { listenerMap =>val listeners = listenerMap.keySet// 如果发现当前Broker配置的监听器与其他Broker有不同之处,记录错误日志if (!aliveNodes.values.forall(_.keySet == listeners))error(s"Listeners are not identical across brokers: $aliveNodes")}// 构造已删除分区数组,将其作为方法返回结果val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]// UpdateMetadataRequest请求没有携带任何分区信息if (!updateMetadataRequest.partitionStates.iterator.hasNext) {// 构造新的MetadataSnapshot对象,使用之前的分区信息和新的Broker列表信息metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes)} else {// 第三部分:提取 UpdateMetadataRequest 请求中的数据,然后填充元数据缓存//since kafka may do partial metadata updates, we start by copying the previous state// 备份现有元数据缓存中的分区数据val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)copy ++= oldPartitionStatespartitionStates += (topic -> copy)}updateMetadataRequest.partitionStates.asScala.foreach { info =>val controllerId = updateMetadataRequest.controllerIdval controllerEpoch = updateMetadataRequest.controllerEpochval tp = new TopicPartition(info.topicName, info.partitionIndex)// 如果分区处于被删除过程中if (info.leader == LeaderAndIsr.LeaderDuringDelete) {// 将分区从元数据缓存中移除removePartitionInfo(partitionStates, tp.topic, tp.partition)stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")// 将分区加入到返回结果数据deletedPartitions += tp} else {// 将分区加入到元数据缓存addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")}}// 使用更新过的分区元数据,和第一部分计算的存活Broker列表及节点列表,构建最新的元数据缓存metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)}// 返回已删除分区列表数组deletedPartitions}}