本节重点讨论 Kafka 的消息拉起流程。
@(本节目录)
消息拉起主要入口为:KafkaConsumer#poll方法,其声明如下:
~java~
public ConsumerRecords<K, V> poll(final Duration timeout) { // @1
return poll(time.timer(timeout), true); // @2
}
代码@1:参数为超时时间,使用 java 的 Duration 来定义。
代码@2:调用内部的 poll 方法。
KafkaConsumer#poll
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { // @1
acquireAndEnsureOpen(); // @2
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { // @3
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
// poll for new data until the timeout expires
do { // @4
client.maybeTriggerWakeup(); //@5
if (includeMetadataInTimeout) { // @6
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); // @7
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // @8
client.pollNoWakeup();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records)); // @9
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
代码@1:首先先对其参数含义进行讲解。
代码@2:检查是否可以拉取消息,其主要判断依据如下:
代码@3:如果当前消费者未订阅任何主题或者没有指定队列,则抛出错误,结束本次消息拉取。
代码@4:使用 do while 结构循环拉取消息,直到超时或拉取到消息。
代码@5:避免在禁止禁用wakeup时,有请求想唤醒时则抛出异常,例如在下面的@8时,会禁用wakeup。
代码@6:更新相关元数据,为真正向 broker 发送消息拉取请求做好准备,该方法将在下面详细介绍,现在先简单介绍其核心实现点:
这里会有一个更新元数据是否占用消息拉取的超时时间,默认为 true。
代码@7:调用 pollForFetches 向broker拉取消息,该方法将在下文详细介绍。
代码@8:如果拉取到的消息集合不为空,再返回该批消息之前,如果还有挤压的拉取请求,可以继续发送拉取请求,但此时会禁用warkup,主要的目的是用户在处理消息时,KafkaConsumer 还可以继续向broker 拉取消息。
代码@9:执行消费拦截器。
接下来对上文提到的代码@6、@7进行详细介绍。
KafkaConsumer#updateAssignmentMetadataIfNeeded
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (coordinator != null && !coordinator.poll(timer)) { // @1
return false;
}
return updateFetchPositions(timer); // @2
}
要理解这个方法实现的用途,我们就必须依次对 coordinator.poll 方法与 updateFetchPositions 方法。
public boolean poll(Timer timer) {
invokeCompletedOffsetCommitCallbacks(); // @1
if (subscriptions.partitionsAutoAssigned()) { // @2
pollHeartbeat(timer.currentTimeMs()); // @21
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { //@22
return false;
}
if (rejoinNeededOrPending()) { // @23
if (subscriptions.hasPatternSubscription()) { // @231
if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
this.metadata.requestUpdate();
}
if (!client.ensureFreshMetadata(timer)) {
return false;
}
}
if (!ensureActiveGroup(timer)) { // @232
return false;
}
}
} else { // @3
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); // @4
return true;
}
代码@1:执行已完成的 offset (消费进度)提交请求的回调函数。
代码@2:队列负载算法为自动分配(即 Kafka 根据消费者个数与分区书动态负载分区)的相关的处理逻辑。其实现关键点如下:
代码@3:用户手动为消费组指定负载的队列的相关处理逻辑,其实现关键如下:
代码@4:如果开启了自动提交消费进度,并且已到下一次提交时间,则提交。Kafka 消费者可以通过设置属性 enable.auto.commit 来开启自动提交,该参数默认为 true,则默认会每隔 5s 提交一次消费进度,提交间隔可以通过参数 auto.commit.interval.ms 设置。
接下来继续探讨 updateAssignmentMetadataIfNeeded (更新元数据)的第二个步骤,更新拉取位移。
KafkaConsumer#updateFetchPositions
private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHashAllFetchPositions) { // @1
return true;
}
if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) // @2
return false;
subscriptions.resetMissingPositions(); // @3
fetcher.resetOffsetsIfNeeded(); // @4
return true;
}
代码@1:如果订阅关系中的所有分区都有有效的位移,则返回 true。
代码@2:如果存在任意一个分区没有有效的位移信息,则需要向 broker 发送请求,从broker 获取该消费组,该分区的消费进度。相关的实现细节将在后续文章【Kafka 消费进度】专题文章中详细介绍。
代码@3:如果经过第二步,订阅关系中还某些分区还是没有获取到有效的偏移量,则使用偏移量重置策略进行重置,如果未配置,则抛出异常。
代码@4:发送一个异步请求去重置那些正等待重置位置的分区。有关 Kafka 消费消费进度、重平衡等知识将会在后续文章中深入探讨,本文只需了解 poll 消息的核心处理流程。
从 KafkaConsumer#poll 中流程可以看到,通过 updateAssignmentMetadataIfNeeded 对元数据、重平衡,更新拉取偏移量等工作处理完成后,下一步就是需要向 broker 拉取消息了,其实现入口为:KafkaConsumer 的 pollForFetches 方法。
KafkaConsumer#pollForFetches
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // @1
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // @2
if (!records.isEmpty()) {
return records;
}
fetcher.sendFetches(); // @3
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { // @4
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
return !fetcher.hasCompletedFetches();
}); // @5
timer.update(pollTimer.currentTimeMs()); // @6
if (coordinator != null && coordinator.rejoinNeededOrPending()) { // @7
return Collections.emptyMap();
}
return fetcher.fetchedRecords(); // @8
}
代码@1:计算本次拉取的超时时间,其计算逻辑如下:
代码@2:如果数据已经拉回到本地,直接返回数据。将在下文详细介绍 Fetcher 的 fetchedRecords 方法。
代码@3:组装发送请求,并将存储在待发送请求列表中。
代码@4:如果已缓存的分区信息中存在某些分区缺少偏移量,如果拉取的超时时间大于失败重试需要阻塞的时间,则更新此次拉取的超时时间为失败重试需要的间隔时间,主要的目的是不希望在 poll 过程中被阻塞【后续会详细介绍 Kafka 拉取消息的线程模型,再来回顾一下这里】。
代码@5:通过调用NetworkClient 的 poll 方法发起消息拉取操作(触发网络读写)。
代码@6:更新本次拉取的时间。
代码@7:检查是需要重平衡。
代码@8:将从 broker 读取到的数据返回(即封装成消息)。
从上面消息拉取流程来看,有几个比较重要的方法,例如 Fetcher 类相关的方法,NetworkClient 的 poll 方法,那我们接下来来重点探讨。
我们先用一张流程图总结一下消息拉取的全过程:
接下来我们将重点看一下 KafkaConsumer 的 pollForFetches 详细过程,也就是需要详细探究 Fetcher 类的实现细节。
Fetcher 封装消息拉取的方法,可以看成是消息拉取的门面类。
我们首先一一介绍一下 Fetcher 的核心属性与核心方法。
接下来我们将按照消息流程,一起来看一下 Fetcher 的核心方法。
Fetcher#fetchedRecords
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); // @1
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) { // @2
if (nextInLineRecords == null || nextInLineRecords.isFetched) { // @3
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
try {
nextInLineRecords = parseCompletedFetch(completedFetch);
} catch (Exception e) {
FetchResponse.PartitionData partition = completedFetch.partitionData;
if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
completedFetches.poll();
}
throw e;
}
completedFetches.poll();
} else { // @4
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
代码@1:首先先解释两个局部变量的含义:
代码@2:循环去取已经完成了 Fetch 请求的消息,该 while 循环有两个跳出条件:
代码@3、@4 主要完成从缓存中解析数据的两个步骤,初次运行的时候,会进入分支@3,然后从 调用 parseCompletedFetch 解析成 PartitionRecords 对象,然后代码@4的职责就是从解析 PartitionRecords ,将消息封装成 ConsumerRecord,返回给消费端线程处理。
代码@3的实现要点如下:
从上面可知,上述方法的核心方法是:parseCompletedFetch。
代码@4的实现要点无非就是调用 fetchRecords 方法,按分区组装成 Map<TopicPartition, List<ConsumerRecord<K, V>>>,供消费者处理,例如供业务处理。
接下来将重点探讨上述两个方法的实现细节。
在尝试探讨该方法之前,我们首先对其入参进行一个梳理,特别是先认识其主要数据结构。
1、CompletedFetch 相关类图
从上图可以看出,CompleteFetch 核心属性主要如下:
分区的数据是使用 PartitionData 来进行封装的。我们也来简单的了解一下其内部数据结果。
2、parseCompletedFetch 详解
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
PartitionRecords partitionRecords = null;
Errors error = partition.error;
try {
if (!subscriptions.isFetchable(tp)) { // @1
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (error == Errors.NONE) { // @2
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) { // @21
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
partition.records.sizeInBytes(), tp, position);
Iterator<? extends RecordBatch> batches = partition.records.batches().iterator(); // @22
partitionRecords = new PartitionRecords(tp, completedFetch, batches);
if (!batches.hasNext() && partition.records.sizeInBytes() > 0) { // @23
if (completedFetch.responseVersion < 3) {
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark >= 0) { // @24
log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
if (partition.logStartOffset >= 0) { // @25
log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
}
if (partition.lastStableOffset >= 0) { // @26
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR) { // @3
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { // @4
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) { // @5
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
subscriptions.requestOffsetReset(tp);
} else {
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { // @6
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally { // @7
if (partitionRecords == null)
completedFetch.metricAggregator.record(tp, 0, 0);
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
}
return partitionRecords;
}
上面的代码虽然比较长,其实整体还是比较简单,只是需要针对各种异常处理,打印对应的日志,接下来详细介绍该方法的实现关键点。
代码@1:判断该分区是否可拉取,如果不可拉取,则忽略这批拉取的消息,判断是可拉取的要点如下:
代码@2:该分支是处理正常返回的相关逻辑。其关键点如下:
从代码@3到@8 是多种异常信息的处理。
代码@3:如果出现如下3种错误码,则使用 debug 打印错误日志,并且向服务端请求元数据并更新本地缓存。
Kafka 认为上述错误是可恢复的,而且对消费不会造成太大影响,故只是用 debug 打印日志,然后更新本地缓存即可。
代码@4:如果出现 UNKNOWN_TOPIC_OR_PARTITION 未知主题与分区时,则使用 warn 级别输出错误日志,并更新元数据。
代码@5:针对 OFFSET_OUT_OF_RANGE 偏移量超过范围异常的处理逻辑,其实现关键点如下:
代码@6:如果是 TOPIC_AUTHORIZATION_FAILED 没有权限(ACL)则抛出异常。
代码@7:如果本次拉取的结果不是NONE(成功),并且是可恢复的,将该队列的订阅关系移动到消费者缓存列表的末尾。如果成功,则返回拉取到的分区数据,其封装对象为 PartitionRecords。
接下来我们再来看看 2.1.1 fetchedRecords 中的另外一个核心方法。
在介绍该方法之前同样先来看一下参数 PartitionRecords 的内部结构。
1、PartitionRecords 类图
主要的核心属性如下:
2、fetchRecords 详解
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) { // @1
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
partitionRecords.partition);
} else if (!subscriptions.isFetchable(partitionRecords.partition)) { // @2
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition); // @3
if (partitionRecords.nextFetchOffset == position) { // @4
List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
subscriptions.position(partitionRecords.partition, nextOffset);
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
Long lead = subscriptions.partitionLead(partitionRecords.partition);
if (lead != null) {
this.sensors.recordPartitionLead(partitionRecords.partition, lead);
}
return partRecords;
} else { // @5
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition, partitionRecords.nextFetchOffset, position);
}
}
partitionRecords.drain();
return emptyList();
}
代码@1:从 PartitionRecords 中提取消息之前,再次判断订阅消息中是否包含当前分区,如果不包含,则使用 debug 打印日志,很有可能是发生了重平衡。
代码@2:是否允许拉取,如果用户主动暂停消费,则忽略本次拉取的消息。备注:Kafka 消费端如果消费太快,可以进行限流。
代码@3:从本地消费者缓存中获取该队列已消费的偏移量,在发送拉取消息时,就是从该偏移量开始拉取的。
代码@4:如果本地缓存已消费偏移量与从服务端拉回的起始偏移量相等的话,则认为是一个有效拉取,否则则认为是一个过期的拉取,该批消息已被消费,见代码@5。如果是一个有效请求,则使用 sensors 收集统计信息,并返回拉取到的消息, 返回结果被封装在 List<ConsumerRecord<K, V>> 。
“发送” fetch 请求,注意这里并不会触发网络操作,而是组装拉取请求,将其放入网络缓存区。
Fetcher#sendFetches
```java
public synchronized int sendFetches() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); // @1
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { // @2
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget()); // @3
client.send(fetchTarget, request) // @4
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) { // @5
synchronized (Fetcher.this) {
@SuppressWarnings("unchecked")
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry :
response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData<Records> fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
} // @6
sensors.fetchLatency.record(resp.requestLatencyMs());
}
}
public void onFailure(RuntimeException e) { // @7
synchronized (Fetcher.this) {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
}
}
});
}
return fetchRequestMap.size();
}
```java
上面的方法比较长,其实现的关键点如下:
代码@1:通过调用 Fetcher 的 prepareFetchRequests 方法按节点组装拉取请求,将在后面详细介绍。
代码@2:遍历上面的待发请求,进一步组装请求。下面就是分节点发送拉取请求。
代码@3:构建 FetchRequest 拉取请求对象。
代码@4:调用 NetworkClient 的 send 方法将其发送到发送缓存区,本文不会详细介绍网络方面的实现,但下文会截图说明拉取请求发送缓存区的一个关键点。
代码@5:这里会注册事件监听器,当消息从 broker 拉取到本地后触发回调,即消息拉取请求收到返回结果后会将返回结果放入到completedFetches 中(代码@6),这就和上文消息拉取时 Fetcher 的 fetchedRecords 方法形成闭环。
代码@7:消息拉取一次处理。
接下来详细介绍 prepareFetchRequests 方法。
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) { // @1
Node node = metadata.partitionInfoIfCurrent(partition).map(PartitionInfo::leader).orElse(null); // @2
if (node == null) { // @3
metadata.requestUpdate();
} else if (client.isUnavailable(node)) { // @4
client.maybeThrowAuthFailure(node);
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
} else if (client.hasPendingRequests(node)) { // @5
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new fetch
FetchSessionHandler.Builder builder = fetchable.get(node); // @7
if (builder == null) {
FetchSessionHandler handler = sessionHandler(node.id());
if (handler == null) {
handler = new FetchSessionHandler(logContext, node.id());
sessionHandlers.put(node.id(), handler);
}
builder = handler.newBuilder();
fetchable.put(node, builder);
}
long position = this.subscriptions.position(partition);
builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
this.fetchSize, Optional.empty()));
log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,
partition, position, node);
}
}
Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>();
for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) {
reqs.put(entry.getKey(), entry.getValue().build());
}
return reqs;
}
代码@1:首先通过调用 fetchablePartitions() 获取可发起拉取任务的分区信息,下文简单介绍一下。
代码@2:如果该分区在客户端本地缓存中获取该分区的 Leader 节点信息。
代码@3:如果其 Leader 节点信息为空,则发起更新元数据请求,本次拉取任务将不会包含该分区。
代码@4:如果客户端与该分区的 Leader 连接为完成,如果是因为权限的原因则抛出ACL相关异常,否则打印日志,本次拉取请求不会包含该分区。
代码@5:判断该节点是否有挂起的拉取请求,即发送缓存区中是待发送的请求,如果有,本次将不会被拉取。
代码@6:构建拉取请求,分节点组织请求。
NetworkClient 的 send 方法只是将其放入 unsent 中。
与上文的 client.hasPendingRequests(node) 方法遥相呼应。
3、总结
上面的源码分析有点长,也有点枯燥,我们还是画一张流程图来进行总结。
Kafka 的消息拉取流程还是比较复杂的,后面会基于上述流程,重点进行拆解,例如消费进度提交,负载队列重平衡等等。
作者介绍:
丁威,《RocketMQ技术内幕》作者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。欢迎加入我的知识星球,构建一个高质量的技术交流社群。
原文:https://www.cnblogs.com/dingwpmz/p/12198667.html