Kafka 源码解析:消费者运行机制

与上一篇介绍的 KafkaProducer 一样,Kafka 消费者 KafkaConsumer 同样是 kafka 与开发者交互的媒介之一,负责从 kafka 集群拉取消息给应用程序消费,并提交已经消费完成的 offset 值。此外,考虑到消费者上下线的场景,以及 topic 分区数目变更的需求,KafkaConsumer 还需要负责与服务端交互执行分区再分配操作,以保证消费者能够更加均衡的消费 topic 分区,从而提升消费的性能。

Kafka 定义了 group 的概念,将多个消费者实例组织成为一个 group,以丰富 kafka 的应用场景。一个 group 名下可以包含任意数量的消费者实例,并从这些消费者中选择一个消费者担任 group 中的 leader 消费者角色,负责管理 group 和其它 follower 角色的消费者的状态。当有消费者加入或离开当前 group 时,Group leader 会依据集群确定的分区分配策略,为 group 名下所有消费者重新分配分区,以保证消息消费的均衡性。

本文我们同样先回忆一下 KafkaConsumer 的使用方式,然后重点分析消息的拉取过程、分区再分配机制,以及 offset 提交机制。

一. KafkaConsumer 使用示例

我们仍然以 kafka 内置的 java 客户端为例,介绍如何消费 kafka 中的消息。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_GROUP);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singleton(DEFAULT_TOPIC));

while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<Integer, String> records = consumer.poll(TimeUnit.SECONDS.toMillis(1));
try {
for (final ConsumerRecord<Integer, String> record : records) {
// ... 这里是消费逻辑
this.printResult(record);
}
// 异步提交
consumer.commitAsync();
} catch (Throwable e) {
// ... 异常
} finally {
// 同步提交
consumer.commitSync();
}
}

示例中消费消息依赖于 KafkaConsumer 对象,KafkaConsumer 类也是我们分析消费者运行机制的入口。创建 KafkaConsumer 类对象时我们需要指定 kafka 集群地址,以及消息 key 和 value 的反序列化器。接着我们可以循环调用 KafkaConsumer#poll 方法从 kafka 集群拉取消息进行消费,该方法接收一个 timeout 参数,用于设置等待消息返回的超时时间,如果设置为 0 则 kafka 会立即从本地缓存的消息集合中获取符合期望的结果进行返回。

读者可能对 timeout=0 的设置有些疑惑,认为这样的参数设置意义不大,因为一次网络请求多少都有时间上开销,这样的理解也是没有错的。但是后面在分析消息的消费过程时你将会看到,实际从集群拉取当前请求的消息的过程并不是在调用 poll 方法之后完成的。Kafka 为了性能考虑,在返回消息之前已经发送了下一次拉取消息的请求,这样处理消息的过程与请求下一轮消息的过程就是并行执行的。如果网络足够快,或者处理消息的逻辑足够耗时,则设置 timeout=0 是完全能够平滑工作的。

示例中我们关闭了 offset 的自动提交策略,并在正常运行过程中启用异步提交来提升性能,只有当出现异常的情况下才会使用同步提交,以防止 offset 丢失。

二. 消息消费过程分析

我们以 KafkaConsumer 类为入口开始分析消费者的运行机制,首先来看一下 KafkaConsumer 类的字段定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class KafkaConsumer<K, V> implements Consumer<K, V> {

/** 客户端 ID 生成器 */
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
/** 客户端 ID */
private final String clientId;
/** 控制消费者与 GroupCoordinator 之间交互 */
private final ConsumerCoordinator coordinator;
/** key 反序列化器 */
private final Deserializer<K> keyDeserializer;
/** value 反序列化器 */
private final Deserializer<V> valueDeserializer;
/** 负责从服务端拉取消息 */
private final Fetcher<K, V> fetcher;
/** 拦截器集合, 在方法返回给用户之前进行拦截修改 */
private final ConsumerInterceptors<K, V> interceptors;
/** 时间戳工具 */
private final Time time;
/** 集群网络通信客户端,对 NetworkClient 的封装 */
private final ConsumerNetworkClient client;
/** 维护消费者的消费状态 */
private final SubscriptionState subscriptions;
/** 集群元数据 */
private final Metadata metadata;
/** 重试间隔 */
private final long retryBackoffMs;
/** 请求超时时间 */
private final long requestTimeoutMs;
/** 标识当前消费者是否关闭 */
private volatile boolean closed = false;
/** 记录当前正在使用 KafkaConsumer 的线程 ID,防止多个线程同时使用同一个 KafkaConsumer 对象 */
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
/** 记录线程重入次数 */
private final AtomicInteger refcount = new AtomicInteger(0);

// ... 省略方法定义

}

KafkaConsumer 对象的构造的过程比较简单,这里不再展开。

2.1 订阅主题

在使用 KafkaConsumer 消费服务端消息之前,我们首先需要调用 KafkaConsumer#subscribe 方法订阅 topic 列表,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 防止一个 KafkaConsumer 对象被多个线程同时使用,以保证线程安全
this.acquire();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// 如果传递空的 topic 订阅列表,则视为解除订阅
this.unsubscribe();
} else {
// ... 校验输入的 topic 不为 null 或空,如果是则抛出 IllegalArgumentException 异常,省略

// 订阅当前 topic 列表
subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
// 线程重入计数 refcount 减 1,如果 refcount = 0,则标记当前 KafkaConsumer 对象没有线程占用
this.release();
}
}

在开始订阅 topic 之前,会先校验 KafkaConsumer 对象是否被多个线程占用,我们知道 KafkaConsumer 不是线程安全的,KafkaConsumer 设置了两个字段 KafkaConsumer#currentThreadKafkaConsumer#refcount 用于控制访问当前 KafkaConsumer 对象的线程 ID 和线程重入次数。其中 currentThread 用于记录持有当前 KafkaConsumer 对象的线程 ID,refcount 则表示该线程的重入次数。对于这 2 个变量的控制,KafkaConsumer 一般会使用下面这样的模板代码:

1
2
3
4
5
6
this.acquire();
try {
// do somthing here
} finally {
this.release();
}

其中 KafkaConsumer#acquire 方法可以类比理解为加锁,而 KafkaConsumer#release 方法可以类比理解为释放锁。我们先来看一下 KafkaConsumer#acquire 方法,该方法首先会验证当前 KafkaConsumer 对象是否被关闭,如果没有被关闭则会继续验证当前操作线程是否是已经持有该 KafkaConsumer 对象的线程,如果不是且当前 KafkaConsumer 对象被其它线程持有,则会抛出异常,否则将重入计数 refcount 加 1。以此来保证一个 KafkaConsumer 对象在同一时段只能被同一个线程持有,但是允许同一个线程多次持有。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
private void acquire() {
// 检测当前 consumer 是否关闭,如果关闭则抛出异常
this.ensureNotClosed();
long threadId = Thread.currentThread().getId();
// 如果存在多个线程使用同一个 consumer 对象,则抛出异常
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
}
// 线程重入次数加 1
refcount.incrementAndGet();
}

再来看一下 KafkaConsumer#release 方法的实现(如下),该方法逻辑比较简单,将重入计数 refcount 减 1,意味着本次线程退出当前临界区,如果重入计数为 0,则清空 currentThread,以允许其它线程获取锁。

1
2
3
4
5
6
7
private void release() {
// 线程重入次数减 1
if (refcount.decrementAndGet() == 0) {
// 如果当前线程重入次数为 0,则表示当前 KafkaConsumer 对象没有线程占用
currentThread.set(NO_CURRENT_THREAD);
}
}

在保证线程安全的前提下,方法 KafkaConsumer#subscribe 会对传递的 topic 集合进行校验,如果当前传递的 topic 集合为空,则视为取消订阅。取消订阅主要做了 2 件事情:

  1. 清空本地订阅的 topic 集合、清除本地记录的每个 topic 分区的消费状态,以及重置一些本地的变量。
  2. 构建并发送 LeaveGroupRequest 请求,告知服务端自己已经离开当前的 group。

