首页 > 其他 > 详细

trafficserver的ICP相关源码注释

时间:2014-03-05 23:58:37      阅读:1029      评论:0      收藏:0      [点我收藏+]

icp.config配置文件说明
文件中的每一行描述一个icp peer的名字和配置信息,格式如下:
host : host_IP : peer_type : proxy_port : icp_port : MC_on : MC_IP : MC_TTL :
host:icp peer的主机名,可选项
host_IP:icp peer的ip地址,可选项,但是host和host_IP必须至少指定一个
ctype:1表示icp父节点缓存,2表示icp子节点缓存
proxy_port:icp peer用来代理通信的tcp端口
icp_port:icp peer用来icp通信的udp端口
MC_on:0表示关闭多播,1表示开启多播
MC_IP:多播的ip地址
MC_ttl:1只能在一个子网内广播,2可以在多个子网内广播

records.config中ICP相关的配置如下:
  proxy.config.icp.enabled
  proxy.config.icp.icp_port
  proxy.config.icp.icp_interface
  proxy.config.icp.multicast_enabled
  proxy.config.icp.query_timeout
  proxy.config.icp.lookup_local
  proxy.config.icp.stale_icp_enabled
  proxy.config.icp.reply_to_unknown_peer
  proxy.config.icp.default_reply_port

在分析ATS的ICP相关源码之前要看一下ICP协议,网上这方面的资料很少,建议直接查看RFC:http://www.cse.ohio-state.edu/cgi-bin/rfc/rfc2186.html

http://chenpiaoping.blog.51cto.com/5631143/1368653

   由于ICP协议本身比较简单,所以ATS和ICP相关的源码也比较简单(默认没有开启,也没用过,应该还有问题),今天花了一下午的时间看了一下源码,下面简单注释一下
和其他模块一样ICP相关处理的类是class ICPProcessorExt,其实例为icpProcessor

extern ICPProcessorExt icpProcessor

class ICPProcessorExt的定义如下:

class ICPProcessorExt
{
public:
 ICPProcessorExt(ICPProcessor *);
 ~ICPProcessorExt();
 void start();
 Action *ICPQuery(Continuation *, URL *);
private:
   ICPProcessor * _ICPpr;
};

从这个类可以看到,要分析ICP源码直接从该类提供的两个外部接口 start()和 ICPQuery()入手即可,
start()的功能是初始化ICP的相关配置并启动ICP模块
ICPQuery()是对外提供的唯一的ICP查询接口(怪不得这个类的名字中有Ext呢,Externet的缩写嘛)

代码的开始还是在Main.cc中
if (icp_enabled)
  icpProcessor.start();

main()--->ICPProcessorExt::start()
封装了一层直接调用ICPProcessor的start()
void
ICPProcessorExt::start()
{
 _ICPpr->start();
}
main()--->ICPProcessorExt::start()--->ICPProcessor::start()
该函数的功能是读取records.config中和ICP相关的配置,以及ICP的配置文件icp.config并初始化相关的数据结构,然后设置定时处理ICP的配置修改以及定时接收和处理ICP的查询请求
void
ICPProcessor::start()
{
 if (_Initialized)    
   return;
//创建锁实例
 _l = NEW(new AtomicLock());
//用于ICP分配内存的索引(ATS的内存分配机制)
 ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
//注册状态计数信息和回调方法
 InitICPStatCallbacks();
//创建 ICPConfiguration的实例,并读取ICP的配置文件来初始化相关的数据结构
 _ICPConfig = NEW(new ICPConfiguration());
//这个目前没有什么用
 _mcastCB_handler = NEW(new ICPHandlerCont(this));
 SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
//创建ICP的peer列表,并启动监听的socket
 if (_ICPConfig->globalConfig()->ICPconfigured()) {
   if (BuildPeerList() == 0) {
     if (SetupListenSockets() == 0) {
       _AllowIcpQueries = 1;   // allow receipt of queries
     }
   }
 }
//打印ICP的配置信息
 DumpICPConfig();
//创建ICP配置监控实例,并设置定时调用的函数来处理ICP配置的改变
 _ICPPeriodic = NEW(new ICPPeriodicCont(this));
 SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
 _PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
//创建ICP的Handler实例,并设置定时调用的函数来接收并处理ICP请求,一两句话说不清楚,往下看就知道了
 _ICPHandler = NEW(new ICPHandlerCont(this));
 SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
 _ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
                                                  HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
//创建http请求结构,作为参数传递给cache的查询接口,即查询cache要用该结构
 if (!gclient_request.valid()) {
   gclient_request.create(HTTP_TYPE_REQUEST);
 }
 _Initialized = 1;
}

main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPConfiguration::ICPConfiguration()
该函数的功能是读取records.config中和ICP相关的配置以及icp.config配置文件,并初始化相关的相应的数据结构

