首页 > 编程语言 > 详细

一个简单的线程池程序设计(消费者和生产者)

时间:2015-04-09 23:35:58      阅读:419      评论:0      收藏:0      [点我收藏+]

最近在学习linux下的编程,刚开始接触感觉有点复杂,今天把线程里比较重要的线程池程序重新理解梳理一下。

实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务。如果线程池中的线程都在忙,那么任务队列中的任务则等待。本程序较为简单,把任务定义为了两个数相加,输出它们的和。

采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全。

1.在linux环境下构建三个文件夹(include,src,bin)

include:包含该程序所需要的头文件。

src:包括程序的main函数,以及其他函数。

bin:程序编译,链接生成的可执行函数。

2.编写include下的头文件函数

vim pool_common.h

 1 #ifndef _COMM
 2 #define _COMM
 3 #include<stdio.h>
 4 #include<stdlib.h>
 5 #include<pthread.h>
 6 #include<string.h>
 7 #include<sys/stat.h>
 8 #include<sys/select.h>
 9 #include<sys/time.h>
10 #include<fcntl.h>
11 #include<unistd.h>
12 #include<sys/types.h>
13  struct tag
14 {
15     int left;
16     int right;
17 };
18 typedef struct tag  elem_t;  //定义数据成员,即要输入的两个相加的数
19 #endif

vim pool_que.h    

 1 #ifndef _QUE
 2 #define _QUE
 3 #include "pool_common.h"//有关任务队列的函数
 4 typedef enum
 5 {
 6     EMPTY,FULL,NEITHER
 7 }STATUS;                           //枚举类型,判断队列是否为空返回的状态
 8 typedef struct que
 9 {
10     elem_t *que_arr;   //任务队列中的任务数组,储存数据
11     int que_front;      //任务队列的队头
12     int que_tail;       //任务队列的队尾
13     int que_capacity;  //任务队列所容纳任务的数量
14     pthread_mutex_t que_lock;  //任务队列的互斥锁
15     pthread_cond_t que_pro;   //任务产生的信号
16     pthread_cond_t que_con;   //任务被执行的信号
17 }que_t,*pque_t;
18 void que_init(pque_t pq,int sizenum);//任务队列初始化
19 
20 STATUS que_full(pque_t pq);    //判断队列是否满了
21 STATUS que_empty(pque_t pq); //判断队列是否为空
22 void que_push(pque_t pq,elem_t val);  //将新的任务加入队列
23 void que_pop(pque_t pq);   //将任务从队列中删除
24 elem_t que_top(pque_t pq);  //获得任务队首的任务,和上个函数结合使用,即将任务交于线程执行
25 void que_destroy(pque_t pq); //销毁队列
26 #endif

vim pool_thread.h  //关于多线程的头文件

 1 #ifndef _POOL_THREAD
 2 #define _POOL_THREAD
 3 #include "pool_que.h"
 4 typedef void* (*consumer_handle)(void*); //定义线程所以执行的任务函数名称
 5 typedef struct threads
 6 {
 7     que_t pool_que;  //线程池中的任务队列
 8     int pool_cnt;   //线程池中线程个数
 9     pthread_t *pool_arr;  //线程数组
10     consumer_handle pool_handle; //线程索要处理的任务函数
11 }threadspool_t,*pthreadspool_t;
12 void pool_init(pthreadspool_t ppool,int quecapacity,int threadsnum,consumer_handle hd);//初始化多线程
13 void pool_on(pthreadspool_t ppool);//启动多线程
14 void pool_off(pthreadspool_t ppool);//关闭多线程
15 void pool_put(pthreadspool_t ppool,elem_t val);//将新的任务加入到线程池中
16 void pool_get(pthreadspool_t ppool,elem_t *val);//取出线程池中的任务
17 void pool_destroy(pthreadspool_t ppool);//销毁线程池
18 #endif

3.编写main函数

cd src

vim pool_main.c

