一个buffer类,400行的一个类要看吐了,不过还好有很多相似功能的成员函数
参考:https://blog.csdn.net/Shreck66/article/details/49618331
先说一下buffer的目的作用:
在这里仅考虑 非阻塞 套接字模式,阻塞模式在高并发服务器情况下严重影响并发性,不考虑.
通常非阻塞套接字模式要搭配IO复用一起使用,否则为实现逻辑,也需要不断循环来进行IO
(这里以epoll为例子)我们知道epoll_wait会返回发生了网络事件的套接字集合以及相应的网络事件名字
正常的情况是:
epoll_wait返回,
EPOLLIN:调用read读取sockfd上的数据或者接受一个连接
EPOLLOUT:在sockfd上调用write发送数据
在高并发情况下会带来一些问题:
问题1.当我们要发送较大的数据时,我们正常想法是方法一,一直write直到所有数据都发了过去,可是事实情况可能是
操作系统内核非常繁忙无法处理write调用,导致一直进行write循环,
解决方法一:
epoll_wait() //返回发生网络事件的套接字
|
EPOLLOUT //处理写事件
|
while(unfinished)write() //内核繁忙,write一直失败,死循环,其他的套接字得不到响应
解决方法二:利用写缓冲区
epoll_wait()
|
EPOLLOUT
|
write to buffer //把内核无法接受的数据先放在buffer中
|
注册EPOLLOUT //注册EPOLLOUT事件,下次epoll_wait()返回时从buffer中拿数据write
很明显方法二不需要我们关系write能不能成功,我们只需要把数据放到buffer中,在注册一个EPOLLOUT事件
操作系统可write时,直接从buffer中拿数据write即可.
优点:并发性优于方法一
buffer用于应用层向套接字进行读写数据所使用的中间的缓冲区
不使用缓冲区会有什么问题?
问题2:我们在处理可读事件时,必须要一次把数据读完,否则epoll_wait()会一直触发返回(LT模式),
造成主循环busy-loop,但是当我们一次把数据全部读取完毕,无法保证消息的完整性(粘包/不完整问题).
方案一:粘包/包不完整:
epoll_wait()
|
EPOLLIN
|
read to buf //buf中第一个数据包和第二个数据包会连在一起,形成粘包,也有可能包不完整
|
process //包有问题,无法处理
方案二:利用读缓冲区
epoll_wait()
|
EPOLLIN
|
read to buffer //先全部读到buffer中
|
buffer notify() //buffer中有完整的一个包了,就通知业务程序处理
|
process //处理数据
因此可以看出来,buffer主要分为 写缓冲区 和 读缓冲区
首先来看一下buffer设计的要点
.对外表现为一块连续的内存
.其大小可自动增长,以适应不断增长的消息,它不能是固定大小的buffer
.内部以std::vector来保存数据,并提供相应的访问函数
.input buffer,是程序从socket中读取数据然后写入input buffer,客户代码从input buffer读取数据
.output buffer,客户代码会把数据写入output buffer中,然后再从output buffer中读取数据并写入socket中
buffer.h:
#ifndef BUFFER_H #define BUFFER_H #include"base/copyable.h" #include"base/types.h" #include"net/endian.h" #include<algorithm> #include<vector> #include<assert.h> #include<string.h> //#include<unistd.h> //ssize_t namespace mymuduo { namespace net { ///缓冲区类模仿 org.jboss.netty.buffer.ChannelBuffer /// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer /// /// @code /// +-------------------+------------------+------------------+ /// | prependable bytes | readable bytes | writable bytes | /// | 预留的空间 | 数据(CONTENT) | 可写的空间 | /// +-------------------+------------------+------------------+ /// | | | | /// 0 <= readerIndex <= writerIndex <= size /// 开头 可读位置 可写位置 结尾 /// @endcode /// /// 注意添加数据时,如果剩余空间可用,直接添加,否则若是剩余空间+预留空间可用, /// 把数据挪动到kCheapPrepend位置即可,否则就需要额外扩容了 /// /// class buffer:copyable { public: //总空间1024+8=1032 static const size_t kCheapPrepend=8; //一开始预留空间为8字节 static const size_t kInitialSize=1024; //一开始初始空间1024 //explicit 禁止隐式转换 //readIndex和writerIndex初始都在kCheaperPrepend(8)处,m_buffer大小(1032) explicit buffer(size_t initialSize=kInitialSize) :m_buffer(kCheapPrepend+initialSize), m_readerIndex(kCheapPrepend), m_writerIndex(kCheapPrepend) { assert(readableBytes()==0); assert(writableBytes()==initialSize); assert(prependableBytes()==kCheapPrepend); } //交换两个buffer类,实际上完成m_buffer,m_readerIndex,m_writerbuffer的交换 void swap(buffer& rhs) { m_buffer.swap(rhs.m_buffer); std::swap(m_readerIndex,rhs.m_readerIndex); std::swap(m_writerIndex,rhs.m_writerIndex); } //当前buffer可读的字节数,从可读位置到可写位置 size_t readableBytes() const { return m_writerIndex-m_readerIndex; } //当前buffer可写的字节数,从可写位置到尾处 size_t writableBytes() const { return m_buffer.size()-m_writerIndex; } //当前预留的字节数,从0到可读位置 size_t prependableBytes() const { return m_readerIndex; } //得到可读位置的地址 const char* peek() const { return begin() + m_readerIndex; } //peek()是可读位置m_readerIndex的地址,beginWrite()是可写位置m_writerIndex的地址 //左闭右开区间[peek(),beginWrite()),在此区间中找,得到[kCRLF,kCRLF+2)的地址 const char* findCRLF() const { //const char Buffer::kCRLF[] = "\r\n"; const char* crlf=std::search(peek(),beginWrite(),kCRLF,kCRLF+2); return crlf==beginWrite()?NULL:crlf; } //在[start,beginWrite())中找到第一个"\r\n"的位置 const char* findCRLF(const char* start) const { assert(peek()<=start); assert(start<=beginWrite()); const char* crlf=std::search(start,beginWrite(),kCRLF,kCRLF+2); return crlf==begin()?NULL:crlf; } //找到换行符‘\n‘的地址 const char* findEOL() const { //从peek()所指区域的前readbleBytes()个字符中找到‘\n‘; const void* eol=memchr(peek(),‘\n‘,readableBytes()); return static_cast<const char*>(eol); } //从start位置开始,找到换行符‘\n‘的地址 const char* findEOL(const char* start) const { assert(peek()<=start); assert(start<=beginWrite()); const void* eol=memchr(start,‘\n‘,beginWrite()-start); return static_cast<const char*>(eol); } //从peek()可读位置开始,让m_readerIndex移动len个位置 void retrieve(size_t len) { assert(len<=readableBytes()); if(len<readableBytes()) m_readerIndex+=len; else retrieveAll(); } //从peek()可读位置开始,让m_readerIndex移动知道end为止 void retrieveUntil(const char* end) { assert(peek()<=end); assert(end<=beginWrite()); retrieve(end-peek()); } //让m_readerIndex移动sizeof(int_*)个位置 void retrieveInt64() { retrieve(sizeof(int64_t)); } void retrieveInt32() { retrieve(sizeof(int32_t)); } void retrieveInt16() { retrieve(sizeof(int16_t)); } void retrieveInt8() { retrieve(sizeof(int8_t)); } //让读位置和写位置都回到kCheapPrepend void retrieveAll() { m_readerIndex=kCheapPrepend; m_writerIndex=kCheapPrepend; } //读取len个字节的数据并移动m_readerIndex string retrieveAsString(size_t len) { assert(len<=readableBytes()); string result(peek(),len); retrieve(len); return result; } //读取剩余所有数据并移动m_readerIndex string retrieveAllAsString() { return retrieveAsString(readableBytes()); } //本来是StringPiece toStringPiece() const,我放弃了StringPiece //读取剩余所有的数据并移动m_readerIndex string toString() const { return string(peek(),static_cast<int>(readableBytes())); } void append(const char* data,size_t len) { //扩容/挪动数据,保证剩余空间足够len字节 ensureWritableBytes(len); //添加数据 std::copy(data,data+len,beginWrite()); //更新m_writerIndex hasWritten(len); } // void append(const string& str) { append(str.data(),str.size()); } void append(const void* data,size_t len) { append(static_cast<const char*>(data),len); } //剩余空间不足len时,对m_buffer进行扩容或者挪动数据,保证能写len字节的数据, void ensureWritableBytes(size_t len) { //扩容/挪动数据 if(writableBytes()<len) makeSpace(len); assert(writableBytes()>=len); } //返回可写位置的地址 char* beginWrite() { return begin()+m_writerIndex; } const char* beginWrite() const { return begin()+m_writerIndex; } //更新m_writerIndex + len void hasWritten(size_t len) { assert(len<=writableBytes()); m_writerIndex+=len; } //m_writerIndex往前移动len个字节 void unwrite(size_t len) { assert(len<=readableBytes()); m_writerIndex-=len; } //向m_buffer中写网络字节序int_*,改变m_writerIndex位置 void appendInt64(int64_t x) { int64_t be64=sockets::hostToNetwork64(x); append(&be64,sizeof(be64)); } void appendInt32(int32_t x) { int32_t be32 = sockets::hostToNetwork32(x); append(&be32, sizeof be32); } void appendInt16(int16_t x) { int16_t be16 = sockets::hostToNetwork16(x); append(&be16, sizeof be16); } void appendInt8(int8_t x) { append(&x, sizeof x); } //从m_buffer中可读位置返回第一个in64_*的网络字节序,不会改变m_readerIndex位置 int64_t peekInt64() const { assert(readableBytes() >= sizeof(int64_t)); int64_t be64 = 0; ::memcpy(&be64, peek(), sizeof be64); return sockets::networkToHost64(be64); } int32_t peekInt32() const { assert(readableBytes() >= sizeof(int32_t)); int32_t be32 = 0; ::memcpy(&be32, peek(), sizeof be32); return sockets::networkToHost32(be32); } int16_t peekInt16() const { assert(readableBytes() >= sizeof(int16_t)); int16_t be16 = 0; ::memcpy(&be16, peek(), sizeof be16); return sockets::networkToHost16(be16); } int8_t peekInt8() const { assert(readableBytes() >= sizeof(int8_t)); int8_t x = *peek(); return x; } //从m_buffer中读取一个int64_t,返回主机字节序,会移动m_readerIndex int64_t readInt64() { //获取8个字节的数据,返回主机字节序 int64_t result=peekInt64(); retrieveInt64(); //移动m_readerIndex; return result; } int32_t readInt32() { //获取4个字节的数据,返回主机字节序 int32_t result=peekInt32(); retrieveInt32(); //移动m_readerIndex; return result; } int16_t readInt16() { int16_t result=peekInt16(); retrieveInt16(); return result; } int8_t readInt8() { int8_t result=peekInt8(); retrieveInt8(); return result; } //向预留空间中写入数据,从m_readerIndex向前也就是向左拷贝data void prepend(const void* data,size_t len) { assert(len<=prependableBytes()); m_readerIndex-=len; const char* d=static_cast<const char*>(data); std::copy(d,d+len,begin()+m_readerIndex); } void prependInt64(int64_t x) { //获取网络字节序 int64_t be64=sockets::hostToNetwork64(x); prepend(&be64,sizeof(be64)); //向预留空间中写入8个字节的数据 } //同上,只是字节数稍有变化 void prependInt32(int32_t x) { int32_t be32 = sockets::hostToNetwork32(x); prepend(&be32, sizeof be32); } void prependInt16(int16_t x) { int16_t be16 = sockets::hostToNetwork16(x); prepend(&be16, sizeof be16); } void prependInt8(int8_t x) { prepend(&x, sizeof x); } //把数据读到另一个m_buffer里,并进行交换 void shrink(size_t reserve) { buffer other; //保证可用空间足够,扩容/挪动数据 other.ensureWritableBytes(readableBytes()+reserve); other.append(toString()); swap(other); } //得到m_buffer的已分配空间大小 size_t internalCapacity() const { return m_buffer.capacity(); } ssize_t readFd(int fd,int* savedErrno); private: //初始位置的地址 char* begin() { return &*m_buffer.begin(); } //初始位置的地址 const char* begin() const { return &*m_buffer.begin(); } //向m_buffer中添加数据时,可能会引起m_buffer扩容/数据挪动操作 void makeSpace(size_t len) { //预留空间+剩余可写空间 还是不够 len+8,就扩容 if(writableBytes()+prependableBytes()<len+kCheapPrepend) m_buffer.resize(m_writerIndex+len); else { //只需要整理空间即可,把数据往前挪动预留空间个位置 assert(kCheapPrepend<m_readerIndex); size_t readable=readableBytes(); //把peek()到beginWrite()之间的数据移动到begin()+kCheapPrepend位置处 std::copy(begin()+m_readerIndex, begin()+m_writerIndex, begin()+kCheapPrepend); //更新可读可写下标 m_readerIndex=kCheapPrepend; m_writerIndex=m_readerIndex+readable; assert(readable==readableBytes()); } } private: std::vector<char> m_buffer; //保存缓冲区数据 size_t m_readerIndex; //读下标 size_t m_writerIndex; //写下标 static const char kCRLF[]; }; }//namespace net }//namespace mymuduo #endif // BUFFER_H
buffer.cpp
#include "buffer.h" #include"net/socketsops.h" #include<errno.h> #include<sys/uio.h> namespace mymuduo { namespace net{ const char buffer::kCRLF[]="\r\n"; const size_t buffer::kInitialSize; const size_t buffer::kCheapPrepend; //在m_buffer中获取 ssize_t buffer::readFd(int fd, int *savedErrno) { //栈空间,用于从套接字中读数据时,m_buffer不够用时暂存数据,等到m_buffer足够时拷贝到m_buffer char extrabuf[65536]; struct iovec vec[2]; const size_t writable=writableBytes(); vec[0].iov_base=begin()+m_writerIndex; vec[0].iov_len=writable; vec[1].iov_base=extrabuf; vec[1].iov_len=sizeof(extrabuf); //m_buffer不够用时用extrabuf const int iovcnt=(writable<sizeof(extrabuf))?2:1; const ssize_t n= sockets::readv(fd,vec,iovcnt); if(n<0) *savedErrno=errno; else if(implicit_cast<size_t>(n)<=writable) m_writerIndex+=n; else { m_writerIndex=m_buffer.size(); //把额外的extra部分添加到m_buffer中 append(extrabuf,n-writable); } return n; } }//namespace net }//namespace mymuduo
测试:
#include "net/buffer.h" #include<iostream> using namespace std; using mymuduo::string; using mymuduo::net::buffer; /* 简单测试一下buffer类对与字符串和int*类型的读写操作把 */ void test_string() { //初始时,m_readerIndex为8(kCheapPrepend),m_writerIndex为8 buffer buf; assert(buf.readableBytes()==0); assert(buf.writableBytes()==buffer::kInitialSize); assert(buf.prependableBytes()==buffer::kCheapPrepend); //写200字节进去,m_readerIndex为8,m_writerIndex为208 const string str(200,‘x‘); buf.append(str); assert(buf.readableBytes()==200); assert(buf.writableBytes()==1024-200); assert(buf.prependableBytes()==8); //读50字节出来,readIndex:8+50,writeIndex:208 string str2=buf.retrieveAsString(50); assert(str2.size()==50); assert(buf.readableBytes()==150); assert(buf.writableBytes()==1024-200); assert(buf.prependableBytes()==8+50); //添加1000字节进去,肯定超出空间大小,肯定会扩容m_buffer buf.append(string(1000,‘y‘)); std::cout<<buf.readableBytes()<<" "<<buf.writableBytes()<<" " <<buf.prependableBytes()<<std::endl; } void test_int() { //初始时,m_readerIndex为8(kCheapPrepend),m_writerIndex为8 buffer buf; assert(buf.readableBytes()==0); assert(buf.writableBytes()==buffer::kInitialSize); assert(buf.prependableBytes()==buffer::kCheapPrepend); //在m_buffer内部int*用网络字节序存储 buf.appendInt64(static_cast<int64_t>(123456)); buf.appendInt32(static_cast<int32_t>(12345)); buf.appendInt16(static_cast<int16_t>(1234)); buf.appendInt8(static_cast<int8_t>(123)); assert(buf.readableBytes()==8+4+2+1); //取出int*返回主机字节序 int64_t a=buf.readInt64(); int64_t b=buf.readInt32(); int64_t c=buf.readInt16(); int64_t d=buf.readInt8(); assert(buf.readableBytes()==0); cout<<a<<" "<<b<<" "<<c<<" "<<d<<endl; } int main() { test_int(); }
测试结果:
123456 12345 1234 123
例子很简单,但是buffer内部实现需要注意,int*类型再m_buffer内部存储方式是网络字节序(大端法),而读出的int*是主机字节序(小端法)
整个buffer围绕着内部成员m_buffer进行操作,重要的成员就是两个下标,读下标和写下标,只要搞明白了这两个下标,基本上就是网m_buffer里面拷贝/删除数据的问题了。
原文:https://www.cnblogs.com/woodineast/p/13582483.html