当前位置: 代码迷 >> 综合 >> Kafka Controller模块(四):Controller 选举
  详细解决方案

Kafka Controller模块(四):Controller 选举

热度:53   发布时间:2024-02-23 08:57:55.0

集群上所有的 Broker 都在实时监听 ZooKeeper 上的 /controller 节点。我们先看看KafkaController 类

class KafkaController(val config: KafkaConfig, // config:Kafka配置信息,通过它,你能拿到Broker端所有参数的值zkClient: KafkaZkClient, // zkClient:ZooKeeper客户端,Controller与ZooKeeper的所有交互均通过该属性完成time: Time, // time:提供时间服务(如获取当前时间)的工具类metrics: Metrics, // metrics:实现指标监控服务(如创建监控指标)的工具类initialBrokerInfo: BrokerInfo, // initialBrokerInfo:Broker节点信息,包括主机名、端口号,所用监听器等initialBrokerEpoch: Long, // initialBrokerEpoch:Broker Epoch值,用于隔离老Controller发送的请求tokenManager: DelegationTokenManager, // tokenManager:实现Delegation token管理的工具类。Delegation token是一种轻量级的认证机制threadNamePrefix: Option[String] = None) // threadNamePrefix:Controller端事件处理线程名字前缀extends ControllerEventProcessor with Logging with KafkaMetricsGroup {this.logIdent = s"[Controller id=${config.brokerId}] "@volatile private var brokerInfo = initialBrokerInfo@volatile private var _brokerEpoch = initialBrokerEpochprivate val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)// 集群元数据类,保存集群所有元数据val controllerContext = new ControllerContext// Controller端通道管理器类,负责Controller向Broker发送请求var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,stateChangeLogger, threadNamePrefix)// have a separate scheduler for the controller to be able to start and stop independently of the kafka server// visible for testing// 线程调度器,当前唯一负责定期执行Leader重选举private[controller] val kafkaScheduler = new KafkaScheduler(1)// visible for testing// Controller事件管理器,负责管理事件处理线程private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)private val brokerRequestBatch = new ControllerBrokerRequestBatch(config, controllerChannelManager,eventManager, controllerContext, stateChangeLogger)// 副本状态机,负责副本状态转换val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))// 分区状态机,负责分区状态转换val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))// 主题删除管理器,负责删除主题及日志val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,partitionStateMachine, new ControllerDeletionClient(this, zkClient))// Controller节点ZooKeeper监听器private val controllerChangeHandler = new ControllerChangeHandler(eventManager)// Broker数量ZooKeeper监听器private val brokerChangeHandler = new BrokerChangeHandler(eventManager)// Broker信息变更ZooKeeper监听器集合private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty// 主题数量ZooKeeper监听器private val topicChangeHandler = new TopicChangeHandler(eventManager)// 主题删除ZooKeeper监听器private val topicDeletionHandler = new TopicDeletionHandler(eventManager)// 主题分区变更ZooKeeper监听器private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty// 主题分区变更ZooKeeper监听器private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)// Preferred Leader选举ZooKeeper监听器private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)// ISR副本集合变更ZooKeeper监听器, 一旦被触发,就需要获取 ISR 发生变更的分区列表,然后更新 Controller 端对应的 Leader 和 ISR 缓存元数据。private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)// 日志路径变更ZooKeeper监听器,一旦被触发,需要获取受影响的 Broker 列表,然后处理这些 Broker 上失效的日志路径。private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)// 当前Controller所在Broker Id@volatile private var activeControllerId = -1// 离线分区总数@volatile private var offlinePartitionCount = 0// 满足Preferred Leader选举条件的总分区数@volatile private var preferredReplicaImbalanceCount = 0// 总主题数@volatile private var globalTopicCount = 0// 总主题分区数@volatile private var globalPartitionCount = 0// 待删除主题数@volatile private var topicsToDeleteCount = 0//待删除副本数@volatile private var replicasToDeleteCount = 0// 暂时无法删除的主题数@volatile private var ineligibleTopicsToDeleteCount = 0// 暂时无法删除的副本数@volatile private var ineligibleReplicasToDeleteCount = 0
}

