linux线程池thrmgr源码解析
thrmgr线程池的作用是提高程序的并发处理能力,在多CPU的服务器上运行程序,可以并发执行多个任务。
thrmgr并非像常规线程池那样,创建线程池时,创建固定数量的线程,线程一直存在,直到线程池被销毁。Thrmgr创建时只是分配线程池对象的变量,并初始化锁、条件变量等变量,并没有创建线程。当向线程池加入第一个任务时,开始创建第一个线程,处理任务,如果一直有任务,则会一直处理任务,如果加入的任务太多太快,线程池内线程都在忙碌,没有空闲等待线程,而且线程池中存在的线程数量不超过最大值,则会创建新的线程去执行任务。线程数量会增多,等到任务数量减少时,有些线程没有任务了,处于等待状态,而且等待超时,这些等待超时的线程就会自动返回,自动退出线程,线程池内活动线程数量减少。这样线程池内线程的数量会随着任务的数量动态调整。即避免了任务量大时,线程池处理不过来,又避免了任务少时,线程池内部存在大量空闲线程的缺陷。从实现了一种根据任务量动态调整的线程池。
typedef struct
{
int detachstate; 线程的分离状态
int schedpolicy; 线程调度策略
structsched_param schedparam; 线程的调度参数
int inheritsched; 线程的继承性
int scope; 线程的作用域
size_t guardsize; 线程栈末尾的警戒缓冲区大小
int stackaddr_set;
void* stackaddr; 线程栈的位置
size_t stacksize; 线程栈的大小
}pthread_attr_t;
线程具有属性,用pthread_attr_t表示,在对该结构进行处理之前必须进行初始化,在使用后需要对其去除初始化。我们用pthread_attr_init函数对其初始化,用pthread_attr_destroy对其去除初始化。
名称:: |
pthread_attr_init/pthread_attr_destroy |
功能: |
对线程属性初始化/去除初始化 |
头文件: |
#include<pthread.h> |
函数原形: |
int pthread_attr_init(pthread_attr_t*attr); int pthread_attr_destroy(pthread_attr_t*attr); |
参数: |
Attr 线程属性变量 |
返回值: |
若成功返回0,若失败返回-1。 |
线程的分离状态决定一个线程以什么样的方式来终止自己。在默认情况下线程是非分离状态的,这种情况下,原有的线程等待创建的线程结束。只有当pthread_join()函数返回时,创建的线程才算终止,才能释放自己占用的系统资源。而分离线程不是这样子的,它没有被其他的线程所等待,自己运行结束了,线程也就终止了,马上释放系统资源。线程池中需要的就是分离状态的线程。则可以设置pthread_attr_t结构中的detachstate线程属性为PTHREAD_CREATE_DETACHED,让线程以分离状态启动。
名称:: |
pthread_attr_getdetachstate/pthread_attr_setdetachstate |
功能: |
获取/修改线程的分离状态属性 |
头文件: |
#include<pthread.h> |
函数原形: |
int pthread_attr_getdetachstate(const pthread_attr_t *attr,int *detachstate); int pthread_attr_setdetachstate(pthread_attr_t *attr,intdetachstate); |
参数: |
Attr 线程属性变量 Detachstate 线程的分离状态属性 |
返回值: |
若成功返回0,若失败返回-1。 |
pthread_mutex_t在线程池中的作用是为了避免多线程对公共变量同时进行访问。在访问变量前,先锁住,不让别人访问,访问结束后在释放锁,让别人访问。保证原子性操作,避免多个线程同时修改公共变量。pthread_mutex_lock(&threadpool->pool_mutex);上锁。pthread_mutex_unlock(&threadpool->pool_mutex);进行解锁,两个函数要配对使用。
条件变量是利用线程间共享的全局变量进行同步的一种机制,通常是要和互斥量pthread_mutex_t配合使用。pthread_cond_t在线程池中的作用是无任务时,让线程处于等待状态,等待条件变量有信号,挂起线程。当有任务时,发出信号,让线程脱离等待状态,开始执行任务。线程等待超时,则自动退出,结束线程。
(1)初始化和反初始化
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
(2)等待信号函数
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);//一直等待
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);//超时等待
因为采用的是绝对时间,所以先要获取系统时间,在加上超时时间,在传入参数。
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
timeout.tv_nsec = 0;
pthread_cond_wait函数需要传入mutex变量,是因为在函数内部需要对mutex进行操作,内部流程是:更新条件等待队列->释放锁->等待信号。。。->信号到来->上锁->更新条件等待队列->执行pthread_cond_wait后面的代码。在pthread_cond_wait内部有一个先解锁再加锁的过程,所以pthread_cond_wait要和pthread_mutex_t配合使用,而且,pthread_cond_wait需要在加锁和解锁之间,总的流程就是外部加锁->内部更新条件等待队列->内部释放锁->内部等待信号。。。->信号到来->内部上锁->内部更新条件等待队列->执行pthread_cond_wait后面的代码->外部解锁。
(3)发送信号
pthread_cond_signal();发出一个信号,激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;
pthread_cond_broadcast();广播方式发送信号,所有pthread_cond_wait获取到信号返回,激活所有等待线程。
/* * Copyright (C) 2004 Trog <trog@clamav.net> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, * MA 02110-1301, USA. */ #ifndef __THRMGR_H__ #define __THRMGR_H__ #include <pthread.h> #ifndef C_WINDOWS #include <sys/time.h> #endif typedef struct work_item_tag { struct work_item_tag *next; void *data; struct timeval time_queued;// } work_item_t; typedef struct work_queue_tag { work_item_t *head; work_item_t *tail; int item_count; } work_queue_t; typedef enum { POOL_INVALID, POOL_VALID, POOL_EXIT } pool_state_t; typedef struct threadpool_tag { pthread_mutex_t pool_mutex;//mutex锁,用于限制同时只有一个线程对公共资源(条件变量,线程池内部变量)访问修改 pthread_cond_t pool_cond;//条件变量,用于多线程间等待任务,有任务的信号控制 pthread_attr_t pool_attr;//线程的属性,主要为了设置分离线程属性,即线程循环退出自动结束线程,释放资源,不用调用函数去释放资源 //具体可参考https://www.cnblogs.com/lidabo/p/5514222.html对属性的解释 pool_state_t state;//线程池的状态 int thr_max;//线程池最大线程数量 int thr_alive;//活着的线程数量,包括正在执行任务的线程和空闲等待线程 int thr_idle;//空闲等待的线程。 int idle_timeout;//线程等待超时时间,超时结束后,将结束本线程 void (*handler)(void *);//任务操作函数,有用户传入函数指针 work_queue_t *queue;//任务队列,以单向链表的方式存储任务 } threadpool_t; /* 功能:新建线程池 参数: int max_threads, 最大线程数 int idle_timeout,超时时间 void (*handler)(void *)函数操作句柄 */ threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *)); /* 功能:销毁线程池 参数: threadpool_t *threadpool线程池指针 */ void thrmgr_destroy(threadpool_t *threadpool); /* 功能:给线程池下发任务 参数: threadpool_t *threadpool线程池指针 void *user_data 用户要处理的数据 */ int thrmgr_dispatch(threadpool_t *threadpool, void *user_data); #endif
/* * Copyright (C) 2004 Trog <trog@clamav.net> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, * MA 02110-1301, USA. */ #if HAVE_CONFIG_H #include "clamav-config.h" #endif #include <stdio.h> #include <pthread.h> #include <time.h> #include <errno.h> #include "shared/output.h" #include "thrmgr.h" #include "others.h" #define FALSE (0) #define TRUE (1) /*新建任务队列,初始化队列参数*/ static work_queue_t *work_queue_new(void) { work_queue_t *work_q; work_q = (work_queue_t *) malloc(sizeof(work_queue_t)); if (!work_q) { return NULL; } work_q->head = work_q->tail = NULL; work_q->item_count = 0; return work_q; } /*向任务队列中加入任务*/ static int work_queue_add(work_queue_t *work_q, void *data) { work_item_t *work_item; if (!work_q) { return FALSE; } //申请任务内存 work_item = (work_item_t *) malloc(sizeof(work_item_t)); if (!work_item) { return FALSE; } //next指针设为空,将用户数据赋值给任务 work_item->next = NULL; work_item->data = data; //设置任务接收时间,好像没有什么用 gettimeofday(&(work_item->time_queued), NULL); //第一次插入任务首尾指针都为空,所以同时指向这个任务 if (work_q->head == NULL) { work_q->head = work_q->tail = work_item; work_q->item_count = 1; } else {//以后插入时,将结尾next指针指向插入任务, //然后将位置指针指向最后插入的任务, //相当于将任务加入单向链表的末尾,然后在将尾指针指向最后一个 work_q->tail->next = work_item; work_q->tail = work_item; work_q->item_count++;//任务数量加1 } return TRUE; } static void *work_queue_pop(work_queue_t *work_q) { work_item_t *work_item; void *data; //头指针为空,无数据返回 if (!work_q || !work_q->head) { return NULL; } //获取链表中第一个任务 work_item = work_q->head; //获取用户数据 data = work_item->data; //将头指针向后移动一位 work_q->head = work_item->next; //如果头指针是空,说明刚刚去除的任务已经是最后一个任务,需要把尾指针也置为空 if (work_q->head == NULL) { work_q->tail = NULL; } //销毁任务框架内容,返回用户数据 free(work_item); return data; } void thrmgr_destroy(threadpool_t *threadpool) { if (!threadpool || (threadpool->state != POOL_VALID)) { return; } //上锁 if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) { logg("!Mutex lock failed\n"); exit(-1); } //设置线程池状态为退出 threadpool->state = POOL_EXIT; /* wait for threads to exit */ //线程池中有活的线程,广播信号变量,让所有线程都获取到信号,自动返回结束线程 if (threadpool->thr_alive > 0) { if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) { pthread_mutex_unlock(&threadpool->pool_mutex); return; } } while (threadpool->thr_alive > 0) {//等待最后一个线程结束时,即thr_alive==0时,会广播一个信号,告诉所有线程都结束了 if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) { pthread_mutex_unlock(&threadpool->pool_mutex); return; } } if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) { logg("!Mutex unlock failed\n"); exit(-1); } //释放资源 pthread_mutex_destroy(&(threadpool->pool_mutex)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_attr_destroy(&(threadpool->pool_attr)); free(threadpool->queue); free(threadpool); return; } threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *)) { threadpool_t *threadpool; #if defined(C_BIGSTACK) || defined(C_BSD) size_t stacksize; #endif if (max_threads <= 0) { return NULL; } //创建线程池对象 threadpool = (threadpool_t *) malloc(sizeof(threadpool_t)); if (!threadpool) { return NULL; } //创建任务队列 threadpool->queue = work_queue_new(); if (!threadpool->queue) { free(threadpool); return NULL; } //线程池创建只是创建对象,没有启动任何线程,线程是在接收到任务才开始创建线程 threadpool->thr_max = max_threads; threadpool->thr_alive = 0; threadpool->thr_idle = 0; threadpool->idle_timeout = idle_timeout; threadpool->handler = handler; //初始化锁 pthread_mutex_init(&(threadpool->pool_mutex), NULL); if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) { pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->queue); free(threadpool); return NULL; } //初始化线程属性对象 if (pthread_attr_init(&(threadpool->pool_attr)) != 0) { pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->queue); free(threadpool); return NULL; } //将线程属性对象参数设置为分离的线程属性(PTHREAD_CREATE_DETACHED),即线程执行到末尾,自动回收资源,不用调用回收函数来回收 if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) { pthread_attr_destroy(&(threadpool->pool_attr)); pthread_cond_destroy(&(threadpool->pool_cond)); pthread_mutex_destroy(&(threadpool->pool_mutex)); free(threadpool->queue); free(threadpool); return NULL; } //设置线程堆栈大小 #if defined(C_BIGSTACK) || defined(C_BSD) pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize); stacksize = stacksize + 64 * 1024; if (stacksize < 1048576) stacksize = 1048576; /* at least 1MB please */ logg("Set stacksize to %u\n", stacksize); pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize); #endif threadpool->state = POOL_VALID; //线程池状态设置为可用状态 return threadpool; } static void *thrmgr_worker(void *arg) { threadpool_t *threadpool = (threadpool_t *) arg; void *job_data; int retval, must_exit = FALSE; struct timespec timeout; /* loop looking for work */ for (;;) {//锁住,要修改公共变量了threadpool,如果这里锁住,pthread_cond_timedwait等待状态,后面的释放锁不执行,加入任务的函数thrmgr_dispatch中pthread_cond_signal信号也是在锁内部, //不就一直发不出去,那么这边在等,不释放锁,那边在争锁,又发不出去,不是死锁了吗。其实不然,pthread_cond_timedwait内部会先释放锁,然后等待信号,然后thrmgr_dispatch就可以进入锁发送信号了。 //然后这边等到信号后再锁住,修改信号状态值,然后返回函数。pthread_cond_timedwait内部有一个释放锁,再锁住的过程。 if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { /* Fatal error */ logg("!Fatal: mutex lock failed\n"); exit(-2); } timeout.tv_sec = time(NULL) + threadpool->idle_timeout; timeout.tv_nsec = 0; //等待线程数量加1 threadpool->thr_idle++; //获取到任务退出循环,否则进入等待 while (((job_data=work_queue_pop(threadpool->queue)) == NULL)&& (threadpool->state != POOL_EXIT)) { /* Sleep, awaiting wakeup */ //接收到第一个任务时,才会 创建第一个线程,所以第一次会接收到信号并处理,第二次循环如果没有任务, //则会在这里等待,线程被挂起,超时后往下执行 retval = pthread_cond_timedwait(&(threadpool->pool_cond),&(threadpool->pool_mutex), &timeout); //如果是超时的,说明空闲超时,需要结束该线程,设置变量,让线程自动结束 if (retval == ETIMEDOUT) { must_exit = TRUE; break;//跳出等待 } }//等待结束,等待线程数量减1 threadpool->thr_idle--; //如果线程状态为退出,也将must_exit置为true if (threadpool->state == POOL_EXIT) { must_exit = TRUE; } //threadpool变量修改结束,释放锁 if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { /* Fatal error */ logg("!Fatal: mutex unlock failed\n"); exit(-2); }//如果任务不为空,执行任务 if (job_data) { threadpool->handler(job_data); } else if (must_exit)//如果是线程空闲太久或者线程池状态为退出,则退出线程 { break; } } //又要操作公共变量threadpool->thr_alive了,锁住,不让别人来打扰 if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { /* Fatal error */ logg("!Fatal: mutex lock failed\n"); exit(-2); } //线程即将推出,将活着的线程数量减1 threadpool->thr_alive--; //最后一个线程了,临死前发出最后一个广播信号,告诉destory函数最后一个线程已经阵亡,可以结束了 if (threadpool->thr_alive == 0) { /* signal that all threads are finished */ pthread_cond_broadcast(&threadpool->pool_cond); } //修改公共变量结束,释放锁 if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { /* Fatal error */ logg("!Fatal: mutex unlock failed\n"); exit(-2); } return NULL; } /*向线程池下发任务*/ int thrmgr_dispatch(threadpool_t *threadpool, void *user_data) { pthread_t thr_id; if (!threadpool) { return FALSE; } /* Lock the threadpool */ //需要访问公共对象任务队列了,开始锁住 if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) { logg("!Mutex lock failed\n"); return FALSE; } if (threadpool->state != POOL_VALID) { if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { logg("!Mutex unlock failed\n"); return FALSE; } return FALSE; }//向任务队列中加入任务 if (!work_queue_add(threadpool->queue, user_data)) { if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { logg("!Mutex unlock failed\n"); return FALSE; } return FALSE; } //空闲等待线程数量为0,要么任务太多,要么第一次添加任务,线程池内线程数量为空 if ((threadpool->thr_idle == 0) &&(threadpool->thr_alive < threadpool->thr_max)) { /* Start a new thread */ //启动线程 if (pthread_create(&thr_id, &(threadpool->pool_attr),thrmgr_worker, threadpool) != 0) { logg("!pthread_create failed\n"); } else { threadpool->thr_alive++;//成功创建,活线程数量加1 } } //发送一个信号,告诉空闲等待线程,有任务了 pthread_cond_signal(&(threadpool->pool_cond)); if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) { logg("!Mutex unlock failed\n"); return FALSE; } return TRUE; }
参考文献
(1)https://www.cnblogs.com/secondtonone1/p/5580203.html 条件变量
(2)https://www.cnblogs.com/lidabo/p/5514222.html 线程属性
(3)http://en.verysource.com/clamav_sourcecode_rar-download-121916.html 线程池源码
自己编了一个股票监控软件,有如下功能,有兴趣的朋友可以下载;
(1) 个股监测。监测个股实时变化,可以监测个股大单交易、急速拉升和下降、主力入场和出场、股票最高点和最低点提醒。检测到最高点、最低点、主力进场点、主力退场点、急速拉升点、急速下跌点,给出语音或者声音提醒,不用再时刻看着大盘了,给你更多自由的时间;
(2) 大盘监测。监测大盘的走势,采用上证、深证、创业三大指数的综合指数作为大盘走势。并实时监测大盘的最高点和最低点、中间的转折点。
(3) 股票推荐。还能根据历史数据长期或短期走势进行分析,对股市3千多个股票进行分析对比,选出涨势良好的股票,按照增长速度从大到小排序,推荐给你涨势良好的股票;
下载地址:
1.0.3版本(修复大盘指数崩溃缺陷)下载地址:
链接:https://pan.baidu.com/s/1BJcTp-kdniM7VE9K5Kd3vg 提取码:003h
更新链接:
https://www.cnblogs.com/bclshuai/p/10621613.html
原文:https://www.cnblogs.com/bclshuai/p/9851435.html