首页 > 编程语言 > 详细

C++实现线程池(二)

时间:2021-08-18 10:37:12      阅读:34      评论:0      收藏:0      [点我收藏+]

toc

背景

19年时,写了一篇线程池的博客,那篇文章介绍得比较详细(是什么?为什么?怎么做?),但也比较啰嗦,看着比较费劲,其实我现在看着也烦,那时候左大括号还换行来着,后面被go语言教育了。。。。。。。。哈哈哈哈哈哈哈哈哈。。。。
现在用尽量简洁清晰的方式再写一遍,同时加上任务取消功能,并做一些优化

实现

再次简要介绍

  • 线程池内部存在一个线程安全队列
    • 线程池内的数个消费线程,不断取出其中的任务去执行,队列为空时阻塞,直到有新任务进来(阻塞线程得不到调度,不耗CPU)
    • 外部生产线程往其中放入数据,达到异步执行的目的
      • 若队列存在上限,生产线程可发生阻塞,或立即得到失败通知

需注意点

  • 生命周期问题,保证线程池执行时,访问到的对象仍处于生命周期内,涉及对象包括
    • 传入的任务访问到的对象——生产线程保证其生命周期
    • 线程池本身的成员对象——线程池对象析构前所有线程必须停止

任务取消

  • 为每个可执行对象分配对应的ID,添加操作将返回ID,通过ID将还未执行的任务删除

代码

任务对象

  • 任务对象内部封装了任务ID、任务类型与行为,为了满足任务取消及退出单个线程的需求,需要使用此任务对象
  • 如果没有此需求,直接使用std::function<>即可,std::function<>本身就是可调用对象
  • 相对于上一任务对象,去掉了考虑不周的成员
    • 去掉了带参数版本:A线程申请,B线程释放,内存释放不清晰;跨chip,对numa架构不友好
    • 去掉了命令模式版本:成员为指针,也存在怎么释放命令成员的问题(上篇文章存在内存泄露taskcommand)
    • 二者存在的目的是处理参数问题,然而,std::function<>、lambda、std::bind相互配合也能带参,二者存在意义不大
constexpr bool ThreadExit = true;

class ThreadTask final{
public:
    enum ThreadTaskType{
        Exec,
        Exit
    };

    ThreadTask() : TaskID(-1), TaskType(ThreadTaskType::Exit), CB(nullptr){
    }
    ThreadTask(int iTaskID, const std::function<void()>& f) : TaskID(iTaskID), 
        TaskType(ThreadTaskType::Exec),CB(f){
    }
    ThreadTask(int iTaskID, std::function<void()>&& f) : TaskID(iTaskID), TaskType(ThreadTaskType::Exec),
        CB(std::move(f)){
    }
    ThreadTask(ThreadTask&& rhs): TaskID(rhs.TaskID), TaskType(rhs.TaskType), CB(std::move(rhs.CB)){
    }
    ThreadTask& operator=(ThreadTask& rhs){    //为了移动赋值CB、未加Const
        if(this != &rhs){
            TaskID = rhs.TaskID;
            TaskType = rhs.TaskType;
            CB = std::move(rhs.CB);
        }
        return *this;
    }
    bool operator()(){
        bool bExit = false;
        switch(TaskType){
        case ThreadTaskType::Exec:
            CB();
            break;
        case ThreadTaskType::Exit:
        default:
            bExit = ThreadExit;
            break;
        }
        return bExit;
    }
    bool operator<(int iTaskID){
        return TaskID < iTaskID;
    }


private:
    int TaskID;
    ThreadTaskType TaskType;
    std::function<void()> CB; 
};
  • 默认构造函数会构造一个携带退出消息的任务对象,执行该任务的线程会退出
  • 增加了两个参数构造函数,一个移动std::function<>,一个拷贝std::function<>
function(function&& _Right)
    {    // construct holding moved copy of _Right
    this->_Reset_move(_STD move(_Right));
    }
function(const function& _Right)
    {    // construct holding copy of _Right
    this->_Reset_copy(_Right);
    }
  • 增加的移动构造函数,使用到了std::move,std::move只是单纯做右值类型转换,由具体的移动构造函数实现具体的移动语义
  template<typename _Tp>
    constexpr typename std::remove_reference<_Tp>::type&&
    move(_Tp&& __t) noexcept
    { return static_cast<typename std::remove_reference<_Tp>::type&&>(__t); }
  • 原来的执行函数变为了重载函数调用符,看起来更像一个函数
  • 该移动构造可以用std::forward等价: 缺点是没有std::move简洁
CB(std::forward<std::function<void()>>(f))
  • 重载了赋值操作符,并以移动语义替换拷贝语义,来得到更好的性能
  • 为了能查找任务,重载了小于比较操作符

任务存储

两个新需求

  1. 为线程池增加任务上限,达到上限阻塞或拒掉新任务
  2. 添加任务成功时,返回任务ID,需要时,根据任务ID取消还未执行的任务

使用前面升级的同步队列来达成需求1,配合线程池新代码,实现需求2
互斥、同步操作由同步队列控制,线程池仅需从里面取出任务执行。

线程池

与之前的线程池相比:

  • 仅保留了支持std::function的添加方法
  • 使用了同步队列替代原队列
  • 改为headonly方式实现
  • 增加了任务取消功能
