简介
线程池(thread pool):一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的组成
1、线程池管理器
创建一定数量的线程,启动线程,调配任务,管理着线程池。
本篇线程池目前只需要启动(start()),停止方法(stop()),及任务添加方法(addTask).
start()创建一定数量的线程池,进行线程循环.
stop()停止所有线程循环,回收所有资源.
addTask()添加任务.
2、工作线程
线程池中线程,在线程池中等待并执行分配的任务.
本篇选用条件变量实现等待与通知机制.
3、任务接口,
添加任务的接口,以供工作线程调度任务的执行。
4、任务队列
用于存放没有处理的任务。提供一种缓冲机制
同时任务队列具有调度功能,高优先级的任务放在任务队列前面;本篇选用priority_queue 与pair的结合用作任务优先队列的结构.
代码实现:
ThreadPool.hpp:
#ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <functional> #include <vector> #include <queue> class ThreadPool { public: using Task = std::function<void()>; explicit ThreadPool(int num) : _thread_num(num), _is_running(false) {} ~ThreadPool() { if (_is_running) stop(); } void start() { _is_running = true; // start threads for (int i = 0; i < _thread_num; i++) _threads.emplace_back(std::thread(&ThreadPool::work, this)); } void stop() { { // stop thread pool, should notify all threads to wake std::unique_lock<std::mutex> lk(_mtx); _is_running = false; _cond.notify_all(); // must do this to avoid thread block } // terminate every thread job for (std::thread& t : _threads) { if (t.joinable()) t.join(); } } void appendTask(const Task& task) { if (_is_running) { std::unique_lock<std::mutex> lk(_mtx); _tasks.push(task); _cond.notify_one(); // wake a thread to to the task } } private: void work() { printf("begin work thread: %d\n", std::this_thread::get_id()); // every thread will compete to pick up task from the queue to do the task while (_is_running) { Task task; { std::unique_lock<std::mutex> lk(_mtx); if (!_tasks.empty()) { // if tasks not empty, // must finish the task whether thread pool is running or not task = _tasks.front(); _tasks.pop(); // remove the task } else if (_is_running && _tasks.empty()) _cond.wait(lk); } if (task) task(); // do the task } printf("end work thread: %d\n", std::this_thread::get_id()); } public: // disable copy and assign construct ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool& other) = delete; private: bool _is_running; // thread pool manager status std::mutex _mtx; std::condition_variable _cond; int _thread_num; std::vector<std::thread> _threads; std::queue<Task> _tasks; }; #endif // !_THREAD_POOL_H_
main.cpp
#include "stdafx.h" #include <iostream> #include <chrono> #include "ThreadPool.hpp" void fun1() { std::cout << "working in thread " << std::this_thread::get_id() << std::endl; } void fun2(int x) { std::cout << "task " << x << " working in thread " << std::this_thread::get_id() << std::endl; } int main(int argc, char* argv[]) { ThreadPool thread_pool(3); thread_pool.start(); std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (int i = 0; i < 6; i++) { //thread_pool.appendTask(fun1); thread_pool.appendTask(std::bind(fun2, i)); //std::this_thread::sleep_for(std::chrono::milliseconds(500)); } thread_pool.stop(); getchar(); return 0; }
原文:https://www.cnblogs.com/zwj-199306231519/p/13648877.html