# 可以把信号量理解为一种锁
# 相当于公共厕所,门口挂了5把钥匙,对应五个坑位,代表同时最多可以接纳五个人上厕所
from threading import Thread,Semaphore,current_thread
import time
import random
# 每个线程运行起来都会运行func
def func():
sm.acquire() # 运行func都会来抢这把锁
print(‘%s is wcing‘ %current_thread().getName())
time.sleep(random.randint(1,5)) # 模拟上厕所的时间
sm.release()
if __name__ == ‘__main__‘:
sm = Semaphore(5) # 创建信号量,自定义为5,相当于5把钥匙得到信号量对象
for i in range(23): # for循环了23次,开了23个线程
t = Thread(target=func)
t.start()
"""
ps:互斥锁只能acquire一次,再有人来执行acquire,如果没有释放,下一个来拿的人就只能阻在原地无法拿到acquire。而信号量一把锁可以acquire指定5次(Semaphore(5)),如果第6个来在
acquire的时候就没有了,相当于没有钥匙了,就只能在原地等着,只要5个人里面有人释放后面的人就
可以拿到钥匙
"""
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self) -> None:
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print("%s 抢到了A锁" %self.name)
mutexB.acquire()
print("%s 抢到了B锁" %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print("%s 抢到了B锁" % self.name)
time.sleep(0.1)
mutexA.acquire()
print("%s 抢到了A锁" % self.name)
mutexA.release()
mutexB.release()
if __name__ == ‘__main__‘:
t1 = Mythread("线程1")
t2 = Mythread("线程2")
t3 = Mythread("线程3")
t4 = Mythread("线程4")
t1.start()
t2.start()
t3.start()
t4.start()
print("主线程")
from threading import Thread,RLock,Lock
import time
#一个线程拿到锁,计数加1,该线程内又碰到加锁的情况,则计数继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即计数递减到0为止
mutexA = mutexB = RLock()
class Mythread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self) -> None:
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print("%s 抢到了A锁" %self.name)
mutexB.acquire()
print("%s 抢到了B锁" %self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print("%s 抢到了B锁" % self.name)
time.sleep(0.1)
mutexA.acquire()
print("%s 抢到了A锁" % self.name)
mutexA.release()
mutexB.release()
if __name__ == ‘__main__‘:
t1 = Mythread("线程1")
t2 = Mythread("线程2")
t3 = Mythread("线程3")
t4 = Mythread("线程4")
t1.start()
t2.start()
t3.start()
t4.start()
print("主线程")
Event对象的方法:
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入
就绪状态, 等待操作系统 调度;
event.clear():恢复event的状态值为False。
Event对象的案例:
# 例1:一个线程运行完,另外一个线程才开始运行
from threading import Event,Thread,current_thread
import time
e = Event() # 全局变量 = False
def f1():
print(‘%s 运行‘ %current_thread().name)
time.sleep(3)
e.set() # 全局变量 = True
def f2():
e.wait() # 等到全局变量 变为 True
print(‘%s 运行‘ % current_thread().name)
if __name__ == ‘__main__‘:
t1 = Thread(target=f1)
t2 = Thread(target=f2)
t1.start()
t2.start()
"""
运行结果:
Thread-1 运行
Thread-2 运行
运行过程:
线程t1运行起来之后打印‘Thread-1 运行’,接着睡了3秒,这个时间足够线程t2
运行起来,线程t2起来后运行f2,接着开始等,e.wait()等到全局变量变为 True,
但是默认全局变量是False,线程t2上来后就开始等了,等3秒多一点的时间,等的这
个时间线程t1将全局变量e.set()为True,这个时候线程t2立马感觉到了全局变量为
True接着继续运行起来了打印‘Thread-2 运行’
"""
# 例2:模仿行人过红绿灯场景
from threading import Event,Thread,current_thread
import time
import random
e = Event() # 全局变量 = False
def task1(): # 任务1负责控制红绿灯
while True:
e.clear() # 全局变量 = False
print("红灯亮--->禁止通行")
time.sleep(2) # 红灯亮的时间
e.set() # 全局变量 = True
print(‘绿灯亮--->行人通行‘)
time.sleep(3) # 绿灯亮的时间
def task2(): # 任务2负责控制行人
while True:
if e.is_set(): # 如果ste过了代表路灯亮了
print(‘%s 走你‘ %current_thread().name)
break
else: # 如果没有set代表红灯亮了
print("%s 等灯" %current_thread().name)
e.wait() # 原地等,只要等灯亮了就等过去了,这个时间刚刚好
if __name__ == ‘__main__‘:
Thread(target=task1).start() # 开启红绿灯
while True:
time.sleep(random.randint(1,5)) # 1-5秒产生一个行人
Thread(target=task2).start()
"""
运行结果:
红灯亮--->禁止通行
绿灯亮--->行人通行
Thread-2 走你
红灯亮--->禁止通行
绿灯亮--->行人通行
Thread-3 走你
红灯亮--->禁止通行
Thread-4 等灯
Thread-5 等灯
"""
Timer(interval, function, args=[], kwargs={})
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数
from threading import Timer
def hello(n): # n秒后执行
print("hello, world",n)
# 用timer类传时间 任务 任务参数
t = Timer(3, hello,args=(1111,))
t.start() # 3秒后,将打印“hello, world 1111”
线程queue基本方法:
put 往线程队列里防止,超过队列长度,直接阻塞
get 从队列中取值,如果获取不到,直接阻塞
put_nowait: 如果放入的值超过队列长度,直接报错(linux)
get_nowait: 如果获取的值已经没有了,直接报错
线程queue的用法
import queue
# 队列:先进先出
q = queue.Queue(3) # 指定队列的大小
q.put(111) # 整型
q.put("aaa") # 字符串
q.put((1,2,3)) # 元组
print(q.get())
print(q.get())
print(q.get())
‘‘‘
111
aaa
(1, 2, 3)
‘‘‘
# 堆栈:后进先出
q = queue.LifoQueue(3)
q.put(111)
q.put("aaa")
q.put((1,2,3))
print(q.get())
print(q.get())
print(q.get())
‘‘‘
(1, 2, 3)
aaa
111
‘‘‘
# 优先级队列:
# 1.默认按照数字大小排序,然后会按照ascii编码在从小到大排序
# 2.先写先排,后写后排
q = queue.PriorityQueue(3)
q.put((10,111)) # 第一个值是优先级,第二值才是要放的元素
q.put((11,"aaa"))
q.put((-1,(1,2,3)))
print(q.get())
print(q.get())
print(q.get())
‘‘‘
(-1, (1, 2, 3)) # 数越小优先级越高
(10, 111)
(11, ‘aaa‘)
‘‘‘
GIL既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。
只要在一个进程里就一定有GIL锁的存在,GIL锁不能保证python数据的安全,它保证的是解释器级别(内存管理)的安全,也可以说是背后存在的一种机制。可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图:
from threading import Thread,Lock
import time
mutex = Lock()
n = 100
def task():
global n
mutex.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
mutex.release()
if __name__ == ‘__main__‘:
l = []
for i in range(100):
t = Thread(target=task)
l.append(t)
t.start()
for obj in l:
obj.join()
print(n) # 结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
‘‘‘
分析:
1. 100个线程去抢GIL锁,即抢执行权限
2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会mutex.acquire()
3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁 lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
4. 直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的
线程再重复2 3 4的过程
‘‘‘
有了GIL的存在,同一时刻同一进程中只有一个线程被执行
产生质问:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了,php才是最牛逼的语言?
要解决这个问题,我们需要在几个点上达成一致:
1. cpu到底是用来做计算的,还是用来做I/O的?
2. 多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能
3. 每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处
结论:
对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地
分析:
我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程
单核情况下,分析结果:
如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
多核情况下,分析结果:
如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线
程执行用不上多核,方案一胜
如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多
大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
time.sleep(5)
if __name__ == ‘__main__‘:
l=[]
# print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(4):
# p=Process(target=work) #多进程:耗时20s多
p=Thread(target=work) # 多线程:耗时31s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print(‘run time is %s‘ %(stop-start))
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
if __name__ == ‘__main__‘:
l=[]
# print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(100):
# p=Process(target=work) # 耗时14s多,大部分时间耗费在创建进程上
p=Thread(target=work) # 耗时2s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print(‘run time is %s‘ %(stop-start))
28.并发编程之多线程:信号量、死锁与递归锁、时间Event、定时器Timer、线程队列、GIL锁
原文:https://www.cnblogs.com/gfeng/p/14315492.html