muduo/net/Buffer.h
muduo/net/Buffer.cc
为了让一个线程能服务于多个socket连接,IO线程只能阻塞在IO多路复用函数(如epoll_wait/poll),所以read/write等IO系统调用需设置为非阻塞non-blocking。因此每个TCP的socket连接都要有输入/输出缓冲区。
输入缓冲区input buffer: Tcpconnection从socket读数据,然后写入输入缓冲区,用户代码从输入缓冲区读取数据。
输出缓冲区output buffer: 用户把数据写入output buffer。TcpConnection从输出缓冲区读取数据并写入socket。
每个TcpConnection有一个输入缓冲区和一个输出缓冲区。
用户代码不会直接操作Buffer,只需要调用发送和取数据的接口函数,read/write由muduo库完成。
这里只放用户代码:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <boost/bind.hpp>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
class TestServer
{
public:
TestServer(EventLoop* loop,
const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}
//消息到来回调函数
void onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
string msg(buf->retrieveAllAsString());//从缓冲区取数据
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
msg.size(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());
conn->send(msg);//发送
}
EventLoop* loop_;
TcpServer server_; //包含TcpServer,基于对象编程
};
int main()
{
printf("main(): pid = %d\n", getpid());
InetAddress listenAddr(8888);
EventLoop loop;
TestServer server(&loop, listenAddr);
server.start();
loop.loop();
}
主要看onMessage()回调函数,使用buf->retrieveAllAsString()从缓冲区取数据,使用 conn->send(msg)发送数据。
实际当可读事件触发,先调用TcpConnection::handleRead(),其中调用inputBuffer_.readFd()和onMessage()回调函数。
当用户代码调用conn->send(msg),如果输出缓冲区没有数据,则直接write(msg),否则将msg添加到输出缓冲区,监听可写事件。当可写事件触发,调用TcpConnection::handleWrite()发送outputBuffer_中的数据,最后调用writeCompleteCallback_回调函数。
Buffer内部以vector
readIndex和writerIndex两个下标将数组分为三块
Buffer就像一个队列queue,从头部读数据,从尾部写数据。
prependable部分是头部预留字节,可方便地在头部追加数据长度等信息。
readable部分数有效数据部分,起始时readIndex等于writerIndex,有效载荷为空。
writable是空白可写部分。
这么做利用了临时栈上空间 ,避免每个连接的初始Buffer过大造成的内存浪费,也避免反复调用read()的系统开销(由于缓冲区足够大,通常一次readv()系统调用就能读完全部数据)。
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
// saved an ioctl()/FIONREAD call to tell how much to read
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
//第一块缓冲区
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
//第二块缓冲区
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don‘t read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else//当前缓冲区不够容纳,数据被接收到了第二块缓冲区extrabuf,将其append至buffer
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
// if (n == writable + sizeof extrabuf)
// {
// goto line_30;
// }
return n;
}
void append(const char* /*restrict*/ data, size_t len)
{
ensureWritableBytes(len); //确保剩余空间足够
std::copy(data, data+len, beginWrite());// 复制到buffer,beginWrite()返回writeIndex所在指针
hasWritten(len); //更新writeIndex位置 +len
}
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len); //空间不足,腾空间
}
assert(writableBytes() >= len);
}
void makeSpace(size_t len)
{
//有效数据区往前挪也不够空间,重新分配空间
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
// FIXME: move readable data
//自动增长
buffer_.resize(writerIndex_+len);
}
else
{
// move readable data to the front, make space inside buffer
//内部腾挪,往前挪
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
std::copy(begin()+readerIndex_,
begin()+writerIndex_,
begin()+kCheapPrepend);
//更新下标位置,readerIndex即回到初始位置
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}
上面的用户代码中,使用retrieveAllAsString()将缓冲区的数据作为string全部取出。
string retrieveAllAsString()
{
return retrieveAsString(readableBytes());
}
string retrieveAsString(size_t len)
{
assert(len <= readableBytes());
string result(peek(), len); //peek()返回readerIndex所在指针。
retrieve(len);
return result;
}
const char* peek() const
{ return begin() + readerIndex_; }
void retrieve(size_t len)
{
assert(len <= readableBytes());
if (len < readableBytes())
{
readerIndex_ += len; //更新readInder位置
}
else
{
retrieveAll();//重置到起始位置
}
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
原文:https://www.cnblogs.com/Lj-ming/p/14827891.html