一、程序运行环境
操作系统:centos 7.0
编译器:gcc/g++ 4.8.3 ? ? cmake 2.8.11
mysql数据库:5.5.47
项目代码管理工具:VS2013
一、程序结构
该程序总共有17个线程,其中分为9个数据库工作线程D和一个日志线程L,6个普通工作线程W,一个主线程M。(以下会用这些字母来代指这些线程)
(一)、数据库工作线程的用途
?9个数据库工作线程在线程启动之初,与mysql建立连接,也就是说每个线程都与mysql保持一路连接,共9个数据库连接。
?每个数据库工作线程同时存在两个任务队列,第一个队列A存放需要执行数据库增删查改操作的任务sqlTask,第二个队列B存放sqlTask执行完成后的结果。sqlTask执行完成后立即放入结果队列中,因而结果队列中任务也是一个个的需要执行的任务。大致伪代码如下:
void db_thread_func()
{
while (!m_bExit)
{
if (NULL != (pTask = m_sqlTask.Pop()))
{
//从m_sqlTask中取出的任务先执行完成后,pTask将携带结果数据
pTask->Execute();
//得到结果后,立刻将该任务放入结果任务队列
m_resultTask.Push(pTask);
continue;
}
sleep(1000);
}
}
现在的问题来了:
任务队列A中的任务从何而来,目前只有消费者,没有生产者,那么生产者是谁?
这两个问题先放一会儿,等到后面我再来回答。
(二)工作线程和主线程
在介绍主线程和工作线程具体做什么时,我们介绍下服务器编程中常常抽象出来的几个概念(这里以tcp连接为例):
2.一个连接对应一个TcpConnection对象,TcpConnection对象管理着这个连接的一些信息:如连接状态、本端和对端的ip地址和端口号等。
3.数据通道对象Channel,Channel记录了socket的句柄,因而是一个连接上执行数据收发的真正执行者,Channel对象一般作为TcpConnection的成员变量。
归纳起来:一个TcpServer依靠TcpListener对新连接的侦听和处理,依靠TcpConnection对象对连接上的数据进行管理,TcpConnection实际依靠Channel对数据进行收发,依靠TcpSession对数据进行装包和解包。也就是说一个TcpServer存在一个TcpListener,对应多个TcpConnection,有几个TcpConnection就有几个TcpSession,同时也就有几个Channel。
以上说的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服务器框架的网络层。一个好的网络框架,应该做到与业务代码脱耦。即上层代码只需要拿到数据,执行业务逻辑,而不用关注数据的收发和网络数据包的封包和解包以及网络状态的变化(比如网络断开与重连)。
拿数据的发送来说:
当业务逻辑将数据交给TcpSession,TcpSession将数据装好包后(装包过程后可以有一些加密或压缩操作),交给TcpConnection::SendData(),而TcpConnection::SendData()实际是调用Channel::SendData(),因为Channel含有socket句柄,所以Channel::SendData()真正调用send()/sendto()/write()方法将数据发出去。
对于数据的接收,稍微有一点不同:
通过select()/poll()/epoll()等IO multiplex技术,确定好了哪些TcpConnection上有数据到来后,激活该TcpConnection的Channel对象去调用recv()/recvfrom()/read()来收取数据。数据收到以后,将数据交由TcpSession来处理,最终交给业务层。注意数据收取、解包乃至交给业务层是一定要分开的。我的意思是:最好不要解包并交给业务层和数据收取的逻辑放在一起。因为数据收取是IO操作,而解包和交给业务层是逻辑计算操作。IO操作一般比逻辑计算要慢。到底如何安排要根据服务器业务来取舍,也就是说你要想好你的服务器程序的性能瓶颈在网络IO还是逻辑计算,即使是网络IO,也可以分为上行操作和下行操作,上行操作即客户端发数据给服务器,下行即服务器发数据给客户端。有时候数据上行少,下行大。(如游戏服务器,一个npc移动了位置,上行是该客户端通知服务器自己最新位置,而下行确是服务器要告诉在场的每个客户端)。
在我的博文《服务器端编程心得(一)—— 主线程与工作线程的分工》中介绍了,工作线程的流程:
while (!m_bQuit)
{
epoll_or_select_func();
handle_io_events();
handle_other_things();
}
其中epoll_or_select_func()即是上文所说的通过select()/poll()/epoll()等IO multiplex技术,确定好了哪些TcpConnection上有数据到来。我的服务器代码中一般只会监测socket可读事件,而不会监测socket可写事件。至于如何发数据,文章后面会介绍。所以对于可读事件,以epoll为例,这里需要设置的标识位是:
EPOLLIN 普通可读事件(当连接正常时,产生这个事件,recv()/read()函数返回收到的字节数;当连接关闭,这两个函数返回0,也就是说我们设置这个标识已经可以监测到新来数据和对端关闭事件)
EPOLLRDHUP 对端关闭事件(linux man手册上说这个事件可以监测对端关闭,但我实际调试时发送即使对端关闭也没触发这个事件,仍然是EPOLLIN,只不过此时调用recv()/read()函数,返回值会为0,所以实际项目中是否可以通过设置这个标识来监测对端关闭,仍然待考证)
EPOLLPRI 带外数据
muduo里面将epoll_wait的超时事件设置为1毫秒,我的另一个项目将epoll_wait超时时间设置为10毫秒。这两个数值供大家参考。
这个项目中,工作线程和主线程都是上文代码中的逻辑,主线程监听侦听socket上的可读事件,也就是监测是否有新连接来了。主线程和每个工作线程上都存在一个epollfd。如果新连接来了,则在主线程的handle_io_events()中接收新连接。产生的新连接的socket句柄挂接到哪个线程的epollfd上呢?这里采取的做法是round-robin算法,即存在一个对象CWorkerThreadManager记录了各个工作线程上工作状态。伪码大致如下:
void attach_new_fd(int newsocketfd)
{
workerthread = get_next_worker_thread(next);
workerthread.attach_to_epollfd(newsocketfd);
++next;
if (next > max_worker_thread_num)
next = 0;
}
即先从第一个工作线程的epollfd开始挂接新来socket,接着累加索引,这样下次就是第二个工作线程了。如果所以超出工作线程数目,则从第一个工作重新开始。这里解决了新连接socket“负载均衡”的问题。在实际代码中还有个需要注意的细节就是:epoll_wait的函数中的struct epoll_event 数量开始到底要设置多少个才合理?存在的顾虑是,多了浪费,少了不够用,我在曾经一个项目中直接用的是4096:
const int EPOLL_MAX_EVENTS = 4096;
const int dwSelectTimeout = 10000;
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);
我在陈硕的muduo网络库中发现作者才用了一个比较好的思路,即动态扩张数量:开始是n个,当发现有事件的fd数量已经到达n个后,将struct epoll_event数量调整成2n个,下次如果还不够,则变成4n个,以此类推,作者巧妙地利用stl::vector在内存中的连续性来实现了这种思路:
//初始化代码
std::vector<struct epoll_event> events_(16);
//线程循环里面的代码
while (m_bExit)
{
int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1);
if (numEvents > 0)
{
if (static_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size() * 2);
}
}
}
读到这里,你可能觉得工作线程所做的工作也不过就是调用handle_io_events()来接收网络数据,其实不然,工作线程也可以做程序业务逻辑上的一些工作。也就是在handle_other_things()里面。那如何将这些工作加到handle_other_things()中去做呢?写一个队列,任务先放入队列,再让handle_other_things()从队列中取出来做?我在该项目中也借鉴了muduo库的做法。即handle_other_things()中调用一系列函数指针,伪码如下:
void do_other_things()
{
somefunc();
}
//m_functors是一个stl::vector,其中每一个元素为一个函数指针
void somefunc()
{
for (size_t i = 0; i < m_functors.size(); ++i)
{
m_functors[i]();
}
m_functors.clear();
}
当任务产生时,只要我们将执行任务的函数push_back到m_functors这个stl::vector对象中即可。但是问题来了,如果是其他线程产生的任务,两个线程同时操作m_functors,必然要加锁,这也会影响效率。muduo是这样做的:
void add_task(const Functor& cb)
{
std::unique_lock<std::mutex> lock(mutex_);
m_functors.push_back(cb);
}
void do_task()
{
std::vector<Functor> functors;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(m_functors);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
}
看到没有,利用一个栈变量functors将m_functors中的任务函数指针倒换(swap)过来了,这样大大减小了对m_functors操作时的加锁粒度。前后变化:变化前,相当于原来A给B多少东西,B消耗多少,A给的时候,B不能消耗;B消耗的时候A不能给。现在变成A将东西放到篮子里面去,B从篮子里面拿,B如果拿去一部分后,只有消耗完了才会来拿,或者A通知B去篮子里面拿,而B忙碌时,A是不会通知B来拿,这个时候A只管将东西放在篮子里面就可以了。
bool bBusy = false;
void add_task(const Functor& cb)
{
std::unique_lock<std::mutex> lock(mutex_);
m_functors_.push_back(cb);
//B不忙碌时只管往篮子里面加,不要通知B
if (!bBusy)
{
wakeup_to_do_task();
}
}
void do_task()
{
bBusy = true;
std::vector<Functor> functors;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
bBusy = false;
}
看,多巧妙的做法!
因为每个工作线程都存在一个m_functors,现在问题来了,如何将产生的任务均衡地分配给每个工作线程。这个做法类似上文中如何将新连接的socket句柄挂载到工作线程的epollfd上,也是round-robin算法。上文已经描述,此处不再赘述。
还有种情况,就是希望任务产生时,工作线程能够立马执行这些任务,而不是等epoll_wait超时返回之后。这个时候的做法,就是使用一些技巧唤醒epoll_wait,linux系统可以使用socketpair或timerevent、eventfd等技巧(这个细节在我的博文《服务器端编程心得(一)—— 主线程与工作线程的分工》已经详细介绍过了)。
上文中留下三个问题:
2.数据库线程任务队列B中的任务将去何方,目前只有生产者没有消费者。
3.业务层的数据如何发送出去?
问题1的答案是:业务层产生任务可能会交给数据库任务队列A,这里的业务层代码可能就是工作线程中do_other_things()函数执行体中的调用。至于交给这个9个数据库线程的哪一个的任务队列,同样采用了round-robin算法。所以就存在一个对象CDbThreadManager来管理这九个数据库线程。下面的伪码是向数据库工作线程中加入任务:
bool CDbThreadManager::AddTask(IMysqlTask* poTask )
{
if (m_index >= m_dwThreadsCount)
{
m_index = 0;
}
return m_aoMysqlThreads[m_index++].AddTask(poTask);
}
同理问题2中的消费者也可能就是do_other_things()函数执行体中的调用。
现在来说问题3,业务层的数据产生后,经过TcpSession装包后,需要发送的话,产生任务丢给工作线程的do_other_things(),然后在相关的Channel里面发送,因为没有监测该socket上的可写事件,所以该数据可能调用send()或者write()时会阻塞,没关系,sleep()一会儿,继续发送,一直尝试,到数据发出去。伪码如下:
bool Channel::Send()
{
int offset = 0;
while (true)
{
int n = ::send(socketfd, buf + offset, length - offset);
if (n == -1)
{
if (errno == EWOULDBLOCK)
{
::sleep(100);
continue;
}
}
//对方关闭了socket,这端建议也关闭
else if (n == 0)
{
close(socketfd);
return false;
}
offset += n;
if (offset >= length)
break;
}
return true;
}
最后,还有一个日志线程没有介绍,高性能的日志实现方案目前并不常见。限于文章篇幅,下次再介绍。
zhangyl 2016.12.02晚12:35
原文:http://blog.51cto.com/6956264/2125747