icp.config配置文件说明
文件中的每一行描述一个icp peer的名字和配置信息,格式如下:
host : host_IP : peer_type : proxy_port : icp_port : MC_on : MC_IP : MC_TTL :
host:icp peer的主机名,可选项
host_IP:icp peer的ip地址,可选项,但是host和host_IP必须至少指定一个
ctype:1表示icp父节点缓存,2表示icp子节点缓存
proxy_port:icp peer用来代理通信的tcp端口
icp_port:icp peer用来icp通信的udp端口
MC_on:0表示关闭多播,1表示开启多播
MC_IP:多播的ip地址
MC_ttl:1只能在一个子网内广播,2可以在多个子网内广播
records.config中ICP相关的配置如下:
proxy.config.icp.enabled
proxy.config.icp.icp_port
proxy.config.icp.icp_interface
proxy.config.icp.multicast_enabled
proxy.config.icp.query_timeout
proxy.config.icp.lookup_local
proxy.config.icp.stale_icp_enabled
proxy.config.icp.reply_to_unknown_peer
proxy.config.icp.default_reply_port
在分析ATS的ICP相关源码之前要看一下ICP协议,网上这方面的资料很少,建议直接查看RFC:http://www.cse.ohio-state.edu/cgi-bin/rfc/rfc2186.html
http://chenpiaoping.blog.51cto.com/5631143/1368653
由于ICP协议本身比较简单,所以ATS和ICP相关的源码也比较简单(默认没有开启,也没用过,应该还有问题),今天花了一下午的时间看了一下源码,下面简单注释一下
和其他模块一样ICP相关处理的类是class ICPProcessorExt,其实例为icpProcessor
extern ICPProcessorExt icpProcessor
class ICPProcessorExt的定义如下:
class ICPProcessorExt
{
public:
ICPProcessorExt(ICPProcessor *);
~ICPProcessorExt();
void start();
Action *ICPQuery(Continuation *, URL *);
private:
ICPProcessor * _ICPpr;
};
从这个类可以看到,要分析ICP源码直接从该类提供的两个外部接口 start()和 ICPQuery()入手即可,
start()的功能是初始化ICP的相关配置并启动ICP模块
ICPQuery()是对外提供的唯一的ICP查询接口(怪不得这个类的名字中有Ext呢,Externet的缩写嘛)
代码的开始还是在Main.cc中
if (icp_enabled)
icpProcessor.start();
main()--->ICPProcessorExt::start()
封装了一层直接调用ICPProcessor的start()
void
ICPProcessorExt::start()
{
_ICPpr->start();
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()
该函数的功能是读取records.config中和ICP相关的配置,以及ICP的配置文件icp.config并初始化相关的数据结构,然后设置定时处理ICP的配置修改以及定时接收和处理ICP的查询请求
void
ICPProcessor::start()
{
if (_Initialized)
return;
//创建锁实例
_l = NEW(new AtomicLock());
//用于ICP分配内存的索引(ATS的内存分配机制)
ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
//注册状态计数信息和回调方法
InitICPStatCallbacks();
//创建 ICPConfiguration的实例,并读取ICP的配置文件来初始化相关的数据结构
_ICPConfig = NEW(new ICPConfiguration());
//这个目前没有什么用
_mcastCB_handler = NEW(new ICPHandlerCont(this));
SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
//创建ICP的peer列表,并启动监听的socket
if (_ICPConfig->globalConfig()->ICPconfigured()) {
if (BuildPeerList() == 0) {
if (SetupListenSockets() == 0) {
_AllowIcpQueries = 1; // allow receipt of queries
}
}
}
//打印ICP的配置信息
DumpICPConfig();
//创建ICP配置监控实例,并设置定时调用的函数来处理ICP配置的改变
_ICPPeriodic = NEW(new ICPPeriodicCont(this));
SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
_PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
//创建ICP的Handler实例,并设置定时调用的函数来接收并处理ICP请求,一两句话说不清楚,往下看就知道了
_ICPHandler = NEW(new ICPHandlerCont(this));
SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
_ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
//创建http请求结构,作为参数传递给cache的查询接口,即查询cache要用该结构
if (!gclient_request.valid()) {
gclient_request.create(HTTP_TYPE_REQUEST);
}
_Initialized = 1;
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPConfiguration::ICPConfiguration()
该函数的功能是读取records.config中和ICP相关的配置以及icp.config配置文件,并初始化相关的相应的数据结构
ICPConfiguration::ICPConfiguration():_icp_config_callouts(0)
{
//ICPConfigData是ICP的配置封装类,简单的理解就是records.condig里读取的和ICP相关的配置都放在这个结构里,这里有两个主要是用来实现ICP配置的修改和更新的
_icp_cdata = NEW(new ICPConfigData());
_icp_cdata_current = NEW(new ICPConfigData());
//读取records.config中的配置到_icp_cdata_current中
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_enabled, "proxy.config.icp.enabled");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_port, "proxy.config.icp.icp_port");
ICP_EstablishStaticConfigStringAlloc(_icp_cdata_current->_icp_interface, "proxy.config.icp.icp_interface");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_multicast_enabled, "proxy.config.icp.multicast_enabled");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_query_timeout, "proxy.config.icp.query_timeout");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_cache_lookup_local, "proxy.config.icp.lookup_local");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_stale_lookup, "proxy.config.icp.stale_icp_enabled");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_reply_to_unknown_peer,
"proxy.config.icp.reply_to_unknown_peer");
ICP_EstablishStaticConfigInteger(_icp_cdata_current->_default_reply_port, "proxy.config.icp.default_reply_port");
//看,更新的时候就是把icp_cdata_current赋给_icp_cdata
UpdateGlobalConfig();
//PeerConfigData是一个peer的配置信息的封装类,最多有MAX_DEFINED_PEERS(64个
for (int n = 0; n <= MAX_DEFINED_PEERS; ++n) {
_peer_cdata[n] = NEW(new PeerConfigData);
_peer_cdata_current[n] = NEW(new PeerConfigData);
}
char
icp_config_filename[PATH_NAME_MAX] = "";
ICP_ReadConfigString(icp_config_filename, "proxy.config.icp.icp_configuration", sizeof(icp_config_filename) – 1);
//读取并解析icp.config的配置信息,并把所有peer的配置信息放到数组_peer_cdata_current[]中
//解析的过程很简单,把每行的每个字段取出来赋给PeerConfigData相应的字段即可
(void) icp_config_change_callback((void *) this, (void *) icp_config_filename, 1);
//_peer_cdata_current[n]赋给_peer_cdata[n]
UpdatePeerConfig();
//设置ICP配置更新的回调函数
ICP_RegisterConfigUpdateFunc("proxy.config.icp.icp_configuration", mgr_icp_config_change_callback, (void *) this);
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPProcessor::BuildPeerList()
该函数的功能是遍历数组_peer_cdata[n]中的每个peer,按照以下规则加入到相应的list(数组)中
所有peer的list:存放所有peer
父peer的list:存放所有类型为PARENT的peer
发送peer的list:初始化时存放所有peer
接收peer的list:初始化时如果多播开启则存放所有peer,否则为空
在这之前会创建一个表示本地的peer加到所有peer的list和接收peer的list中
int
ICPProcessor::BuildPeerList()
{
PeerConfigData *Pcfg;
Peer *P;
Peer *mcP;
int index;
int status;
PeerType_t type;
//创建本地peer,_peer_cdata[n]数组中的第0个元素类给本地peer用
Pcfg = _ICPConfig->indexToPeerConfigData(0);
//本地peer的名字:localhost
ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
//本地peer的类型: CTYPE_LOCAL
Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
IpEndpoint tmp_ip;
//根据配置的interface获取本地peer的ip地址
if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) {
Pcfg->_ip_addr._family = AF_UNSPEC;
Warning("ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface has no IP address");
} else {
Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip;
}
//获取配置的TCP和UDP端口
Pcfg->_proxy_port = 0;
Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
//不开启多播(或组播)
Pcfg->_mc_member = 0;
//地址族: AF_UNSPEC
Pcfg->_mc_ip_addr._family = AF_UNSPEC;
//只能在一个子网内组播
Pcfg->_mc_ttl = 0;
P = NEW(new ParentSiblingPeer(PEER_LOCAL, Pcfg, this));
//加入所有peer的list中
status = AddPeer(P);
ink_release_assert(status);
//加入接收peer的list
status = AddPeerToRecvList(P);
ink_release_assert(status);
_LocalPeer = P;
//遍历数组_peer_cdata[n]中的每个peer,按照以下规则加入到相应的list中
所有peer的list:存放所有peer
父peer的list:存放所有类型为PARENT的peer
发送peer的list:存放所有peer
接收peer的list:如果多播开启则存放所有peer,否则为空
for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
Pcfg = _ICPConfig->indexToPeerConfigData(index);
type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
if (Pcfg->GetIPAddr() == _LocalPeer->GetIP())
continue; // ignore
if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
if (Pcfg->MultiCastMember()) {
mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort());
if (!mcP) {
mcP = NEW(new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this));
status = AddPeer(mcP);
ink_assert(status);
status = AddPeerToSendList(mcP);
ink_assert(status);
status = AddPeerToRecvList(mcP);
ink_assert(status);
}
P = NEW(new ParentSiblingPeer(type, Pcfg, this));
status = AddPeer(P);
ink_assert(status);
status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
ink_assert(status);
} else {
P = NEW(new ParentSiblingPeer(type, Pcfg, this));
status = AddPeer(P);
ink_assert(status);
status = AddPeerToSendList(P);
ink_assert(status);
}
if (type == PEER_PARENT) {
status = AddPeerToParentList(P);
ink_assert(status);
}
}
}
return 0; // Success
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPProcessor::SetupListenSockets()
该函数的主要功能是开启组播的peer创建并初始化发送和接收的socket,其他不开启组播的peer(包括本地peer)只在_chan上设置了自己的ip地址,用于填充到ICP报文里
int
ICPProcessor::SetupListenSockets()
{
int allow_null_configuration;
//ICP的模式:0表示禁止,1表示只接收ICP查询,2表示接收和发送ICP查询
if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
&& _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
allow_null_configuration = 1;
} else {
allow_null_configuration = 0;
}
if (!_LocalPeer) {
Warning("ICP setup, no defined local Peer");
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
return 1;
}
if (GetSendPeers() == 0) {
if (!allow_null_configuration) {
Warning("ICP setup, no defined send Peer(s)");
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
return 1;
}
}
if (GetRecvPeers() == 0) {
if (!allow_null_configuration) {
Warning("ICP setup, no defined receive Peer(s)");
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
return 1;
}
}
Peer *P;
int status;
int index;
for (index = 0; index < (_nPeerList + 1); ++index) {
ip_port_text_buffer ipb, ipb2;
if ((P = _PeerList[index])) {
//没有开启组播的peer只要设置一下_chan的ip地址就好了
if ((P->GetType() == PEER_PARENT)
|| (P->GetType() == PEER_SIBLING)) {
ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
pPS->GetChan()->setRemote(pPS->GetIP());
//开启组播的peer要分别创建并初始化发送和接收的socket
} else if (P->GetType() == PEER_MULTICAST) {
MultiCastPeer *pMC = (MultiCastPeer *) P;
ink_assert(_mcastCB_handler != NULL);
status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
if (status) {
Warning("ICP MC send setup failed, res=%d, ip=%s bind_ip=%s",
status,
ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)),
ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2))
);
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed");
return 1;
}
status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(),
NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
if (status) {
Warning("ICP MC recv setup failed, res=%d, ip=%s",
status, ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)));
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed");
return 1;
}
}
}
}
//没有开启组播的peer只要设置一下_chan的ip地址就好了
ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
pPS->GetChan()->setRemote(pPS->GetIP());
return 0;
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPPeriodicCont::PeriodicEvent()
这个函数的功能是当ICP的配置改变时重新初始化相应的数据结构,相当于又重走了一下以上的流程,这里不是主要的而且流程和前面差不多,就不深入地注释了
int
ICPPeriodicCont::PeriodicEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
int do_reconfig = 0;
ICPConfiguration *C = _ICPpr->GetConfig();
if (C->GlobalConfigChange())
do_reconfig = 1;
int configcallouts = C->ICPConfigCallouts();
if (_last_icp_config_callouts != configcallouts) {
_last_icp_config_callouts = configcallouts;
do_reconfig = 1;
}
if (do_reconfig) {
ICPPeriodicCont *rc = NEW(new ICPPeriodicCont(_ICPpr));
SET_CONTINUATION_HANDLER(rc, (ICPPeriodicContHandler) & ICPPeriodicCont::DoReconfigAction);
eventProcessor.schedule_imm(rc);
}
return EVENT_CONT;
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPHandlerCont::PeriodicEvent()
kao,这么快就下班了,明天再分析吧
int
ICPHandlerCont::PeriodicEvent(int event, Event * /* e ATS_UNUSED */)
{
int n_peer, valid_peers;
Peer *P;
valid_peers = _ICPpr->GetRecvPeers();
switch (event) {
case EVENT_POLL:
case EVENT_INTERVAL:
{
for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
if (!P || (P && !P->IsOnline()))
continue;
if (P->shouldStartRead()) {
P->startingRead();
ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
s->init(_ICPpr, P, local_lookup);
RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
s->handleEvent(EVENT_INTERVAL, (Event *) 0);
}
}
break;
}
default:
{
ink_release_assert(!"unexpected event");
break;
}
}
return EVENT_CONT;
}
本文出自 “陈漂评的博客” 博客,请务必保留此出处http://chenpiaoping.blog.51cto.com/5631143/1368770
trafficserver的ICP相关源码注释,布布扣,bubuko.com
原文:http://chenpiaoping.blog.51cto.com/5631143/1368770