Kafka 源码解析:Broker 节点的启动与关闭

从本篇开始我们分析 kafka 的服务端组件实现,Kafka 集群由多个 broker 节点构成,每一个节点上都运行着一个 kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。

Kafka 提供了 kafka-server-start.sh 脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka 类来启动 kafka 服务,这也是 kafka 整个服务端的驱动类。在 kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup 方法启动服务。

Kafka 驱动类的 main 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main(args: Array[String]): Unit = {
try {
// 解析命令行参数
val serverProps = getPropsFromArgs(args)
// 创建 kafkaServerStartable 对象,期间会初始化监控上报程序
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

// 注册一个钩子方法,当 JVM 被关闭时执行 shutdown 逻辑,本质上是在执行 KafkaServer#shutdown 方法
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
kafkaServerStartable.shutdown()
}
})

// 本质上调用的是 KafkaServer#startup 方法
kafkaServerStartable.startup()
// 阻塞等待 kafka server 运行线程关闭
kafkaServerStartable.awaitShutdown()
} catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}

KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class KafkaServer(val config: KafkaConfig, // 配置信息对象
time: Time = Time.SYSTEM, // 时间戳工具
threadNamePrefix: Option[String] = None,
kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() // 监控上报程序
) extends Logging with KafkaMetricsGroup {

/** 标识节点已经启动完成 */
private val startupComplete = new AtomicBoolean(false)
/** 标识节点正在执行关闭操作 */
private val isShuttingDown = new AtomicBoolean(false)
/** 标识节点正在执行启动操作 */
private val isStartingUp = new AtomicBoolean(false)
/** 阻塞主线程等待 KafkaServer 的关闭 */
private var shutdownLatch = new CountDownLatch(1)
/** 记录 broker 节点的当前状态 */
val brokerState: BrokerState = new BrokerState
/** Api 接口类,用于分发各种类型的请求 */
var apis: KafkaApis = _
/** 权限控制相关 */
var authorizer: Option[Authorizer] = None
var credentialProvider: CredentialProvider = _
/** 网络 socket 服务 */
var socketServer: SocketServer = _
/** 简单的连接池实现,用于管理所有的 KafkaRequestHandler */
var requestHandlerPool: KafkaRequestHandlerPool = _
/** 日志数据管理 */
var logManager: LogManager = _
/** 管理当前 broker 节点上的分区副本 */
var replicaManager: ReplicaManager = _
/** topic 增删管理 */
var adminManager: AdminManager = _
/** 动态配置管理 */
var dynamicConfigHandlers: Map[String, ConfigHandler] = _
var dynamicConfigManager: DynamicConfigManager = _
/** group 协调管理组件 */
var groupCoordinator: GroupCoordinator = _
/** 集群控制组件 */
var kafkaController: KafkaController = _
/** 定时任务调度器 */
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
/** broker 节点活跃性检查 */
var kafkaHealthcheck: KafkaHealthcheck = _
/** broker 缓存整个集群中全部分区的状态信息 */
var metadataCache: MetadataCache = _
/** ZK 操作工具类 */
var zkUtils: ZkUtils = _

// ... 省略方法定义

}

在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown 方法实现,在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch 字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown 方法只是简单的调用了 CountDownLatch#await 方法来阻塞主线程。当 KafkaServer#shutdown 方法执行完成后,会调用 CountDownLatch#countDown 方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。

一. 服务启动过程分析

方法 KafkaServer#shutdown 的实现我们稍后进行分析,下面首先看一下 kafka 服务的启动过程,即 KafkaServer#startup 方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤进行进一步分析:

  1. 运行状态校验,如果当前 broker 节点正在执行关闭操作,则此时不允许再次启动服务,所以抛出异常;如果当前服务已经启动完成,即处于运行状态,则直接返回,不需要重复启动;否则设置正在启动标记;
  2. 设置当前 broker 节点的状态为 Starting,标识 broker 节点正在启动;
  3. 初始化定时任务调度器 KafkaScheduler;
  4. 创建 ZkUtils 工具类对象,用于操作 ZK,期间会在 ZK 上创建一些基本的节点;
  5. 从 ZK 上获取当前 broker 所属集群的 clusterId,如果不存在则创建一个;
  6. 获取当前 broker 节点的 brokerId;
  7. 初始化一些监控相关的配置;
  8. 创建并启动 LogManager,用于管理记录在本地的日志数据;
  9. 创建 MetadataCache 对象,用于为当前 broker 节点缓存整个集群中全部分区的状态信息;
  10. 创建并启动 SocketServer,用于接收并处理来自客户端和其他 broker 节点的请求;
  11. 创建并启动 ReplicaManager,用于管理当前 broker 节点上的分区副本信息;
  12. 创建并启动 KafkaController,每个 broker 节点都会创建并启动一个 KafkaController 实例,但是只有一个 broker 会成为 leader 角色,负责管理集群中所有的分区和副本的状态,也是集群与 ZK 进行交互的使者;
  13. 创建并启动 GroupCoordinator,负责管理分配给当前 broker 节点的消费者 group 的一个子集;
  14. 创建并初始化 Authorizer 对象,用于权限管理;
  15. 创建 KafkaApis 对象,用于分发接收到的各种类型请求;
  16. 创建 KafkaRequestHandlerPool 线程池对象,用于管理所有 KafkaRequestHandler 线程;
  17. 创建并启动动态配置管理器,用于监听 ZK 的变更;
  18. 将自己的 brokerId 注册到 ZK 中(/brokers/ids/{brokerId} 路径,临时节点),用于标记当前 broker 节点是否存活;
  19. 设置当前 broker 节点的状态为 RunningAsBroker,表示当前 broker 节点已经启动完成,可以对外提供服务;
  20. 更新相关状态标记,标识当前节点的 kafka 服务启动完成。