ICPConfiguration::ICPConfiguration():_icp_config_callouts(0)
{
//ICPConfigData是ICP的配置封装类,简单的理解就是records.condig里读取的和ICP相关的配置都放在这个结构里,这里有两个主要是用来实现ICP配置的修改和更新的
 _icp_cdata = NEW(new ICPConfigData());
 _icp_cdata_current = NEW(new ICPConfigData());
//读取records.config中的配置到_icp_cdata_current中
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_enabled, "proxy.config.icp.enabled");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_port, "proxy.config.icp.icp_port");
 ICP_EstablishStaticConfigStringAlloc(_icp_cdata_current->_icp_interface, "proxy.config.icp.icp_interface");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_multicast_enabled, "proxy.config.icp.multicast_enabled");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_query_timeout, "proxy.config.icp.query_timeout");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_cache_lookup_local, "proxy.config.icp.lookup_local");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_stale_lookup, "proxy.config.icp.stale_icp_enabled");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_reply_to_unknown_peer,
                                  "proxy.config.icp.reply_to_unknown_peer");
 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_default_reply_port, "proxy.config.icp.default_reply_port");
//看,更新的时候就是把icp_cdata_current赋给_icp_cdata
 UpdateGlobalConfig();      
//PeerConfigData是一个peer的配置信息的封装类,最多有MAX_DEFINED_PEERS(64个
 for (int n = 0; n <= MAX_DEFINED_PEERS; ++n) {
   _peer_cdata[n] = NEW(new PeerConfigData);
   _peer_cdata_current[n] = NEW(new PeerConfigData);
 }
 char
   icp_config_filename[PATH_NAME_MAX] = "";
 ICP_ReadConfigString(icp_config_filename, "proxy.config.icp.icp_configuration", sizeof(icp_config_filename) – 1);
//读取并解析icp.config的配置信息,并把所有peer的配置信息放到数组_peer_cdata_current[]中
//解析的过程很简单,把每行的每个字段取出来赋给PeerConfigData相应的字段即可
 (void) icp_config_change_callback((void *) this, (void *) icp_config_filename, 1);
//_peer_cdata_current[n]赋给_peer_cdata[n]
 UpdatePeerConfig();      
//设置ICP配置更新的回调函数  
 ICP_RegisterConfigUpdateFunc("proxy.config.icp.icp_configuration", mgr_icp_config_change_callback, (void *) this);
}

main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPProcessor::BuildPeerList()


该函数的功能是遍历数组_peer_cdata[n]中的每个peer,按照以下规则加入到相应的list(数组)中
所有peer的list:存放所有peer
父peer的list:存放所有类型为PARENT的peer
发送peer的list:初始化时存放所有peer
接收peer的list:初始化时如果多播开启则存放所有peer,否则为空
在这之前会创建一个表示本地的peer加到所有peer的list和接收peer的list中

int
ICPProcessor::BuildPeerList()
{
 PeerConfigData *Pcfg;
 Peer *P;
 Peer *mcP;
 int index;
 int status;
 PeerType_t type;
//创建本地peer,_peer_cdata[n]数组中的第0个元素类给本地peer用
 Pcfg = _ICPConfig->indexToPeerConfigData(0);
//本地peer的名字:localhost
 ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
//本地peer的类型: CTYPE_LOCAL
 Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
 IpEndpoint tmp_ip;
//根据配置的interface获取本地peer的ip地址
 if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) {
   Pcfg->_ip_addr._family = AF_UNSPEC;
   Warning("ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
   REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface has no IP address");
 } else {
   Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip;
 }
//获取配置的TCP和UDP端口
 Pcfg->_proxy_port = 0;
 Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
//不开启多播(或组播)
 Pcfg->_mc_member = 0;
//地址族: AF_UNSPEC
 Pcfg->_mc_ip_addr._family = AF_UNSPEC;
//只能在一个子网内组播
 Pcfg->_mc_ttl = 0;
 P = NEW(new ParentSiblingPeer(PEER_LOCAL, Pcfg, this));
//加入所有peer的list中
 status = AddPeer(P);
 ink_release_assert(status);
//加入接收peer的list
 status = AddPeerToRecvList(P);
 ink_release_assert(status);
 _LocalPeer = P;
//遍历数组_peer_cdata[n]中的每个peer,按照以下规则加入到相应的list中
所有peer的list:存放所有peer
父peer的list:存放所有类型为PARENT的peer
发送peer的list:存放所有peer
接收peer的list:如果多播开启则存放所有peer,否则为空

 for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
   Pcfg = _ICPConfig->indexToPeerConfigData(index);
   type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());

   if (Pcfg->GetIPAddr() == _LocalPeer->GetIP())
     continue;                 // ignore
   if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {

     if (Pcfg->MultiCastMember()) {
       mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort());
       if (!mcP) {
         mcP = NEW(new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this));
         status = AddPeer(mcP);
         ink_assert(status);
         status = AddPeerToSendList(mcP);
         ink_assert(status);
         status = AddPeerToRecvList(mcP);
         ink_assert(status);
       }
       P = NEW(new ParentSiblingPeer(type, Pcfg, this));
       status = AddPeer(P);
       ink_assert(status);
       status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
       ink_assert(status);

     } else {
       P = NEW(new ParentSiblingPeer(type, Pcfg, this));
       status = AddPeer(P);
       ink_assert(status);
       status = AddPeerToSendList(P);
       ink_assert(status);
     }
     if (type == PEER_PARENT) {
       status = AddPeerToParentList(P);
       ink_assert(status);
     }
   }
 }
 return 0;                     // Success
}