如果传递的 topic 集合不为空,则会调用 SubscriptionState#subscribe 方法订阅指定 topic 集合,实现如下:

1
2
3
4
5
6
7
8
9
10
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null) {
throw new IllegalArgumentException("RebalanceListener cannot be null");
}
// 设置 topic 订阅模式为 AUTO_TOPICS
this.setSubscriptionType(SubscriptionType.AUTO_TOPICS);
this.listener = listener;
// 更新本地缓存的订阅的信息
this.changeSubscription(topics);
}

同一个 KafkaConsumer 对象订阅主题的模式有 3 种,定义在 SubscriptionType 枚举类中(其中 NONE 指代未订阅任何主题):

1
2
3
4
5
6
7
8
9
private enum SubscriptionType {
NONE,
/** 按照指定的 topic 的名字进行订阅,自动分配分区 */
AUTO_TOPICS,
/** 按照正则匹配 topic 名称进行订阅,自动分配分区 */
AUTO_PATTERN,
/** 用户手动指定消费的 topic 以及分区 */
USER_ASSIGNED
}

并且这些订阅模式之间是互斥的,即一个 KafkaConsumer 对象不允许同时使用多种模式进行订阅,相关控制位于 SubscriptionState#setSubscriptionType 方法中:

1
2
3
4
5
6
7
8
9
private void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE) {
// NONE 表示没有设置过,设置为目标模式
this.subscriptionType = type;
} else if (this.subscriptionType != type) {
// 如果之前设置过,且目标模式不是之前的模式,则抛出异常
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
}
}

如果当前的订阅模式合法,则会继续调用 SubscriptionState#changeSubscription 方法,依据本次订阅的 topic 集合更新 SubscriptionState#subscriptionSubscriptionState#groupSubscription 字段。其中 subscription 字段用于记录当前消费者订阅的 topic 集合,而 groupSubscription 字段则依据当前消费者是 leader 还是 follower 有所不同。如果是 leader 则记录当前消费者所属 group 中所有消费者订阅的 topic 集合,如果是 follower 则仅保存其自身订阅的 topic 集合。到这里,订阅 topic 的过程就算完成了,整个过程还未涉及到与集群的交互,这会在执行 KafkaConsumer#poll 时发生。

2.2 拉取消息

在完成了对目标 topic 的订阅之后,下面继续分析从集群拉取消息的过程,位于 KafkaConsumer#poll 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ConsumerRecords<K, V> poll(long timeout) {
this.acquire();
try {
// 超时时间不允许设置为负数,但是允许设置为 0
if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
}
// 当前消费者未订阅任何 topic
if (subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

long start = time.milliseconds();
long remaining = timeout;
do {
// 拉取消息,优先从本地缓存中获取,如果没有则会请求服务端,期间会尝试执行分区再分配策略,以及异步提交 offset
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.pollOnce(remaining);
if (!records.isEmpty()) {
/*
* 为了提升效率,在对响应的消息处理之前,先发送下一次 fetch 请求,
* 从而让处理消息的过程与拉取消息的过程并行,以减少等待网络 IO 的时间
*/
if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0) {
// 如果有待发送的请求,执行一次不可中断的 poll 请求
client.pollNoWakeup();
}

if (this.interceptors == null) {
return new ConsumerRecords<>(records);
} else {
// 如果注册了拦截器,则在返回之前先应用拦截器
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
}

long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
this.release();
}
}

上述方法返回一个 ConsumerRecords 对象,用于对从每个 topic 分区拉取回来的 ConsumerRecord 对象集合进行封装。前面我们在分析 KafkaProducer 时介绍了 ProducerRecord 类,用于封装 Producer 发送的每条消息,而 ConsumerRecord 类则与之对应,用于封装 Consumer 消费的每条消息。

从集群拉取消息时需要指定响应超时时间 timeout 参数,该参数允许设置为非负数,前面我们已经介绍了 timeout=0 的意义,相应的逻辑位于这里实现。方法首先会调用 KafkaConsumer#pollOnce 从本地或服务端拉取一批消息,如果拉取成功(即返回结果不为空),方法并不会立即将结果返回,而是在返回之前尝试发送下一次拉取消息的请求。因为拉取消息涉及网络通信,需要与远端集群进行交互,比较耗时,而业务处理消息也是一个耗时的过程,Kafka 的设计者巧妙的将这两步并行执行,以提升效率。

如果设置了 Consumer 拦截器,那么在返回待消费消息数据之前会先对消息执行拦截修改。Kafka 定义了 ConsumerInterceptor 接口,该接口定义如下:

1
2
3
4
5
public interface ConsumerInterceptor<K, V> extends Configurable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
void close();
}

其中 ConsumerInterceptor#onConsume 方法会在消息数据被返回给应用程序之前执行,如 KafkaConsumer#poll 方法所示,而方法 ConsumerInterceptor#onCommit 会在 offset 成功提交后被调用。

下面先跳过 KafkaConsumer#pollOnce 方法,来看一下 Fetcher#sendFetches 方法的实现,因为在 pollOnce 中同样调用了该方法,所以先了解其执行逻辑,从而能够更好的理解 pollOnce 所做的工作。方法 Fetcher#sendFetches 的主要工作就是构建并向集群发送 FetchRequest 请求,以拉取指定 offset 的消息,Fetcher 类主要负责从服务端拉取消息,其字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Fetcher<K, V> implements SubscriptionState.Listener {

/** 网络客户端 */
private final ConsumerNetworkClient client;
/** 时间戳工具 */
private final Time time;
/** 服务端返回的消息并不是立即响应,而是累积到 minBytes 再响应 */
private final int minBytes;
/** 请求时指定的服务端最大响应字节数 */
private final int maxBytes;
/** 累积等待的最大时长,达到该时间时,即使消息数据量不够,也会执行响应 */
private final int maxWaitMs;
/** 每次 fetch 操作的最大字节数 */
private final int fetchSize;
/** 重试间隔时间戳 */
private final long retryBackoffMs;
/** 每次获取 record 的最大数量 */
private final int maxPollRecords;
/** 是否对结果执行 CRC 校验 */
private final boolean checkCrcs;
/** 集群元数据 */
private final Metadata metadata;
/** 记录每个 topic 分区的消息消费情况 */
private final SubscriptionState subscriptions;
/** 每个响应在解析之前都会先转换成 CompletedFetch 对象记录到该队列中 */
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
/** key 反序列化器 */
private final Deserializer<K> keyDeserializer;
/** value 反序列化器 */
private final Deserializer<V> valueDeserializer;
/** 缓存,用于对响应结果进行解析 */
private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
/** 保存响应的分区、消息 起始位移等 */
private PartitionRecords<K, V> nextInLineRecords = null;
/** 封装在解析指定 offset 时的异常信息 */
private ExceptionMetadata nextInLineExceptionMetadata = null;

// ... 省略方法定义

}

下面来看一下 Fetcher#sendFetches 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public int sendFetches() {
// 获取可以 fetch 的 topic 分区,并创建到分区 leader 副本所在节点的 FetchRequest 请求
Map<Node, FetchRequest.Builder> fetchRequestMap = this.createFetchRequests();
// 遍历并往各个目标节点发送 FetchRequest 请求
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();

// 往目标节点发送 FetchRequest 请求
log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
client.send(fetchTarget, request)
// 添加监听器用于处理 FetchResponse 响应
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
// 响应中的 topic 分区集合与请求的 topic 分区集合不匹配,忽略本次响应
if (!matchesRequestedPartitions(request, response)) {
log.warn("Ignoring fetch response containing partitions {} since it does not match the requested partitions {}", response.responseData().keySet(), request.fetchData().keySet());
return;
}

// 响应中的 topic 分区集合
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
// 遍历处理响应中的数据
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
// 当前 topic 分区对应请求的 offset
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
// 将结果包装成 CompletedFetch 缓存到 completedFetches 队列中
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, request.version()));
}

sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}

@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request to {} for partitions {} failed", fetchTarget, request.fetchData().keySet(), e);
}
});
}
// 返回本次发送的请求数目
return fetchRequestMap.size();
}

