#1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 #shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果 #add_done_callback(fn) 回调函数
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def func(i): print(‘thread‘, i) time.sleep(1) print(‘thread %s end‘ % i) # 线程池,提供异步调用 tp = ThreadPoolExecutor(5) tp.submit(func, 1) tp.shutdown() print(‘主线程‘) # 进程池,提供异步调用 if __name__ == ‘__main__‘: tp = ProcessPoolExecutor(5) tp.submit(func, 1) tp.shutdown() print(‘主线程‘)
import time from concurrent.futures import ThreadPoolExecutor from threading import current_thread def func(i): print(‘thread‘, i, current_thread().ident) time.sleep(1) print(‘thread %s end‘ % i) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func, i) tp.shutdown() print(‘主线程‘) # 获取返回值 import time from concurrent.futures import ThreadPoolExecutor from threading import current_thread def func(i): print(‘thread‘, i, current_thread().ident) time.sleep(1) print(‘thread %s end‘ % i) return i * ‘*‘ tp = ThreadPoolExecutor(5) ret_l = [] for i in range(20): ret = tp.submit(func, i) ret_l.append(ret) for ret in ret_l: print(ret.result()) print(‘主线程‘)
# map的用法 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(‘thread‘, i) time.sleep(1) print(‘thread %s end‘ % i) return i * ‘*‘ tp = ThreadPoolExecutor(5) res = tp.map(func, range(20)) for i in res: print(i)
# 回调函数 import time import os from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import current_thread def func(i): print(‘thread‘, i, current_thread().ident, os.getpid()) time.sleep(1) print(‘thread %s end‘ % i) return i * ‘*‘ def call_back(arg): print(‘call back : ‘, current_thread().ident, os.getpid()) print(‘ret : ‘, arg.result())
# 线程池的回调函数是由子线程完成的 tp = ThreadPoolExecutor(5) ret_l = [] for i in range(20): tp.submit(func, i).add_done_callback(call_back) print(‘主线程‘, current_thread().ident, os.getpid())
# 进程池的回调函数是由主进程完成的 if __name__ == ‘__main__‘: tp = ProcessPoolExecutor(5) ret_l = [] for i in range(20): tp.submit(func, i).add_done_callback(call_back) print(‘主线程‘, current_thread().ident, os.getpid())
# 1. yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级 # 2. send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
# 串行执行 import time def consumer(res): ‘‘‘任务1:接收数据,处理数据‘‘‘ pass def producer(): ‘‘‘任务2:生产数据‘‘‘ res=[] for i in range(10000000): res.append(i) return res start=time.time() # 串行执行 res=producer() consumer(res) # 写成consumer(producer())会降低执行效率 stop=time.time() print(stop-start) # 1.5536692142486572 #基于yield并发执行 import time def consumer(): ‘‘‘任务1:接收数据,处理数据‘‘‘ while True: x=yield def producer(): ‘‘‘任务2:生产数据‘‘‘ g=consumer() next(g) for i in range(10000000): g.send(i) start=time.time() # 基于yield保存状态,实现两个任务直接来回切换,即并发的效果 # PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的. producer() stop=time.time() print(stop-start) #2.0272178649902344
import time def consumer(): ‘‘‘任务1:接收数据,处理数据‘‘‘ while True: x=yield def producer(): ‘‘‘任务2:生产数据‘‘‘ g=consumer() next(g) for i in range(10000000): g.send(i) time.sleep(2) start=time.time() producer() # 并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行 stop=time.time() print(stop-start)
# 1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。 # 2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行) #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级 #2. 单线程内就可以实现并发的效果,最大限度地利用cpu
#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程 #2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
安装 :pip3 install greenlet
# greenlet实现状态切换 import time from greenlet import greenlet def eat(): print(‘eating 1‘) g2.switch() time.sleep(1) print(‘eating 2‘) def play(): print(‘playing 1‘) time.sleep(1) print(‘playing 2‘) g1.switch() g1 = greenlet(eat) g2 = greenlet(play) g1.switch()
# 顺序执行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print(‘run time is %s‘ %(stop-start)) #10.985628366470337 # 切换 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print(‘run time is %s‘ %(stop-start)) # 52.763017892837524
安装:pip3 install gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
g1=gevent.spawn(func,1,,2,3,x=4,y=5) # 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的 g2=gevent.spawn(func2) g1.join() #等待g1结束 g2.join() #等待g2结束 #或者上述两步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值