/**************************************************************************************************
* File Name : ring_queue_nolock.h
* Created : 20 / 03 / 03
* Author : GYT
* Description : 线程池
* 1.需要预先定义任务的函数指针
* 2.构造函数接受容量参数,申请线程池空间,并将线程全部启动
* 3.初始状态任务队列tasks_为空,当队列为空,则阻塞线程——thread_loop()
* 4.当任务队列中有元素后,唤醒其中一个线程,执行这个任务——take()->task()
**************************************************************************************************/
#ifndef _thread_pool_HPP
#define _thread_pool_HPP
#include <vector>
#include <deque>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>
namespace gyt {
class Thread_Pool{
public:
typedef std::function<void()> task_t;
/**************************************************************************************************
* Function Name : Thread_Pool
* Description : 构造函数
* Date : 20 / 03 / 03
* Parameter : int init_size:线程池大小
* Return Code : none
* Author : GYT
**************************************************************************************************/
Thread_Pool(int init_size);
/**************************************************************************************************
* Function Name : ~Thread_Pool
* Description : 析构函数
* Date : 20 / 03 / 0
* Parameter : none
* Return Code : none
* Author : GYT
**************************************************************************************************/
~Thread_Pool();
/**************************************************************************************************
* Function Name : stop
* Description : 停止函数
* Date : 20 / 03 / 03
* Parameter : none
* Return Code : none
* Author : GYT
**************************************************************************************************/
void stop();
/**************************************************************************************************
* Function Name : add_task
* Description : 添加任务函数
* Date : 20 / 03 / 03
* Parameter : const task_t& 需要添加的任务函数指针
* Return Code : none
* Author : GYT
**************************************************************************************************/
void add_task(const task_t&/*, type para*/);
private:
Thread_Pool(const Thread_Pool&);
const Thread_Pool& operator=(const Thread_Pool&);
/**************************************************************************************************
* Function Name : is_started
* Description : 判断是否启动
* Date : 20 / 03 / 03
* Parameter : none
* Return Code : bool
* Author : GYT
**************************************************************************************************/
bool is_started() { return is_started_; }
/**************************************************************************************************
* Function Name : start
* Description : 启动线程池
* Date : 20 / 03 / 03
* Parameter : none
* Return Code : none
* Author : GYT
**************************************************************************************************/
void start();
/**************************************************************************************************
* Function Name : thread_loop
* Description : 线程循环遍历任务队列,如果不为空,则取任务
* Date : 20 / 03 / 03
* Parameter : none
* Return Code : bool
* Author : GYT
**************************************************************************************************/
void thread_loop();
/**************************************************************************************************
* Function Name : take
* Description : 从任务队列中获取任务,返回函数指针
* Date : 20 / 03 / 03
* Parameter : none
* Return Code : task_t
* Author : GYT
**************************************************************************************************/
task_t take();
typedef std::vector<std::thread*> THREADS_T; //线程容器,大小由构造函数决定
typedef std::deque<task_t> TASKS_T; //任务队列,先入先出
int init_threads_size_; //初始化线程池数量
THREADS_T threads_; //线程容器本体
TASKS_T tasks_; //任务队列
std::mutex mutex_; //同步控制-锁
std::condition_variable cond_; //同步控制-条件变量
bool is_started_; //线程池是否正在运行
};
}
#endif
#include <assert.h>
#include <iostream>
#include <sstream>
#include "thread_pool.h"
namespace gyt {
Thread_Pool::Thread_Pool(int init_size)
:init_threads_size_(init_size),
mutex_(),
cond_(),
is_started_(false)
{
start();
}
Thread_Pool::~Thread_Pool()
{
if (is_started_)
{
stop();
}
}
void Thread_Pool::start()
{
assert(threads_.empty());
is_started_ = true;
threads_.reserve(init_threads_size_);
for (int i = 0; i < init_threads_size_; i++)
{
threads_.push_back(new std::thread(std::bind(&Thread_Pool::thread_loop, this)));
}
}
void Thread_Pool::stop()
{
std::unique_lock<std::mutex> lock(mutex_);
is_started_ = false;
cond_.notify_all();
for (THREADS_T::iterator it = threads_.begin(); it != threads_.end(); ++it)
{
(*it)->join();
delete *it;
}
threads_.clear();
}
void Thread_Pool::thread_loop()
{
while (is_started_)
{
task_t task = take();
if (task)
{
//std::unique_lock<std::mutex> lock(mutex_);
task();
}
}
}
void Thread_Pool::add_task(const task_t& task/*, type para*/)
{
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push_back(task);
cond_.notify_one();
}
Thread_Pool::task_t Thread_Pool::take()
{
std::unique_lock<std::mutex> lock(mutex_);
while (tasks_.empty() && is_started_)
{
cond_.wait(lock);
}
task_t task;
TASKS_T::size_type size = tasks_.size();
if (!tasks_.empty() && is_started_)
{
task = tasks_.front();
tasks_.pop_front();
assert(size - 1 == tasks_.size());
}
return task;
}
}
测试代码:
#include <iostream>
#include <chrono>
#include <condition_variable>
#include "thread_pool.h"
std::mutex g_mutex;
void DoTest()
{
for (int i = 1; i < 4; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lock(g_mutex);
std::cout << "testFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;
}
}
int main()
{
gyt::Thread_Pool thread_pool(7);
for (int i = 0; i < 7; i++){
thread_pool.add_task(DoTest);
}
system("pause");
return 0;
}
原文:https://www.cnblogs.com/fishily/p/12402247.html