1背景
网络通讯框架是任何业务平台高效、稳定运行的基础。而在今天盛行的分布式存储与计算集群里,高效、可靠的网络通讯框架更是保证各个集群节点间进行数据同步与消息沟通的重要基础。开源的流行网络框架有libevent、netty等,今天看看taobao的tfs-common里面基于epoll通讯模型的网络框架:
2 EPoll通讯接口
Epoll的常用的接口,函数如下:
a int epoll_create(int size);
生成一个epoll专用的文件描述符,其实是申请一个内核空间,用来存放你想关注的socket fd上是否发生以及发生了什么事件。size就是你在这个Epoll fd上能关注的最大socket fd数,大小自定,只要内存足够。
b int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
控制某个Epoll文件描述符上的事件:注册、修改、删除。其中参数epfd是epoll_create()创建Epoll专用的文件描述符。相对于select模型中的FD_SET和FD_CLR宏。
c int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待I/O事件的发生;参数说明:
a epfd:由epoll_create()生成的Epoll专用的文件描述符; b epoll_event:用于回传代处理事件的数组; c maxevents:每次能处理的事件数; d timeout:等待I/O事件发生的超时值; e 返回发生事件数。 |
3 类关系图
3.1类说明:
Transport: 总管理类,负责网络线程、定时器线程的启动;可读写事件的注册、监听;所有监听事件的管理。 EPollSocketEvent: epoll通讯接口epoll_wait、epoll_ctl的管理,负责读写事件的监听、状态修改。 TcpComponent: 负责异步监听到的一个可读、写事件的简单封装,一个TcpComponent关联着一个TcpConnection连接。 TcpConnection: 维护着一个连接,管理该连接上的读、写二进制包集合及其处理操作 Socket: 负责网络字节流接口层的读写操作。
DataBuffer: 一个完整网络二进制包由两部分组成:a 包头DataHeader b 包内容DataBuffer。包头包含消息类型与包二进制流的长度。 OutputQueue/InputQueue: 待写入网络二进制包集合与待读取的网络二进制包集合,方便批处理(长任务)或者及时处理(短任务)。
IPacketStreamer: 负责网络二进制包与业务数据结构之间的编码与解码转换。 IPacketHandler: 业务处理接口类,抽象了业务逻辑的处理,其处理函数handlePacket(Packet *packet),具体的业务逻辑各自不同,可以各自实现接口,扩展其函数的实现。 |
3.2网络监听线程
基于事件驱动的方式处理读写网络事件,代码如下:
void Transport::eventLoop(SocketEvent *socketEvent) { IOEvent events[MAX_SOCKET_EVENTS]; while (!_stop) { // 检查是否有事件发生 int cnt = socketEvent->getEvents(1000, events, MAX_SOCKET_EVENTS); if (cnt < 0) { TBSYS_LOG(INFO, "得到events出错了: %s(%d)\n", strerror(errno), errno); } for (int i = 0; i < cnt; i++) { IOComponent *ioc = events[i]._ioc; if (ioc == NULL) { continue; } if (events[i]._errorOccurred) { // 错误发生了 removeComponent(ioc); continue; } ioc->addRef(); // 读写 bool rc = true; if (events[i]._readOccurred) { rc = ioc->handleReadEvent(); } if (rc && events[i]._writeOccurred) { rc = ioc->handleWriteEvent(); } ioc->subRef(); if (!rc) { removeComponent(ioc); } } } }
3.3定时器线程
定期心跳去清理过期的连接,代码如下:
void Transport::timeoutLoop() { IOComponent *mydelHead = NULL; IOComponent *mydelTail = NULL; std::vector<IOComponent*> mylist; while (!_stop) { // 先写复制到list中 _iocsMutex.lock(); if (_iocListChanged) { mylist.clear(); IOComponent *iocList = _iocListHead; while (iocList) { mylist.push_back(iocList); iocList = iocList->_next; } _iocListChanged = false; } // 加入到mydel中 if (_delListHead != NULL && _delListTail != NULL) { if (mydelTail == NULL) { mydelHead = _delListHead; } else { mydelTail->_next = _delListHead; _delListHead->_prev = mydelTail; } mydelTail = _delListTail; // 清空delList _delListHead = _delListTail = NULL; } _iocsMutex.unlock(); // 对每个iocomponent进行检查 for (int i=0; i<(int)mylist.size(); i++) { IOComponent *ioc = mylist[i]; ioc->checkTimeout(tbsys::CTimeUtil::getTime()); } // 删除掉 IOComponent *tmpList = mydelHead; int64_t nowTime = tbsys::CTimeUtil::getTime() - static_cast<int64_t>(900000000); // 15min while (tmpList) { if (tmpList->getRef() <= 0) { tmpList->subRef(); } if (tmpList->getRef() <= -10 || tmpList->getLastUseTime() < nowTime) { // 从链中删除 if (tmpList == mydelHead) { // head mydelHead = tmpList->_next; } if (tmpList == mydelTail) { // tail mydelTail = tmpList->_prev; } if (tmpList->_prev != NULL) tmpList->_prev->_next = tmpList->_next; if (tmpList->_next != NULL) tmpList->_next->_prev = tmpList->_prev; IOComponent *ioc = tmpList; tmpList = tmpList->_next; TBSYS_LOG(INFO, "DELIOC, %s, IOCount:%d, IOC:%p\n", ioc->getSocket()->getAddr().c_str(), _iocListCount, ioc); delete ioc; } else { tmpList = tmpList->_next; } } usleep(500000); } // 写回到_delList上,让destroy销毁 _iocsMutex.lock(); if (mydelHead != NULL) { if (_delListTail == NULL) { _delListHead = mydelHead; } else { _delListTail->_next = mydelHead; mydelHead->_prev = _delListTail; } _delListTail = mydelTail; } _iocsMutex.unlock(); }
参考:
http://code.taobao.org/p/tfs/src/
原文:http://www.cnblogs.com/gisorange/p/4891204.html