封装了eventloop线程也就是IO线程
有一个好处,就是使用这个线程可以保证one eventloop one thread
eventloopthread会启动自己的线程,并在里面运行eventloop::loop()
流程:
主线程内创建eventloopthread对象
主线程调用eventloopthread::startLoop()获取创建好的eventloop对象指针
startLoop()内会新建一个线程执行eventloopthread::threadFunc()函数,函数内会创建好一个
eventloop对象,并让m_loop指向这个对象,用条件变量通知startLoop()创建完成,返回eventloop对象指针
threadFunc()内部会执行eventloop::loop()
#ifndef eventloopTHREAD_H #define eventloopTHREAD_H #include"base/condition.h" #include"base/mutex.h" #include"base/thread.h" namespace mymuduo { namespace net { class eventloop; class eventloopthread { public: typedef std::function<void(eventloop*)> ThreadInitCallback; //构造函数,负责成员变量的初始化注意m_callback设置为cb //m_thread会被用eventloopthread::threadFunc()初始化,执行threadFunc() eventloopthread(const ThreadInitCallback& cb = ThreadInitCallback(), const string& name = string()); //析构函数,负责停止eventloop,回收m_thread ~eventloopthread(); //返回一个创建好的eventloop*,负责启动线程调用threadFunc() eventloop* startLoop(); private: //eventloopthread线程函数,在sratLoop中被调用, //负责创建一个eventloop对象,设置m_loop,执行用户传入的回调ThreadInitCallback等 void threadFunc(); eventloop* m_loop; //线程内部的eventloop* bool m_exiting; //线程是否退出 thread m_thread; //线程 mutexlock m_mutex; //互斥锁 condition m_cond; //条件变量 ThreadInitCallback m_callback; //线程初始化回调函数 }; }//namespace net }//namespace mymuduo #endif // eventloopTHREAD_H
#include "eventloopthread.h" #include"net/eventloop.h" namespace mymuduo { namespace net { //构造函数,负责初始化数据成员,注意m_thread线程函数是threadFunc eventloopthread::eventloopthread(const ThreadInitCallback& cb,const string& name) :m_loop(NULL),m_exiting(false), m_thread(std::bind(&eventloopthread::threadFunc,this),name), m_mutex(),m_cond(m_mutex),m_callback(cb) { } //析构函数,eventloop::quit,thread::join() eventloopthread::~eventloopthread() { m_exiting=true; if(m_loop!=NULL) { m_loop->quit(); m_thread.join(); } } //获取内部eventloop对象的地址 eventloop* eventloopthread::startLoop() { assert(!m_thread.started()); m_thread.start(); //创建线程调用threadFunc()函数 eventloop* loop=NULL; { //条件变量保证eventloop创建完成,m_loop被赋值 mutexlockguard mlg(m_mutex); while(m_loop==NULL) m_cond.wait(); loop=m_loop; } return loop; } //线程函数体,负责创建eventloop对象,以及调用eventloop::loop() void eventloopthread::threadFunc() { eventloop loop; //可调用回调函数 if(m_callback) m_callback(&loop); { //通知startLoop()eventloop创建完成 mutexlockguard mlg(m_mutex); m_loop=&loop; m_cond.notify(); } m_loop->loop(); //循环体结束,eventloop析构,把m_loop设为空 mutexlockguard mlg(m_mutex); m_loop=NULL; } }//namespace net }//namespace mymuduo
#include "net/eventloop.h" #include "net/eventloopthread.h" #include "base/thread.h" #include <stdio.h> #include <unistd.h> using namespace mymuduo; using namespace mymuduo::net; int cnt = 0; eventloop* g_loop; void print(const char* msg) { printf("msg %s %s\n", timestamp::now().toString().c_str(), msg); if (++cnt == 20) { g_loop->quit(); } } void init(eventloop*) { printf("eventloopthread init done...\n"); } void sum500() { int count=0; for(int i=1;i<=500;i++) count+=i; printf("%d\n",count); } int main() { { eventloopthread loopthread(init,"loopthread1"); g_loop=loopthread.startLoop(); //启动loopthread1,返回创建的eventloop* g_loop->runAfter(4,std::bind(print,"loop run after 4s done...")); currentthread::sleepUsec(5000*1000); //sleep 5s } { eventloopthread loopthread(init,"loopthread2"); g_loop=loopthread.startLoop(); //启动loopthread2,返回创建的eventloop* g_loop->runInLoop(sum500); currentthread::sleepUsec(500*1000); //sleep 0.5s } }
打印结果:
eventloopthread init done...
msg 1598977533.789526 loop run after 4s done...
eventloopthread init done...
125250
为便于线程池使用,同样封装一个eventloopthreadpool类,即负责IO事件处理的线程池
整个线程池本身拥有一个eventloop* m_baseLoop;
线程池内部每个线程都是eventloopthread,因此每个eventloopthread都有一个eventloop*
用m_loops存储每个线程的eventloop*,用m_threads存储每个eventloopthread对象指针
#ifndef EVENTLOOPTHREADPOOL_H #define EVENTLOOPTHREADPOOL_H #include"base/noncopyable.h" #include"base/types.h" #include<functional> #include<memory> #include<vector> namespace mymuduo { namespace net { class eventloop; class eventloopthread; class eventloopthreadpool { public: //回调函数类型,当线程池线程数量>0时,在eventloopthread中调用该回调函数 // 当线程池数量<=0时,直接(在线程池所在线程)调用该回调函数 typedef std::function<void(eventloop*)> ThreadInitCallback; //构造函数,初始化成员变量 eventloopthreadpool(eventloop* baseloop,const string& name); ~eventloopthreadpool(); //啥也不做的析构函数 //设置线程池内线程数量 void setThreadNum(int numThreads){m_numThreads=numThreads;} //启动线程池,实际上创建numThreads个线程,并让每个eventloopthread调用startLoop() void start(const ThreadInitCallback& cb=ThreadInitCallback()); //下面三个函数是与m_loops相关的操作,获得指定线程的eventloop* eventloop* getNextLoop(); eventloop* getLoopForHash(size_t hashCode); std::vector<eventloop*> getAllLoops(); bool started() const{return m_started;} const string& name() const{return m_name;} private: eventloop* m_baseLoop; //属于eventloopthreadpool的eventloop对象指针 string m_name; //线程池名字 bool m_started; //线程池是否启动 int m_numThreads; //线程池内线程数量 int m_next; // std::vector<std::unique_ptr<eventloopthread>> m_threads; //eventloopthread集合 std::vector<eventloop*> m_loops; //各个eventloopthread拥有的eventloop指针 }; }//namespace net }//namespace mymuduo #endif // EVENTLOOPTHREADPOOL_H
#include "eventloopthreadpool.h" #include"net/eventloopthread.h" #include"net/eventloop.h" #include<stdio.h> namespace mymuduo { namespace net { //构造函数 eventloopthreadpool::eventloopthreadpool(eventloop* baseloop,const string& name) :m_baseLoop(baseloop),m_name(name),m_started(false),m_numThreads(0),m_next(0) { } //析构函数 eventloopthreadpool::~eventloopthreadpool() { //不必删除任何eventloop,因为eventloop都是在栈内创建的 } //创建numThreads个线程,并且填充m_threads集合和m_loops集合 void eventloopthreadpool::start(const ThreadInitCallback &cb) { assert(!m_started); m_baseLoop->assertInLoopThread(); m_started=true; //在线程池内创建numThreads个线程 for(int i=0;i<m_numThreads;i++) { char buf[m_name.size() + 32]; //线程池内各个线程的名字 snprintf(buf, sizeof buf, "%s%d", m_name.c_str(), i); eventloopthread* t = new eventloopthread(cb, buf); //创建eventloopthread m_threads.push_back(std::unique_ptr<eventloopthread>(t));//线程池内线程集合 m_loops.push_back(t->startLoop()); //各个线程相对应的eventloop*集合 } if(m_numThreads==0 && cb) //没有任何线程的话,直接调用callback cb(m_baseLoop); } //获取m_loops[m_next]的那个eventloop* eventloop* eventloopthreadpool::getNextLoop() { m_baseLoop->assertInLoopThread(); assert(m_started); eventloop* loop=m_baseLoop; if(!m_loops.empty()) { loop=m_loops[m_next]; m_next++; if(implicit_cast<size_t>(m_next)>=m_loops.size()) m_next=0; } return loop; } //获取第hashCode个eventloop* eventloop* eventloopthreadpool::getLoopForHash(size_t hashCode) { m_baseLoop->assertInLoopThread(); eventloop* loop=m_baseLoop; if(!m_loops.empty()) loop=m_loops[hashCode%m_loops.size()]; return loop; } //获取所有的eventloop* std::vector<eventloop*> eventloopthreadpool::getAllLoops() { m_baseLoop->assertInLoopThread(); assert(m_started); if(m_loops.empty()) return std::vector<eventloop*>(1,m_baseLoop); else return m_loops; } }//namespace net }//namespace mymuduo
#include "net/eventloopthreadpool.h" #include "net/eventloop.h" #include "base/thread.h" #include <stdio.h> #include <unistd.h> using namespace mymuduo; using namespace mymuduo::net; void print(eventloop* p = NULL) { printf("main(): pid = %d, tid = %d, loop = %p\n", getpid(), currentthread::tid(), p); } void init(eventloop* p) { printf("init(): pid = %d, tid = %d, loop = %p\n", getpid(), currentthread::tid(), p); } int main() { print(); eventloop loop; loop.runAfter(11, std::bind(&eventloop::quit, &loop)); { printf("Single thread %p:\n", &loop); eventloopthreadpool model(&loop, "single"); //注意一个现场都没有的话直接在当前线程调用线程池初始化回调也就是init model.setThreadNum(0); model.start(init); assert(model.getNextLoop() == &loop); assert(model.getNextLoop() == &loop); assert(model.getNextLoop() == &loop); } { printf("Another thread:\n"); eventloopthreadpool model(&loop, "another"); model.setThreadNum(1); model.start(init); eventloop* nextLoop = model.getNextLoop(); nextLoop->runAfter(2, std::bind(print, nextLoop)); assert(nextLoop != &loop); assert(nextLoop == model.getNextLoop()); assert(nextLoop == model.getNextLoop()); ::sleep(3); } { printf("Three threads:\n"); eventloopthreadpool model(&loop, "three"); model.setThreadNum(3); model.start(init); eventloop* nextLoop = model.getNextLoop(); nextLoop->runInLoop(std::bind(print, nextLoop)); assert(nextLoop != &loop); assert(nextLoop != model.getNextLoop()); assert(nextLoop != model.getNextLoop()); assert(nextLoop == model.getNextLoop()); } loop.loop(); }
打印结果:
main(): pid = 45231, tid = 45231, loop = (nil)
Single thread 0x7ffd548af220:
init(): pid = 45231, tid = 45231, loop = 0x7ffd548af220
Another thread:
init(): pid = 45231, tid = 45242, loop = 0x7fa1ab015a20
main(): pid = 45231, tid = 45242, loop = 0x7fa1ab015a20
Three threads:
init(): pid = 45231, tid = 45261, loop = 0x7fa1ab015a20
init(): pid = 45231, tid = 45262, loop = 0x7fa1aa814a20
init(): pid = 45231, tid = 45263, loop = 0x7fa1aa013a20
main(): pid = 45231, tid = 45261, loop = 0x7fa1ab015a20
muduo源码解析27-网络库5:eventloopthread类和eventloopthreadpool类
原文:https://www.cnblogs.com/woodineast/p/13599602.html