使用 \muduo\examples\asio\chat\server.cc 作例子
首先需要知道:
EventLoop:一个事件分发器类,拥有 Poller 对象,事件处理函数是 loop(),在这里捕获注册的 Channel 事件,并调用相应的回调函数。
while (!quit_) { ... pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ... for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { currentActiveChannel_ = *it; // 通过 revents_ 来处理响应的事件 currentActiveChannel_->handleEvent(pollReturnTime_); } ... }
下面这几个类,都拥有一个 EventLoop * 对象,他们的事件注册,就注册到他们分配到的 EventLoop * 对象中。
Channel:一个封装 fd 的类,方便向 EventLoop 注册事件.比如,可读,可写事件。
Acceptor:负责接受连接的类,拥有一个Channel对象,向 EventLoop 注册为可读,则当有人来连接的时候,就能调用相应的回调。
TcpConnecton:也拥有一个 Channel 对象,已建立连接以后,再生成的对象,用来处理,连接之后的操作,发送,接受,断开。
然后开始来看吧,客户使用的代码,极其简单,就建立起一个 ChatServer . 这里的 loop 为 baseloop 处于主线程的条件下。
ChatServer server(&loop, serverAddr);
server.start();
loop.loop();
再来看看 server.start() 干了什么.先在构造函数中,初始化了几个成员,threadPool_,acceptor_。设置了acceptor_的回调.
1.threadPool_.start(),开启创建 N 个 EventLoopThread 个对象,即开了了 N 个 EventLoop 对象,并执行他们的 loop() 函数。
2.loop_runInLoop(..listen..),runInLoop 简单说明一下,就是保证你的函数执行的环境是跟你的 EventLoop 处于同一个线程中,保证同步。 最终,这个函数会调用 acceptor_.listen(),这个函数的本质就是向他的 loop 注册可读事件,可读的话就是有人来连接啦。可以自己跟踪下。
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option) : loop_(CHECK_NOTNULL(loop)), hostport_(listenAddr.toIpPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), threadPool_(new EventLoopThreadPool(loop)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), nextConnId_(1) { // Acceptor::handleRead函数中会回调用TcpServer::newConnection // _1对应的是socket文件描述符,_2对应的是对等方的地址(InetAddress) acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); }
void TcpServer::start() { .... threadPool_->start(threadInitCallback_); loop_->runInLoop( boost::bind(&Acceptor::listen, get_pointer(acceptor_))); .... }
到这里为止,就已经注册了 accept fd 的可读事件,那么当有连接到来的时候,就会触发,在 loop() 函数中,然后去调用响应的回调函数,在 TcpServer 构造函数中可以看到,注册了 acceptor_ 可读回调是 TcpServer::newConnecton() ,所以,就来看看这个函数又看了什么。
1.可以看到,通过传过来的参数,创建一个 TcpConnecton 对象,并存储到 connections_ 中,这是一个 map 容器。threadPool_->getNextLoop() 就是在线程池中分配一个 loop 给他,负载均衡吧。如果之前设置的线程数为0,则大家共用主线程的 loop.
2.注册,响应的回调函数,可读,可写,有连接(就是调用用户设置的有连接来的回调函数),关闭。
3.最后调用,ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));这个函数里面,实际就是将 TcpConnection 中的 Channel 注册为可读,这样就能开始接收 client 的数据。然后也可以调用 TcpConnection 的 send 来发数据。
这里的接受和发送设计一个 Buffer 类,这是一个 缓存类,非阻塞I/O必备,这里就不说先。
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); // 按照轮叫的方式选择一个EventLoop EventLoop* ioLoop = threadPool_->getNextLoop(); // 记录那个 nextConnId_ 有什么意义吗? char buf[32]; snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); // getLocalAddr listenAddr 不应该就是 localAddr 了吗? InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary // 每个TcpConnection 里面都有一个 Channel // 每一个 channel 里面 有一个 loop // 每一个 loop 里面有一个 poller_(),每一个loop不同线程 // 每一个 loop 里面有一个 vector<Channel *> TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); // connections_ is a map<string,TcpConnectionPtr> connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_);//改变connection 改为用户定义的,跟构造函数的不一样 conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe//跨线程的调用 // TcpConnection所对应的通道加入到Poller关注 // 嗯,这个时候这里调用 ioLoop->runInLoop() 的话,似乎就不是他那里的环境 // 没错,这里的 ioloop 是在其他线程中创建的。 ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//各自线程调用 }
当已经建立连接好以后,我们再来看看,在最初的 loop() 函数中的 Channel::handleEvent() 函数.可以看到,通过判断事件来种类,来调用响应的回调函数。
if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); } if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); }
大致逻辑是这样,服务端的过程比客户端的过程要复杂一些,毕竟服务端要管理这么多个连接。客户端的过程大家可以自己去看一下。本文只是粗略的解释了服务端的过程,其中涉及的多线程应该注意的事项还无功力能总结。
原文:http://www.cnblogs.com/cycxtz/p/4940953.html