这块内容是个关键性流程。
tars调用在调用层,一般只会简单的
PatchPrx proxy = Application::getCommunicator()->stringToProxy<PatchPrx>(_patchRequest.patchobj);
proxy->timeout(60000);
proxy只是一个ServantProxy,他只是作为一个本地与服务器组中某个服务器名的几个服务进程打交道的代表,这些具体干活的服务进程,会分别部署在几个不同的服务器物理机中,一般一次请求也只会这几个服务器的某一个执行。
ServantProxy的用法只会类似于proxy->timeout()这样就完事了,那么具体会是哪个服务器去执行?可否指定某一个?负载均衡怎么实现?一致性hash怎么实现?set分组怎么实现?这些都在接下来介绍的内容中。
selectAdapterProxy函数的流程
refreshReg刷新主控.
3种模式的选择:
{
最优先 判断是否是哈希模式.哈希模式 返回对应的AdapterProxy.看后续getHashProxy()流程
再次 判断是否权重模式.权重模式 返回对应的AdapterProxy.getWeightedProxy()流程
最后 普通轮询模式. 查看getNextValidProxy流程
}
完全看不懂。。据tars 大佬所说,可以:通过看tc_endpoint这个类 配置实现权重, -w -v。。但我在代码中看权重模式只有静态权重一种。。啥情况?
此处很容易就让人产生3个疑问:
疑问1、此函数的流程,在servant调用rpc时,必然会调用的函数,而在此流程中,会调用registry服务的几个函数。这就有点尴尬了,有一种死循环的感觉?那么对于调用registry服务时,到底是怎么处理的呢?另外registry可能会有很多台,到底调用的是哪一台呢?
疑问2、当客户端服务与被调用服务之间链接断掉时,怎样切到另一个服务?多久切过去?
疑问3、在取得servant的activity节点信息后,本进程与目标activity通信,肯定要建立个链接,此链接是长链接还是短链接,如何管理链接状态呢?
这些疑问,都在后面部分解答。
refreshReg的流程
这个函数是刷新主控。直连的模式,忽略此函数.
refreshReg函数目的简介:
1、根据传入的GetEndpointType类型,和sName。查找对应的vector<tars::EndpointF>节点分组,并按存活节点和非存活节点分组好。出异常时,走doEndpointsExp()函数,更改请求状态。注意,对于同步接口的请求,这里会按下面流程走;对于异步请求,这里就结束了,等异步回调那边过来再调用第2步的内容。
2、返回结果后调用 doEndpoints。将结果转行下类型存入set<EndpointInfo>中,并调用notifyEndpoints(),更新内存中缓存的对应AdapterProxy* 这些ip列表信息,如果不存在对应的AdapterProxy 则new一个,AdapterProxy这个类的对象都是在此被创建出来
3、如果有必要,调用doNotify(),这里会执行通知_objectProxy->doInvoke() 这个函数的具体内容参考《完整的tars调用流程详解部分》
流程大致是上面3个步骤。这里有些细节值得探讨一二。
因为请求可能没那么快返回;也可能registry死机了,压根就不会返回。
对这些情况,必须要有一个合理的控制,限定后续重复的请求次数,才能避免在registry负载本来就很大的时候,因为服务节点拉不到servant的数据 狂请求registry把整套系统搞挂了。
处理细节大致如下:
1、每次请求到来时,会把当前状态置成请求中状态。并设置5s超时时间_requestTimeout,这个5s是代码中写死的。
2、如果在请求状态中,再次收到refreshReg。如果此时_requestTimeout未超时,忽略此次请求;如果_requestTimeout超时,表示此次请求失败,执行doEndpointsExp(0)。这个函数就分两种情况了:
首先要把 请求状态置 为 未请求中,请求失败次数 累加1
1]如果失败次数未超过3次,则将 下一次请求时间 设置为2s后(也就是2s内的请求忽略掉,参考下面 下一次请求时间 的用法)
2]如果失败次数超过3次,则将 下一次请求时间 设置为30s后(也就是30s内的请求忽略掉)。并且如果数据无效检测是false。则 连续失败 强制设成数据是有效的,并调用doNotify()。。这个啥原因?完全没看懂了。
3]继续走下面的代码.
3、如果不在请求状态中,并且 下一次请求时间 到了,则走上面 refreshReg函数目的简介 部分的流程。否则忽略此次请求。注意,2和3 不是if-else关系,而是前后的流程
4、在结果调用回来后,doEndpoints函数中。将失败次数赋0,请求状态置为未请求。
1]如果活跃节点数组节点为空。下一次请求时间 设为10s后.
2]否则,下一次请求时间设为2s后
请求失败或超时时的处理流程保护基本如上所示。
如果仔细看代码,会发现,此处是重要的流程,也是让人产生疑惑的地方。
在每次ObjectProxy::invoke()和ObjectProxy::doInvoke()执行发送时,都会
->selectAdapterProxy();
->refreshReg();
->rpc请求RegistryServer.
如果每次请求都走RegistryServer,那服务铁定是性能极其低下,这个设计是不是有点奇怪?refreshReg中是怎么解决此问题呢?
关键点还在于_requestRegistry,_requestTimeout,_refreshTime。这三个变量上。
1、当第一次请求过来时,会走 正常请求逻辑
_requestRegistry=true;_requestTimeout = iNow + 5s;
->rpc请求RegistryServer.....
如果请求正常,会走到doEndpoints()。这里会将
_requestRegistry = false;
if(返回activity节点组为空) {_refreshTime = iNow + 10s;}
else {_refreshTime = iNow + 60s;}
失败会走doEndpointsExp()
2、第二次请求进来时候,会执行两个判断.
if(_requestRegistry && _requestTimeout < iNow) //如果上次请求,超时了
{
doEndpointsExp(0);此处将执行 ->_requestRegistry = false;_refreshTime = iNow + 2;
}
if( (!_requestRegistry) && (_refreshTime <= iNow))
{
执行1处的 正常请求逻辑.
}
重点是if( (!_requestRegistry) && (_refreshTime <= iNow))。
试想,如果在第一次请求还在请求中时,第二次请求过来了,_requestRegistry==true。那肯定不会走到下面的 正常请求逻辑,此次调用就被忽略掉;
另外,如果前次请求已完成,此时_requestRegistry==false,如果新请求与前次间隔在_refreshTime内,也是忽略掉本次调用。这样就可以把请求频率控制在:
成功返回,并且有Activity节点组,1分钟一次;成功返回并没有Activity节点组,10s内一次;请求失败,2s后再试,如果还是失败,并走上面介绍的超时保护流程。
这么设计有啥好处:
可以控制请求的频率,又可以保证之前请求的节点挂掉后,可以很平滑的切到另一个节点。保证请求端的高可用。
到此处,就可以回答疑问2了。当客户端服务与被调用服务之间链接断掉时,怎样切到另一个服务?多久切过去?会影响多久的调用请求?上面这部分就是答案。
对RegistryServer请求的函数具体返回的内容, 参考框架部分的 分控registry章节的 《对象查询接口类的实现(QueryF.tars的实现)》部分。
与目标Servant的activity节点的链接管理
有个很重要的函数AdapterProxy::checkActive(bForceConnect)。
在此函数中会调用Transceiver::checkTimeout()判断本链接端口是否失效(端口失效包括,根本连不上,以及链接超时这些),并且会判断下 当前节点是否被屏蔽,被限定重试时间(查看下面CheckTimeoutInfo 超时保护逻辑的解释)。
如果端口失效或者是bForceConnect=true,并且未被限定不可重试中,则会执行重连。执行重连操作分两步:
//连接没有建立或者连接无效, 重新建立连接
if(!Transceiver->isValid())
{
Transceiver->reconnect();
}
reconnect其实就做两件事 close();connect();
那么这个流程的关键点在于,怎么判断是否建立链接,链接无效,端口失效这些。
先解释下connect的过程,伪代码如下:
int connect() { isValid();_connStatus == eConnecting || _connStatus == eConnected;判断是否已链接 //每次连接前都重新解析一下地址, 避免dns变了! _ep.parseAddress(); if (_ep.type()==UDP) { fd=NetworkUtil::createsocket(...); _connStatus = eConnected;//udp直接设置成链接上 } else { fd=NetworkUtil::createsocket(...); bool bConnected = NetworkUtil::doConnect(fd,..); if(bConnected) { setConnected(); //这里会_connStatus = eConnected; } else { _connStatus = Transceiver::eConnecting; _conTimeoutTime = TNOWMS + _adapterProxy->getConTimeout(); } } vector<SocketOpt> &socketOpts = _adapterProxy->getObjProxy()->getSocketOpt(); setsockopt(_fd,socketOpts ....); _adapterProxy->getObjProxy()->getCommunicatorEpoll()->addFd(fd, &_fdInfo, EPOLLIN|EPOLLOUT); }
connect超时时间为多少呢?配置在<connect-timeout>中,默认是1500ms.代码中限定在 100-5000ms之间.
其中特别重要的变量是_connStatus 。如果是connect==eConnected。。connect()成功那没话说,如果connect()返回值不成功。但也不一定此链接就没链接上,所以就设了个超时,设置成eConnecting状态。那这个状态是啥时候被改掉呢?
分两种情况: 在CommunicatorEpoll::handleInputImp和CommunicatorEpoll::handleOutputImp收发包的时候,如果是eConnecting,会改成eConnected;如果一直没收到包也没发包,超时了,在Transceiver::checkTimeout()时候,会将此Transceiver::close()。并_connStatus=eUnconnected.
话说这种超时的做法是为了防止DDOS?
这里还有个重要点,保护逻辑,eConnecting超时一定比率后进行节点屏蔽。看CheckTimeoutInfo的注释。
/**
* 超时一定比率后进行节点屏蔽
* 设置超时检查参数
* 计算到某台服务器的超时情况, 如果连续超时次数或者超时比例超过阀值
* 默认60s内, 超时调用次数>=2, 超时比率0.5,或者5s内连续超时次数>5
* 或者连接异常连续超过5次
* 则失效,进行屏蔽
* 服务屏蔽后, 请求将尽可能的切换到其他可能的服务器, 并每隔tryTimeInterval(默认是30s)尝试一次, 如果成功则认为恢复
* 如果其他服务器都屏蔽, 则随机选择一台尝试
*/
因为我们服务与服务之间调用基本上都是TCP,所以只用关注框架中对tcp的调用处理即可。
对于已经eConnected的链接。。那么怎么判断链接异常呢?
通常,在TcpTransceiver的recv/send/readv/doResponse..或者AdapterProxy::checkActive(),Epoll收到EPOLLERR,等操作失败后(完全看不懂系列,话说对tcp而言,失败后多久底层会返回失败呢?),会TcpTransceiver::close().将链接关掉,并从epoll中删掉。
这里有细节还需确认,tcp对链接断开的处理。说法很多,有的说是tcp一断开就能够感知到,有的说法是感知不到。
至此,疑问3就得到解答
AdapterProxy::checkActive()的流程大致如上。那么此函数在哪些场景中会去执行呢?
基本上都是在EndpointManager类的选取节点时候,各种hash,轮询中获取对应节点时。详情查看下面这块部分的介绍
还遗留一个疑问1待回答:
在servant调用rpc时,必然会调用的函数,而在此流程中,会调用registry服务的几个函数。这就有点尴尬了,有一种死循环的感觉?那么对于调用registry服务时,到底是怎么处理的呢?另外registry可能会有很多台,到底调用的是哪一台呢?
通过看配置和代码,貌似对于RegistryServer的连接,是通过模板中的配置
<client>
locator=tars.tarsregistry.QueryObj@tcp -h registry.tars.com -p 17890
</client>
这里写死的。。是直接直连的。所以不存在调用死循环的问题。完全看不懂系列。。此处需要找大佬确认
getHashProxy 根据hash值选取一个节点
分4种类别:
静态权重模式下的 一致性hash getConHashProxyForWeight()函数实现
这个函数总结来说就是:
1检查此ServantProxy的全部AdapterProxy的一致性hash静态权重 如果一致性hash静态权重有改变,则更新
2如果压根就没有一致性hash静态权重组数据,调用getHashProxyForNormal()并返回结果;
3否则,通过一致性hash取得对应节点。。如果节点不存活(通过AdapterProxy::isActiveInReg()和AdapterProxy::checkActive()判断是否存活),则在剩余存活节点中,再次哈希个,直到找到可用的存活节点为止;
如果所有节点都有问题,随机选择一个没有connect超时或者链接异常的节点重连一次,并拿他来;
如果前面步骤都没找到可返回的节点,从activity组中,随便选一个节点重连(完全看不懂系列,这么做目的是啥?),返回NULL。
静态权重模式下的hash取模 getHashProxyForWeight()
这个函数总结来说就是:
跟getConHashProxyForWeight()基本上流程一致。不过用来算hash的内存数据不同,一个是一致性hash静态权重数组,一个是内存中的activity节点组静态权重数组。再此不多写了
普通模式下的一致性hash getConHashProxyForNormal()
这个函数总结来说就是:
跟getConHashProxyForWeight()基本上流程一致。不过用来算hash的内存变量数据不同,在静态权重一致性hash模式中,其参考hash权重值是静态配置的值,而在轮询一致性hash模式中,其hash权重值都为100 。其它流程基本一样,再此不多写了
普通模式下的hash取模。这种是最普通的模式 getHashProxyForNormal()
这个函数总结来说就是:
通过hashcode取得对应节点。。如果节点不存活(通过AdapterProxy::isActiveInReg()和AdapterProxy::checkActive()判断是否存活),则在剩余存活节点中,再次哈希个,直到找到可用的存活节点为止;
如果所有节点都有问题,随机选择一个没有connect超时或者链接异常的节点重连一次,并拿他来;
如果前面步骤都没找到可返回的节点,从activity组中,随便选一个节点重连(完全看不懂系列,这么做目的是啥?),返回NULL。
getWeightedProxy 根据后端服务的权重值选取一个结点
1更新静态权重活跃节点信息。更新频率是 第一次是65s,非第一次是1分钟
2如果存在静态权重的活跃节点。并且存在 静态权重对应的节点路由缓存(这个缓存也会在更新静态权重活跃节点信息时被更新),
按节点权重排列好的顺序轮询的方式遍历Activity节点(有个类成员变量记录之前轮询到哪个位置),找到存活节点则返回;
如果一个存活节点都没有, 随机选择一个没有connect超时或者链接异常的节点重连,并返回此节点。
如果所有节点都有问题,所有adapter都有问题 那就是选不到结点,随机找一个重连,并返回NULL
3如果不存在 静态权重的活跃节点。按节点获得的顺序,轮询活跃节点,后面流程跟上面2的类似
getNextValidProxy 普通轮询选取节点
这个流程就简单了。跟getWeightedProxy 第3步基本一致
对于set和分组的实现.
基本上,每次按set方式去请求节点时,返回的activity和inactivity节点组,都是对应set和分组的内容。所以这块的实现在返回的节点组里就注定了。没啥稀奇可介绍的了。
还有个EndpointThread线程。这个是做啥用的?完全看不懂系列。
至此,客户端部分的流程,基本上了解清楚。。
tars framework 源码解读(四) servant部分章节。客户端部分。selectAdapterProxy实现流程 选取合适的AdapterProxy
原文:https://www.cnblogs.com/yylingyao/p/12198146.html