muduo/net/Poller.h
muduo/net/Poller.cc
muduo/net/poller/*
Poller是对IO多路复用的封装,在muduo中支持poll和epoll两种IO multiplexing机制。
Poller是EventLoop的间接成员,EventLoop调用其poll()函数获得当前活动的IO事件。
前面说到在muduo中支持poll和epoll两种IO multiplexing机制。
在muduo中Poller是虚基类,也是muduo中唯一使用面向对象编程方法的地方。
在muduo/net/poller/文件夹下,PollPoller是poll的封装,EpollPoller是epoll的封装,都继承了Poller。
Poller.h
/// Base class for IO Multiplexing
///
/// This class doesn‘t own the Channel objects.
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
virtual ~Poller();
/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;
/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;
virtual bool hasChannel(Channel* channel) const;
static Poller* newDefaultPoller(EventLoop* loop);
void assertInLoopThread() const
{
ownerLoop_->assertInLoopThread();
}
protected:
typedef std::map<int, Channel*> ChannelMap; //fd,channel
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
PollPoller.h
class PollPoller : public Poller
{
public:
PollPoller(EventLoop* loop);
~PollPoller() override;
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;
private:
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
typedef std::vector<struct pollfd> PollFdList;
PollFdList pollfds_;
};
PollPoller调用poll监听pollfds的事件
imestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// XXX pollfds_ shouldn‘t change
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now()); //时间戳
if (numEvents > 0)//有活跃事件
{
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);//添加到活跃通道
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happened";
}
else
{
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "PollPoller::poll()";
}
}
return now;
}
承接上面的poll函数,监听到活跃事件。将有活跃事件的通道添加到EventLoop的活跃通道列表
void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
//遍历文件描述符
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
//如果有活跃事件
if (pfd->revents > 0)
{
--numEvents;
//找到fd对应的channel
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second; //channels_的key是fd, value是Channel*
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents); //设置通道的
// pfd->revents = 0;
activeChannels->push_back(channel);//添加到EventLoop的活跃通道列表
}
}
}
先看一个通道更新的传递链:
Channel::enableReading()->Channel::update()->EventLoop::updateChannel()
->PollPoller::updateChannel()
一个通道修改了自身监听的event后,调用update让Poller更新监听的事件。
Poller则根据Channel的events修改pollfd中的events。
void PollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0) //新的Channel
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd); //添加到pollfds
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel; //添加到Poller的map
}
else
{
// update existing one 更新已存在的Channel
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent())
{
// ignore this pollfd
pfd.fd = -channel->fd()-1;
}
}
}
void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
{
pollfds_.pop_back();
}
else
{
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0)
{
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}
原文:https://www.cnblogs.com/Lj-ming/p/14774611.html