PartitionAssignor
定义,内置三种分配策略RangeAssignor
、RoundRobinAssignor
、StickyAssignor
,支持自定义策略。ConsumerCoordinator
和服务端的组协调器GroupCoordinator
通过心跳不断保持通信。Node
,即具有最少的InFlightRequests
的节点。JoinGroupResponseHandler
对响应进行处理,响应包含generationId
、memberId
、leaderId
、protocol
。memberId=leaderId
,则需要根据分配策略protocol
计算分区分配。SyncGroupResponseHandler
对上述请求的响应进行处理。UNJOINED
,表示未加入消费组。REBALANCING
,表示再平衡状态STABLE
,表示稳定状态,如果是失败的消息,状态重置为UNJOINED
STABLE
,则不发送心跳。SubscriptionState
。LATEST
, EARLIEST
,NONE
。Future
。boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
final long startMs = time.milliseconds();
if (!coordinator.poll(timeoutMs)) { // 获取协调器
return false;
}
// 更新偏移量
return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
}
// 获取协调器
public boolean poll(final long timeoutMs) {
final long startTime = time.milliseconds();
long currentTime = startTime;
long elapsed = 0L;
if (subscriptions.partitionsAutoAssigned()) { // 是自动分配主题类型
// 更新心跳的上一次的轮询时间
pollHeartbeat(currentTime);
if (coordinatorUnknown()) { // 协调器未知
// 确保协调器已经 ready
if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
}
if (rejoinNeededOrPending()) { // 需要加入消费组
// 加入组、同步组
if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
currentTime = time.milliseconds();
}
} else { // 指定分区类型
if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// 如果没有准备就绪的节点
// 阻塞等待元数据更新
final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
return false; // 更新元数据失败
}
currentTime = time.milliseconds();
}
}
maybeAutoCommitOffsetsAsync(currentTime); // 异步自动提交偏移量
return true;
}
// 确保协调器已经 ready
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
final long startTimeMs = time.milliseconds();
long elapsedTime = 0L;
while (coordinatorUnknown()) { // 如果协调器未知
final RequestFuture<Void> future = lookupCoordinator(); // 向当前请求队列最少的节点,发送获取协调器的请求
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!future.isDone()) {
break; // 响应未完成,退出
}
}
return !coordinatorUnknown();
}
// 加入组、同步组
boolean ensureActiveGroup(long timeoutMs, long startMs) {
startHeartbeatThreadIfNeeded(); // 启动心跳线程
return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
}
boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
long elapsedTime = 0L;
while (rejoinNeededOrPending()) {
// 发送加入组请求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!future.isDone()) {
// we ran out of time
return false;
}
if (future.succeeded()) { // 加入成功,回调处理响应,更新缓存的分区分配
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
}
}
return true;
}
// 发送加入组请求
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
disableHeartbeatThread(); // 暂停心跳线程
state = MemberState.REBALANCING; // 状态改为 REBALANCING
joinFuture = sendJoinGroupRequest(); // 向协调器发送加入组请求
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // 响应监听器
@Override
public void onSuccess(ByteBuffer value) { // 成功
synchronized (AbstractCoordinator.this) {
state = MemberState.STABLE; // 状态改为 STABLE
rejoinNeeded = false; // 不需要加入了
if (heartbeatThread != null)
heartbeatThread.enable(); // 启动暂停了的心跳
}
}
@Override
public void onFailure(RuntimeException e) { // 失败
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED; // 状态改为 UNJOINED
}
}
});
}
return joinFuture;
}
// 向协调器发送加入组请求
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
groupId,
this.sessionTimeoutMs,
this.generation.memberId,
protocolType(),
metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler()); // 异步回调响应处理类
}
// 异步回调响应处理类
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) { // 如果是 REBALANCING,状态异常
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
if (joinResponse.isLeader()) { // 当前消费组是 leader
onJoinLeader(joinResponse).chain(future);
} else { // 当消费者是 follower
onJoinFollower().chain(future);
}
}
}
}
}
}
// 发送 leader 消费者同步组请求
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 根据响应的分配策略,给消费者分配分区
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
// 发送 follower 消费者同步组请求
private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
Collections.<String, ByteBuffer>emptyMap()); // 发送不带分配信息的请求
return sendSyncGroupRequest(requestBuilder);
}
原文:https://www.cnblogs.com/bigshark/p/11198481.html