上述方法的主要执行逻辑就是获取需要拉取消息的 topic 分区集合,并为每个分区创建对应的 FetchRequest 请求对象,同时将这些请求按照分区对应的 leader 副本所在 broker 节点组成 Map<Node, FetchRequest.Builder> 集合。接着,方法会遍历处理该集合向对应节点发送 FetchRequest 请求,并注册 RequestFutureListener 监听器对响应结果进行处理。如果响应中的分区集合与请求时的分区集合能够匹配,则方法会遍历响应结果,并将每个分区 offset 对应的响应结果对象封装成 CompletedFetch 对象,记录到 Fetcher#completedFetches 同步队列中。CompletedFetch 仅仅是对响应的一个简单的封装,后面会消费该队列,并将获取到的 CompletedFetch 对象解析成 ConsumerRecord 对象封装到 PartitionRecords 中,该类记录了消息对应的分区、offset、消息集合,以及客户端消费的位置等信息。消费逻辑的实现如下(位于 Fetcher#fetchedRecords 方法中):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 获取并移除队首元素
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null) break; // completedFetches 已空
try {
// 解析 CompletedFetch 成 PartitionRecords 对象
nextInLineRecords = this.parseCompletedFetch(completedFetch);
} catch (KafkaException e) {
if (drained.isEmpty()) {
throw e;
}
// 封装当前分区 offset 对应的异常信息,在下次获取该分区 offset 消息时抛出
nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition, completedFetch.fetchedOffset, e);
}

private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
int bytes = 0;
int recordsCount = 0;
PartitionRecords<K, V> parsedRecords = null;
// 解析获取响应错误码
Errors error = Errors.forCode(partition.errorCode);
try {
// 当前 topic 分区不允许 fetch 消息,一般是因为当前正在执行分区再分配,或者消费者被暂停
if (!subscriptions.isFetchable(tp)) {
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
}
// 正常响应
else if (error == Errors.NONE) {
// 获取 topic 分区对应的下次获取消息的 offset
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
// 请求的 offset 与响应的不匹配
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {}", tp, fetchOffset, position);
return null;
}

List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
for (LogEntry logEntry : partition.records.deepEntries(decompressionBufferSupplier)) {
// 跳过请求 offset 位置之前的消息
if (logEntry.offset() >= position) {
// 封装成 ConsumerRecord 对象
parsed.add(this.parseRecord(tp, logEntry));
bytes += logEntry.sizeInBytes();
} else {
skippedRecords = true;
}
}
recordsCount = parsed.size();

log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
// 封装结果为 PartitionRecords 对象
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);

// ... 省略一些异常情况的处理

// 更新本地记录的对应 topic 分区最新的 HW 值
if (partition.highWatermark >= 0) {
log.trace("Received {} records in fetch response for partition {} with offset {}", parsed.size(), tp, position);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
}
// ... 省略对于错误响应的处理
} finally {
completedFetch.metricAggregator.record(tp, bytes, recordsCount);
}

/*
* we move the partition to the end if we received some bytes or if there was an error.
* This way, it's more likely that partitions for the same topic can remain together (allowing for more efficient serialization).
*/
if (bytes > 0 || error != Errors.NONE) {
subscriptions.movePartitionToEnd(tp);
}

return parsedRecords;
}

从队列中获取到的 CompletedFetch 对象,会调用 Fetcher#parseCompletedFetch 方法将其解析封装成 PartitionRecords 对象,并记录到 Fetcher#nextInLineRecords 字段中,等待后续处理。下面来完整看一下 Fetcher#fetchedRecords 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
// 如果之前解析当前分区 offset 存在异常,处理该异常
if (nextInLineExceptionMetadata != null) {
ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
nextInLineExceptionMetadata = null;
TopicPartition tp = exceptionMetadata.partition;
// 如果当前消费者处于运行状态,但是期望的 offset 在解析响应时存在异常,则直接抛出
if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset) {
throw exceptionMetadata.exception;
}
}

Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords; // 剩余获取 record 的数量
while (recordsRemaining > 0) {
// 如果当前 topic 分区没有可以处理的记录
if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
// ... 解析 CompletedFetch 成 PartitionRecords 对象,上面已经分析过
} else {
TopicPartition partition = nextInLineRecords.partition;
// 从之前解析得到的 PartitionRecords 对象中拉取指定数量的消息
List<ConsumerRecord<K, V>> records = this.drainRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
if (currentRecords == null) {
drained.put(partition, records);
} else {
/*
* 合并同一个分区的记录(发生的概率很小)
*
* this case shouldn't usually happen because we only send one fetch at a time per partition,
* but it might conceivably happen in some rare cases (such as partition leader changes).
* we have to copy to a new list because the old one may be immutable
*/
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}

// 返回每个 topic 分区拉取到的消息
return drained;
}

前面我们在分析消费 Fetcher#completedFetches 同步队列时,对于解析过程中出现异常的 topic 分区对应的 offset,会将异常信息封装成 ExceptionMetadata 对象,记录到 Fetcher#nextInLineExceptionMetadata 字段中,方法 Fetcher#fetchedRecords 一开始会先处理该异常。如果之前解析过程能够正常拿到消息并封装成 PartitionRecords 对象,那么接下来将会从对象中拉取指定数量的消息返回给用户。该过程位于 Fetcher#drainRecords 方法中,该方法会校验目标分区是否被分配给当前消费者,因为在执行分区再分配时,一个消费者消费的分区是可能变化的,我们将在后面对分区再分配的逻辑进行针对性分析。如果目标 topic 分区仍然是分配给当前消费者的,那么方法会在对应分区允许拉取消息的情况下,从之前解析得到的 PartitionRecords 中获取指定数量的消息,并更新客户端记录的位置信息,包括对应分区的 offset,以及 PartitionRecords 中缓存的消息集合的消费位置。最后将每个分区对应的消息集合封装成 Map<TopicPartition, List<ConsumerRecord<K, V>>> 集合返回。

下面我们回到 KafkaConsumer 类,看一下 KafkaConsumer#pollOnce 方法的执行逻辑,了解了 Fetcher#sendFetchesFetcher#fetchedRecords 的实现,再来看 pollOnce 方法会简单很多。该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// 执行分区再分配策略,以及异步提交 offset
coordinator.poll(time.milliseconds());

if (!subscriptions.hasAllFetchPositions()) {
/*
* 如果存在没有分配 offset 的 topic 分区,则执行更新:
* 1. 如果需要重置,则按照指定策略重置 offset
* 2. 否则,尝试获取上次提交的 offset,如果结果为空则按照默认重置策略进行重置
* 3. 否则,使用上次提交的 offset 更新本地记录的 offset 值
*/
this.updateFetchPositions(subscriptions.missingFetchPositions());
}

// 尝试从本地获取缓存的消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}

// 如果本地没有直接可用的消息,则创建 FetchRequest 请求,从集群拉取消息数据
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
// 发送 FetchRequest 请求
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
return !fetcher.hasCompletedFetches();
}
});

// 检查是否需要执行分区再分配,如果是则返回空的结果,以保证尽快对分区执行再平衡操作
if (coordinator.needRejoin()) {
return Collections.emptyMap();
}

// 获取 FetchRequest 请求返回的消息
return fetcher.fetchedRecords();
}

在拉取消息之前会先尝试执行分区再分配策略,以及异步提交 offset,这 2 步我们先不展开,留到后面的小节中针对性分析。

由于 kafka 在设计上由消费者自己维护自身消费状态,所以在拉取消息之前需要确定是否需要更新消费者维护的分区 offset 信息,一方面是为了支持分区重置策略,另一方面也是配合分区再分配操作。KafkaConsumer 在拉取消息数据之前会调用 SubscriptionState#hasAllFetchPositions 方法检测分配给当前消费者的分区在本地是不是都记录着对应的 offset 值,如果存在没有记录 offset 值的分区,则需要调用 KafkaConsumer#updateFetchPositions 方法对这些分区进行更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void updateFetchPositions(Set<TopicPartition> partitions) {
// 对于需要重置 offset 的分区,请求分区 leader 副本所在节点获取对应的 offset 值
fetcher.resetOffsetsIfNeeded(partitions);

