void sentinelTimer(void) { //check是否需要进入TITL模式 sentinelCheckTiltCondition(); //执行定期操作(检查redis-server状态,和其他sentinel节点交互等) sentinelHandleDictOfRedisInstances(sentinel.masters); //运行等待执行的脚本 sentinelRunPendingScripts(); //清理已执行完毕脚本 sentinelCollectTerminatedScripts(); //杀死超时运行的脚本 sentinelKillTimedoutScripts(); //修改hz值(影响sentinel相关操作执行频率),引入随机值,尽量避免所有sentinel节点持续性的同一时间发起投票请求 server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ; }
void sentinelCheckTiltCondition(void) { mstime_t now = mstime(); mstime_t delta = now - sentinel.previous_time; //两次执行时间<0或者大于2s,则进入TITL模式 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { sentinel.tilt = 1; sentinel.tilt_start_time = mstime(); sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered"); } sentinel.previous_time = mstime(); }
void sentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL; //遍历获取所有master结点 di = dictGetIterator(instances); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); //执行结点的周期性操作 sentinelHandleRedisInstance(ri); // 如果被遍历的是master,则遍历和该master关联的所有slave&sentinel if (ri->flags & SRI_MASTER) { sentinelHandleDictOfRedisInstances(ri->slaves); sentinelHandleDictOfRedisInstances(ri->sentinels); //如果master的状态为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,则准备执行failover if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { switch_to_promoted = ri; } } } //执行failover if (switch_to_promoted) sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); dictReleaseIterator(di); }
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { /* 以下为所有实例都需要执行的操作 */ //连接及订阅管理 sentinelReconnectInstance(ri); //和instance交流(PING/INFO/PUBLISH) sentinelPingInstance(ri); //如果仍然处于TILT模式,啥也不干 if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; sentinel.tilt = 0; sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited"); } //判断instance是否下线(sdown) sentinelCheckSubjectivelyDown(ri); ...... /* 以下操作只针对master instance*/ if (ri->flags & SRI_MASTER) { //check master是否为odown(满足用户配置的quorum节点数判断master为sdown) sentinelCheckObjectivelyDown(ri); //check是否需要做fail over,如果确认需要,则调用sentinelStartFailover修改自身状态 if (sentinelStartFailoverIfNeeded(ri)) //发送SENTINEL is-master-down-by-addr给其他的sentinel,并注册毁掉函数 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); //执行故障转移 sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } }
void sentinelReconnectInstance(sentinelRedisInstance *ri) { if (!(ri->flags & SRI_DISCONNECTED)) return; //和master/slave/sentinel instance建立连接 if (ri->cc == NULL) { ...... } //针对master/slave,订阅其“__sentinel__:hello”频道 if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) { ...... retval = redisAsyncCommand(ri->pc, sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s", SENTINEL_HELLO_CHANNEL); ...... } ...... }
void sentinelPingInstance(sentinelRedisInstance *ri) { //假如instance处于不可连接状态或者过多的命令(100)还没有发送出去,直接返回 if (ri->flags & SRI_DISCONNECTED) return; if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return; //对于slave instance,如果其master处于异常状态(SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS),则向该slave发送info的频率从10s一发提高到1s一发 if ((ri->flags & SRI_SLAVE) && (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; } //对于mastere/slave instance,每隔info_period时间,向其发送info命令,注册info命令的回调函数为sentinelInfoReplyCallback //sentinelInfoReplyCallback会根据从master/slave所得到的回复中分析出相关信息,并更新sentinelRedisInstance的当前状态 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { /* Send INFO to masters and slaves, not sentinels. */ retval = redisAsyncCommand(ri->cc, sentinelInfoReplyCallback, NULL, "INFO"); if (retval != REDIS_OK) return; ri->pending_commands++; } //对于所有类型的instance,都定时向其发送PING命令(1s),注册ping命令的回调函数为sentinelPingReplyCallback //sentinelPingReplyCallback根据PING命令的返回值判断instance当前状态 else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) { retval = redisAsyncCommand(ri->cc, sentinelPingReplyCallback, NULL, "PING"); if (retval != REDIS_OK) return; ri->pending_commands++; //每隔2s向master/slave的“__sentinel__:hello”频道发布消息 //消息内容为:ip,port,runid,current_epoch, master->name,master->ip,master->port } else if ((ri->flags & SRI_SENTINEL) == 0 && (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { sentinelSendHello(ri); } }
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { ...... //假如本身的状态为sdown,则开始判断是否可以判断为odown if (master->flags & SRI_S_DOWN) { quorum = 1; di = dictGetIterator(master->sentinels); //遍历sentinel字典,查看其是否将master状态职位sdown while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); //假如sentinel flag状态为SRI_MASTER_DOWN的sentinel个数达到用户定义的quorum个数,则将master状态置为odown if (quorum >= master->quorum) odown = 1; } ...... }
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { //确认master状态为odown if (!(master->flags & SRI_O_DOWN)) return 0; //确认failover没有在运行 if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0; //确认在超时时间*2内没有failover在运行 if (mstime() - master->failover_start_time < master->failover_timeout*2) return 0; sentinelStartFailover(master); return 1; }
void sentinelStartFailover(sentinelRedisInstance *master) { redisAssert(master->flags & SRI_MASTER); // 设置 failover 状态 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; // 设置master当前状态 master->flags |= SRI_FAILOVER_IN_PROGRESS; // 设置failover_epoch master->failover_epoch = ++sentinel.current_epoch; // 设置fail over开始时间 master->failover_start_time = mstime()+rand()%s; master->failover_state_change_time = mstime(); } sentinelAskMasterStateToOtherSentinels是在检测到master状态为sdown后,sentinel向其它sentinel节点发送sentinel is-master-down-by-addr消息 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) { //遍历关注该master的sentinel节点 while((de = dictNext(di)) != NULL) { //向其它sentinle发送消息SENTINEL is-master-down-by-addr master_ip master_port current_epoch runid/* //如果本身已经开始了failover进程,则向其他sentinel节点发送自己的runid,否则发送* //注册回调函数sentinelReceiveIsMasterDownReply接受回复的信息 string(port,sizeof(port),master->addr->port); retval = redisAsyncCommand(ri->cc, sentinelReceiveIsMasterDownReply, NULL, "SENTINEL is-master-down-by-addr %s %s %llu %s", master->addr->ip, port, sentinel.current_epoch, (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? server.runid : "*"); if (retval == REDIS_OK) ri->pending_commands++; } dictReleaseIterator(di); }
void sentinelCommand(redisClient *c) { ...... //处理sentinel is-master-down-by-addr消息 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>*/ ...... //根据其它sentinel传送过来的消息 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, c->argv[2]->ptr,port,NULL); /* It exists? Is actually a master? Is subjectively down? It‘s down. * Note: if we are in tilt mode we always reply with "0". */ if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && (ri->flags & SRI_MASTER)) isdown = 1; //假如发过来的信息中包含请求来源sentinel的runid,则开始进行投票 if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) { leader = sentinelVoteLeader(ri,(uint64_t)req_epoch, c->argv[5]->ptr, &leader_epoch); } //回复信息,包括isdown,leader,leader_epoch addReplyMultiBulkLen(c,3); addReply(c, isdown ? shared.cone : shared.czero); addReplyBulkCString(c, leader ? leader : "*"); addReplyLongLong(c, (long long)leader_epoch); if (leader) sdsfree(leader); }
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) { ...... //根据返回值,判断是否将对应sentinel的状态置为SRI_MASTER_DOWN if (r->element[0]->integer == 1) { ri->flags |= SRI_MASTER_DOWN; } else { ri->flags &= ~SRI_MASTER_DOWN; } //如果sentinel返回了其选举的leader,则更新自己的leader和leader_epoch if (strcmp(r->element[1]->str,"*")) { sdsfree(ri->leader); if (ri->leader_epoch != r->element[2]->integer) redisLog(REDIS_WARNING, "%s voted for %s %llu", ri->name, r->element[1]->str, (unsigned long long) r->element[2]->integer); ri->leader = sdsnew(r->element[1]->str); ri->leader_epoch = r->element[2]->integer; } }
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { //master节点&正处于failover状态则继续 redisAssert(ri->flags & SRI_MASTER); if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; switch(ri->failover_state) { //等待故障转移开始,如果自己为leader,置状态为SENTINEL_FAILOVER_STATE_SELECT_SLAVE,开始下一步操作,否则,不变更状态,等待fail-over完成/超时 case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; //从slave中选择一个master,置状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break; //升级被选中的从服务器为新主服务器,置状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break; //等待fail over生效,info语句的回调函数sentinelRefreshInstanceInfo会更新当前状态SENTINEL_FAILOVER_STATE_RECONF_SLAVES case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break; //令其它从服务器同步新主服务器 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; } }
Redis Sentinel源码分析(二),布布扣,bubuko.com
原文:http://blog.csdn.net/yfkiss/article/details/22687771