19年时,写了一篇线程池的博客,那篇文章介绍得比较详细(是什么?为什么?怎么做?),但也比较啰嗦,看着比较费劲,其实我现在看着也烦,那时候左大括号还换行来着,后面被go语言教育了。。。。。。。。哈哈哈哈哈哈哈哈哈。。。。
现在用尽量简洁清晰的方式再写一遍,同时加上任务取消功能,并做一些优化
constexpr bool ThreadExit = true;
class ThreadTask final{
public:
enum ThreadTaskType{
Exec,
Exit
};
ThreadTask() : TaskID(-1), TaskType(ThreadTaskType::Exit), CB(nullptr){
}
ThreadTask(int iTaskID, const std::function<void()>& f) : TaskID(iTaskID),
TaskType(ThreadTaskType::Exec),CB(f){
}
ThreadTask(int iTaskID, std::function<void()>&& f) : TaskID(iTaskID), TaskType(ThreadTaskType::Exec),
CB(std::move(f)){
}
ThreadTask(ThreadTask&& rhs): TaskID(rhs.TaskID), TaskType(rhs.TaskType), CB(std::move(rhs.CB)){
}
ThreadTask& operator=(ThreadTask& rhs){ //为了移动赋值CB、未加Const
if(this != &rhs){
TaskID = rhs.TaskID;
TaskType = rhs.TaskType;
CB = std::move(rhs.CB);
}
return *this;
}
bool operator()(){
bool bExit = false;
switch(TaskType){
case ThreadTaskType::Exec:
CB();
break;
case ThreadTaskType::Exit:
default:
bExit = ThreadExit;
break;
}
return bExit;
}
bool operator<(int iTaskID){
return TaskID < iTaskID;
}
private:
int TaskID;
ThreadTaskType TaskType;
std::function<void()> CB;
};
function(function&& _Right)
{ // construct holding moved copy of _Right
this->_Reset_move(_STD move(_Right));
}
function(const function& _Right)
{ // construct holding copy of _Right
this->_Reset_copy(_Right);
}
template<typename _Tp>
constexpr typename std::remove_reference<_Tp>::type&&
move(_Tp&& __t) noexcept
{ return static_cast<typename std::remove_reference<_Tp>::type&&>(__t); }
CB(std::forward<std::function<void()>>(f))
两个新需求
使用前面升级的同步队列来达成需求1,配合线程池新代码,实现需求2
互斥、同步操作由同步队列控制,线程池仅需从里面取出任务执行。
与之前的线程池相比:
//线程的各种个数最好由配置文件决定,配置文件根据具体场景配置
class ThreadPool{
public:
//线程池可能需要多个不同实例(如线程池隔离的情况,不同的线程池做不同类型的事情),所以为非单例
ThreadPool(int iInitThreadCount, int iHoldThreadCount, int iMaxThreadCount, int iMaxTaskCount) :
m_iInitThreadCount(iInitThreadCount), m_aiHoldThreadCount(iHoldThreadCount),
m_aiMaxThreadCount(iMaxThreadCount), m_aiCurThreadCount(0), m_abStopFlag(true),
m_aiTaskID(0){
}
ThreadPool(int iMaxTaskCount) : ThreadPool(4, 4, 4, iMaxTaskCount){
}
ThreadPool() : ThreadPool(4, 4, 4, 100){
}
~ThreadPool(){
Stop();
}
int AddTask(const std::function<void()>& Task){
return AppendTask(Task);
}
int AddTask(std::function<void()>&& Task){
return AppendTask(std::forward<std::function<void()>>(Task));
}
int TryAddTask(const std::function<void()>& Task){
return TryAppendTask(Task);
}
int TryAddTask(std::function<void()>&& Task){
return TryAppendTask(std::forward<std::function<void()>>(Task));
}
bool CancelTask(int iTaskID){
return m_TaskQueue.Delete(std::mem_fn(&ThreadTask::operator<), iTaskID);
}
void Start(){
m_abStopFlag = false;
for(int i = 0; i < m_iInitThreadCount; ++i){
AddThread();
}
}
void Stop(){
do{
SetStop();
} while(0 != GetCurThreadCount());
}
int GetCurTaskCount(){
return m_TaskQueue.Size();
}
int GetCurThreadCount(){
return m_aiCurThreadCount;
}
void SetHoldThreadCount(int iCount){
m_aiHoldThreadCount = iCount;
}
void SetMaxThreadCount(int iCount){
m_aiMaxThreadCount = iCount;
}
private:
template<typename U>
int AppendTask(U&& Task){
if(!Task){
return -1;
}
int iTaskID = m_aiTaskID.fetch_add(1, std::memory_order_relaxed);
m_TaskQueue.Enqueue(iTaskID, std::forward<U>(Task));
AddThreadSuitably();
return iTaskID;
}
template<typename U>
int TryAppendTask(U&& Task){
if(!Task){
return -1;
}
int iTaskID = m_aiTaskID.fetch_add(1, std::memory_order_relaxed);
if(!m_TaskQueue.TryEnqueue(iTaskID, std::forward<U>(Task))){
return -1;
}
AddThreadSuitably();
return iTaskID;
}
void AddThread(){
std::thread th(&ThreadPool::Run, this);
th.detach();
m_aiCurThreadCount++;
}
//带条件添加线程:当前线程数未达最大,并且任务数多于线程数时,添加线程,以便尽快处理任务
void AddThreadSuitably(){
if(!m_abStopFlag && m_aiCurThreadCount < m_aiMaxThreadCount && m_aiCurThreadCount <= GetCurTaskCount()){
AddThread();
}
}
//退出一个线程
void ReduceThread(){
m_TaskQueue.Enqueue(ThreadTask());
}
//线程池主循环
void Run(){
bool bExit = false;
do{
ThreadTask Task;
m_TaskQueue.Dequeue(Task);
bExit = Task();
} while(!m_abStopFlag && !bExit);
m_aiCurThreadCount--;
}
//监视空闲线程,当前线程数多于保持线程数,并空闲一定时间时,将当前线程数减少到保持线程数
//后续发布定时器时再添加处理逻辑
void MontorIdle(){
if(m_aiHoldThreadCount < m_aiCurThreadCount){
//利用定时器控制线程池里空闲线程的个数
ReduceThread();
}
}
void SetStop(){
if(!m_abStopFlag){
m_abStopFlag = true;
m_TaskQueue.Stop();
}
}
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool(ThreadPool&& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
ThreadPool& operator=(ThreadPool&& rhs) = delete;
private:
int m_iInitThreadCount; //初始线程个数
std::atomic<int> m_aiHoldThreadCount; //保持线程个数
std::atomic<int> m_aiMaxThreadCount; //最大线程个数
std::atomic<int> m_aiCurThreadCount; //当前线程个数
std::atomic<bool> m_abStopFlag;
std::atomic<int> m_aiTaskID;
SyncQueue<ThreadTask> m_TaskQueue;
};
原文:https://www.cnblogs.com/Keeping-Fit/p/15154744.html