// 如果仍然存在没有分配 offset 的分区
if (!subscriptions.hasAllFetchPositions(partitions)) {
// 如果需要从 GroupCoordinator 获取上次提交的 offset,则发送 OffsetFetchRequest 请求更新
coordinator.refreshCommittedOffsetsIfNeeded();
/*
* 再次尝试对未分配 offset 的分区进行更新:
* 1. 如果需要重置,则按照指定策略重置 offset
* 2. 如果获取到的上次提交的 offset 为空,则按照默认重置策略进行重置
* 3. 使用上次提交的 offset 更新本地记录的 offset 值
*/
fetcher.updateFetchPositions(partitions);
}
}

然后 KafkaConsumer 会调用 Fetcher#fetchedRecords 方法尝试从本地获取每个 topic 分区对应的缓存消息。如果本地缓存不命中,则会继续调用 Fetcher#sendFetches 方法构建并发送 FetchRequest 请求,尝试从集群拉取消息。

在调用 Fetcher#fetchedRecords 方法解析并返回服务端响应的消息之前,消费者会先检测当前是否需要执行分区再分配操作,如果需要则直接返回空的结果,这样在不超时的情况下,方法 KafkaConsumer#pollOnce 会立即被再次调用,从而开始对当前 topic 分区执行再分配,即调用 ConsumerCoordinator#poll 方法。我们会在后面的小节中对分区再分配和自动提交 offset 操作的逻辑展开分析,这里我们先来看一下 ConsumerCoordinator#poll 方法,了解这 2 个步骤的触发过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void poll(long now) {
// 触发执行注册的监听 offset 提交完成的方法
this.invokeCompletedOffsetCommitCallbacks();

// 确保当前是 AUTO_TOPICS 或 AUTO_PATTERN(USER_ASSIGNED 不需要再平衡)订阅模式,
// 且目标 GroupCoordinator 节点可达,如果不可达,则会尝试寻找一个可用的节点
if (subscriptions.partitionsAutoAssigned() && this.coordinatorUnknown()) {
this.ensureCoordinatorReady();
now = time.milliseconds();
}

// 需要执行再平衡
if (this.needRejoin()) {
/*
* due to a race condition between the initial metadata fetch and the initial rebalance,
* we need to ensure that the metadata is fresh before joining initially.
* This ensures that we have matched the pattern against the cluster's topics at least once before joining.
*
* 如果是 AUTO_PATTERN 订阅模式,则检查是否需要更新集群元数据
*/
if (subscriptions.hasPatternSubscription()) {
client.ensureFreshMetadata();
}

/*
* 1. 检查目标 GroupCoordinator 节点是否准备好接收请求
* 2. 启动心跳线程
* 3. 执行分区再分配操作
*/
this.ensureActiveGroup();
now = time.milliseconds();
}

// 发送心跳
this.pollHeartbeat(now);
// 异步提交 offset
this.maybeAutoCommitOffsetsAsync(now);
}

我们前面介绍了 kafka 有 3 种订阅模式:AUTO_TOPICSAUTO_PATTERN,和 USER_ASSIGNED。其中 USER_ASSIGNED 订阅模式是由用户手动指定消费的分区,所以这种模式下不需要执行分区再分配操作对消费者消费的分区进行动态再分配。对于另外 2 种订阅模式来说,如果需要执行分区再分配,则方法首先需要确保与服务端交互的 GroupCoordinator 实例所在 broker 节点是可用的,然后调用 AbstractCoordinator#ensureActiveGroup 方法执行具体的分区再分配操作,我们将在 2.3 小节对这一过程进行深入分析。如果启用了自动 offset 提交策略,上述方法在最后还会调用 ConsumerCoordinator#maybeAutoCommitOffsetsAsync 方法尝试提交当前消费完成的 offset 值,我们将在 2.4 小节对自动提交 offset 的过程展开分析。

2.3 分区再分配机制

当我们使用 AUTO_TOPICSAUTO_PATTERN 模式订阅 kafka topic 时,我们并不需要考虑当前消费者具体消费哪个分区,Kafka 会依据分区分配策略为消费者分配一个或多个分区进行消费(一个分区至多被一个消费者消费,不允许多个消费者同时消费同一个分区)。但是消费者可能会中途加入,也可能会中途退出,topic 的分区数目也是允许改变的,此时就需要依赖分区再分配机制为注册的消费者重新分配分区。

当一个消费者发送心跳信息时,如果在集群的响应中侦测到 REBALANCE_IN_PROGRESS 错误码,则该消费者会意识到所属 group 正在执行分区再分配操作,于是会停下手头上的工作加入到这一进程中来。分区再分配操作分为 3 个阶段,并且是一个与集群交互联动的过程,这里我们以客户端视角,当消费者检测到需要重新分配分区时会触发执行:

  1. 发送 GroupCoordinatorRequest 请求获取目标可用的 GroupCoordinator 实例所在的 broker 节点,如果没有则选择负载最小的节点并尝试建立连接;
  2. 向 GroupCoordinator 实例所在节点发送 JoinGroupRequest 请求申请加入目标 group,GroupCoordinator 实例会在既定时间范围内等待消费者的申请加入请求,如果提前检测到已经接收到 group 名下所有消费者的申请,或者等待时间超时,则会返回 JoinGroupResponse 响应,主要目的是告知谁是新的 group leader 消费者,以及最终确定的分区分配策略;
  3. Group leader 依据指定的分区分配策略为当前 group 名下的消费者分配分区,并向目标 GroupCoordinator 实例所在节点发送 SyncGroupRequest 请求以告知最终的分区分配结果。

触发分区再分配操作的场景主要有以下 2 种:

  1. 有消费者加入或离开 group,这里的离开可能是主动离开,也可能是宕机,或者是取消了对目标 topic 的订阅等。
  2. 消费者订阅的 topic 的分区数目发生变化。

在 2.2 小节我们最后简单分析了 ConsumerCoordinator#poll 方法,该方法会调用 ConsumerCoordinator#needRejoin 检测是否需要执行分区再分配,并在需要的情况下予以执行。ConsumerCoordinator 是消费者执行分区再分配操作和 offset 提交的核心类,该类继承自 AbstractCoordinator 抽象类,首先来看一下这两个类的字段定义:

  • AbstractCoordinator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public abstract class AbstractCoordinator implements Closeable {

/** 分区再分配操作超时时间 */
protected final int rebalanceTimeoutMs;
/** 消费者与服务端会话超时时间,超过该时间则认为与服务端断开连接 */
private final int sessionTimeoutMs;
/** 指定消费者被关闭时是否离开所属 group,如果为 true 的话会触发分区再分配操作 */
private final boolean leaveGroupOnClose;
/** 心跳机制 */
private final Heartbeat heartbeat;
/** 执行心跳机制的线程 */
private HeartbeatThread heartbeatThread = null;
/** 当前消费者所属的 group */
protected final String groupId;
/** 网络通信客户端 */
protected final ConsumerNetworkClient client;
/** 时间戳工具 */
protected final Time time;
/** 重试时间间隔 */
protected final long retryBackoffMs;
/** 标记是否需要重新发送 {@link JoinGroupRequest} 的请求条件之一 */
private boolean rejoinNeeded = true;
/** 标记是否需要执行发送 {@link JoinGroupRequest} 请求前的准备工作 */
private boolean needsJoinPrepare = true;
/** 记录当前消费者的运行状态 */
private MemberState state = MemberState.UNJOINED;
/** 分区再分配操作请求对应的 future 对象,避免多个请求同时执行 */
private RequestFuture<ByteBuffer> joinFuture = null;
/** 服务端 GroupCoordinator 所在节点 */
private Node coordinator = null;
/** 服务端 GroupCoordinator 返回的年代信息,用于区分两次分区再分配操作 */
private Generation generation = Generation.NO_GENERATION;
/** 获取可用 GroupCoordinator 节点请求对应的 future,避免多个请求同时执行 */
private RequestFuture<Void> findCoordinatorFuture = null;

// ... 省略方法定义

}
  • ConsumerCoordinator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final class ConsumerCoordinator extends AbstractCoordinator {

/**
* 消费者在发送 JoinGroupRequest 请求时会传递自己支持的分区分配策略,服务端会从所有消费者都支持的策略中选择一种,
* 并通知 group leader 使用此分配策略进行分配
*/
private final List<PartitionAssignor> assignors;
/** 集群元数据信息 */
private final Metadata metadata;
/** 记录 topic 分区和 offset 的对应关系 */
private final SubscriptionState subscriptions;
/** 默认的 offset 提交完成时的 callback */
private final OffsetCommitCallback defaultOffsetCommitCallback;
/** 是否启用 offset 自动提交策略 */
private final boolean autoCommitEnabled;
/** offset 自动提交时间间隔 */
private final int autoCommitIntervalMs;
/** 注册的拦截器集合 */
private final ConsumerInterceptors<?, ?> interceptors;
/** 是否排除内部 topic,即 offset topic */
private final boolean excludeInternalTopics;
/** 记录正在等待异步提交 offset 的请求数目 */
private final AtomicInteger pendingAsyncCommits;
/** 记录每个 offset 提交对应的响应 callback */
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
/** 标记当前消费者是不是 group leader */
private boolean isLeader = false;
/** 当前消费者成功订阅的 topic 集合 */
private Set<String> joinedSubscription;
/** 元数据快照,用于检测 topic 分区数量是否发生变化 */
private MetadataSnapshot metadataSnapshot;
/** 元数据快照,用于检测分区分配过程中分区数量是否发生变化 */
private MetadataSnapshot assignmentSnapshot;
/** 下一次自动提交 offset 的截止时间 */
private long nextAutoCommitDeadline;

// ... 省略方法定义

}

