public void run(){
MessagingService.instance().waitUntilListening();
/* Update the local heartbeat counter. */
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
Gossiper.instance.makeRandomGossipDigest(gDigests);
if (gDigests.size() > 0){
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
DatabaseDescriptor.getPartitionerName(),
gDigests);
MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
digestSynMessage,
GossipDigestSyn.serializer);
/* Gossip to some random live member */
boolean gossipedToSeed = doGossipToLiveMember(message);
/* Gossip to some unreachable member with some probability to check if he is back up */
doGossipToUnreachableMember(message);
if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
doGossipToSeed(message);
doStatusCheck();
}
}主要做了几下几件事: MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());可见这三种消息分别对应三个消息类型GOSSIP_DIGEST_SYN、GOSSIP_DIGEST_ACK、GOSSIP_DIGEST_ACK2.public void doVerb(MessageIn<GossipDigestSyn> message, int id){
//...check
List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
doSort(gDigestList);
List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
GossipDigestAck.serializer);
Gossiper.instance.checkSeedContact(from);
MessagingService.instance().sendOneWay(gDigestAckMessage, from);
}1、对接受到的消息排序:先按generation排序,如果generation相同,按maxVersion与本地版本差排序; public void doVerb(MessageIn<GossipDigestAck> message, int id){
List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
if (epStateMap.size() > 0)
{
Gossiper.instance.notifyFailureDetector(epStateMap);
Gossiper.instance.applyStateLocally(epStateMap);
}
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
{
InetAddress addr = gDigest.getEndpoint();
EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
if (localEpStatePtr != null)
deltaEpStateMap.put(addr, localEpStatePtr);
}
MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
new GossipDigestAck2(deltaEpStateMap),
GossipDigestAck2.serializer);
MessagingService.instance().sendOneWay(gDigestAck2Message, from);
}1、epStateMap 是from节点想要告诉它的消息,调用applyStateLocally方法进行更新;Cassandra学习笔记之Gossip协议,布布扣,bubuko.com
原文:http://blog.csdn.net/aesop_wubo/article/details/20401431