任务类是个虚类,所有的任务要从CTask类中继承
,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。
AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。
Cpp代码
************************************************
代码:
××××××××××××××××××××CThread.h
#ifndef
__CTHREAD
#define
__CTHREAD
#include
<vector>
#include
<string>
#include
<pthread.h>
using namespace
std;
/**
执行任务的类,设置任务数据并执行
**/
class
CTask
{
protected:
string m_strTaskName;
//任务的名称
void* m_ptrData;
//要执行的任务的具体数据
public:
CTask(){}
CTask(string
taskName)
{
this->m_strTaskName =
taskName;
m_ptrData =
NULL;
}
virtual int Run()=
0;
void
SetData(void* data);
//设置任务数据
};
/**
线程池
**/
class
CThreadPool
{
private:
vector<CTask*>
m_vecTaskList;
//任务列表
int
m_iThreadNum;
//线程池中启动的线程数
static vector<pthread_t> m_vecIdleThread;
//当前空闲的线程集合
static vector<pthread_t> m_vecBusyThread;
//当前正在执行的线程集合
static pthread_mutex_t m_pthreadMutex;
//线程同步锁
static pthread_cond_t m_pthreadCond;
//线程同步的条件变量
protected:
static void* ThreadFunc(void * threadData);
//新线程的线程函数
static int MoveToIdle(pthread_t tid);
//线程执行结束后,把自己放入到空闲线程中
static int MoveToBusy(pthread_t tid);
//移入到忙碌线程中去
int Create();
//创建所有的线程
public:
CThreadPool(int
threadNum);
int AddTask(CTask *task);
//把任务添加到线程池中
int
StopAll();
};
#endif
类的实现为:
××××××××××××××××××××CThread.cpp
#include "CThread.h"
#include
<string>
#include <iostream>
using namespace std;
void CTask::SetData(void *
data)
{
m_ptrData =
data;
}
vector<pthread_t>
CThreadPool::m_vecBusyThread;
vector<pthread_t>
CThreadPool::m_vecIdleThread;
pthread_mutex_t
CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t
CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
CThreadPool::CThreadPool(int
threadNum)
{
this->m_iThreadNum =
threadNum;
Create();
}
int CThreadPool::MoveToIdle(pthread_t
tid)
{
vector<pthread_t>::iterator busyIter =
m_vecBusyThread.begin();
while (busyIter !=
m_vecBusyThread.end())
{
if(tid ==
*busyIter)
{
break;
}
busyIter++;
}
m_vecBusyThread.erase(busyIter);
m_vecIdleThread.push_back(tid);
return 0;
}
int CThreadPool::MoveToBusy(pthread_t
tid)
{
vector<pthread_t>::iterator idleIter =
m_vecIdleThread.begin();
while(idleIter !=
m_vecIdleThread.end())
{
if(tid ==
*idleIter)
{
break;
}
idleIter++;
}
m_vecIdleThread.erase(idleIter);
m_vecBusyThread.push_back(tid);
return 0;
}
void* CThreadPool::ThreadFunc(void *
threadData)
{
pthread_t
tid = pthread_self();
while
(1)
{
pthread_mutex_lock(&m_pthreadMutex);
pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
cout << "tid:" << tid << " run" <<
endl;
//get
task
vector<CTask*>* taskList =
(vector<CTask*>*)threadData;
vector<CTask*>::iterator iter =
taskList->begin();
while (iter !=
taskList->end())
{
MoveToBusy(tid);
break;
}
CTask* task =
*iter;
taskList->erase(iter);
pthread_mutex_unlock(&m_pthreadMutex);
cout << "idel thread number:" <<
CThreadPool::m_vecIdleThread.size() <<
endl;
cout << "busy thread number:" <<
CThreadPool::m_vecBusyThread.size() <<
endl;
//cout << "task to be run:" << taskList->size() <<
endl;
task->Run();
//cout << "CThread::thread work" <<
endl;
cout << "tid:" << tid << " idle" <<
endl;
}
return
(void*)0;
}
int CThreadPool::AddTask(CTask
*task)
{
this->m_vecTaskList.push_back(task);
pthread_cond_signal(&m_pthreadCond);
return 0;
}
int
CThreadPool::Create()
{
for(int i = 0; i < m_iThreadNum;
i++)
{
pthread_t tid =
0;
pthread_create(&tid,NULL,ThreadFunc,
&m_vecTaskList);
m_vecIdleThread.push_back(tid);
}
return
0;
}
int
CThreadPool::StopAll()
{
vector<pthread_t>::iterator iter =
m_vecIdleThread.begin();
while
(iter !=
m_vecIdleThread.end())
{
pthread_cancel(*iter);
pthread_join(*iter,NULL);
iter++;
}
iter =
m_vecBusyThread.begin();
while(iter !=
m_vecBusyThread.end())
{
pthread_cancel(*iter);
pthread_join(*iter,
NULL);
iter++;
}
return
0;
}
// 简单示例:
××××××××test.cpp
#include "CThread.h"
#include
<iostream>
using namespace std;
class CWorkTask: public
CTask
{
public:
CWorkTask()
{}
int
Run()
{
cout << (char*)this->m_ptrData <<
endl;
sleep(10);
return 0;
}
};
int
main()
{
CWorkTask
taskObj;
char szTmp[] =
"this is the first thread running,haha
success";
taskObj.SetData((void*)szTmp);
CThreadPool threadPool(10);
for(int i = 0; i < 11;
i++)
{
threadPool.AddTask(&taskObj);
}
while(1)
{
sleep(120);
}
return
0;
}