下面我们开始分析分区再分配机制,首先来看一下判定需要执行分区再分配操作的条件,位于 ConsumerCoordinator#needRejoin 中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean needRejoin() {
// USER_ASSIGNED 订阅模式不需要执行分区再分配
if (!subscriptions.partitionsAutoAssigned()) {
return false;
}
// 再平衡过程中分区数量发生变化
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
return true;
}
// 消费者 topic 订阅信息发生变化
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
return true;
}
// 其它标识需要再平衡的操作,例如分区再分配执行失败、重置年代信息等
return super.needRejoin();
}

如果判定需要执行分区再分配操作,消费者接下去会调用 AbstractCoordinator#ensureActiveGroup 方法确认所属 group 对应的目标 GroupCoordinator 实例所在节点是否准备好接收请求,如果对应节点不可用,则会发送 GroupCoordinatorRequest 请求查找负载较小且可用的节点,并与之建立连接。接着会调用 AbstractCoordinator#joinGroupIfNeeded 方法开始执行分区再分配策略,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void joinGroupIfNeeded() {
// 如果需要执行分区再分配,且目前正在进行中
while (this.needRejoin() || this.rejoinIncomplete()) {
// 再次检查目标 GroupCoordinator 节点是否准备好接收请求
this.ensureCoordinatorReady();

// 执行前期准备工作
if (needsJoinPrepare) {
/*
* 1. 如果开启了 offset 自动提交,则同步提交 offset
* 2. 调用注册的 ConsumerRebalanceListener 监听器的 onPartitionsRevoked 方法
* 3. 取消当前消费者的 leader 身份(如果是的话),恢复成为一个普通的消费者
*/
this.onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}

// 创建并发送 JoinGroupRequest 请求,申请加入目标 group
RequestFuture<ByteBuffer> future = this.initiateJoinGroup();
client.poll(future);
// 申请加入 group 完成,将 joinFuture 置为 null,表示允许发送下一次 JoinGroupRequest 请求
this.resetJoinGroupFuture();

// 执行分区分配成功
if (future.succeeded()) {
needsJoinPrepare = true;
this.onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
}
// 执行分区分配失败,依据失败类型考虑是否重试
else {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException) {
continue;
} else if (!future.isRetriable()) {
throw exception;
}
time.sleep(retryBackoffMs);
}
}
}

在开始执行分区再分配操作之前需要执行一些前期准备工作,这里使用了 needsJoinPrepare 字段进行控制,如果当前正在执行分区再分配,则 needsJoinPrepare 字段会被标记为 false,以防止重复执行。准备工作的逻辑实现位于 ConsumerCoordinator#onJoinPrepare 方法中,主要做了 3 件事情:

  1. 如果开启了 offset 自动提交,则同步提交 offset 到集群。
  2. 激活注册的 ConsumerRebalanceListener 监听器的 onPartitionsRevoked 方法。
  3. 取消当前消费者的 leader 身份(如果是的话),将其恢复成为一个普通的消费者。

分区再分配操作之前需要提交当前消费者消费完成的 offset,因为当分区再分配完成之后,相应的分区可能会被分配给其它消费者,新的消费者需要依赖于前任消费者提交的 offset 来确定接下去消费的起始位置。所以,为了防止消息的遗漏或重复消费,在开始执行分区再分配之前,需要先提交当前消费者已经完成消费的 offset 值。

ConsumerRebalanceListener 监听器用于监听分区再分配操作,接口定义如下:

1
2
3
4
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

其中 onPartitionsRevoked 方法会在分区再分配操作之前被触发,也就是我们当前分析的位置,而 onPartitionsAssigned 方法则会在分区再分配操作完成之后被触发,调用的位置位于 ConsumerCoordinator#onJoinComplete 方法中,我们后面会对该方法进行分析。

因为接下去要执行分区再分配操作,当操作完成之后会有新的 group leader 消费者被选出,如果当前消费者是 leader 角色,那么此时需要剥夺其 leader 身份,同时将其 SubscriptionState#groupSubscription 字段中记录的所属 group 名下所有消费者订阅的 topic 集合重置为当前消费者自己订阅的 topic 集合。

完成了前期准备工作之后,消费者将正式开始执行分区再分配,这是一个客户端与服务端交互配合的过程,消费者需要构造并发送 JoinGroupResult 请求到对应的 GroupCoordinator 实例所在节点申请加入目标 group,这一过程位于 AbstractCoordinator#initiateJoinGroup 方法中,该方法的主要工作就是切换当前消费者的状态为 REBALANCING,创建并缓存 JoinGroupRequest 请求,并处理申请加入的结果。如果申请加入成功,则会切换当前消费者的状态为 STABLE,并重启心跳机制(为了避免心跳机制干扰分区再分配,在开始执行分区再分配之前会临时关闭心跳机制);如果申请加入失败,则会切换当前消费者的状态为 UNJOINED。

JoinGroupRequest 请求中包含了当前消费者的 ID,消费者所属 group 的 ID、消费者支持的分区策略、协议类型、以及会话超时时间等信息。构造、发送,以及处理 JoinGroupRequest 请求及其响应的过程位于 AbstractCoordinator#sendJoinGroupRequest 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (this.coordinatorUnknown()) {
// 如果目标 GroupCoordinator 节点不可达,则返回异常
return RequestFuture.coordinatorNotAvailable();
}

log.info("(Re-)joining group {}", groupId);
// 构建 JoinGroupRequest 请求
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
groupId,
sessionTimeoutMs,
generation.memberId,
protocolType(),
metadata()).setRebalanceTimeout(rebalanceTimeoutMs);

log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
// 发送 JoinGroupRequest 请求,并注册结果处理器 JoinGroupResponseHandler
return client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());
}

