多道技术:单核实现并发的效果,即程序一遇到I/O操作就切换CUP使用者
并发:看起来像同时运行的就可以称之为并发
并行:真正意义上的同时进行
空间复用:多个进程复用内存空间
时间复用:多个进程复用CPU时间
进程三状态:当前进程退出运行状态的原因(CPU切换)
1.遇到IO操作时,就要让出CPU让其他进程执行
2.进程占用CPU时间过长,或者优先级更高的进程需要调用
同步和异步:描述的是任务的提交方式
同步:任务提交之后,原地等待任务的返回结果,等待过程中不做任何事情
异步:任务提交之后,不等待任务的返回结果,直接去做其他事情,任务的返回结果会有一个异步回调机制自动处理
阻塞和非阻塞:描述程序的运行状态
阻塞:阻塞态
非阻塞:就绪态和运行态
两组概念组合效率最高的是异步非阻塞
‘‘‘第一种方法‘‘‘
from multiprocessing import Process
import time
def task(name):
print(f‘{name}‘)
time.sleep(3)
print(‘end‘)
‘‘‘
windows操作系统下,创建进程一定要在main内创建
‘‘‘
if __name__ == "__main__":
# 实例化一个进程对象
p = Process(target=task,args=(‘zhao‘,))
# 开启进程
p.start()
p.join() # join方法是让主进程等待子进程p运行结束之后再继续往后执行,其他子进程不受影响
print(‘mmm‘)
‘‘‘第二种方法‘‘‘
from multiprocessing import Process
import time
class MyProcess(Process):
def run(self): # 必须包含一个run方法
print(‘start‘)
time.sleep(1)
print(‘end‘)
if __name__ == "__main__":
p = MyProcess()
p.start()
print(‘end2‘)
# 创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去
# 一个进程对应在内存中就是一块独立的内存空间
# 多个进程对应在内存中就是多块独立的内存空间
# 进程之间数据默认隔离,若要交互需要借助第三方模块
# 容器类型的数据如列表,字典,元组等,最好在最后一个元素的后面也加上逗号
from multiprocessing import Process, current_process
import time
import os
def task():
print(f‘{current_process().pid} is running‘) # 查看当前进程的进程号
print(f‘{os.getppid()} 子进程的父进程id‘)
time.sleep(3)
if __name__ == "__main__":
p = Process(target=task)
p.start()
p.terminate() # 杀死该进程
print(p.is_alive()) # 判断进程是否存活
print(f‘{os.getpid()} is running‘) # 查看当前进程号
print(f‘{os.getppid()} 父进程id‘) # 查看父进程id
# 一般情况下我们会将存储bool值的变量名和返回的结果是bool值的方法名,用is_作为开头
僵尸进程:当某个进程开设子进程后,该进程死后不会立刻释放占用的资源,所有的子进程都会步入僵尸进程
孤儿进程:子进程存活,父进程意外死亡
# 被守护的子进程会随着主进程的结束而结束,不管其是否运行完
from multiprocessing import Process
import time
import os
def task():
print(‘start‘)
time.sleep(3)
print(‘end‘)
if __name__ == "__main__":
p = Process(target=task)
p.daemon = True # 设置守护进程
p.start()
print(‘go‘)
线程互斥锁
# 多个线程在操作同一份数据的时候可能会造成数据的混乱,这个时候为了保证数据的安全我们通常会加锁处理
from threading import Thread, Lock
import time
l = Lock() # 实例化一把锁
money = 100
def task():
global money
l.acquire() # 获取锁
tem = money
time.sleep(0.1)
money = tem - 1
l.release() # 释放锁
if __name__ == ‘__main__‘:
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for i in t_list:
i.join()
print(money)
GIL全局解释器锁
# GIL不是python的特点,而是CPython解释器的特点
# GIL在CPython解释器中是一把互斥锁,会导致同一个进程下的多个线程无法同时执行,即无法利用多核优势
# GIL是保证解释器级别的数据安全
import threading
global_num = 0
lock = threading.Lock()
def test1():
global global_num
lock.acquire()
for i in range(1000000):
global_num += 1
lock.release()
print("test1", global_num)
def test2():
global global_num
lock.acquire()
for i in range(1000000):
global_num += 1
lock.release()
print("test2", global_num)
t1 = threading.Thread(target=test1)
t2 = threading.Thread(target=test2)
t1.start()
t2.start()
# 在Cpython解释器中,当我们的python代码有一个线程开始访问解释器的时候,GIL会把这个大锁给锁上,此时此刻其他的线程只能干等着,无法对解释器的资源进行访问,这一点就跟我们的互斥锁相似。而只是这个过程发生在我们的Cpython中,同时也需要等这个线程分配的时间到了,这个线程把gil释放掉,类似我们互斥锁的lock.release()一样,另外的线程才开始跑起来,说白了,这无疑也是一个单线程。
‘‘‘
执行过程:
1.thread1拿到全局变量count
2.thread1申请到python解释器的gil
3.解释器调用系统原生线程
4.在cpu1上执行规定的时间
5.执行时间到了,要求释放gil等下一次得到gil的时候,程序从这里接着这一次开始执行
6.thread2拿到了全局变量,此时thread1对全局count的操作并未完成,所以thread1拿到的和thread2拿到的count其实是相同的,这样也很好解释为什么结果不是200万 而是少于200万
7.thread2申请到了gil锁
8.调用原生的线程
9.如果是单核cpu则会在cup1上执行,(不是重点),如图在cpu3上执行
10.执行规定的时间,此时完成了对count的加一操作
11.执行时间还未到,线程2执行完对count操作,并给count加一,并且释放了gil锁
12.线程1又申请到了gil锁,重复之前的操作。
13.线程1执行对count的操作,执行时间到,释放gil锁
哪些情况适合用多线程呢:
只要在进行耗时的IO操作的时候,能释放GIL,所以只要在IO密集型的代码里,用多线程就很合适,相比于多进程,多线程更节省资源,时间上面差不了多少
哪些情况适合用多进程呢:
用于计算密集型,比如计算某一个文件夹的大小。
‘‘‘
# 同一个进程内的多个线程数据是共享的
# 进程是资源单位,线程是执行单位
死锁与递归锁
‘‘‘死锁‘‘‘
from threading import Thread, Lock
import time
lock1 = Lock()
lock2 = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
lock1.acquire()
print(f‘{self.name} 获取到1锁‘)
lock2.acquire() # 此时1锁在func1上,并且需要获得2锁
print(f‘{self.name} 获取到2锁‘)
lock2.release()
lock1.release()
def func2(self):
lock2.acquire()
print(f‘{self.name} 获取到2锁‘)
time.sleep(2) # 此时2锁在func2上,并且func2要获取1锁
lock1.acquire()
print(f‘{self.name} 获取到1锁‘)
lock1.release()
lock2.release()
if __name__ == ‘__main__‘:
for i in range(10):
t = MyThread()
t.start()
# 整个程序卡住,产生死锁现象
‘‘‘递归锁‘‘‘
# 可以被连续的acquire和release
# 但是只能被第一个抢到这把锁的执行上述操作
# 它的内部有一个计数器,每acquire一次计数加一,每release一次技术减一
# 只要计数不为0,那么其他人都无法抢到该锁
from threading import Thread, RLock
import time
lock1 = lock2 = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
lock1.acquire()
print(f‘{self.name} 获取到1锁‘)
lock2.acquire()
print(f‘{self.name} 获取到2锁‘)
lock2.release()
lock1.release()
def func2(self):
lock2.acquire()
print(f‘{self.name} 获取到2锁‘)
time.sleep(2)
lock1.acquire()
print(f‘{self.name} 获取到1锁‘)
lock1.release()
lock2.release()
if __name__ == ‘__main__‘:
for i in range(10):
t = MyThread()
t.start()
信号量
# 在并发编程中信号量指的是锁
# 信号量在不同的阶段可能对应不同的技术点
from threading import Thread, Semaphore
import time
import random
sm = Semaphore(5) # 设置五把锁,同时可以运行五个线程获取到锁
def task(name):
sm.acquire()
print(f‘{name}正在吃屎‘)
time.sleep(random.randint(1,5))
sm.release()
if __name__ == ‘__main__‘:
for i in range(20):
t = Thread(target=task,args=(f‘毛芋头{i}号‘, ))
t.start()
Event事件
# 让某些进程/线程,等待另一些进程/线程运行完毕,再运行
from threading import Thread, Event
import time
event = Event() # 创建一个Event事件
def light():
print(‘red‘)
time.sleep(3)
print(‘green‘)
event.set() # 触发事件
def car():
print(‘stop‘)
event.wait() # 等待事件,等待触发事件被触发
print(‘run‘)
if __name__ == ‘__main__‘:
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=car)
t.start()
线程Q
import queue
# 队列 = 管道 + 锁
# 使用队列还是为了数据安全
# 先进先出队列
q = queue.Queue(3)
# 后进先出队列
q1 = queue.LifoQueue()
# 优先级队列
q2 = queue.PriorityQueue(3)
q2.put((10, ‘zhao‘)) # 参数是一个元组第一个数字表示优先级(数字越小优先级越高),第二个是放入的值
进程池与线程池
# 池是用来保证计算机硬件安全的情况下最大限度的利用计算机
# 它降低了程序的运行效率,但是保证了你的程序能够正常运行
‘‘‘线程池/进程池‘‘‘
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# 函数可以传参数数字,如果不传,默认开启 os.cpu_count() * 5 数量的线程(os.cpu_count()数量的进程)
pool = ThreadPoolExecutor(5) # 池子里面开设五个线程
# pool = ProcessPoolExecutor() # 开设进程池
def task(name):
print(f‘{name} start‘)
time.sleep(2)
return f‘{name} end‘
def call_back(n):
print(‘回调函数执行‘, n.result())
for i in range(20):
res = pool.submit(task, ‘zhao‘) # 提交任务,异步非阻塞
res.add_done_callback(call_back) # 绑定回调函数,只要res对象获取了数据,回调函数就立即执行
# 这种提交方式,也可用res = pool.map(task,range(20)),不过这个方法只适合传递简单的参数
# pool.shutdown() # 关闭线程池,等待线程池中所有的线程执行完毕
# res.result() # 获取执行函数返回值,类似join函数,会等待task函数执行结束
# 池中的线程是异步执行的
# 单线程实现并发
# 代码层面上:遇到IO自动切换,能检测的io是十分有限的,比较常见的是时间模块的io和socket模块的io
from gevent import spawn
from gevent import monkey
import time
monkey.patch_all() # 猴子补丁,添加IO类型
def func1():
print(‘func1 start‘)
time.sleep(2)
print(‘func1 end‘)
def func2():
print(‘func2 start‘)
time.sleep(3)
print(‘func2 end‘)
g1 = spawn(func1) # 监控IO,func1为监控对象,可以传参,直接在后面加上,spawn(func1, ‘zhao‘)
g2 = spawn(func2)
g1.join() # 阻塞,直到g1结束
g2.join()
# gevent.joinall([g1,g2])
# 协程是代码层面上的规避IO操作
# 本质就是一条线程,多个任务在一条线程上来回切换,来规避IO操作
import asyncio
async def func(): # async修饰运行的函数
print(‘start‘)
# await可能会发生阻塞的方法
await asyncio.sleep(1)
print(‘end‘)
loop = asyncio.get_event_loop()
# loop.run_until_complete(func())
loop.run_until_complete(asyncio.wait([func(),func()])) # 启动协程
# 进程之间数据隔离
# 进程之间通信(IPC):基于文件,基于网络(memcache,redis,rabbitmq,kafka)
from multiprocessing import Queue,Process
def son(q):
q.put(‘hello‘) # 向队列q里面丢内容
def func(q):
x = q.get() # 从队列q里面取出内容
print(x)
if __name__ == ‘__main__‘:
q = Queue() # 实例化一个队列对象,可携带参数,表示队列大小
p = Process(target=son,args=(q,))
p.start()
p1 = Process(target=func,args=(q,))
p1.start()
p.join() # 让生产者生产完所有的数据
q.put(None) # 给队列最后放上一个None,作为消费者判断结束的标志
# 生产者消费者模型
# 本质就是让生产数据和消费数据的效率达到平衡并且最大化效率
# put和get的速度比
原文:https://www.cnblogs.com/zhaoxulu/p/14722972.html