main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPProcessor::SetupListenSockets()
该函数的主要功能是开启组播的peer创建并初始化发送和接收的socket,其他不开启组播的peer(包括本地peer)只在_chan上设置了自己的ip地址,用于填充到ICP报文里
int
ICPProcessor::SetupListenSockets()
{
 int allow_null_configuration;
//ICP的模式:0表示禁止,1表示只接收ICP查询,2表示接收和发送ICP查询
 if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
     && _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
   allow_null_configuration = 1;
 } else {
   allow_null_configuration = 0;
 }
 if (!_LocalPeer) {
   Warning("ICP setup, no defined local Peer");
   REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
   return 1;    
 }
 if (GetSendPeers() == 0) {
   if (!allow_null_configuration) {
     Warning("ICP setup, no defined send Peer(s)");
     REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
     return 1;                
   }
 }
 if (GetRecvPeers() == 0) {
   if (!allow_null_configuration) {
     Warning("ICP setup, no defined receive Peer(s)");
     REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
     return 1;                
   }
 }
 Peer *P;
 int status;
 int index;
 for (index = 0; index < (_nPeerList + 1); ++index) {
   ip_port_text_buffer ipb, ipb2;
   if ((P = _PeerList[index])) {
//没有开启组播的peer只要设置一下_chan的ip地址就好了
     if ((P->GetType() == PEER_PARENT)
         || (P->GetType() == PEER_SIBLING)) {
       ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
   pPS->GetChan()->setRemote(pPS->GetIP());
//开启组播的peer要分别创建并初始化发送和接收的socket
     } else if (P->GetType() == PEER_MULTICAST) {
       MultiCastPeer *pMC = (MultiCastPeer *) P;
       ink_assert(_mcastCB_handler != NULL);
       status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
       if (status) {
         Warning("ICP MC send setup failed, res=%d, ip=%s bind_ip=%s",
           status,
           ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)),
           ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2))
         );
         REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed");
         return 1;      
       }
       status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(),
                                                     NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
       if (status) {
         Warning("ICP MC recv setup failed, res=%d, ip=%s",
           status, ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)));
         REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed");
         return 1;    
       }
     }
   }
 }
//没有开启组播的peer只要设置一下_chan的ip地址就好了
 ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
 pPS->GetChan()->setRemote(pPS->GetIP());
 return 0;                  
}

main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPPeriodicCont::PeriodicEvent()

这个函数的功能是当ICP的配置改变时重新初始化相应的数据结构,相当于又重走了一下以上的流程,这里不是主要的而且流程和前面差不多,就不深入地注释了
int
ICPPeriodicCont::PeriodicEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
 int do_reconfig = 0;
 ICPConfiguration *C = _ICPpr->GetConfig();
 if (C->GlobalConfigChange())
   do_reconfig = 1;
 int configcallouts = C->ICPConfigCallouts();
 if (_last_icp_config_callouts != configcallouts) {
   _last_icp_config_callouts = configcallouts;
   do_reconfig = 1;
 }
 if (do_reconfig) {
   ICPPeriodicCont *rc = NEW(new ICPPeriodicCont(_ICPpr));
   SET_CONTINUATION_HANDLER(rc, (ICPPeriodicContHandler) & ICPPeriodicCont::DoReconfigAction);
   eventProcessor.schedule_imm(rc);
 }
 return EVENT_CONT;
}

main()--->ICPProcessorExt::start()--->ICPProcessor::start()--->ICPHandlerCont::PeriodicEvent()
kao,这么快就下班了,明天再分析吧

int
ICPHandlerCont::PeriodicEvent(int event, Event * /* e ATS_UNUSED */)
{
 int n_peer, valid_peers;
 Peer *P;
 valid_peers = _ICPpr->GetRecvPeers();
 switch (event) {
 case EVENT_POLL:
 case EVENT_INTERVAL:
   {
     for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
       P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
       if (!P || (P && !P->IsOnline()))
         continue;
       if (P->shouldStartRead()) {
         P->startingRead();
         ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
         int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
         s->init(_ICPpr, P, local_lookup);
         RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
         s->handleEvent(EVENT_INTERVAL, (Event *) 0);
       }
     }
     break;
   }
 default:
   {
     ink_release_assert(!"unexpected event");
     break;
   }
 }                          
 return EVENT_CONT;
}

本文出自 “陈漂评的博客” 博客,请务必保留此出处http://chenpiaoping.blog.51cto.com/5631143/1368770

trafficserver的ICP相关源码注释,布布扣,bubuko.com

trafficserver的ICP相关源码注释

原文:http://chenpiaoping.blog.51cto.com/5631143/1368770

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!