消费者通过注册结果处理器 JoinGroupResponseHandler 对请求的响应结果进行处理,如果是正常响应则会执行分区分配操作,核心逻辑实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// 在接收到响应之前,消费者的状态发生变更(可能已经从所属 group 离开),抛出异常
future.raise(new UnjoinedGroupException());
} else {
// 基于响应,更新 group 的年代信息
generation = new Generation(
joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
rejoinNeeded = false;
// 如果当前消费者是 group 中的 leader 角色
if (joinResponse.isLeader()) {
/*
* 基于分区分配策略执行分区分配,leader 需要关注当前 group 中所有消费者订阅的 topic,
* 并发送 SyncGroupRequest 请求反馈分区分配结果给 GroupCoordinator 节点
*/
onJoinLeader(joinResponse)
// 这里调用 chain 方法,是希望当 SyncGroupResponse 处理完成之后,能够将结果传递给 future
.chain(future);
} else {
// 如果是 follower 消费者,则只关注自己订阅的 topic,这一步仅发送 SyncGroupRequest 请求
onJoinFollower().chain(future);
}
}
}

在开始重新分配分区之前,消费者会确认当前状态是不是 REBALANCING,前面在发送 JoinGroupRequest 请求之前会将消费者状态变更为 REBALANCING,这里再次确认以防止在请求的过程中消费者的状态发生了变更,例如消费者因某种原理离开了所属的 group,这种情况下不应该再继续执行下去。如果状态未发生变更,那么会依据响应更新本地记录的状态信息(包括年代信息、标识不需要执行分区再分配等),然后依据当前消费者的角色(leader/follower)执行相应的逻辑。

对于 follower 消费者而言,响应 JoinGroupRequest 请求的逻辑只是构造一个包含空的分区分配结果的 SyncGroupRequest 请求,并附带上所属的 group 和自身 ID,以及 group 年代信息,发送给对应的 GroupCoordinator 节点,如果此时所属的 group 已经处于正常运行的状态,则该消费者会拿到分配给自己的分区信息。

如果当前消费者是 leader 角色,那么需要依据 GroupCoordinator 最终确定的分区分配策略为当前 group 名下所有的消费者分配分区,并发送 SyncGroupRequest 请求向对应的 GroupCoordinator 节点反馈最终的分区分配结果。方法 AbstractCoordinator#onJoinLeader 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 基于分区分配策略分配分区
Map<String, ByteBuffer> groupAssignment = this.performAssignment(
joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());

// 创建 SyncGroupRequest 请求,反馈分区分配结果给 GroupCoordinator 节点
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, requestBuilder);
// 发送 SyncGroupRequest 请求
return this.sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}

分配分区的具体过程位于 ConsumerCoordinator#performAssignment 方法中,这是一个长长的方法实现,但是逻辑并不复杂,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
protected Map<String, ByteBuffer> performAssignment(String leaderId, // leader 消费者 ID
String assignmentStrategy, // 服务端最终确定的分区分配策略
Map<String, ByteBuffer> allSubscriptions // group 名下所有消费者的 topic 订阅信息
) {
// 从消费者支持的分区分配策略集合中选择指定策略对应的分区分配器
PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
if (assignor == null) {
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
}

// 解析封装 topic 订阅信息
Set<String> allSubscribedTopics = new HashSet<>(); // 记录 group 名下所有消费者订阅的 topic 集合
Map<String, Subscription> subscriptions = new HashMap<>(); // Map<String, ByteBuffer> -> Map<String, Subscription>
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
// ByteBuffer -> Subscription
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}

/*
* 对于 leader 消费者来说,需要关注 group 名下所有消费者订阅的 topic,
* 以保证当相应 topic 对应的元数据发生变化,能够感知
*/
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());

// 分区再分配之后,检测是否需要更新集群元数据信息,如果需要则立即更新
client.ensureFreshMetadata();
// 标记当前消费者为 leader 角色
isLeader = true;

log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", groupId, assignor.name(), subscriptions);

/*
* 基于分区分配器(range/round-robin)执行分区分配,
* 返回结果:key 是消费者 ID,value 是对应的分区分配结果
*/
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

// 记录所有完成分配的 topic 集合
Set<String> assignedTopics = new HashSet<>();
for (Assignment assigned : assignment.values()) {
for (TopicPartition tp : assigned.partitions())
assignedTopics.add(tp.topic());
}
// 如果 group 中存在一些已经订阅的 topic 并未分配,则日志记录
if (!assignedTopics.containsAll(allSubscribedTopics)) {
Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
notAssignedTopics.removeAll(assignedTopics);
log.warn("The following subscribed topics are not assigned to any members in the group {} : {} ", groupId, notAssignedTopics);
}

// 如果分配的 topic 集合包含一些未订阅的 topic 集合
if (!allSubscribedTopics.containsAll(assignedTopics)) {
// 日志记录这些未订阅的 topic
Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
newlyAddedTopics.removeAll(allSubscribedTopics);
log.info("The following not-subscribed topics are assigned to group {}, and their metadata will be fetched from the brokers : {}", groupId, newlyAddedTopics);

// 将这些已分配但是未订阅的 topic 添加到 group 集合中
allSubscribedTopics.addAll(assignedTopics);
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
client.ensureFreshMetadata(); // 更新元数据信息
}

// 更新本地缓存的元数据信息快照
assignmentSnapshot = metadataSnapshot;

log.debug("Finished assignment for group {}: {}", groupId, assignment);

// 对分区分配结果进行序列化,后续需要反馈给集群
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}

return groupAssignment;
}

整个分区分配的执行流程可以概括为:

  1. 校验服务端确定的最终分区策略,获取对应的分区分配器 PartitionAssignor 对象;
  2. 反序列化解析并封装服务端返回的当前 group 名下所有消费者的 topic 订阅信息;
  3. 更新消费者本地缓存的状态信息,包括 group 名下所有消费者订阅的 topic 集合,leader 角色自己订阅的 topic 集合等;
  4. 检查是否需要更新本地集群元数据信息,如果需要则立即执行更新;
  5. 依据具体的分区分配策略执行分区分配操作;
  6. 校验分区分配结果,如果 group 已经订阅的一些 topic 并未被分配,则记录到日志;
  7. 校验分配结果,如果分配了一些并未订阅的 topic,则将其加入到 group 集合中,并更新本地集群元数据信息;
  8. 序列化封装并返回分区分配结果,后续需要反馈给集群。

Kafka 目前主流的分区分配策略分为 2 种(默认是 range,可以通过 partition.assignment.strategy 参数指定):

  • range:在保证均衡的前提下,将连续的分区分配给消费者,对应的实现是 RangeAssignor。
  • round-robin:在保证均衡的前提下,轮询分配,对应的实现是 RoundRobinAssignor。

在 0.11.0.0 版本引入了一种新的分区分配策略 StickyAssignor,相对于上面两种分区分配策略的优势在于能够在保证分区均衡的前提下尽量保持原有的分区分配结果,从而避免许多冗余的分区分配操作,减少分区再分配的执行时间。不过据反映 StickyAssignor 目前还存在一些小 bug,所以在你的应用中具体是否采用还需要斟酌。

说到这里我们插点题外话,聊聊分区再分配机制的缺点。我们知道分区再分配机制设计的出发点是好的,也确实解决了实际面临的一些问题,但是缺点在于执行过程效率太低,究其根本可以概括为以下 2 方面的原因:

  1. 在执行分区再分配过程中,对应 group 名下的所有消费者都需要暂停手头上的工作加入到分区再分配过程中来,外在的表现就是整个 group 在此期间不消费新的消息,会出现一段时间的消息堆积,有点 Stop The World 的意思。
  2. 基于 RangeAssignor 或 RoundRobinAssignor 分区分配策略会对 group 名下所有消费者的分区分配方案重新洗牌,实际上较好的策略是尽量复用原有的分区分配结果,并在此基础上进行微调,从而最大利用原有的状态信息,避免一些冗余的工作量。

实际中因为订阅的 topic 数目发生变更,或者 topic 分区数目的变化导致触发的分区再分配操作我们无法避免,但是此类情况发生的概率较小,大部分的分区再分配都是由于消费者上下线导致的,而且是被 kafka 误判为下线。Kafka 基于心跳机制来对具体的一个消费者进行判活,如果对应的参数设置不当会极大增加误判率,所以在这一块的参数配置上需要仔细斟酌。

继续接着分析分区再分配机制的实现,步骤 5 会依据具体的分区分配策略对分区执行分配操作,即执行 PartitionAssignor#assign 方法,我们已经知晓了每种分配算法的思想,具体分配细节这里不再深入。

