单线程libevent模式 项目里面是多线程版的,我先理解下单线程的。 //client 1.调用NGP::init() bool NGP::init(NGPcontext context) { _context = context; //_TcpLink = NEWSP(TcpLink); _TcpLink = NEWSP(TcpLinkEx); _TcpLink->Init(this); return true; } 2.初始化Libevent bool LibEvtServer::init(I_NetServerEvent* event, int start, int size) { m_ids = new ChannelIDGenerator(); m_ids->init(start, size); m_allChannels.resize(m_ids->getSize()); m_event = event; //使用windows模式的线程和锁 int hr = evthread_use_windows_threads();//当前是两个线程一个主线程一个派发线程 //一个线程只有一个event_base,对应的是一个struct event_base结构体和相应的事件管理器,这个事件管理器管理跟这个event_base绑定的事件 m_base = event_base_new();//创建event_base if (!m_base) { fprintf(stderr, "Could not initialize libevent!\n"); return false; } return true; } 3.监听,搞不懂客户端为什么要监听 bool LibEvtServer::listen(int* port) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; if(-1 == *port) sin.sin_port = htons(10000); else sin.sin_port = htons(*port); //如果libevent分配和绑定套接字就调用evconnlistener_new_bind,这个相当于原始的socket()和bind()和listen(),然后有连接会调用listener_cb //其中调用了evutil_make_socket_nonblocking,将socket设置成无阻塞的,bind绑定,evconnlistener_new创建监听器,然后返回监听器 //连接监听器使用event_base来得知什么时候在监听套接字上有TCP连接,当有连接时会调用回调函数 m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,//evconnlistener_new_bind提供了监听和接受TCP的一个方法,这个是要libevent分配和绑定套接字 LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!m_listener) { return false; } if( -1 == *port) *port = ntohs(sin.sin_port); if (!m_listener) { fprintf(stderr, "Could not create a listener!\n"); return false; } m_spThread.reset(new std::thread([this] { //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); //event_base_loop(m_base, EVLOOP_ONCE); //循环在epoll/kqueue系统调用上,当事件发生时自动调用绑定的事件,但这些事件需要绑定到这个event_base上面 event_base_dispatch(m_base);//启动event_base的循环,event_base_dispatch内部就是一个while循环,项目直接开了一个线程取做这个 if(WSAENOTSOCK == WSAGetLastError()) { Plug::PlugMessageBox(L"操作无效套接字啊!"); } Plug::PlugMessageBox(L"Libevent派发线程退出!"); })); return true; } 4.客户端连接 bool LibEvtServer::connet_to(int ip, int port) { //创建基于socket的bufferevent,并将其托管给event_base //buffevent从单词可以看出是在event上的buffer,即在read/write两个事件上有input和output缓冲区 //是文件描述符,决定哪个文件被写入写出,可以将socket看成文件描述符,但不允许设置成pipe(2),为安全起见可以设置成-1,然后 set it with bufferevent_setfd or bufferevent_socket_connect(). auto bev = bufferevent_socket_new(m_base, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); //创建一个bufferevent,关联sockfd,托管给event_base if (!bev){ std::cout << "bufferevent_socket_new error!" << std::endl; return false; } struct sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_addr.s_addr = ntohl(ip); sin.sin_port = htons(port); //连接 //If the bufferevent does not already have a socket set, we allocate a new socket here and make it nonblocking before we begin. int hr = bufferevent_socket_connect(bev, (struct sockaddr*)&sin, sizeof(sin)); if(0 != hr) return false; std::lock_guard<std::mutex> lock(m_offline_mtx);//跟申请释放channel id互锁 auto c2 = CreateChannel(bev); //设置bufferevent上的回调 bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, c2);//设置读写事件 //启动bufferevent bufferevent_enable(bev, EV_READ | EV_WRITE);//启动读写事件,将读写事件放入监听队列poll中 return true; } //-------------------------------------到此client方面就做好了libevent方面的准备------------------------- //server /* 成员函数 */ bool LibEvtServer::init(I_NetServerEvent* event, int start, int size) { m_ids = new ChannelIDGenerator(); m_ids->init(start, size); m_allChannels.resize(m_ids->getSize()); m_event = event; //event支持多线程的初始化函数 int hr = evthread_use_windows_threads(); m_base = event_base_new(); if (!m_base) { fprintf(stderr, "Could not initialize libevent!\n"); return false; } return true; } //侦听端口,-1表示向系统申请一个任意可用端口 bool LibEvtServer::listen(int* port) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; if(-1 == *port) sin.sin_port = htons(10000); else sin.sin_port = htons(*port); m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!m_listener) { return false; } if( -1 == *port) *port = ntohs(sin.sin_port); if (!m_listener) { fprintf(stderr, "Could not create a listener!\n"); return false; } m_spThread.reset(new std::thread([this] { //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); //event_base_loop(m_base, EVLOOP_ONCE); event_base_dispatch(m_base); if(WSAENOTSOCK == WSAGetLastError()) { Plug::PlugMessageBox(L"操作无效套接字啊!"); } Plug::PlugMessageBox(L"Libevent派发线程退出!"); })); return true; } //然后在这个监听器里面的回调函数里面 /* 新连接事件处理 */ typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *);//这个是回调的原型 static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { auto ser = (LibEvtServer*)user_data; ser->listener_cb(listener, fd, sa, socklen, user_data); } void LibEvtServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { //获取listener相关的event_base auto base = evconnlistener_get_base(listener); //建立socket的bufferevent auto bev = bufferevent_socket_new(base, fd, BEV_OPT_THREADSAFE);//BEV_OPT_CLOSE_ON_FREE | if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(base); return ; } auto c2 = CreateChannel(bev); //设置回调 bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2); //启动bufferevent bufferevent_enable(bev, EV_READ | EV_WRITE ); } //看来服务器过程和客户端差不多, //libevent过程,基于event_base的socket bufferevent,在event_base_dispath里面不知道用的是select还是iocp,然后发现socket状态发生变化可读,可写,或者 //错误都会调用相应的注册时间,建立的socket都是不阻塞的。还有个这个读写回调到底是什么时候调用的,我还不太明白,和那个读写缓冲区到底什么区别.
原文:http://www.cnblogs.com/zzyoucan/p/3955109.html