ControllerChangeHandler 监听器

        KafkaController 定义了十几种 ZooKeeper 监听器。和 Controller 相关的监听器是 ControllerChangeHandler,用于监听 Controller 的变更。它的定义如下 

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {// ZooKeeper中Controller节点路径,即/controlleroverride val path: String = ControllerZNode.path// 监听/controller节点创建事件,事件队列写入 ControllerChange 事件,处理 ControllerChange 事件,只需要当前 Broker 执行“卸任 Controller”的逻辑即可override def handleCreation(): Unit = eventManager.put(ControllerChange)// 监听/controller节点被删除事件,向事件队列写入 Reelect 事件// Deletion 表明 ZooKeeper 中 /controller 节点不存在了,即 Kafka 集群中的 Controller 暂时空缺了。// 因为它和 Creation 和 DataChange 是不同的状态,需要区别对待,因此,Reelect 事件做的事情要比 ControllerChange 的多override def handleDeletion(): Unit = eventManager.put(Reelect)// 监听/controller节点数据变更事件,事件队列写入 ControllerChange 事件,处理 ControllerChange 事件,只需要当前 Broker 执行“卸任 Controller”的逻辑即可override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

Controller 选举流程

可能触发 Controller 选举的三个场景:

  • 集群从零启动时;
  • Broker 侦测 /controller 节点消失时;
  • Broker 侦测到 /controller 节点数据发生变更时。

场景一:集群从零启动

        集群首次启动时,Controller 尚未被选举出来。于是,Broker 启动后,首先将 Startup 这个 ControllerEvent 写入到事件队列中,然后启动对应的事件处理线程和 ControllerChangeHandler ZooKeeper 监听器,最后依赖事件处理线程进行 Controller 的选举。

  def startup() = {// 第1步:注册ZooKeeper状态变更监听器,它是用于监听Zookeeper会话过期的zkClient.registerStateChangeHandler(new StateChangeHandler {override val name: String = StateChangeHandlers.ControllerHandleroverride def afterInitializingSession(): Unit = {eventManager.put(RegisterBrokerAndReelect)}override def beforeInitializingSession(): Unit = {val queuedEvent = eventManager.clearAndPut(Expire)// Block initialization of the new session until the expiration event is being handled,// which ensures that all pending events have been processed before creating the new sessionqueuedEvent.awaitProcessing()}})// 第2步:写入Startup事件到事件队列eventManager.put(Startup)// 第3步:启动ControllerEventThread线程,开始处理事件队列中的ControllerEventeventManager.start()}

KafkaController 的 process 方法处理 Startup 事件

// KafkaController的process方法,
override def process(event: ControllerEvent): Unit = {try {event match {......case Startup =>processStartup() // 处理Startup事件}}......
}
private def processStartup(): Unit = {// 注册ControllerChangeHandler ZooKeeper监听器zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)// 执行Controller选举elect()
}

场景二:/controller 节点消失
所有检测到 /controller 节点消失的 Broker,都会立即调用 elect 方法执行竞选逻辑。

场景三:/controller 节点数据变更
Controller“易主”了,这就分为两种情况:

  • 如果 Broker 之前是 Controller,那么该 Broker 需要首先执行卸任操作,然后再尝试竞选;
  • 如果 Broker 之前不是 Controller,那么,该 Broker 直接去竞选新 Controller。
  private def maybeResign(): Unit = {// 非常关键的一步!这是判断是否需要执行卸任逻辑的重要依据!// 判断该Broker之前是否是Controllerval wasActiveBeforeChange = isActive// 注册ControllerChangeHandler监听器zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)// 获取当前集群Controller所在的Broker Id,如果没有Controller则返回-1activeControllerId = zkClient.getControllerId.getOrElse(-1)// 如果该Broker之前是Controller但现在不是了if (wasActiveBeforeChange && !isActive) {onControllerResignation() // 执行卸任逻辑}}

        卸任逻辑是由 onControllerResignation 方法执行的,它主要是用于清空各种数据结构的值、取消 ZooKeeper 监听器、关闭各种状态机以及管理器,等等

  private def onControllerResignation(): Unit = {debug("Resigning")// de-register listeners// 取消ZooKeeper监听器的注册zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)// shutdown leader rebalance scheduler// 关闭Kafka线程调度器,其实就是取消定期的Leader重选举kafkaScheduler.shutdown()// 将统计字段全部清0offlinePartitionCount = 0preferredReplicaImbalanceCount = 0globalTopicCount = 0globalPartitionCount = 0topicsToDeleteCount = 0replicasToDeleteCount = 0ineligibleTopicsToDeleteCount = 0ineligibleReplicasToDeleteCount = 0// stop token expiry check scheduler// 关闭Token过期检查调度器if (tokenCleanScheduler.isStarted)tokenCleanScheduler.shutdown()// de-register partition ISR listener for on-going partition reassignment task// 取消分区重分配监听器的注册unregisterPartitionReassignmentIsrChangeHandlers()// shutdown partition state machine// 关闭分区状态机partitionStateMachine.shutdown()// 取消主题变更监听器的注册zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)// 取消分区变更监听器的注册unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)// 取消主题删除监听器的注册zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)// shutdown replica state machine// 关闭副本状态机replicaStateMachine.shutdown()// 取消Broker变更监听器的注册zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)// 关闭Controller通道管理器controllerChannelManager.shutdown()// 清空集群元数据controllerContext.resetContext()info("Resigned")}

选举逻辑

  private def elect(): Unit = {// 第1步:获取当前Controller所在Broker的序号,如果Controller不存在,显式标记为-1activeControllerId = zkClient.getControllerId.getOrElse(-1)/** We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,* it's possible that the controller has already been elected when we get here. This check will prevent the following* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.*/// 第2步:如果当前Controller已经选出来了,直接返回即可if (activeControllerId != -1) {debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")return}try {// 第3步:注册Controller相关信息// 主要是创建/controller节点val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)controllerContext.epoch = epochcontrollerContext.epochZkVersion = epochZkVersionactiveControllerId = config.brokerIdinfo(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +s"and epoch zk version is now ${controllerContext.epochZkVersion}")// 第4步:执行当选Controller的后续逻辑onControllerFailover()} catch {case e: ControllerMovedException =>maybeResign()if (activeControllerId != -1)debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)elsewarn("A controller has been elected but just resigned, this will result in another round of election", e)case t: Throwable =>error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +s"Trigger controller movement immediately", t)triggerControllerMove()}}

 

  相关解决方案