完成了分配分区之后,消费者(不管是 leader,还是 follower)会构建 SyncGroupRequest 请求,将分区分配结果信息发送给对应的 GroupCoordinator 实例所在节点,并最终保存在服务端。如果请求异常,则会调用 AbstractCoordinator#requestRejoin 方法标记需要再次执行分区再分配操作。

下面继续回到 AbstractCoordinator#joinGroupIfNeeded 方法,如果分区分配操作失败,则消费者会依据异常类型决定是否继续重试。如果分区分配成功,则接下来需要对本地记录的相关信息重新初始化,因为分配给当前消费者的分区很可能已经变化,消费者需要知晓上一任消费者对当前分区的消费情况,从而找到合适的 offset 位置继续消费,相关实现位于 ConsumerCoordinator#onJoinComplete 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader) {
assignmentSnapshot = null;
}

// 获取最终确定的分区分配策略对应的分区分配器
PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
if (assignor == null) {
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
}

// 反序列化获取分区分配信息
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// 标记需要从 GroupCoordinator 节点获取最近提交的 offset 值
subscriptions.needRefreshCommits();
// 设置每个 topic 分区对应的消费状态
subscriptions.assignFromSubscribed(assignment.partitions());

// 遍历获取新分配的 topic,并更新本地记录的订阅信息
Set<String> addedTopics = new HashSet<>();
for (TopicPartition tp : subscriptions.assignedPartitions()) {
if (!joinedSubscription.contains(tp.topic())) {
addedTopics.add(tp.topic()); // 新分配的分区
}
}
if (!addedTopics.isEmpty()) {
Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
newSubscription.addAll(addedTopics);
newJoinedSubscription.addAll(addedTopics);

// 使用 AUTO_PATTERN 模式进行订阅
subscriptions.subscribeFromPattern(newSubscription);
joinedSubscription = newJoinedSubscription;
}

// 更新本地缓存的集群元数据信息
metadata.setTopics(subscriptions.groupSubscription());
client.ensureFreshMetadata();

// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);

// 重置下次自动提交 offset 的截止时间
nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;

// 应用监听分区再分配操作完成的监听器
ConsumerRebalanceListener listener = subscriptions.listener();
log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition assignment", listener.getClass().getName(), groupId, e);
}
}

上述方法主要做了以下 4 件事情:

  1. 标记需要重新从集群获取最近一次提交的 offset 值。
  2. 重置本地记录的每个分区的消费状态,并更新本地记录的 topic 订阅信息。
  3. 条件性更新本地记录的集群元数据信息。
  4. 激活 ConsumerRebalanceListener 监听器的 onPartitionsAssigned 方法。

前面我们介绍了 ConsumerRebalanceListener 接口的定义,在分区再分配操作执行之前会调用 ConsumerRebalanceListener#onPartitionsRevoked 方法,而另外一个方法 ConsumerRebalanceListener#onPartitionsAssigned 的调用时机则是位于这里。

2.4 分区消费 offset 提交策略

提交已经消费完成的消息对应的 offset 是保证消息不重复消费和遗漏消费的最重要的措施,这里提交的 offset 是下一条待消费消息的 offset,而非当前已经消费的最后一条消息的 offset。Kafka 默认会按照指定时间间隔自动提交消费者消费完成的 offset 值,同时也允许开发者手动控制 offset 的提交时机。提交操作分为同步(阻塞)和异步(非阻塞)两种,Kafka 为手动提交分别提供了对应 KafkaConsumer#commitSyncKafkaConsumer#commitAsync 方法实现,这些方法都存在多个重载版本,相应的实现均位于 ConsumerCoordinator 类中。

ConsumerCoordinator 类提供了多个提交 offset 的方法,区分同步和异步,这些方法之间的调用关系如下所示:

  • 同步 offset 提交
1
2
+ maybeAutoCommitOffsetsSync
| ---- + commitOffsetsSync
  • 异步 offset 提交
1
2
3
4
+ maybeAutoCommitOffsetsNow / maybeAutoCommitOffsetsAsync
| ---- + doAutoCommitOffsetsAsync
| ---- | ---- + commitOffsetsAsync
| ---- | ---- | ---- + doCommitOffsetsAsync

整个方法调用链路按照同步和异步提交方式分为两条独立的路线,自动提交和手动提交在底层实现上其实是复用的,下面的篇幅中我们分别分析同步提交和异步提交的具体实现细节。

2.4.1 同步 offset 提交策略

我们从 ConsumerCoordinator#maybeAutoCommitOffsetsSync 方法切入,该方法的调用时机有两个地方:

  1. 执行分区再分配操作的准备阶段(ConsumerCoordinator#onJoinPrepare 方法);
  2. 关闭消费者的时候(ConsumerCoordinator#close 方法,KafkaConsumer#close 方法在执行时会调用该方法)。

前面曾提到过,当分区再分配操作完成之后,分区与消费者之间的订阅关系可能会发生变化,而 kafka 又依赖于消费者自己去记录分区的消费状态,所以在执行分区再分配操作之前需要让每个消费者将自己维护的分区消费状态信息上报给集群,这样在完成分区重新分配之后,消费者可以通过请求就集群以知晓新分配的分区的消费 offset 位置,消费者关闭的过程本质上也是如此。这些场景下提交 offset 的过程必须是同步的,否则存在丢失消费状态的可能,最终将导致消息被重复消费。

方法 ConsumerCoordinator#maybeAutoCommitOffsetsSync 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void maybeAutoCommitOffsetsSync(long timeoutMs) {
if (autoCommitEnabled) {
// 获取当前消费者订阅的所有 topic 分区,以及分区对应的消费状态信息
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId);
// 执行同步 offset 提交
if (!this.commitOffsetsSync(allConsumedOffsets, timeoutMs)) {
log.debug("Auto-commit of offsets {} for group {} timed out before completion", allConsumedOffsets, groupId);
}
}
// ... 省略异常处理
}
}

如果允许自动提交 offset,则上述方法首先会从本地获取当前消费者被分配的分区的消费状态,然后调用 ConsumerCoordinator#commitOffsetsSync 方法向集群提交 offset 值,方法 KafkaConsumer#commitSync 本质上也是调用了该方法实现对 offset 的提交操作。方法 ConsumerCoordinator#commitOffsetsSync 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, long timeoutMs) {
// 触发注册的监听 offset 提交完成的方法
this.invokeCompletedOffsetCommitCallbacks();
// 如果提交的 offset 数据为空,则直接返回
if (offsets.isEmpty()) {
return true;
}

long now = time.milliseconds();
long startMs = now;
long remainingMs = timeoutMs;
do {
// 如果目标 GroupCoordinator 节点不可用
if (this.coordinatorUnknown()) {
// 尝试寻找负载最小且可用的 GroupCoordinator 节点
if (!this.ensureCoordinatorReady(now, remainingMs)) {
// 如果目标 GroupCoordinator 节点未准备好接收请求
return false;
}
remainingMs = timeoutMs - (time.milliseconds() - startMs);
}

// 创建并发送 OffsetCommitRequest 请求,提交 offset 值
RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
client.poll(future, remainingMs);

// 提交成功
if (future.succeeded()) {
if (interceptors != null) {
interceptors.onCommit(offsets);
}
return true;
}
// 提交失败,且不可重试,则抛出异常
if (!future.isRetriable()) {
throw future.exception();
}

time.sleep(retryBackoffMs);

now = time.milliseconds();
remainingMs = timeoutMs - (now - startMs);
} while (remainingMs > 0);

return false;
}

同步 offset 提交的执行流程可以概括为:

  1. 触发注册的监听 offset 提交完成的回调方法;
  2. 校验待提交的 offset 数据是否为空,如果为空则直接返回;
  3. 校验目标 GroupCoordinator 实例所在节点是否可用,如果不可用则尝试寻找负载最小且可用的节点;
  4. 创建并发送提交 offset 的 OffsetCommitRequest 请求;
  5. 处理请求的响应结果。

