public void run(){ MessagingService.instance().waitUntilListening(); /* Update the local heartbeat counter. */ endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); Gossiper.instance.makeRandomGossipDigest(gDigests); if (gDigests.size() > 0){ GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), gDigests); MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); /* Gossip to some random live member */ boolean gossipedToSeed = doGossipToLiveMember(message); /* Gossip to some unreachable member with some probability to check if he is back up */ doGossipToUnreachableMember(message); if (!gossipedToSeed || liveEndpoints.size() < seeds.size()) doGossipToSeed(message); doStatusCheck(); } }主要做了几下几件事:
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());可见这三种消息分别对应三个消息类型GOSSIP_DIGEST_SYN、GOSSIP_DIGEST_ACK、GOSSIP_DIGEST_ACK2.
public void doVerb(MessageIn<GossipDigestSyn> message, int id){ //...check List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); doSort(gDigestList); List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>(); Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size()); MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer); Gossiper.instance.checkSeedContact(from); MessagingService.instance().sendOneWay(gDigestAckMessage, from); }1、对接受到的消息排序:先按generation排序,如果generation相同,按maxVersion与本地版本差排序;
public void doVerb(MessageIn<GossipDigestAck> message, int id){ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); if (epStateMap.size() > 0) { Gossiper.instance.notifyFailureDetector(epStateMap); Gossiper.instance.applyStateLocally(epStateMap); } Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); for (GossipDigest gDigest : gDigestList) { InetAddress addr = gDigest.getEndpoint(); EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); if (localEpStatePtr != null) deltaEpStateMap.put(addr, localEpStatePtr); } MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap), GossipDigestAck2.serializer); MessagingService.instance().sendOneWay(gDigestAck2Message, from); }1、epStateMap 是from节点想要告诉它的消息,调用applyStateLocally方法进行更新;
Cassandra学习笔记之Gossip协议,布布扣,bubuko.com
原文:http://blog.csdn.net/aesop_wubo/article/details/20401431