下面针对上述流程中的 2、3、4 和 6 几个步骤进行进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。

首先来看一下 步骤 2,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:

1
2
3
4
5
6
7
8
sealed trait BrokerStates { def state: Byte }

case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }

关于每种状态的解释和状态转换图如下:

  • NotRunning:初始状态,标识当前 broker 节点未运行。
  • Starting:标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown:标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RunningAsBroker:标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown:标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:标识当前 broker 节点正在执行 shutdown 操作。
image

所谓 controlled shutdown,实际上是 kafka 提供的一种友好的关闭 broker 节点的机制,除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。

下面继续来分析一下 步骤 3,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:

1
2
3
4
5
6
trait Scheduler {
def startup()
def shutdown()
def isStarted: Boolean
def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
}

其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。

步骤 4 调用了 KafkaServer#initZk 方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk 会基于 zookeeper.connect 配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:

  • /brokers/ids/{id}: 记录集群中可用的 broker 的 ID。
  • /brokers/topics/{topic}/partitions: 记录一个 topic 中所有分区的分配信息,以及 AR 集合。
  • /brokers/topics/{topic}/partitions/{partition_id}/state: 记录分区 leader 副本所在的 broker 节点 ID、年代信息、ISR 集合,以及 zkVersion 等。
  • /controller: 记录集群 controller leader 所在 broker 节点的 ID。
  • /controller_epoch: 记录集群 controller leader 的年代信息。
  • /admin/reassign_partitions: 记录需要执行副本重新分配的分区。
  • /admin/preferred_replica_election: 记录需要进行优先副本选举的分区,优先副本是在创建分区时指定的第一个副本。
  • /admin/delete_topics: 记录待删除的 topic 集合。
  • /isr_change_notification: 记录一段时间内 ISR 集合发生变化的分区。
  • /config: 记录一些配置信息。

最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 kafka 服务之前,可以在配置中通过 broker.id 配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private def getBrokerId: Int = {
// 获取配置的 brokerId
var brokerId = config.brokerId
val brokerIdSet = mutable.HashSet[Int]()

// 遍历 log.dirs 配置的 log 目录列表
for (logDir <- config.logDirs) {
// 在每一个 log 目录下面创建一个 meta.properties 文件,内容包含当前 broker 节点的 ID 和版本信息
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
brokerMetadataOpt.foreach { brokerMetadata =>
brokerIdSet.add(brokerMetadata.brokerId)
}
}

if (brokerIdSet.size > 1) {
// 不允许多个 broker 节点共享同一个 log 目录
// ... 抛出 InconsistentBrokerIdException 异常,略
} else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) {
// 配置的 brokerId 与 meta.properties 中记录的 brokerId 不一致
// ... 抛出 InconsistentBrokerIdException 异常,略
} else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) {
// 如果没有配置,则自动创建 brokerId,通过 ZK 保证 brokerId 的全局唯一性
brokerId = generateBrokerId
} else if (brokerIdSet.size == 1) {
// 从 meta.properties 中获取 brokerId
brokerId = brokerIdSet.last
}

brokerId
}

在 broker 节点的每个 log 目录下有一个 meta.properties 文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 kafka 可以通过该文件约束 broker.id 配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。

二. 服务关闭过程分析

Broker 节点在关闭对应的 kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def shutdown() {
try {
info("shutting down")

// 如果正在启动,则不允许关闭
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")

if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown())
// 设置 broker 状态为 BrokerShuttingDown,表示当前 broker 正在执行关闭操作
brokerState.newState(BrokerShuttingDown)

/* 依次关闭相应注册的组件 */

if (socketServer != null) CoreUtils.swallow(socketServer.shutdown())
if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown())
CoreUtils.swallow(kafkaScheduler.shutdown())
if (apis != null) CoreUtils.swallow(apis.close())
CoreUtils.swallow(authorizer.foreach(_.close()))
if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown())
if (adminManager != null) CoreUtils.swallow(adminManager.shutdown())
if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown())
if (logManager != null) CoreUtils.swallow(logManager.shutdown())
if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown())
if (zkUtils != null) CoreUtils.swallow(zkUtils.close())
if (metrics != null) CoreUtils.swallow(metrics.close())

// 设置 broker 状态为 NotRunning,表示关闭成功
brokerState.newState(NotRunning)

// 设置状态标记
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
shutdownLatch.countDown()
info("shut down completed")
}
} catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}

整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行分析。

三. 总结

本文我们主要分析了 kafka 服务启动和关闭的过程,Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性分析。


转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.