所有的监听 offset 提交操作的 OffsetCommitCallback 都会被封装成 OffsetCommitCompletion 对象,记录到 ConsumerCoordinator#completedOffsetCommits 字段中,并在每次提交 offset 时触发调用。下面来看一下创建并发送 OffsetCommitRequest 请求的逻辑,实现位于 ConsumerCoordinator#sendOffsetCommitRequest 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty()) {
// 如果没有请求的数据,则直接返回
return RequestFuture.voidSuccess();
}

// 获取 GroupCoordinator 节点,并检查其可达性
Node coordinator = this.coordinator();
if (coordinator == null) {
return RequestFuture.coordinatorNotAvailable();
}

// 封装每个分区对应提交的 offset 数据
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata.offset() < 0) {
// 非法的 offset 值
return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
}
// key 是分区,value 是分区对应的请求数据
offsetData.put(entry.getKey(),
new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}

// 获取当前消费者所属 group 的年代信息
final Generation generation;
if (subscriptions.partitionsAutoAssigned()) {
// 如果是 AUTO_TOPICS 或 AUTO_PATTERN 订阅模式,则获取年代信息
generation = this.generation();
} else {
// 对于 USER_ASSIGNED 模式,因为不涉及到分区再分配操作,所以没有年代信息
generation = Generation.NO_GENERATION;
}
if (generation == null) {
// 如果获取 group 年代信息失败,则说明当前消费者并不属于该 group,抛出异常,需要执行分区再分配
return RequestFuture.failure(new CommitFailedException());
}

// 创建 OffsetCommitRequest 请求
OffsetCommitRequest.Builder builder =
new OffsetCommitRequest.Builder(groupId, offsetData)
.setGenerationId(generation.generationId)
.setMemberId(generation.memberId)
.setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);

log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);

// 发送 OffsetCommitRequest 请求,并注册响应处理器
return client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets));
}

上述方法的执行流程如下:

  1. 校验待发送的请求数据是否为空,如果为空则直接返回成功;
  2. 获取目标 GroupCoordinator 实例所在 broker 节点,并校验其可用性;
  3. 封装每个目标分区的待提交 offset 数据;
  4. 获取并校验当前 group 的年代信息,防止提交一些已经离开 group 的消费者的 offset 数据;
  5. 创建并缓存 OffsetCommitRequest 请求,同时注册响应结果处理器。

整体流程都比较简单和直观,集群 GroupCoordinator 实例在收到 OffsetCommitRequest 请求之后,会依据请求指定的版本号决定将 offset 消费信息记录到 ZK 还是最新实现的 offset topic,我们将在后面分析 GroupCoordinator 组件的篇章中针对 OffsetCommitRequest 请求的处理过程进行深入分析。下面来看一下对于响应结果的处理过程,实现位于 OffsetCommitResponseHandler#handle 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();

// 遍历对所有 topic 分区的响应
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
long offset = offsetAndMetadata.offset();

// 获取当前分区对应的响应错误码
Errors error = Errors.forCode(entry.getValue());
// 正常响应
if (error == Errors.NONE) {
log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
if (subscriptions.isAssigned(tp)) {
// 更新分区对应的消费状态
subscriptions.committed(tp, offsetAndMetadata);
}
}
// ... 省略异常响应处理的过程
}

if (!unauthorizedTopics.isEmpty()) {
log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
future.complete(null);
}
}

这里主要看一下对于正常响应的处理过程,一个消费者可能同时消费多个 topic 分区,前面我们已经说过对于每个 topic 分区的消费状态都是独立维护和提交的,所以对于响应的处理也需要针对每个 topic 分区进行单独处理。如果是正常响应,方法首先会确认对应 topic 分区是否分配给当前消费者,如果是的话则会更新对应的分区消费状态的 TopicPartitionState#committed 字段,本质上也就是将 TopicPartitionState#position 字段记录的消费 offset 值封装成 OffsetAndMetadata 对象进行赋值。

2.4.2 异步 offset 提交策略

下面继续看一下异步 offset 提交的过程,我们从 ConsumerCoordinator#maybeAutoCommitOffsetsAsync 方法切入。前面我们在分析 ConsumerCoordinator#poll 方法时曾提到了对于该方法的调用,方法的实现比较简单(如下),即判断是否启用了自动提交,如果启用了的话则首先判断对应的 GroupCoordinator 实例所在节点是否可用,如果不可用则修改下一次自动提交的时间戳,延迟到下一次执行,如果目标 GroupCoordinator 节点是可用的,同时自动提交的时间已到,则执行异步提交操作。

1
2
3
4
5
6
7
8
9
10
11
12
private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (this.coordinatorUnknown()) {
// 目标 GroupCoordinator 节点不可达,稍后再试
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
// 时间已到,执行异步自动提交
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
this.doAutoCommitOffsetsAsync();
}
}
}

其中,方法 ConsumerCoordinator#doAutoCommitOffsetsAsync 除了在这里被调用之外,也会在 ConsumerCoordinator#maybeAutoCommitOffsetsNow 方法中被调用,用于立即发起一次异步提交 offset 请求,实现比较简单,不在此贴出。方法 ConsumerCoordinator#doAutoCommitOffsetsAsync 的实现也比较简单,核心逻辑就是获取当前消费者消费的所有 topic 分区对应的消费状态信息,然后调用 ConsumerCoordinator#commitOffsetsAsync 方法执行异步提交操作。该方法首先会触发注册的监听 offset 提交完成的监听器,然后判断目标 GroupCoordinator 实例所在节点是否可用。如果可用则继续执行异步提交操作,如果不可用则会尝试寻找可用且负载最小的节点,并在找到的前提下继续执行异步提交,否则返回异常。

执行异步提交的核心实现位于 ConsumerCoordinator#doCommitOffsetsAsync 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// 标记需要从 GroupCoordinator 节点获取最近提交的 offset 值
subscriptions.needRefreshCommits();
// 创建并发送 OffsetCommitRequest 请求
RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
// 封装 callback,用于监听 offset 提交结果
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
if (interceptors != null) {
interceptors.onCommit(offsets);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}

@Override
public void onFailure(RuntimeException e) {
Exception commitException = e;
if (e instanceof RetriableException) {
commitException = new RetriableCommitFailedException(e);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
}
});
}

执行提交 OffsetCommitRequest 请求之前,方法首先会标记需要从 GroupCoordinator 实例所在节点获取最近提交的 offset 值,这里的标记主要用于通知那些本地未有效记录分区消费状态的消费者。然后构造并缓存 OffsetCommitRequest 请求对象,等待下次 poll 操作时一并发送,方法 ConsumerCoordinator#sendOffsetCommitRequest 的执行逻辑在前面已经分析过,这里不再重复撰述。

异步发送和同步发送主要的区别在于是否立即调用 ConsumerNetworkClient#poll 方法阻塞发送请求,并处理响应结果。前面在介绍同步提交(ConsumerCoordinator#commitOffsetsSync 方法)时可以看到在构建完 OffsetCommitRequest 请求之后会立即执行 poll 方法,而在异步提交时,构建完 OffsetCommitRequest 请求之后并不会立即发送请求,而是会等到下一次执行 poll 方法时一并发送,并通过回调的方式处理响应结果。

三. 总结

本文我们介绍了 java 版本的 KafkaConsumer 的使用,并深入分析了相关设计和实现,了解一个 group 名下的所有消费者区分 leader 和 follower 角色,其中 leader 角色除了肩负普通消费者的职责外,还需要负责管理整个 group 的运行状态。此外,消费者在拉取消息时的预取策略虽然在设计上很简单,却很好的利用了消息拉取和消费这两者之间能够并行执行的特点,极大提升了消费者的运行性能。而分区再分配机制则为 kafka 提供了良好的扩展性,保证在 topic 分区数据发生变化,以及消费者上下线时能够不停服,继续正常对外提供服务。

到此为止,关于 kafka SDK 的相关实现已经基本介绍完了,从下一篇开始我们将转战服务端,分析 kafka 集群各个组件的设计与实现。


转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.