FastLeaderElection.lookForLeader 选举核心方法 public Vote lookForLeader() throws InterruptedException { try { // 收票箱(节点状态是LOOKING发来的投票信息) HashMap<Long, Vote> recvVoteSet = new HashMap<>(); // 收票箱(选举结束节点发来的投票信息) HashMap<Long, Vote> outOfElection = new HashMap<>(); synchronized (this) { // 当前节点选举轮次 logicalClock.incrementAndGet(); // 初始化投票参数 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // 群发Vote sendNotifications(); // LOOKING & not stop while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { Notification n = recvQueue.poll(200, TimeUnit.MILLISECONDS); if (n == null) { if (manager.haveDelivered()) { sendNotifications(); } else { manager.connectAll(); } } else if (validVoter(n.sid) && validVoter(n.leader)) { switch (n.state) { // 对方选举状态(LOOKING) case LOOKING: // 如果对方选举轮次大于本节点选举轮次,投票箱直接清空,并更新本节点的投票轮次 if (n.electionEpoch > logicalClock.get()) { logicalClock.set(n.electionEpoch); recvVoteSet.clear(); // 如果对方发来的选举信息获胜 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { // 更新选举信息 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); // 如果对方选举轮次小于本节点选举轮次 } else if (n.electionEpoch < logicalClock.get()) { // 直接忽略(不加入投票箱),WorkerReceiver里直接做了处理 break; // 如果处于同一选举轮次(这种情况比较多) } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { // 如果对方投票信息获胜,更新选举信息,并群发 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } // 投票箱存入接收到的投票信息 recvVoteSet.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 接收到的当前投票信息(统计过半) if (termPredicate(recvVoteSet, new Vote(proposedLeader, proposedZxid, logicalClock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while ((n = recvQueue.poll(200, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvQueue.put(n); break; } } // 如果选票过半,且200ms内没新选票来,则选举初步结束 if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING : learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalClock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: break; case FOLLOWING: case LEADING: // 当前节点和对方处于同一选举轮次 if (n.electionEpoch == logicalClock.get()) { recvVoteSet.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (ooePredicate(recvVoteSet, outOfElection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } outOfElection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (ooePredicate(outOfElection, outOfElection, n)) { synchronized (this) { logicalClock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if (self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }
原文:https://www.cnblogs.com/zhwcs/p/13755069.html