Python虚拟机使用GIL(Global Interpreter Lock,全局解释器锁定)来互斥线程对共享资源的访问,暂时无法利用多处理器的优势。虽然python解释器可以“运行”多个线程,但在任意时刻,不管有多少的处理器,任何时候都总是只有一个线程在执行。对于I/O密集型任务,使用线程一般是没有问题的,而对于涉及大量CPU计算的应用程序而言,使用线程来细分工作没有任何好处,用户最好使用子进程和消息传递。
python的threading模块提供Thread类和各种同步原语,用于编写多线程的程序。
2.1. Thread(target=None,name=None,args=(),kwargs={})
此函数创建一个Thread实例。target是一个可调用函数,线程启动时,run()方法将调用此对象。name是线程的名称,默认是‘Thread-N‘的格式。args是传递给target函数的参数元组,kwargs是传递给target函数的关键字参数的字典。
Thread实例t支持以下方法和属性:
创建线程有两种方法:
1
2
3
4
5
6
7
8
9 |
import
threading import
time def
clock(nsec): whhile True : print
‘Now is %s‘ % time.ctime() time.sleep(nsec) t = threading.Thread(target = clock,args = ( 5 ,)) t.daemon = True
#设置为守护线程 t.start() |
2. 从Thread派生出一个子类,然后创建一个子类的实例
1
2
3
4
5
6
7
8
9
10
11
12
13 |
import
threading import
time class
ClockThread(threading.Thread): def
__init__( self ,nsec): threading.Thread.__init__( self ) self .daemon = True
#设置为守护线程 self .nsec = nsec def
run(): while
True : print
‘Now is s%‘ % time.ctime() time.sleep( self .nsec) t = ClockThread( 5 ) t.start() |
后一种方法比较python一点。
由于线程会无限循环,所以设置daemon为True,这样当进程结束时,线程也将被销毁。
例如有个数数程序,一个线程从1数到9,另一个线程从a数到j,每个线程都要耗费9s,如果要顺序执行的话需耗费18s。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 |
import
threading import
time class
CountThread(threading.Thread): def
__init__( self ,func,name): threading.Thread.__init__( self ) self .name = str (name) self .func = func def
run( self ): apply ( self .func) def
numcount(): print
threading.currentThread().name, ‘start at : ‘ ,time.ctime() for
i in
range ( 10 ): print
i time.sleep( 1 ) print
threading.currentThread().name, ‘done at : ‘ ,time.ctime() def
alphacount(): print
threading.currentThread().name, ‘start at : ‘ ,time.ctime() for
i in
range ( 97 , 107 ): print
chr (i) time.sleep( 1 ) print
threading.currentThread().getName(), ‘done at : ‘ ,time.ctime() def
main(): funclist = [numcount,alphacount] threads = [] for
i in
funclist: t = CountThread(i,i.__name__) threads.append(t) for
t in
threads: t.start() for
t in
threads: t.join() print
‘All done at :‘ ,time.ctime() if
__name__ = = ‘__main__‘ : main() |
结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 |
numcount start at : Fri Feb 07
12 : 19 : 28 alphacount start at : Fri Feb 07
12 : 19 : 28
2014 a0 b1 2c 3d 4 e 5f 6g 7 h 8 i 9j alphacount numcount done at : done at : Fri Feb 07
12 : 19 : 38
2014 Fri Feb 07
12 : 19 : 38
2014 All
done at : Fri Feb 07
12 : 19 : 38
2014 |
10s就完成了。
举一个更清晰的看t.join()作用的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 |
import
threading import
time def
join(): print
‘in Threadjoin‘ time.sleep( 1 ) print
‘out Threadjoin‘ Threadjoin = threading.Thread(target = join,name = ‘Threadjoin‘ ) def
context(Threadjoin): print
‘in Threadcontext‘ Threadjoin.start() Threadjoin.join() #Threadjoin线程开始阻塞,等待Threadjoin完成 print
‘out Threadcontext‘ Threadcontext = threading.Thread(target = context,name = ‘Threadcontext‘ ,args = (Threadjoin,)) Threadcontext.start() |
结果:
1
2
3
4
5 |
>>> in
Threadcontext in
Threadjoin out Threadjoin out Threadcontext |
线程运行在创建它的进程内部,共享所有的数据和资源,但都有自己独立的栈和堆。编写并发编程的难点在于同步和访问共享数据。多个任务同时更新一个数据结构可能导致数据损坏和程序状态不一致(也就是竞争条件)。要解决这个问题,必须找出程序的关键代码段,并使用互斥锁和其它类似的同步手段保护他们。
2.2.1 Lock
原语锁定(互斥锁定)是一个同步原语,状态是"已锁定"或"未锁定"之一。两个方法acquire()和release()用于修改锁定的状态。如果有多个线程在等待获取锁定,当锁定释放时,只有一个线程能获得它。
构造方法:
Lock() :创建新的Lock对象,初始状态为未锁定
实例方法:
Lock.acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
成功获取锁定返回True,无法获取锁定返回False。
Lock.release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
Python多线程分块读取大文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 |
import
threading import
os seekposition = 0 blocksize = 1000000 filesize = 0 def
getFilesize(filename): f = open (filename) f.seek( 0 ,os.SSEK_END) filesize = f.tell() f.close() return
filesize def
parsefile(filename): global
seekposition,filesize f = open (filename) while
True : lock.acquire() #seekposition是线程共享的,修改时需要锁定 startposition = seekposition endposition = (startposition + blocksize) if
(startposition + blocksize)<filesize else
filesize seekposition = endposition lock.release() if
startposition = = filesize: break elif
startposition> 0 : f.seek(startposition) f.readline() #分成的block第一行可能不是完整的一行,略掉不处理,而是作为上一个block的最后一行处理 position = f.tell() outfile = open ( str (endposition) + ‘.txt‘ , ‘w‘ ) while
position< = endposition: line = f.readline() outfile.write(line) position = f.tell() outfile.close() f.close() def
main(filename): global
seekposition,filesize filesize = getFilesize(filename) lock = threading.Lock() threads = [] for
i in
range ( 4 ): t = threading.Thread(target = parsefile,args = (filename,)) threads.append(t) for
t in
threads: t.start() for
t in
threads: t.join() if
__name__ = = ‘__main__‘ : filename = ‘‘ main(filename) |
2.2.2 RLock
多重锁定是一个类似于Lock对象的同步原语,但同一个线程可以多次获取它。这允许拥有锁定的线程执行嵌套的acquire()和release()操作。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 |
import
threading import
time rlock = threading.RLock() count = 0 class
MyThread(threading.Thread): def
__init__( self ): threading.Thread.__init__( self ) def
run( self ): global
count if
rlock.acquire(): count + = 1 print
‘%s set count : %d‘ % ( self .name,count) time.sleep( 1 ) if
rlock.acquire(): count + = 1 print
‘%s set count : %d‘ % ( self .name,count) time.sleep( 1 ) rlock.release() rlock.release() if
__name__ = = ‘__main__‘ : for
i in
range ( 5 ): t = MyThread() t.start() |
2.2.3 信号量Semaphore
信号量是一个基于计数器的同步原语,调用acquire()方法时此计数器减1,调用release()方法时此计数器加1.如果计数器为0,acquire()方法会被阻塞,直到其他线程调用release()为止。
下面是一个说明信号量的好例子,引用自http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 |
import
threading import
time semaphore =
threading.Semaphore( 2 ) # 计数器初值为2 def
func(): # 请求Semaphore,成功后计数器-1;计数器为0时阻塞 print
‘%s acquire semaphore...‘ %
threading.currentThread().getName() if
semaphore.acquire(): print
‘%s get semaphore‘ %
threading.currentThread().getName() time.sleep( 4 ) # 释放Semaphore,计数器+1 print
‘%s release semaphore‘ %
threading.currentThread().getName() semaphore.release() t1 =
threading.Thread(target = func) t2 =
threading.Thread(target = func) t3 =
threading.Thread(target = func) t4 =
threading.Thread(target = func) t1.start() t2.start() t3.start() t4.start() time.sleep( 2 ) # 没有获得semaphore的主线程也可以调用release # 若使用BoundedSemaphore,t4释放semaphore时将抛出异常 print
‘MainThread release semaphore without acquire‘ semaphore.release() |
2.2.4 Condition
条件变量是构建在另一个锁定上的同步原语。典型的用法是生产者-使用者问题,其中一个线程生产的数据供另一个线程使用。
构造方法:
Condition([lock/rlock])
实例方法:
acquire([timeout])/release(): 调用关联的锁的相应方法。
wait([timeout]):
调用这个方法将使线程进入Condition的等待池等待通知,并释放锁,直到另一个线程在条件变量上执行notify()或notify_all()方法将其唤醒为止。使用前线程必须已获得锁定,否则将抛出异常。
notify():
调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notify_all(): 唤醒所有等待此条件的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 |
import
threading cv = threading.Condition() alist = [] def
producer(): global
alist cv.acquire() for
i in
range ( 10 ): alist.append(i) cv.notify() cv.release() def
consumer(): cv.acquire() while
alist is
None : cv.wait() cv.release() print
alist tproducer =
threading.Thread(target = producer) tconsumer =
threading.Thread(target = consumer) tconsumer.start() tproducer.start() |
2.3 local()
返回local对象,用于保存线程的数据,管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。 可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 |
import
threading mydata = threading.local() mydata.number = 42 mydata.color = ‘red‘ print
mydata.__dict__ log = [] def
foo(): items = mydata.__dict__.items() #在此线程中mydata属性字典为空,无number与color属性 items.sort() log.append(items) mydata.number = 11 log.append(mydata.number) t = threading.Thread(target = foo) t.start() t.join() print
log print
mydata.number #仍为42 |
尽管在Python中可以使用各种锁定和同步原语的组合编写非常传统的多线程程序,但有一种更优的编程方式——即将多线程程序组织为多个独立任务的集合,这些线程之间通过消息队列进行通讯。Queue模块可以用于线程间通讯,让各个线程共享数据。
构造方法:
Queue():创建一个FIFO队列
LifoQueue():创建一个LIFO栈
实例方法:
q.put(item):将item放入队列
q.get():从队列中删除一项,然后返回这个项目
q.task_done():队列中数据的使用者用来指示对于项目的处理已结束。从队列中删除的每一项都应该调用一次。
q.join():阻塞直到队列中的所有项目均被处理为止。
python核心编程中有关于多线程编程和Queue结合使用的思路:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 |
import
threading import
Queue q = Queue.Queue() class
MyThread(threading.Thread): def
__init__( self ): threading.Thread.__init__( self ) self .daemon = True def
run( self ): while
True : item = q.get() print
threading.current_thread().name, ‘get‘ ,item q.task_done() for
i in
range ( 4 ): t = MyThread() t.start() for
i in
range ( 100 ): q.put(i) q.join() |
下面选自《Python参考手册》
线程没有任何方法可用于强制终止或挂起。由于在设计上,如果某个线程获取了锁定,在它释放之前强制终止线程,将导致整个应用程序出现死锁。
可以自己构建终止功能:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 |
import
threading class
MyThread(threading.Thread): def
__init__( self ): threading.Thread.__init__( self ) self ._terminate = False
#设置终止标志 self .lock = threading.Lock() def
terminal( self ): self ._terminal = True def
acquire( self ): self .lock.acquire() def
release( self ): self .lock.release() def
run( self ): while
True : if
self ._terminal: #标志为True,则终止线程 break self .lock.acquire() statements self .lock.release() statements |
也可以利用Queue传递终止信号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 |
import
threading import
Queue class
MyThread(threading.Thread): def
__init__( self ): threading.Thread.__init__( self ) self .queue = Queue.Queue() def
send( self ,item): self .queue.put(item) def
close( self ): self .queue.put( None ) self .queue.join() def
run( self ): while
True : item = self .queue.get() if
item is
None : break print
item self .queue.task_done() self .queue.task_done() t = MyThread() t.start() t.send( ‘hello‘ ) t.send( ‘world‘ ) t.close() |
原文:http://www.cnblogs.com/linxiyue/p/3538913.html