一、线程池
大多数的网络服务器,包括Web服务器都具有一个特点,就是单位时间内必须处理数目巨大的连接请求,但是处理时间却是比较短的。在传统的多线程服务器模型中是这样实现的:一旦有个请求到达,就创建一个新的线程,由该线程执行任务,任务执行完毕之后,线程就退出。这就是"即时创建,即时销毁"的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数非常频繁,那么服务器就将处于一个不停的创建线程和销毁线程的状态。这笔开销是不可忽略的,尤其是线程执行的时间非常非常短的情况。
线程池就是为了解决上述问题的,它的实现原理是这样的:在应用程序启动之后,就马上创建一定数量的线程,放入空闲的队列中。这些线程都是处于阻塞状态,这些线程只占一点内存,不占用CPU。当任务到来后,线程池将选择一个空闲的线程,将任务传入此线程中运行。当所有的线程都处在处理任务的时候,线程池将自动创建一定的数量的新线程,用于处理更多的任务。执行任务完成之后线程并不退出,而是继续在线程池中等待下一次任务。当大部分线程处于阻塞状态时,线程池将自动销毁一部分的线程,回收系统资源。
下面是一个简单线程池的实现,这个线程池的代码是我参考网上的一个例子实现的,由于找不到出处了,就没办法注明参考自哪里了。它的方案是这样的:程序启动之前,初始化线程池,启动线程池中的线程,由于还没有任务到来,线程池中的所有线程都处在阻塞状态,当一有任务到达就从线程池中取出一个空闲线程处理,如果所有的线程都处于工作状态,就添加到队列,进行排队。如果队列中的任务个数大于队列的所能容纳的最大数量,那就不能添加任务到队列中,只能等待队列不满才能添加任务到队列中。
主要由两个文件组成一个threadpool.h头文件和一个threadpool.c源文件组成。源码中已有重要的注释,就不加以分析了。
threadpool.h文件:
- struct job
- {
- void* (*callback_function)(void *arg);
- void *arg;
- struct job *next;
- };
-
- struct threadpool
- {
- int thread_num;
- int queue_max_num;
- struct job *head;
- struct job *tail;
- pthread_t *pthreads;
- pthread_mutex_t mutex;
- pthread_cond_t queue_empty;
- pthread_cond_t queue_not_empty;
- pthread_cond_t queue_not_full;
- int queue_cur_num;
- int queue_close;
- int pool_close;
- };
-
-
-
-
-
-
-
-
-
- struct threadpool* threadpool_init(int thread_num, int queue_max_num);
-
-
-
-
-
-
-
-
-
-
- int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);
-
-
-
-
-
-
-
-
- int threadpool_destroy(struct threadpool *pool);
-
-
-
-
-
-
-
-
- void* threadpool_function(void* arg);
threadpool.c文件:
- #include "threadpool.h"
-
- struct threadpool* threadpool_init(int thread_num, int queue_max_num)
- {
- struct threadpool *pool = NULL;
- do
- {
- pool = malloc(sizeof(struct threadpool));
- if (NULL == pool)
- {
- printf("failed to malloc threadpool!\n");
- break;
- }
- pool->thread_num = thread_num;
- pool->queue_max_num = queue_max_num;
- pool->queue_cur_num = 0;
- pool->head = NULL;
- pool->tail = NULL;
- if (pthread_mutex_init(&(pool->mutex), NULL))
- {
- printf("failed to init mutex!\n");
- break;
- }
- if (pthread_cond_init(&(pool->queue_empty), NULL))
- {
- printf("failed to init queue_empty!\n");
- break;
- }
- if (pthread_cond_init(&(pool->queue_not_empty), NULL))
- {
- printf("failed to init queue_not_empty!\n");
- break;
- }
- if (pthread_cond_init(&(pool->queue_not_full), NULL))
- {
- printf("failed to init queue_not_full!\n");
- break;
- }
- pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
- if (NULL == pool->pthreads)
- {
- printf("failed to malloc pthreads!\n");
- break;
- }
- pool->queue_close = 0;
- pool->pool_close = 0;
- int i;
- for (i = 0; i < pool->thread_num; ++i)
- {
- pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
- }
-
- return pool;
- } while (0);
-
- return NULL;
- }
-
- int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg)
- {
- assert(pool != NULL);
- assert(callback_function != NULL);
- assert(arg != NULL);
-
- pthread_mutex_lock(&(pool->mutex));
- while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
- {
- pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex));
- }
- if (pool->queue_close || pool->pool_close)
- {
- pthread_mutex_unlock(&(pool->mutex));
- return -1;
- }
- struct job *pjob =(struct job*) malloc(sizeof(struct job));
- if (NULL == pjob)
- {
- pthread_mutex_unlock(&(pool->mutex));
- return -1;
- }
- pjob->callback_function = callback_function;
- pjob->arg = arg;
- pjob->next = NULL;
- if (pool->head == NULL)
- {
- pool->head = pool->tail = pjob;
- pthread_cond_broadcast(&(pool->queue_not_empty));
- }
- else
- {
- pool->tail->next = pjob;
- pool->tail = pjob;
- }
- pool->queue_cur_num++;
- pthread_mutex_unlock(&(pool->mutex));
- return 0;
- }
-
- void* threadpool_function(void* arg)
- {
- struct threadpool *pool = (struct threadpool*)arg;
- struct job *pjob = NULL;
- while (1)
- {
- pthread_mutex_lock(&(pool->mutex));
- while ((pool->queue_cur_num == 0) && !pool->pool_close)
- {
- pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
- }
- if (pool->pool_close)
- {
- pthread_mutex_unlock(&(pool->mutex));
- pthread_exit(NULL);
- }
- pool->queue_cur_num--;
- pjob = pool->head;
- if (pool->queue_cur_num == 0)
- {
- pool->head = pool->tail = NULL;
- }
- else
- {
- pool->head = pjob->next;
- }
- if (pool->queue_cur_num == 0)
- {
- pthread_cond_signal(&(pool->queue_empty));
- }
- if (pool->queue_cur_num == pool->queue_max_num - 1)
- {
- pthread_cond_broadcast(&(pool->queue_not_full));
- }
- pthread_mutex_unlock(&(pool->mutex));
-
- (*(pjob->callback_function))(pjob->arg);
- free(pjob);
- pjob = NULL;
- }
- }
- int threadpool_destroy(struct threadpool *pool)
- {
- assert(pool != NULL);
- pthread_mutex_lock(&(pool->mutex));
- if (pool->queue_close || pool->pool_close)
- {
- pthread_mutex_unlock(&(pool->mutex));
- return -1;
- }
-
- pool->queue_close = 1;
- while (pool->queue_cur_num != 0)
- {
- pthread_cond_wait(&(pool->queue_empty), &(pool->mutex));
- }
-
- pool->pool_close = 1;
- pthread_mutex_unlock(&(pool->mutex));
- pthread_cond_broadcast(&(pool->queue_not_empty));
- pthread_cond_broadcast(&(pool->queue_not_full));
- int i;
- for (i = 0; i < pool->thread_num; ++i)
- {
- pthread_join(pool->pthreads[i], NULL);
- }
-
- pthread_mutex_destroy(&(pool->mutex));
- pthread_cond_destroy(&(pool->queue_empty));
- pthread_cond_destroy(&(pool->queue_not_empty));
- pthread_cond_destroy(&(pool->queue_not_full));
- free(pool->pthreads);
- struct job *p;
- while (pool->head != NULL)
- {
- p = pool->head;
- pool->head = p->next;
- free(p);
- }
- free(pool);
- return 0;
- }
测试文件main.c文件:
- #include "threadpool.h"
-
- void* work(void* arg)
- {
- char *p = (char*) arg;
- printf("threadpool callback fuction : %s.\n", p);
- sleep(1);
- }
-
- int main(void)
- {
- struct threadpool *pool = threadpool_init(10, 20);
- threadpool_add_job(pool, work, "1");
- threadpool_add_job(pool, work, "2");
- threadpool_add_job(pool, work, "3");
- threadpool_add_job(pool, work, "4");
- threadpool_add_job(pool, work, "5");
- threadpool_add_job(pool, work, "6");
- threadpool_add_job(pool, work, "7");
- threadpool_add_job(pool, work, "8");
- threadpool_add_job(pool, work, "9");
- threadpool_add_job(pool, work, "10");
- threadpool_add_job(pool, work, "11");
- threadpool_add_job(pool, work, "12");
- threadpool_add_job(pool, work, "13");
- threadpool_add_job(pool, work, "14");
- threadpool_add_job(pool, work, "15");
- threadpool_add_job(pool, work, "16");
- threadpool_add_job(pool, work, "17");
- threadpool_add_job(pool, work, "18");
- threadpool_add_job(pool, work, "19");
- threadpool_add_job(pool, work, "20");
- threadpool_add_job(pool, work, "21");
- threadpool_add_job(pool, work, "22");
- threadpool_add_job(pool, work, "23");
- threadpool_add_job(pool, work, "24");
- threadpool_add_job(pool, work, "25");
- threadpool_add_job(pool, work, "26");
- threadpool_add_job(pool, work, "27");
- threadpool_add_job(pool, work, "28");
- threadpool_add_job(pool, work, "29");
- threadpool_add_job(pool, work, "30");
- threadpool_add_job(pool, work, "31");
- threadpool_add_job(pool, work, "32");
- threadpool_add_job(pool, work, "33");
- threadpool_add_job(pool, work, "34");
- threadpool_add_job(pool, work, "35");
- threadpool_add_job(pool, work, "36");
- threadpool_add_job(pool, work, "37");
- threadpool_add_job(pool, work, "38");
- threadpool_add_job(pool, work, "39");
- threadpool_add_job(pool, work, "40");
-
- sleep(5);
- threadpool_destroy(pool);
- return 0;
- }
二、线程池补充
上面的文章介绍了线程池的原理及意义。
下面,介绍的这个线程池与上面提到的那个线程池有一部分相似的地方。
主要区别为:
1、线程池中的每个线程都有自己的互斥量和条件变量,而不是线程池共享一个。
2、线程池中的线程在程序结束时,等待线程池中线程停止的机制不同。
该程序主要由两个文件构成,分别为ThreadPool.h和ThreadPool.cpp文件。
ThreadPool.h文件:
- #define MAXT_IN_POOL 200
- #define BUSY_THRESHOlD 0.5
- #define MANAGE_INTREVAL 2
-
- class ThreadPool;
-
- typedef void (*dispatch_fn)(void*);
-
-
- typedef struct tagThread
- {
- pthread_t thread_id;
- pthread_mutex_t thread_mutex;
- pthread_cond_t thread_cond;
- dispatch_fn do_job;
- void* args;
- ThreadPool *parent;
- }_thread;
-
-
- class ThreadPool
- {
- public:
-
-
-
-
-
-
-
-
- ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool = 2);
- ~ThreadPool();
-
-
-
-
-
-
-
-
-
- void dispatch_threadpool(dispatch_fn dispatch_me, void* dispatch_me);
- private:
- pthread_mutex_t tp_mutex;
- pthread_cond_t tp_idle;
- pthread_cond_t tp_full;
- pthread_cond_t tp_empty;
- int tp_min;
- int tp_max;
- int tp_avail;
- int tp_total;
- _thread** tp_list;
- bool tp_stop;
-
-
-
-
-
-
-
-
- bool add_avail(_thread* avail);
-
-
-
-
-
-
-
-
- static void* work_thread(void* args);
-
-
-
-
-
-
-
-
-
- bool add_thread(dispatch_fn dispatch_me, void* args);
-
-
-
-
-
-
-
-
- void syn_all();
- };
ThreadPool.cpp文件:
- ThreadPool::ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool)
- {
- pthread_t manage_id;
-
- if (min_threads_in_pool <= 0 || max_threads_in_pool < 0 || min_threads_in_pool > max_threads_in_pool || max_threads_in_pool > MAXT_IN_POOL)
- {
- return ;
- }
-
- tp_avail = 0;
- tp_total = 0;
- tp_min = min_threads_in_pool;
- tp_max = max_threads_in_pool;
- tp_stop = false;
- tp_list = (_thread * *) malloc(sizeof(void *) * max_threads_in_pool);
- if (NULL == tp_list)
- {
- return;
- }
- memset(tp_list, 0, sizeof(void *) * max_threads_in_pool);
-
- pthread_mutex_init(&tp_mutex, NULL);
- pthread_cond_init(&tp_idle, NULL);
- pthread_cond_init(&tp_full, NULL);
- pthread_cond_init(&tp_empty, NULL);
- }
-
- bool ThreadPool::add_avail(_thread* avail)
- {
- bool ret = false;
-
- pthread_mutex_lock(&tp_mutex);
- if (tp_avail < tp_max)
- {
- tp_list[tp_avail] = avail;
- tp_avail++;
- pthread_cond_signal(&tp_idle);
- if (tp_avail >= tp_total)
- {
- pthread_cond_signal(&tp_full);
- }
- ret = true;
- }
- pthread_mutex_unlock(&tp_mutex);
-
- return ret;
- }
-
- void* ThreadPool::work_thread(void* args)
- {
- _thread* thread = (_thread*) args;
- ThreadPool *pool = thread->parent;
- while (pool->tp_stop == false)
- {
- thread->do_job(thread->args);
- pthread_mutex_lock(&thread->thread_mutex);
- if (pool->add_avail(thread))
- {
- pthread_cond_wait(&thread->thread_cond, &thread->thread_mutex);
- pthread_mutex_unlock(&thread->thread_mutex);
- }
- else
- {
- pthread_mutex_unlock(&thread->thread_mutex);
- pthread_mutex_destroy(&thread->thread_mutex);
- pthread_cond_destroy(&thread->thread_cond);
- free(thread);
- break;
- }
- }
-
- pthread_mutex_lock(&pool->tp_mutex);
- pool->tp_total--;
- if (pool->tp_total <= 0)
- {
- pthread_cond_signal(&pool->tp_empty);
- }
- pthread_mutex_unlock(&pool->tp_mutex);
-
- return NULL;
- }
-
- bool ThreadPool::add_thread(dispatch_fn dispatch_me, void* args)
- {
- _thread* thread = NULL;
- pthread_attr_t attr;
-
- thread = (_thread *) malloc(sizeof(_thread));
- if (NULL == thread)
- {
- return false;
- }
-
- pthread_mutex_init(&thread->thread_mutex, NULL);
- pthread_cond_init(&thread->thread_cond, NULL);
- thread->do_job = dispatch_me;
- thread->args = args;
- thread->parent = this;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-
- if (pthread_create(&thread->thread_id, &attr, work_thread, (void *) thread) != 0)
- {
- pthread_mutex_destroy(&thread->thread_mutex);
- pthread_cond_destroy(&thread->thread_cond);
- pthread_attr_destroy(&attr);
- free(thread);
- return false;
- }
- tp_total++;
-
- return true;
- }
-
- void ThreadPool::dispatch_threadpool(dispatch_fn dispatch_me, void* args)
- {
- _thread* thread = NULL;
-
- pthread_mutex_lock(&tp_mutex);
-
- if (tp_avail <= 0 && tp_total >= tp_max)
- {
- pthread_cond_wait(&tp_idle, &tp_mutex);
- }
-
- if (tp_avail <= 0)
- {
- if (!add_thread(dispatch_me, args))
- {
- return;
- }
- }
- else
- {
- tp_avail--;
- thread = tp_list[tp_avail];
- tp_list[tp_avail] = NULL;
- thread->do_job = dispatch_me;
- thread->args = args;
-
- pthread_mutex_lock(&thread->thread_mutex);
- pthread_cond_signal(&thread->thread_cond);
- pthread_mutex_unlock(&thread->thread_mutex);
- }
-
- pthread_mutex_unlock(&tp_mutex);
- }
-
-
- void ThreadPool::syn_all()
- {
- if (tp_avail < tp_total)
- {
- pthread_cond_wait(&tp_full, &tp_mutex);
- }
-
- tp_stop = true;
- int i = 0;
- for (i = 0; i < tp_avail; i++)
- {
- _thread *thread = tp_list[i];
- pthread_mutex_lock(&thread->thread_mutex);
- pthread_cond_signal(&thread->thread_cond);
- pthread_mutex_unlock(&thread->thread_mutex);
- }
- if (tp_total > 0)
- {
- pthread_cond_wait(&tp_empty, &tp_mutex);
- }
- }
-
- ThreadPool::~ThreadPool()
- {
- sleep(MANAGE_INTREVAL);
- pthread_mutex_lock(&tp_mutex);
- syn_all();
- int i = 0;
- for (i = 0; i < tp_total; i++)
- {
- free(tp_list[i]);
- tp_list[i] = NULL;
- }
- pthread_mutex_unlock(&tp_mutex);
- pthread_mutex_destroy(&tp_mutex);
- pthread_cond_destroy(&tp_idle);
- pthread_cond_destroy(&tp_full);
- pthread_cond_destroy(&tp_empty);
- free(tp_list);
- }
linux中的线程池
原文:http://blog.csdn.net/yusiguyuan/article/details/18401277