首页 > 其他 > 详细

zookeeper选举-lookForLeader

时间:2020-09-30 16:45:18      阅读:37      评论:0      收藏:0      [点我收藏+]
FastLeaderElection.lookForLeader
选举核心方法

public Vote lookForLeader() throws InterruptedException {
        try {
            // 收票箱(节点状态是LOOKING发来的投票信息)
            HashMap<Long, Vote> recvVoteSet = new HashMap<>();
            // 收票箱(选举结束节点发来的投票信息)
            HashMap<Long, Vote> outOfElection = new HashMap<>();

            synchronized (this) {
                // 当前节点选举轮次
                logicalClock.incrementAndGet();
                // 初始化投票参数
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }
            // 群发Vote
            sendNotifications();

            // LOOKING & not stop
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                Notification n = recvQueue.poll(200, TimeUnit.MILLISECONDS);
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    switch (n.state) {
                        // 对方选举状态(LOOKING)
                        case LOOKING:
                            // 如果对方选举轮次大于本节点选举轮次,投票箱直接清空,并更新本节点的投票轮次
                            if (n.electionEpoch > logicalClock.get()) {
                                logicalClock.set(n.electionEpoch);
                                recvVoteSet.clear();
                                // 如果对方发来的选举信息获胜
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                    // 更新选举信息
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                } else {
                                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                                }
                                sendNotifications();
                                // 如果对方选举轮次小于本节点选举轮次
                            } else if (n.electionEpoch < logicalClock.get()) {
                                // 直接忽略(不加入投票箱),WorkerReceiver里直接做了处理
                                break;
                                // 如果处于同一选举轮次(这种情况比较多)
                            } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                // 如果对方投票信息获胜,更新选举信息,并群发
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                                sendNotifications();
                            }
                            // 投票箱存入接收到的投票信息
                            recvVoteSet.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                            // 接收到的当前投票信息(统计过半)
                            if (termPredicate(recvVoteSet, new Vote(proposedLeader, proposedZxid,
                                    logicalClock.get(), proposedEpoch))) {

                                // Verify if there is any change in the proposed leader
                                while ((n = recvQueue.poll(200, TimeUnit.MILLISECONDS)) != null) {
                                    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                            proposedLeader, proposedZxid, proposedEpoch)) {
                                        recvQueue.put(n);
                                        break;
                                    }
                                }
                                // 如果选票过半,且200ms内没新选票来,则选举初步结束
                                if (n == null) {
                                    self.setPeerState((proposedLeader == self.getId()) ?
                                            ServerState.LEADING : learningState());

                                    Vote endVote = new Vote(proposedLeader, proposedZxid, logicalClock.get(), proposedEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }
                            break;
                        case OBSERVING:
                            break;
                        case FOLLOWING:
                        case LEADING:
                            // 当前节点和对方处于同一选举轮次
                            if (n.electionEpoch == logicalClock.get()) {
                                recvVoteSet.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                                if (ooePredicate(recvVoteSet, outOfElection, n)) {
                                    self.setPeerState((n.leader == self.getId()) ?
                                            ServerState.LEADING : learningState());

                                    Vote endVote = new Vote(n.leader,
                                            n.zxid,
                                            n.electionEpoch,
                                            n.peerEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                            }

                            outOfElection.put(n.sid, new Vote(n.version,
                                    n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch,
                                    n.state));

                            if (ooePredicate(outOfElection, outOfElection, n)) {
                                synchronized (this) {
                                    logicalClock.set(n.electionEpoch);
                                    self.setPeerState((n.leader == self.getId()) ?
                                            ServerState.LEADING : learningState());
                                }
                                Vote endVote = new Vote(n.leader,
                                        n.zxid,
                                        n.electionEpoch,
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                            break;
                        default:
                            LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid);
                            break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

  

zookeeper选举-lookForLeader

原文:https://www.cnblogs.com/zhwcs/p/13755069.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!