//线程的各种个数最好由配置文件决定,配置文件根据具体场景配置
class ThreadPool{
public:
    //线程池可能需要多个不同实例(如线程池隔离的情况,不同的线程池做不同类型的事情),所以为非单例
    ThreadPool(int iInitThreadCount, int iHoldThreadCount, int iMaxThreadCount, int iMaxTaskCount) :
        m_iInitThreadCount(iInitThreadCount), m_aiHoldThreadCount(iHoldThreadCount),
        m_aiMaxThreadCount(iMaxThreadCount), m_aiCurThreadCount(0), m_abStopFlag(true),
        m_aiTaskID(0){
    }
    ThreadPool(int iMaxTaskCount) : ThreadPool(4, 4, 4, iMaxTaskCount){
    }
    ThreadPool() : ThreadPool(4, 4, 4, 100){
    }
    ~ThreadPool(){
        Stop();
    }

    int AddTask(const std::function<void()>& Task){
        return AppendTask(Task);
    }

    int AddTask(std::function<void()>&& Task){
        return AppendTask(std::forward<std::function<void()>>(Task));
    }

    int TryAddTask(const std::function<void()>& Task){
        return TryAppendTask(Task);
    }

    int TryAddTask(std::function<void()>&& Task){
        return TryAppendTask(std::forward<std::function<void()>>(Task));
    }

    bool CancelTask(int iTaskID){
        return m_TaskQueue.Delete(std::mem_fn(&ThreadTask::operator<), iTaskID);
    }

    void Start(){
        m_abStopFlag = false;
        for(int i = 0; i < m_iInitThreadCount; ++i){
            AddThread();
        }
    }

    void Stop(){
        do{
            SetStop();
        } while(0 != GetCurThreadCount());
    }

    int GetCurTaskCount(){
        return m_TaskQueue.Size();
    }

    int GetCurThreadCount(){
        return m_aiCurThreadCount;
    }

    void SetHoldThreadCount(int iCount){
        m_aiHoldThreadCount = iCount;
    }

    void SetMaxThreadCount(int iCount){
        m_aiMaxThreadCount = iCount;
    }

private:
    template<typename U>
    int AppendTask(U&& Task){
        if(!Task){
            return -1;
        }
        int iTaskID = m_aiTaskID.fetch_add(1, std::memory_order_relaxed);
        m_TaskQueue.Enqueue(iTaskID, std::forward<U>(Task));
        AddThreadSuitably();
        return iTaskID;
    }

    template<typename U>
    int TryAppendTask(U&& Task){
        if(!Task){
            return -1;
        }
        int iTaskID = m_aiTaskID.fetch_add(1, std::memory_order_relaxed);
        if(!m_TaskQueue.TryEnqueue(iTaskID, std::forward<U>(Task))){
            return -1;
        }
        AddThreadSuitably();
        return iTaskID;
    }

    void AddThread(){
        std::thread th(&ThreadPool::Run, this);
        th.detach();
        m_aiCurThreadCount++;
    }
    //带条件添加线程:当前线程数未达最大,并且任务数多于线程数时,添加线程,以便尽快处理任务
    void AddThreadSuitably(){
        if(!m_abStopFlag && m_aiCurThreadCount < m_aiMaxThreadCount && m_aiCurThreadCount <= GetCurTaskCount()){
            AddThread();
        }
    }
    //退出一个线程
    void ReduceThread(){
        m_TaskQueue.Enqueue(ThreadTask());
    }
    //线程池主循环
    void Run(){
        bool bExit = false;
        do{
            ThreadTask Task;
            m_TaskQueue.Dequeue(Task);
            bExit = Task();
        } while(!m_abStopFlag && !bExit);
        m_aiCurThreadCount--;
    }
    //监视空闲线程,当前线程数多于保持线程数,并空闲一定时间时,将当前线程数减少到保持线程数
    //后续发布定时器时再添加处理逻辑
    void MontorIdle(){
        if(m_aiHoldThreadCount < m_aiCurThreadCount){
            //利用定时器控制线程池里空闲线程的个数
            ReduceThread();
        }
    }

    void SetStop(){
        if(!m_abStopFlag){
            m_abStopFlag = true;
            m_TaskQueue.Stop();
        }
    }

    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool(ThreadPool&& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(ThreadPool&& rhs) = delete;

private:
    int m_iInitThreadCount;                    //初始线程个数
    std::atomic<int> m_aiHoldThreadCount;      //保持线程个数
    std::atomic<int> m_aiMaxThreadCount;       //最大线程个数
    std::atomic<int> m_aiCurThreadCount;       //当前线程个数

    std::atomic<bool> m_abStopFlag;

    std::atomic<int> m_aiTaskID;
    SyncQueue<ThreadTask> m_TaskQueue;
};
  • 类中有三个构造函数,其初始化逻辑都是一样的,仅仅是参数列表不同,使用委托构造函数的方式,来避免重复代码
  • AddTask添加方式,在队列满时,添加操作会被阻塞,TryAddTask方式满时任务会被拒掉,并的到添加失败信息,两种添加方式均支持任务拷贝或移动,任务添加成功后返回ID,返回的ID是取消任务的依据
  • CancelTask为取消任务接口,其依赖同步队列删除方法实现,传入成员函数包装器与任务ID,此处包装的是小于操作符,因为需要根据任务ID成员与目标ID大小,来确定具体任务,确定后执行删除操作
  • Start后线程池才开始创建取消费任务的线程,个数为初始个数
  • Stop必须等到线程池内没有正在运行的线程才返回,为先停线程后析构做保证(升级后的线程池,消费线程退出很快,并且std::this_thread::sleep_for将引发调度,性价比不高,并且睡眠时长会收系统时间影响,存在风险,所以直接选择循环)
  • AppendTask与TryAppendTask为添加任务核心逻辑




C++实现线程池(二)

原文:https://www.cnblogs.com/Keeping-Fit/p/15154744.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!