前面提到TcpConnection的生存期模糊,主要是因为我们不能在TcpServer中直接erase掉TcpConnection对象,因为此时有可能Channel中的handleEvent还在执行,如果析构TcpConnection对象,那么他的成员channel_也会被析构,会导致core dump。也就是说我们需要TcpConnection 对象生存期要长于handleEvent() 函数,直到执行完connectDestroyed() 后才会析构。
class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection(); EventLoop* getLoop() const { return loop_; } const string& name() const { return name_; } const InetAddress& localAddress() { return localAddr_; } const InetAddress& peerAddress() { return peerAddr_; } bool connected() const { return state_ == kConnected; } void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } /// Internal use only. void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } // called when TcpServer accepts a new connection void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void handleRead(Timestamp receiveTime); void handleClose(); void handleError(); void setState(StateE s) { state_ = s; } EventLoop* loop_; // 所属EventLoop string name_; // 连接名 StateE state_; // FIXME: use atomic variable // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; boost::scoped_ptr<Channel> channel_; InetAddress localAddr_; InetAddress peerAddr_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; CloseCallback closeCallback_; }; typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr; }
TcpConnection::TcpConnection(EventLoop* loop, const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr)/*, highWaterMark_(64*1024*1024)*/ { // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间 channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); // 连接关闭,回调TcpConnection::handleClose channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); // 发生错误,回调TcpConnection::handleError channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); } TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd(); } void TcpConnection::connectEstablished() { loop_->assertInLoopThread(); assert(state_ == kConnecting); setState(kConnected); LOG_TRACE << "[3] usecount=" << shared_from_this().use_count(); channel_->tie(shared_from_this()); channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注 connectionCallback_(shared_from_this()); LOG_TRACE << "[4] usecount=" << shared_from_this().use_count(); } void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove(); } void TcpConnection::handleRead(Timestamp receiveTime) { /* loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } */ loop_->assertInLoopThread(); int savedErrno = 0; char buf[65536]; ssize_t n = ::read(channel_->fd(), buf, sizeof buf); if (n > 0) { messageCallback_(shared_from_this(), buf, n); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); }
void Channel::handleEvent(Timestamp receiveTime) { boost::shared_ptr<void> guard; if (tied_) { guard = tie_.lock(); if (guard) { LOG_TRACE << "[6] usecount=" << guard.use_count(); handleEventWithGuard(receiveTime); LOG_TRACE << "[12] usecount=" << guard.use_count(); } } else { handleEventWithGuard(receiveTime); } }
Muduo网络库源码分析(六)TcpConnection 的生存期管理