进程与线程
为什么对于大多数合作性任务,多线程比多个独立的进程更优越呢?这是因为,线程共享相同的内存空间。不同的线程可以存取内存中的同一个变量。所以,程序中的所有线程都可以读或写声明过的全局变量。如果曾用fork() 编写过重要代码,就会认识到这个工具的重要性。为什么呢?虽然fork() 允许创建多个进程,但它还会带来以下通信问题:如何让多个进程相互通信,这里每个进程都有各自独立的内存空间。对这个问题没有一个简单的答案。虽然有许多不同种类的本地IPC (进程间通信),但它们都遇到两个重要障碍:
双重坏事: 开销和复杂性都非好事。如果曾经为了支持 IPC而对程序大动干戈过,那么您就会真正欣赏线程提供的简单共享内存机制。由于所有的线程都驻留在同一内存空间,POSIX线程无需进行开销大而复杂的长距离调用。只要利用简单的同步机制,程序中所有的线程都可以读取和修改已有的数据结构。而无需将数据经由文件描述符转储或挤入紧窄的共享内存空间。仅此一个原因,就足以让您考虑应该采用单进程/多线程模式而非多进程/单线程模式。
为什么要用线程?
与标准 fork()相比,线程带来的开销很小。内核无需单独复制进程的内存空间或文件描述符等等。这就节省了大量的CPU时间,使得线程创建比新进程创建快上十到一百倍。因为这一点,可以大量使用线程而无需太过于担心带来的CPU 或内存不足。使用 fork() 时导致的大量 CPU占用也不复存在。这表示只要在程序中有意义,通常就可以创建线程。
当然,和进程一样,线程将利用多CPU。如果软件是针对多处理器系统设计的,这就真的是一大特性(如果软件是开放源码,则最终可能在不少平台上运行)。特定类型线程程序(尤其是CPU密集型程序)的性能将随系统中处理器的数目几乎线性地提高。如果正在编写CPU非常密集型的程序,则绝对想设法在代码中使用多线程。一旦掌握了线程编码,无需使用繁琐的IPC和其它复杂的通信机制,就能够以全新和创造性的方法解决编码难题。所有这些特性配合在一起使得多线程编程更有趣、快速和灵活。
什么是线程?
- 进程ID,进程群组ID,用户ID,群组ID
- 环境
- 工作目录
- 程序指令
- 寄存器
- 栈
- 堆
- 文件描述符
- 信号动作
- 共享库
- 进程间通信工具(例如消息队列,管道,信号量,共享内存)
Unix进程 Unix进程内部的线程
- 栈指针
- 寄存器
- 调度属性(例如规则和优先级)
- 等待序列和阻塞信号
- 线程拥有的数据
- 它生存在进程中,并使用进程资源;
- 拥有它自己独立的控制流,前提是只要它的父进程还存在,并且OS支持它;
- 它仅仅复制可以使它自己调度的必要的资源;
- 它可能会同其它与之同等独立的线程分享进程资源;
- 如果父进程死掉那么它也会死掉——或者类似的事情;
- 它是轻量级的,因为大部分的开支已经在它的进程创建时完成了。
- 一个线程对共享的系统资源做出的改变(例如关闭一个文件)会被所有的其它线程看到;
- 指向同一地址的两个指针的数据是相同的;
- 对同一块内存进行读写操作是可行的,但需要程序员作明确的同步处理操作。
什么是Pthreads?
为什么要用Pthreads?
Platform | fork() | pthread_create() | ||||
---|---|---|---|---|---|---|
real | user | sys | real | user | sys | |
AMD 2.3 GHz Opteron (16cpus/node) | 12.5 | 1.0 | 12.5 | 1.2 | 0.2 | 1.3 |
AMD 2.4 GHz Opteron (8cpus/node) | 17.6 | 2.2 | 15.7 | 1.4 | 0.3 | 1.3 |
IBM 4.0 GHz POWER6 (8cpus/node) | 9.5 | 0.6 | 8.8 | 1.6 | 0.1 | 0.4 |
IBM 1.9 GHz POWER5 p5-575 (8cpus/node) | 64.2 | 30.7 | 27.6 | 1.7 | 0.6 | 1.1 |
IBM 1.5 GHz POWER4 (8cpus/node) | 104.5 | 48.6 | 47.2 | 2.1 | 1.0 | 1.5 |
INTEL 2.4 GHz Xeon (2 cpus/node) | 54.9 | 1.5 | 20.8 | 1.6 | 0.7 | 0.9 |
INTEL 1.4 GHz Itanium2 (4 cpus/node) | 54.5 | 1.1 | 22.2 | 2.0 | 1.2 | 0.6 |
1 测试代码: 2 3 ============================================================================== 4 C Code for fork() creation test 5 ============================================================================== 6 #include <stdio.h> 7 #include <stdlib.h> 8 #define NFORKS 50000 9 10 void do_nothing() { 11 int i; 12 i= 0; 13 } 14 15 int main(int argc, char *argv[]) { 16 int pid, j, status; 17 18 for (j=0; j<NFORKS; j++) { 19 20 /*** error handling ***/ 21 if ((pid = fork()) < 0 ) { 22 printf ("fork failed with error code= %d\n", pid); 23 exit(0); 24 } 25 26 /*** this is the child of the fork ***/ 27 else if (pid ==0) { 28 do_nothing(); 29 exit(0); 30 } 31 32 /*** this is the parent of the fork ***/ 33 else { 34 waitpid(pid, status, 0); 35 } 36 } 37 } 38 39 ============================================================================== 40 C Code for pthread_create() test 41 ============================================================================== 42 #include <pthread.h> 43 #include <stdio.h> 44 #include <stdlib.h> 45 46 #define NTHREADS 50000 47 48 void *do_nothing(void *null) { 49 int i; 50 i=0; 51 pthread_exit(NULL); 52 } 53 54 int main(int argc, char *argv[]) { 55 int rc, i, j, detachstate; 56 pthread_t tid; 57 pthread_attr_t attr; 58 59 pthread_attr_init(&attr); 60 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 61 62 for (j=0; j<NTHREADS; j++) { 63 rc = pthread_create(&tid, &attr, do_nothing, NULL); 64 if (rc) { 65 printf("ERROR; return code from pthread_create() is %d\n", rc); 66 exit(-1); 67 } 68 69 /* Wait for the thread */ 70 rc = pthread_join(tid, NULL); 71 if (rc) { 72 printf("ERROR; return code from pthread_join() is %d\n", rc); 73 exit(-1); 74 } 75 } 76 77 pthread_attr_destroy(&attr); 78 pthread_exit(NULL); 79 80 }
- CPU与I/O的重叠协作:例如,一个程序可能分多段对I/O进行长操作,当一个线程正在等待一个I/O系统调用完成时,CPU可以用其它线程进行现有的密集工作;
- 优先级/实时调度:可预定更重要的任务取代或者中断低优先级的任务;
- 异步事件处理:一些不确定次数和持续时间的服务事件是交叉执行的任务。例如,Web服务器可以在应答前一个传输数据的请求时候,处理新的数据请求。
- MPI库经常通过共享内存来实现on-node任务通讯,这样,就必须至少调用一次内存拷贝操作(进程对进程的);
- 对于Pthreads,不存在中间(intermediate)的内存拷贝,因为在一个进程中,线程共享相同的地址空间。本身也没有数据传输,它成为了一个从高速缓存到CPU或是内存到CPU带宽式的传输情况,它们的速度更快;
- 以下是比较列表:
Platform MPI Shared Memory Bandwidth
(GB/sec)Pthreads Worst Case
Memory-to-CPU Bandwidth
(GB/sec)AMD 2.3 GHz Opteron 1.8 5.3 AMD 2.4 GHz Opteron 1.2 5.3 IBM 1.9 GHz POWER5 p5-575 4.1 16 IBM 1.5 GHz POWER4 2.1 4 Intel 2.4 GHz Xeon 0.3 4.3 Intel 1.4 GHz Itanium 2 1.8 6.4
附注:
SMP——Symmetrical Multi-Processing,对称多处理系统;
MPI——Message Passing Interface,参照http://www-unix.mcs.anl.gov/mpi/mpich/。
创建线程函数——pthread_create()
#include <pthread.h>
int pthread_create( pthread_t* thread, const pthread_attr_t* attr, void* (*start_routine)(void* ), void* arg );
描述:
pthread_create()函数创建一个新的线程,通过线程属性对象attr指定属性。
被创建的线程继承了父线程的信号掩码,且它的等待信号是空的。
参数:
thread:NULL,或者指向一个pthread_t对象的指针,指向此函数执行后存储的一个新线程的线程ID。
attr:
指向 pthread_attr_t结构体的指针,此结构体指定了线程的执行属性。
如果attr为NULL,则被设置为默认的属性,pthread_attr_init()可以设置attr属性。
注意:如果在创建线程后编辑attr属性,线程的属性不受影响。
start_routine:
线程的执行函数,arg为函数的参数。
如果start_routine()有返回值,调用pthread_exit(),start_routine()的返回值作为线程的退出状态。
在main()中的线程被调用有所不同。当它从main()返回,调用exit(),用main()的返回值作为退出状态。
arg:
传给start_routine的参数。
Linux线程在核内是以轻量级进程的形式存在的,拥有独立的进程表项,而所有的创建、同步、删除等操作都在核外pthread库中进行。pthread库使用一个管理线程(__pthread_manager(),每个进程独立且唯一)来管理线程的创建和终止,为线程分配线程ID,发送线程相关的信号(比如Cancel),而主线程(pthread_create())的调用者则通过管道将请求信息传给管理线程。
取消线程pthread_cancel()
#include <pthread.h>
int pthread_cancel( pthread_t thread );
描述:
pthread_cancel()函数请求目标线程被取消(终止)。
取消类型(type)和状态(state)决定了取消函数是否会生效。
当取消动作被执行,目标线程的取消清除操作会被调用。
当最后的取消清除操作返回,目标线程的线程的析构函数会被调用。
当最后的析构函数返回,目标线程会被终止 (terminated)。
线程的取消过程同调用线程会异步执行。
参数:
thread: 欲取消的线程ID,可以通过pthread_create()或者pthread_self()函数取得。
pthread_testcancel()
#include <pthread.h>
void pthread_testcancel( void );
描述:函数在运行的线程中创建一个取消点,如果cancellation无效则此函数不起作用。
pthread_setcancelstate()
#include <pthread.h>
int pthread_setcancelstate( int state, int* oldstate );
描述:
pthread_setcancelstate() f函数设置线程取消状态为state,并且返回前一个取消点状态oldstate。
取消点有如下状态值:
PTHREAD_CANCEL_DISABLE:取消请求保持等待,默认值。
PTHREAD_CANCEL_ENABLE:取消请求依据取消类型执行;参考pthread_setcanceltype()。
参数:
state: 新取消状态。
oldstate: 指向本函数所存储的原取消状态的指针。
pthread_setcanceltype()
#include <pthread.h>
int pthread_setcanceltype( int type, int* oldtype );
描述:
pthread_setcanceltype()函数设置运行线程的取消类型为type,并且返回原取消类型与oldtype。
取消类型值:
PTHREAD_CANCEL_ASYNCHRONOUS:如果取消有效,新的或者是等待的取消请求会立即执行。
PTHREAD_CANCEL_DEFERRED:如果取消有效,在遇到下一个取消点之前,取消请求会保持等待,默认值。
注意:标注POSIX和C库的调用不是异步取消安全地。
参数:
type: 新取消类型
oldtype: 指向该函数所存储的原取消类型的指针。
什么是取消点(cancelation point)?
资料中说,根据POSIX标准,pthread_join()、pthread_testcancel()、 pthread_cond_wait()、pthread_cond_timedwait()、sem_wait()、sigwait()等函数以及 read()、write()等会引起阻塞的系统调用都是Cancelation-point。而其他pthread函数都不会引起 Cancelation动作。但是pthread_cancel的手册页声称,由于LinuxThread库与C库结合得不好,因而目前C库函数都不是 Cancelation-point;但CANCEL信号会使线程从阻塞的系统调用中退出,并置EINTR错误码,因此可以在需要作为 Cancelation-point的系统调用前后调用pthread_testcancel(),从而达到POSIX标准所要求的目标,即如下代码段:
pthread_testcancel();
retcode = read(fd, buffer, length);
pthread_testcancel();
我发现,对于C库函数来说,几乎可以使线程挂起的函数都会响应CANCEL信号,终止线程,包括sleep、delay等延时函数,下面的例子对此会进行详细分析。
实例探讨
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 void cleanup(void *parm) {
6 printf("Inside cancellation cleanup handler\n");
7 }
8 void * child1(void *arg) {
9 int oldstate, oldtype;
10 int i = 0;
11 pthread_cleanup_push(cleanup,NULL);
12 printf("\nPTHREAD_CANCEL_DISABLE before\n");
13 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);
14 // pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);//comment 1
15 while (1) {
16 i++;
17 printf("child1: I am running. \n");
18 sleep(2);
19 if (i == 5) {
20 printf("\nPTHREAD_CANCEL_ENABLE\n");
21 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
22 //pthread_testcancel();//comment 2
23 }
24
25 }
26 pthread_cleanup_pop(0);
27 }
28
29 int main(int argc, char *argv[]) {
30 int tid1, rc;
31 void * status = NULL;
32 printf("hello, condition variable test\n");
33 pthread_create(&tid1, NULL, child1, NULL);
34 sleep(2);
35 printf("\npthread_cancel \n");
36 pthread_cancel(tid1);
37
38 rc = pthread_join(tid1, &status);
39 if (status != PTHREAD_CANCELED) {
40 printf("\npthread_join failed,status=%d,rc=%d\n", status, rc);
41 return -1;
42 } else
43 printf("\npthread_join succ,status=%d,rc=%d\n", status, rc);
44 printf("\nMain completed!~~\n");
45
46 return EXIT_SUCCESS;
47 }
运行结果:
hello, condition variable test
PTHREAD_CANCEL_DISABLE before
child1: I am running.
pthread_cancel
child1: I am running.
child1: I am running.
child1: I am running.
child1: I am running.
PTHREAD_CANCEL_ENABLE
child1: I am running.
Inside cancellation cleanup handler
pthread_join succ,status=-1,rc=0
Main completed!~~
代码分析:
那么,取消点在哪儿呢,也就是哪一句终止的线程呢?是sleep(2),它可以阻塞线程相应了取消信号。
如果你将sleep拿掉,会发现线程会无休止的运行下去,如果将sleep换成pthread_testcancel(),会发现线程也不会终止,这个函数不是有检测取消点,如果存在就取消线程的功用吗?懵了一会儿,索性将child1函数的while(1)内所有打印去掉,发现线程可以终止了。原来,无延时的频繁打印,会使pthread_testcancel()无法响应取消信号,解决方法是要么取消打印,要么加上延时。
最后是第13行:
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);//comment 1
这句的作用是将取消类型设置为PTHREAD_CANCEL_ASYNCHRONOUS,即取消请求会被立即响应,加上这句,那么运行结果会变为:
hello, condition variable test
PTHREAD_CANCEL_DISABLE before
child1: I am running.
pthread_cancel
child1: I am running.
child1: I am running.
child1: I am running.
child1: I am running.
PTHREAD_CANCEL_ENABLE
Inside cancellation cleanup handler
pthread_join succ,status=-1,rc=0
Main completed!~~
由结果看到,比上次的执行结果在【PTHREAD_CANCEL_ENABLE】后少了一次【child1: I am running. 】打印,可以看出,在线程取消设为有效后,直接终止线程,并不进入下一轮循环。
总结:
创建线程
终止线程
线程连接(joining)和分离(detaching)函数:
pthread_join(threadid,status)——等待线程终止
语法:
#include <pthread.h>
int pthread_join(pthread_t thread, void ** value_ptr);
描述:
pthread_join()将挂起调用线程的执行直到目标线程终止,除非目标线程已经终止了。
在一次成功调用pthread_join()并有非NULL的参数value_ptr,传给pthread_exit()终止线程的这个值以value_ptr作为引用是可用的。
当pthread_join()成功返回,目标线程就会终止。
对同一目标线程多次同时调用pthread_join()的结果是不确定的。
如果调用pthread_join()的线程被取消,那么目标线程将不会被分离。
一个线程是否退出或者保持为连接状态不明确会与{PTHREAD_THREADS_MAX}相背。
(It is unspecified whether a thread that has exited but remains unjoined counts against {PTHREAD_THREADS_MAX}.)
返回值
如果执行成功,pthread_join()会返回0;否则,会返回错误代码以表明错误类型。
原理
pthread_join()函数在多线程应用程序中的便捷性是令人满意的。如果没有其他状态作为参数传给start_routine(),程序员可以模拟这个函数。
pthread_detach——分离线程
语法:
#include <pthread.h>
int pthread_detach(pthread_t thread);
描述:
pthread_detach()函数在所指出线程终止时,该线程的内存空间可以被回收。
如果线程没有终止,pthread_detach()函数也不会令其终止。
对同一目标线程多次调用pthread_datach()的结果是不确定的。
返回值:
如果调用成功,pthread_detach()返回0;反之,会返回错误代码表明错误。
原理:
每一个被创建的线程,最终都需要调用pthread_join()函数或者pthread_detach()函数,因为与线程相关联的内存空间需要回收。
我们建议调用分离函数是不必要的;把线程创建时的属性置为分离状态就足够了,因为线程永远不需要动态分离。但是,需要的话仅限于如下两个原因:
连接:
“连接”是线程间完成同步的一种方法,例如:
pthread_join()过程阻塞调用线程,直到被指定的threadid线程终止;
程序员能够获得目标线程终止的返回状态,如果目标线程调用pthread_exit()时指定;
一个连接线程跟一个pthread_join()调用对应。如果对同一个线程进行多次连接会发生逻辑错误;
还有两个同步方法:互斥锁和条件变量。
是否连接?
当一个线程被创建了,它有一个属性表明了它是否可以被连接和分离。只有线程是可连接的(joinable)才可以被连接。如果一个线程作为分离来创建的,那么它永远不能被连接。
POSIX标准的最终方案线程被设计为可连接被创建;
为了明确创建线程的连接性和分离性,pthread_create()函数可使用attr参数。典型的4个步骤:
分离
pthread_detach()函数能够明确的分离一个线程,即使被创建为可连接的。这个过程是不可逆的。
建议
Mutex Variables(互斥量)
Thread 1 | Thread 2 | Balance |
---|---|---|
Read balance: $1000 | $1000 | |
Read balance: $1000 | $1000 | |
Deposit $200 | $1000 | |
Deposit $200 | $1000 | |
Update balance $1000+$200 | $1200 | |
Update balance $1000+$200 | $1200 |
- 创建并初始化互斥量;
- 多个线程试图锁定此互斥量;
- 有且只有一个线程成功的拥有了这个互斥量;
- 拥有互斥量的线程执行了一系列的操作;
- 拥有互斥量的线程解锁互斥量;
- 另一个线程获得此互斥量,并重复上面的过程;
- 最终互斥量被销毁。
创建并销毁互斥量
函数:
pthread_mutex_init (mutex,attr)
pthread_mutex_destroy (mutex)
pthread_mutexattr_init (attr)
pthread_mutexattr_destroy (attr)
用法:
- 静态初始化,例如:pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
- 动态初始化,使用pthread_mutex_init()过程,此函数可以设置互斥量属性对象attr;
注意,互斥量初始状态是未被锁定的(unlocked)。
- Protocol:为互斥量指定用来防止优先级倒置的规则的;
- Prioceiling:为互斥量指定优先级上限的;
- Process-shared:为互斥量指定进程共享的。
注意,不是所有的实现都要提供这三个可选的互斥量属性的。
锁定和解锁互斥量
函数:
pthread_mutex_lock (mutex)
pthread_mutex_trylock (mutex) pthread_mutex_unlock (mutex) |
用法:
- 如果互斥量已经被解锁;
- 如果互斥量被另一个线程拥有。
案例分析
/*****************************************************************************
* 描述:
* 这个例子阐明了互斥变量在线程编程中的用法
* main数据域定义一个全局的可访问结构体,所有线程均可访问,
* 每个线程都为此数据的一部分做运算工作,主线程等待所有线程完成运算,
* 并最后打印出计算结果。
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
/*下面的结构体包含了dotprod函数要处理的所需要的信息,
* 访问输入数据并把输出写入结构体
* */
typedef struct {
double*a;
double*b;
double sum;//ab的和
int veclen;//结构体个数
} DOTDATA;
/*全局变量和一个互斥量*/
#define NUMTHRDS 4
#define VECLEN 100
DOTDATA dotstr;
pthread_t callThd[NUMTHRDS];
pthread_mutex_t mutexsum;
/*dotprod函数在线程创建时被激活,从DOTDATA结构体内取出数据,之后把运算结果又写入到结构体中。
* 这种处理方法的好处很明显是多线程编程:线程创建时,我们给被激活的函数传递了一个参数——参数值是线程的序号。
* 函数需要的其它信息是来自全局的结构体。
* */
void*dotprod(void*arg) {
int i, start, end, len;
long offset;
double mysum, *x, *y;
offset = (long) arg;
len = dotstr.veclen;
start = offset * len;
end = start + len;
x = dotstr.a;
y = dotstr.b;
//计算
mysum =0;
for (i = start; i < end; i++) {
mysum += (x[i] * y[i]);
}
/*锁定互斥量,更改共享结构的值,解锁 */
pthread_mutex_lock(&mutexsum);
dotstr.sum += mysum;
printf("Thread%ld: mysum=%f,dotstr.sum=%f\n",offset,mysum,dotstr.sum);
pthread_mutex_unlock(&mutexsum);
pthread_exit((void*) 0);
}
/*主程序创建线程,线程计算数据,然后打印出结果。在创建线程前,创建输入数据。
* 因为所有线程更改同一个结构体,所以我们需要一个互斥量。
* 主线程需要等待所有线程完成,它等待这些线程中的一个,我们为线程赋予属性,允许主线程连接(join)其它被
* 创建的线程。注意,当不再需要它们是要记得释放它们。
* */
int main(int argc, char*argv[]) {
long i;
double*a, *b;
void*status;
pthread_attr_t attr;
/* 初始化变量 */
a = (double*) malloc(NUMTHRDS * VECLEN *sizeof(double));
b = (double*) malloc(NUMTHRDS * VECLEN *sizeof(double));
for (i =0; i < VECLEN * NUMTHRDS; i++) {
a[i] =1;
b[i] = a[i];
}
dotstr.veclen = VECLEN;
dotstr.a = a;
dotstr.b = b;
dotstr.sum =0;
pthread_mutex_init(&mutexsum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (i =0; i < NUMTHRDS; i++) {
/*创建线程,激活dotpod函数,传入参数线程序号 */
printf("i=%d\n",i);
pthread_create(&callThd[i], &attr, dotprod, (void*) i);
}
pthread_attr_destroy(&attr);
/* 等待其它线程 */
for (i =0; i < NUMTHRDS; i++) {
pthread_join(callThd[i], &status);
}
/* 线程join后,释放内存,销毁互斥量*/
printf("Sum = %f \n", dotstr.sum);
free(a);
free(b);
pthread_mutex_destroy(&mutexsum);
pthread_exit(NULL);
}
1 # Pthreads 2 i=0 3 Thread0: mysum=100.000000,dotstr.sum=100.000000 4 i=1 5 Thread1: mysum=100.000000,dotstr.sum=200.000000 6 i=2 7 Thread2: mysum=100.000000,dotstr.sum=300.000000 8 i=3 9 Thread3: mysum=100.000000,dotstr.sum=400.000000 10 Sum = 400.000000
指针和内存的图像化
主程序申请了两块内存,各分为四块,上图标明了各个指针的指向,程序共创建四个线程,把两块4*100的内存区域分成四块,做连加运算得到mysum,之后再把所有的运算结果加起来赋值于dostr.sum,由于它是多线程共享的全局变量这里得利用互斥变量排队做加法,才能保证该值结果的正常。
条件变量(Condition Variables)
条件变量是什么?
主线程
|
|
线程A
|
Thread B
|
主线程
|
创建和销毁条件变量
函数:
pthread_cond_init (condition,attr)
pthread_cond_destroy (condition) pthread_condattr_init (attr) pthread_condattr_destroy (attr) |
用法:
- 静态初始化,像这样声明:pthread_con_t myconvar = PTHREAD_CON_INITIALIZER;
- 动态初始化,使用pthread_cond_init()函数。用创建条件变量的ID作为件参数传给线程,这种方法允许设置条件变量对象属性attr。
注意,不是所有的实现都用得着process-shared属性。
条件变量的等待和信号发送
函数:
pthread_cond_wait (condition,mutex)
pthread_cond_signal (condition) pthread_cond_broadcast (condition) |
使用:
- 在调用pthread_cond_wait()之前锁定互斥量失败,可致使其无法阻塞;
- 在调用pthread_cond_signal()之后解锁互斥量失败,则致使与之对应的pthread_cond_wait()函数无法完成,并仍保持阻塞状态。
实例分析
看到下面的一汪代码不要挠头,99行而已,之后会抽丝剥茧,目的是对条件变量的运行机制了解个大概:
/******************************************************************************
* 描述:
* 应用Pthreads条件变量的实例代码,主线程创建三个线程,其中两个为“count”变量做
* 加法运算,第三个线程监视“count”的值。当“count”达到一个限定值,等待线程准备接收来
* 自于两个加法线程中一个的信号,等待 线程唤醒后更改“count”的值。程序继续运行直到加法
* 线程达到TCOUNT的值。最后,主程序打印出count的值。
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREADS 3
#define TCOUNT 5 //单线程轮询次数
#define COUNT_LIMIT 7 //发送信号的次数
int count =0; //全局的累加量
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;
void*inc_count(void*t) {
int i;
long my_id = (long) t;
for (i =0; i < TCOUNT; i++) {
pthread_mutex_lock(&count_mutex);
count++;
/*
* 检查count的值,如果条件满足就发信号给等待线程
* 注意,此处是用信号量锁定的。
* */
if (count < COUNT_LIMIT) {
printf("inc_count(): thread %ld, count = %d Threshold reached. ",
my_id, count);
pthread_cond_signal(&count_threshold_cv);
printf("Just sent signal.\n");
}
printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id,
count);
pthread_mutex_unlock(&count_mutex);
/*为线程轮询互斥锁增加延时*/
sleep(1);
}
pthread_exit(NULL);
}
void*watch_count(void*t) {
long my_id = (long) t;
printf("Starting watch_count(): thread %ld\n", my_id);
/*锁定互斥量并等待信号,注意,pthread_cond_wait函数在等待时将自动以自动原子方式
* 解锁互斥量。还有,请注意,如果等待线程运行到等待函数之前已经满足COUNT_LIMIT的
* 条件判断,轮询会忽略掉等待函数,
* */
while (count < COUNT_LIMIT) {
pthread_mutex_lock(&count_mutex);
printf("watch_count(): thread %ld going into wait...\n", my_id);
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received.\n", my_id);
printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
pthread_mutex_unlock(&count_mutex);
}
pthread_exit(NULL);
}
int main(int argc, char*argv[]) {
int i;
long t1 =1, t2 =2, t3 =3;
pthread_t threads[3];
pthread_attr_t attr;
/*初始化互斥量和条件变量对象*/
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&count_threshold_cv, NULL);
/*创建线程时设为可连接状态,便于移植*/
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void*) t1);
pthread_create(&threads[1], &attr, inc_count, (void*) t2);
pthread_create(&threads[2], &attr, inc_count, (void*) t3);
/* 等待所有线程完成*/
for (i =1; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
/*发送信号给监听线程*/
pthread_cond_signal(&count_threshold_cv);
pthread_join(threads[0],NULL);
printf("Main(): Waited on %d threads. Final value of count = %d. Done.\n",
NUM_THREADS, count);
/*清除并退出 */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
pthread_exit(NULL);
}
- 两个线程利用互斥量为count做加法运算,两个线程一起做了(2*TCOUNT=)10次运算;
- count值小于COUNT_LIMIT时,发送信号给监听线程;
# Pthreads
Starting watch_count(): thread 1
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =1 Threshold reached. Just sent signal.
inc_count(): thread 2, count =1, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =1.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count =2 Threshold reached. Just sent signal.
inc_count(): thread 3, count =2, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =2.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =3 Threshold reached. Just sent signal.
inc_count(): thread 2, count =3, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =3.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count =4 Threshold reached. Just sent signal.
inc_count(): thread 3, count =4, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =4.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =5 Threshold reached. Just sent signal.
inc_count(): thread 2, count =5, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =5.
watch_count(): thread 1 going into wait...
inc_count(): thread 3, count =6 Threshold reached. Just sent signal.
inc_count(): thread 3, count =6, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =6.
watch_count(): thread 1 going into wait...
inc_count(): thread 2, count =7, unlocking mutex
inc_count(): thread 3, count =8, unlocking mutex
inc_count(): thread 2, count =9, unlocking mutex
inc_count(): thread 3, count =10, unlocking mutex
watch_count(): thread 1 Condition signal received.
watch_count(): thread 1 count now =10.
Main(): Waited on 3 threads. Final value of count =10. Done.
POSIX Threads Programming:https://computing.llnl.gov/tutorials/pthreads/
原文:http://www.cnblogs.com/linsanshu/p/3913250.html