技术分享
 1 #include "pool_pthread.h"
 2 void *handle(void * arg)//任务处理函数
 3 {
 4     pthreadspool_t ppool=(pthreadspool_t)arg;
 5     elem_t val;
 6     while(1)
 7     {
 8         pool_get(ppool,&val);    //从线程池中获得任务数据
 9         printf("%u:excute task!,%d +%d=%d\n",pthread_self(),val.left,val.right,val.left+val.right);//执行输出
10     }
11 
12 }
13 int main(int argc,char *argv[])
14 {
15     if(argc!=3)
16     {
17         printf("EXE QUE_CAPACITY THREADS_NUM!\n");
18         exit(1);
19     }//生成的可执行函数后面必须加上2个变量,否则出错
20     threadspool_t mypool;
21     pool_init(&mypool,atoi(argv[1]),atoi(argv[2]),handle);//线程池初始化
22     pool_on(&mypool);//启动线程池
23     char buf[1024];
24 //下面是一个select函数,其主要作用是每隔一秒去监听键盘上是否有数字输入,如果有,那么把数据流中的数据转化为任务数据添加到任务队列中去
25 /*    while(1)  
26     {
27         fd_set rds;
28         FD_ZERO(&rds); //初始化
29         FD_SET(0,&rds);//rds捆绑输入,监听
30 
31         struct timeval tm;
32         tm.tv_sec=1;
33         tm.tv_usec=0;//设置时间
34         if(0==select(1024,&rds,NULL,NULL,&tm))
35         {
36             continue;
37         }//如果没输入,继续监听
38         if(FD_ISSET(0,&rds))
39         {
40         memset(buf,0,sizeof(buf));
41         if(read(0,buf,127)==0)//将键盘上输入的字符流储存到数组中去
42         {
43             break;
44         }
45         elem_t val;
46         sscanf(buf,"%d%d",&val.left,&val.right);//将数组中的字符赋值两个数字到数据项
47         pool_put(&mypool,val);//加入任务队列
48         }
49     }*/
50 //第二种输入方法,不停的输入数字,并将这些数据添加到任务队列中去,不用select函数
51    int i,x,y;
52    elem_t val;
53    while(1)
54    {
55     scanf("%d%d",&x,&y);
56         val.left=x;
57         val.right=y;
58     pool_put(&mypool,val);
59    }
60     pool_off(&mypool);//关闭多线程
61     pool_destroy(&mypool);//销毁线程池
62     return 0;
63 }
View Code

4.实现多线程的具体功能函数pool_thread.c和队列操作的具体功能函数pool_que.c

vim pool_thread.c

技术分享
 1 #include<pool_pthread.h>
 2 void pool_init(pthreadspool_t ppool,int quecapacity,int threadsnum,consumer_handle hd)
 3 {
 4     que_init(&ppool->pool_que,quecapacity);
 5     ppool->pool_arr=(pthread_t *)calloc(threadsnum,sizeof(pthread_t));
 6     ppool->pool_cnt=threadsnum;//线程数
 7     ppool->pool_handle=hd;
 8 }
 9 void pool_on(pthreadspool_t ppool)
10 {
11     int i;
12     for(i=0;i<ppool->pool_cnt;i++)
13     {
14         if(0!=pthread_create(ppool->pool_arr+i,NULL,ppool->pool_handle,(void*)ppool))//创建线程,并且执行
15         {
16             printf("thread create fail!\n");
17             exit(1);
18         }
19     }
20 }
21 void pool_off(pthreadspool_t ppool)
22 {
23     int i;
24     for(i=0;i<ppool->pool_cnt;i++)
25     {
26     pthread_join(ppool->pool_arr[i],NULL);//关闭线程
27     }
28 }
29 void pool_put(pthreadspool_t ppool,elem_t val)
30 {
31     que_push(&ppool->pool_que,val);
32 }
33 void pool_get(pthreadspool_t ppool,elem_t* val)
34 {
35     *val=que_top(&ppool->pool_que);
36     que_pop(&ppool->pool_que);
37 }
38 void pool_destroy(pthreadspool_t ppool)
39 {
40 que_destroy(&ppool->pool_que);
41 free(ppool->pool_arr);
42 }
View Code

vim pool_que.c

