# 1、介绍
concurrent.futures模块是用来创建并行的任务,提供了高度封装的异步调用接口
concurent.future这个模块用起来非常方便,它的接口也封装的非常简单,既可以实现进程池,也可以实现线程池
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
两者都实现了同一个接口,这个接口是由抽象Executor类定义的。
# 2、基本方法
submit(fn, *args, **kwargs)
异步提交任务
map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
result(timeout=None)
取得结果
add_done_callback(fn)
回调函数
"""
# 介绍:
ProcessPoolExecutor类是Executor的子类,它使用一个进程池来异步执行调用。ProcessPoolExecutor
使用多处理模块,这允许它避免全局解释器锁,但也意味着只能执行和返回可pickle的对象。
类concurrent.futures。ProcessPoolExecutor (max_workers = None, mp_context =没有)
使用最多max_workers进程池异步执行调用的Executor子类。如果max_workers为None或未给出,则默认值为
机器上的处理器数。如果max_workers小于或等于0,则会引发ValueError。
"""
# 用法:异步执行
from concurrent.futures import ProcessPoolExecutor
from threading import current_thread
import os,time,random
def task(n): # 定一个任务
print(‘%s is runing‘ %os.getpid()) # 任务启动先打印任务的进程pid
# I/O密集型的,一般用线程,用进程开销大耗时长
time.sleep(random.randint(1,3)) # 随机睡1-3秒
return n**2 # 返回值
def handle(futrue): # 处理任务的函数,拿到futrue对象
res = futrue.result() # 拿到返回结果,一个任务运行完就会触发回调函数,所以不会阻塞
print("%s 正在处理结果:%s" %(os.getpid(),res))
time.sleep(2)
if __name__ == ‘__main__‘:
pool = ProcessPoolExecutor(max_workers=4) # 对于进程池如果不写max_works:默认的是cpu的数量是4个
for i in range(19): # 现在开了19个任务,如果是上百个任务,就不能无限开进程,就要考虑控制
pool.submit(task,i).add_done_callback(handle) # 异步的方式提交任务
pool.shutdown(wait=True)
‘‘‘
解析:
pool.submit(task,i)会返回一个futrue对象,这个任务对象可以调出add_done_callback()方法,
叫回调函数,里面就一个参数handle,也就是说每提交一个任务捆绑一个函数,一旦一个任务运行完就会立
马触发这个回调函数的运行,并且会自动的把任务对象当做第一个参数传给回调函数。
在回调函数里处理任务,先拿到结果,一个任务运行完就会触发这个回调函数,所以不会阻塞在原地。打印
一边在运行一边就会有人在处理结果,一边在运行着一边结果正在被处理,这个运行效率并不慢,一直都是
主进程在处理任务,这就是回调函数的概念。
‘‘‘
"""
# 介绍:
ThreadPoolExecutor是Executor的子类,它使用一个线程池来异步执行调用。
类concurrent.futures。ThreadPoolExecutor (max_workers = None, thread_name_prefix = ")
一个Executor子类,使用最多max_workers线程池来异步执行调用。
3.5版本的变化:如果max_workers没有或没有,它将默认为处理器的机器上,乘以5,假设ThreadPoolExecutor通常
用于重叠I / O而不是CPU工作和工人的数量应该为ProcessPoolExecutor高于工人的数量。
3.6新版功能:添加了thread_name_prefix参数,允许用户控制线程。由池创建的工作线程的线程名,以便于调试。
"""
# 用法:
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import os,time,random
def task(n):
print(‘%s is runing‘ %current_thread().name)
time.sleep(random.randint(1,3))
return n**2
def handle(futrue):
res = futrue.result()
print("%s 正在处理结果:%s" %(current_thread().name,res))
time.sleep(2)
if __name__ == ‘__main__‘:
pool = ThreadPoolExecutor(max_workers=10) # 对于线程池如果不写max_works:默认的是cpu的数目*5
for i in range(19): # 同样是19个任务,线程池效率高了
pool.submit(task,i).add_done_callback(handle)
pool.shutdown(wait=True)
yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
Gevent内部会用到greenlet这个模块,这个模块就是多个任务之间来回的切,切走之前把一个任务的状态保留下来,它们的底层都会用到yield,其实就是层层帮我们封装好了。greenlet内部会封装yield,Gevent就是对greenlet进行了进一步的封装,封装后greenlet会帮忙检测I/O,实现遇到I/O切换,这个才是我们所追求的协程
用法:
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的g2=gevent.spawn(func2)
g1.join() 等待g1结束
g2.join() 等待g2结束
或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
import gevent
def eat(name):
print(‘%s eat 1‘ %name) # 1.吃了一口饭
gevent.sleep(2) # 2.原地睡了2秒,相当于模拟遇到I/O了
print(‘%s eat 2‘ %name) # 6.接着打印又回来吃了一口饭
def play(name):
print(‘%s play 1‘ %name) # 3.遇到I/O以后就切到了另外一个任务,玩了一下
gevent.sleep(1) # 4.又遇到I/O了,睡了1秒,它先睡完
print(‘%s play 2‘ %name) # 5.接着又玩了一下,原本应该切到eat 2,但是仍在阻塞中
g1=gevent.spawn(eat,‘egon‘) # spawn提交eat任务,然后提交一个人名。协程1
g2=gevent.spawn(play,name=‘egon‘)# spawn提交playt任务。协程2
g1.join() # 等着协程对象g1结束
g2.join() # 等着协程对象g2结束
#或者gevent.joinall([g1,g2])
print(‘主‘)
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print(‘eat food 1‘)
time.sleep(2)
print(‘eat food 2‘)
def play():
print(‘play 1‘)
time.sleep(1)
print(‘play 2‘)
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print(‘主‘)
"""
单线程下能抗住的并发已经非常非常高了,因为现在接触的软件大部分都是I/O密集型的
其实单线程下完全可以一个任务运行完以后(它真正运行完花的时间是非常短的,大量时间都在做I/O)
可以利用运行一段时间遇到I/O操作了就快速切换另一个任务再运行,在多任务之间快速的切
"""
# 首先导了猴子补丁,打了补丁保证下面所有模块的I/O行为都能监测到
from gevent import monkey;monkey.patch_all()
from socket import * # 然后导了socket模块,准备写套接字
import gevent # 最后导入gevent模块, 用来单线程下实现并发
def server(server_ip,port): # 套接字服务端任务1:建链接
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port)) # 绑定ip和端口
s.listen(5) # 监听
while True:
conn,addr=s.accept() # 等待链接请求
# 每建成一个链接,就提交一个协程对象进行通信,异步提交
gevent.spawn(talk,conn,addr)
def talk(conn,addr): # 套接字服务端任务2:建通信
try:
while True:
res=conn.recv(1024) # 收消息
print(‘client %s:%s msg: %s‘ %(addr[0],addr[1],res))
conn.send(res.upper()) # 回消息,大写回
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == ‘__main__‘:
server(‘127.0.0.1‘,8080) # 把ip和端口传进去
# 注:没必要join在原地等了,因为服务端在启动运行起来后,服务端函数是一个死循环,
# 不会结束,既然主进程不会结束那就不用再等了
"""
整体逻辑:就一个线程server,没有多线程也没有多进程,这个线程每建成一个链接就提交
一个协程对象,gevent会帮你在多个任务之间遇到I/O来回快速的切换,从而实现并发效果
如何证明并发的效果?
服务端启动起来后,同时多个客户端连接过去,如果多个客户端能同时得到结果,并发效果
就实现了
"""
# 可同时开多个客户端(客户端1、客户端2、客户端3)
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))
while True:
client.send("hello".encode(‘utf-8‘)) # 在不停的向服务端发送“hello”
msg=client.recv(1024) # 收消息,在不停的收HELLO
print(msg.decode(‘utf-8‘))
"""
解析:
三个客户端都能同时不停的发消息和收消息,都有并发效果,但服务端没有开多线程,事实上
就是服务端在多个任务之间来回的切换
其实就是给第一个客户端执行一个seed来发送I/O请求,只要seed发出之后运行完就是操作
系统的任务了,seed负责发消息,操作系统负责做I/O。gevent模块会利用你seed的过程
直接切到下一个任务,再切到下下一个任务,一直往下切,给客户端的感觉就是每一个客户端
都能被服务,并发就实现了
"""
原文:https://www.cnblogs.com/gfeng/p/14332501.html