技术分享
 1 #include  "pool_que.h"
 2 void que_init(pque_t pq,int sizenum)
 3 {
 4     pq->que_arr=(elem_t *)calloc(sizenum,sizeof(elem_t));
 5     pq->que_capacity=sizenum;
 6     pq->que_front =0;
 7     pq->que_tail=0;
 8     pthread_mutex_init(&pq->que_lock,NULL);//初始化互斥锁
 9     pthread_cond_init(&pq->que_pro,NULL);//初始化信号量
10     pthread_cond_init(&pq->que_con,NULL);
11 }
12 
13 STATUS que_empty(pque_t pq)
14 {
15     if(pq->que_front==pq->que_tail)//队首和队尾相等则为空,该队列是一个循环队列
16     {
17         return EMPTY;
18     }
19     else
20         return NEITHER;
21 }
22 STATUS que_full(pque_t pq)
23 {
24     if((pq->que_tail+1)%pq->que_capacity==pq->que_front)
25     {
26         return FULL;
27     }
28     else
29         return NEITHER;
30 }
31 
32 void que_push(pque_t pq,elem_t val)
33 {
34     pthread_mutex_lock(&pq->que_lock);//关闭互斥锁
35     while(que_full(pq)==FULL)
36     {
37    pthread_cond_wait(&pq->que_pro,&pq->que_lock); //当任务队列为满时,等待执行任务的信号,并且将互斥锁打开
38     }
39     pq->que_arr[pq->que_tail]=val;
40     pq->que_tail=(pq->que_tail+1)%pq->que_capacity;
41     pthread_cond_signal(&pq->que_con);//通知线程有新任务出现
42     pthread_mutex_unlock(&pq->que_lock);//打开互斥锁
43 }
44 elem_t que_top(pque_t pq)
45 {
46     pthread_mutex_lock(&pq->que_lock);//上锁
47     while(EMPTY==que_empty(pq))
48     {
49         pthread_cond_wait(&pq->que_con,&pq->que_lock);
50     }
51     return pq->que_arr[pq->que_front];
52 }
53 void que_pop(pque_t pq)
54 {
55     pq->que_front=(pq->que_front+1)%pq->que_capacity;
56     pthread_cond_signal(&pq->que_pro);
57     pthread_mutex_unlock(&pq->que_lock);
58 }
59 void que_destroy(pque_t pq)
60 {
61     free(pq->que_arr);
62     pthread_mutex_destroy(&pq->que_lock);//销毁锁和信号
63     pthread_cond_destroy(&pq->que_pro);
64     pthread_cond_destroy(&pq->que_con);
65 }
View Code

5.Makefile函数

由于程序涉及的函数较多,为了编译执行方便,可以使用Makefile函数,用make命令来执行

1 EXE_DIR:=./bin   //生成可执行函数的地址
2 INC_DIR:=./include  //头文件
3 SRC_DIR:=./src  //源文件
4 
5 OBJECTS:=./src/pool_main.c ./src/pool_thread.c ./src/pool_que.c
6 
7 $(EXE_DIR)/main:$(OBJECTS) $(INCLUDES)
8     gcc -g -o $@ $(OBJECTS) -I$(INC_DIR) -lpthread//编译命令

6.执行

 

技术分享

7.总结

这个程序只是实现了一个简单的加法运算,当然也可以把多线程执行的任务换成其他的任务。只是通过这一个实例来说明多线程的意义和构建过程。

线程池:创建线程池以后,首先创建若干个线程。当任务队列中有任务出现时,指定一个线程为这个任务服务,若每个线程都在执行,那么任务队列中的任务只能等待。反之,如果任务队列为空,线程也只能等待。在客户端向服务器请求过程中,虽然线程不像进程那样占用很多资源,但是线程本身的创建和销毁在线程数量多的情况下仍然是很大的工作量。线程池实现了线程的复用,每个线程执行完一个任务后,可以继续执行另一个任务,这大大提高了线程处理任务的效率。

一个简单的线程池程序设计(消费者和生产者)

原文:http://www.cnblogs.com/zhaoheng/p/4412342.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!