程序中运行中表现的状态:阻塞 运行、就绪(非阻塞)
阻塞:程序遇到IO阻塞,立刻停止(挂起),cpu马上切换,等到IO结束之后再执行
非阻塞:程序没有IO阻塞或者遇到IO阻塞通过某种手段让cpu去执行其他的任务,尽可能地占用cpu
同步,异步站在任务发布的角度:
同步:任务发出去之后,直到这个任务执行结束,再继续
from concurrent.futures import ProcessPoolExecutor
import os
import time
def task():
print(f'{os.getpid()} is running')
time.sleep(1)
return f'{os.getpid()} is finish'
if __name__ == '__main__':
p = ProcessPoolExecutor(4)
for i in range(10):
obj = p.submit(task,)
print(obj.result())
异步:所有的任务同时发出,继续执行下一步
from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def task():
print(f'{os.getpid()} is running')
time.sleep(random.randint(0,2))
return f'{os.getpid()} is finish'
if __name__ == '__main__':
p = ProcessPoolExecutor(4)
obj_l1 = []
for i in range(10):
obj = p.submit(task,) # 异步发出.
obj_l1.append(obj)
p.shutdown(wait=True)
# 1. 阻止在向进程池投放新任务
# 2. wait = True 十个任务是10,一个任务完成了-1,直至为零,再进行下一步
for i in obj_l1:
print(i.result())
异步回收任务的方式一: 我将所有的任务的结果统一收回
以爬虫为例:
爬虫:利用requests模块功能模拟浏览器封装头,给服务器发送一个请求,‘骗过’服务器之后,服务器返回一个文件,爬虫拿到文件后进行数据清洗获取到想要的信息
1.爬取服务器的文件(IO阻塞)
2.拿到文件,进行数据分析(非IO,IO极少)
版本一:
import requests
from concurrent.futures import ProcessPoolExecutor
import time
import random
import os
def get(url):
response = requests.get(url)
print(f'{os.getpid()} 正在爬取:{url}')
time.sleep(random.randint(1,3))
if response.status_code == 200:
return response.text
def parse(text):
print(f'{os.getpid()} 分析结果:{len(text)}')
if __name__ == '__main__':
url_list = [
'http://www.taobao.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.baidu.com',
'http://www.sina.com.cn',
'https://www.sohu.com',
'https://www.youku.com',
]
pool = ProcessPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(get, url)
obj_list.append(obj)
pool.shutdown(wait=True)
for obj in obj_list:
parse(obj.result())
问题:
1.分析的结果是串行,效率低
2.将所有的结果全部都爬取成功之后,先放在一个列表中再分析
版本二:将分析这一步加入到发布的任务中
import requests
from concurrent.futures import ProcessPoolExecutor
import time
import random
import os
def get(url):
response = requests.get(url)
print(f'{os.getpid()} 正在爬取:{url}')
time.sleep(random.randint(1,3))
if response.status_code == 200:
parse(response.text)
def parse(text):
print(f'{os.getpid()} 分析结果:{len(text)}')
if __name__ == '__main__':
url_list = [
'http://www.taobao.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.baidu.com',
'http://www.sina.com.cn',
'https://www.sohu.com',
'https://www.youku.com',
]
pool = ProcessPoolExecutor(4)
for url in url_list:
obj = pool.submit(get, url)
# pool.shutdown(wait=True)
print('主')
问题:两个函数的偶合性过强
版本三:通过回调函数对其解藕
import requests
from concurrent.futures import ProcessPoolExecutor
import time
import os
import random
def get(url):
response = requests.get(url)
print(f'{os.getpid()} 正在爬取:{url}')
# time.sleep(random.randint(1,3))
if response.status_code == 200:
return response.text
def parse(obj):
time.sleep(1)
print(f'{os.getpid()} 分析结果:{len(obj.result())}')
if __name__ == '__main__':
url_list = [
'http://www.taobao.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.baidu.com',
'http://www.sina.com.cn',
'https://www.sohu.com',
'https://www.youku.com',
]
start_time = time.time()
pool = ProcessPoolExecutor(4)
for url in url_list:
obj = pool.submit(get, url)
obj.add_done_callback(parse) # 增加一个回调函数,只存在一个所以是串行,并且存在于主进程中
# 进程完成的网络爬取的任务,拿到了返回值之后,结果丢给回调函数add_done_callback,回调函数分析结果
pool.shutdown(wait=True)
print(f'主: {time.time() - start_time}')
回调函数进行分析,明确了进程的任务只有网络爬取
极值情况:如果回调函数是IO任务(大多数情况下不是),由于回调函数存在于主进程中又是串行,所以极有可能影响效率,此时异步+回调机制是不好的,如要保证效率,只能牺牲开销再开一个线程进程池
多个任务下,多进程多线程处理IO任务的方法:
1.剩下的任务:非IO阻塞或阻塞很小 => 异步+回调机制
2.剩下的任务:IO阻塞>=多个任务的IO => 版本二或者异步+回调机制基础上再开线程进程池
import queue
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put('坤坤')
# q.put(666)
print(q.get())
print(q.get())
print(q.get()) # 用法基本与进程Queue相同
LIFO栈:先进后出
import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put('坤坤')
print(q.get())
print(q.get())
print(q.get())
优先级队列:需要元组的形式 (int,数据) int代表优先级,数字越低,优先级越高
import queue
q = queue.PriorityQueue(3)
q.put((10, '垃圾消息'))
q.put((-9, '紧急消息'))
q.put((3, '一般消息'))
print(q.get())
print(q.get())
print(q.get())
并发地执行某个任务,一个线程在执行时通知另一个线程开始执行、
版本一(未用Event)
import time
from threading import Thread
from threading import current_thread
flag = False
def task():
print(f'{current_thread().name} 检测服务器是否正常开启....')
time.sleep(3)
global flag
flag = True
def task1():
while 1:
time.sleep(1)
print(f'{current_thread().name} 正在尝试连接服务器.....')
if flag:
print('连接成功')
return
if __name__ == '__main__':
t1 = Thread(target=task1,)
t2 = Thread(target=task1,)
t3 = Thread(target=task1,)
t = Thread(target=task)
t.start()
t1.start()
t2.start()
t3.start()
版本二(运用Event)
import time
from threading import Thread
from threading import current_thread
from threading import Event
event = Event() # 默认是False
def task():
print(f'{current_thread().name} 检测服务器是否正常开启....')
time.sleep(3)
event.set() # 改成了True
def task1():
print(f'{current_thread().name} 正在尝试连接服务器')
event.wait() # 轮循检测event是否为True,当其为True,继续下一行代码(阻塞)
# event.wait(1)
# 设置超时时间,如果1s中以内,event改成True,代码继续执行
# 设置超时时间,如果超过1s,event没做改变,代码继续执行
print(f'{current_thread().name} 连接成功')
if __name__ == '__main__':
t1 = Thread(target=task1,)
t2 = Thread(target=task1,)
t3 = Thread(target=task1,)
t = Thread(target=task)
t.start()
t1.start()
t2.start()
t3.start()
并发的本质:遇到IO阻塞或计算密集型执行世界过长,cpu切换;保持原来状态
多进程:操作系统控制 多个进程的多个任务切换 保持状态
多线程:操作系统控制 多个线程的多个任务切换 保持状态
协程:程序控制 一个线程的多个任务切换 保持状态(微并发,处理任务不宜过多)
协程会调度cpu,如果协程管控的任务中,遇到阻塞,它会快速的(比操作系统快)切换到另一个任务,并且能将上一个任务挂起(保持状态),让操作系统以为cpu一直在工作
yield就是一个协程,虽然可以实现两个任务来回切换,并且能够保存原来的状态,而且还是一个线程,但是其只能遇到yield切换,遇到Io还是阻塞
import gevent
import time
def eat(name):
print('%s eat 1' %name) # 1
gevent.sleep(2)
# time.sleep(300)
print('%s eat 2' %name) #4
def play(name):
print('%s play 1' %name) # 2
gevent.sleep(1)
# time.sleep(3)
print('%s play 2' %name) #3
g1 = gevent.spawn(eat, 'ikun')
g2 = gevent.spawn(play, '坤坤')
g1.join()
g2.join() # 防止线程发布完任务直接执行结束
# gevent.joinall([g1,g2])
print('主')
# 遇到IO阻塞不切换
解决方式:
import threading
from gevent import monkey
monkey.patch_all() # 将代码中的所有的IO都标识
import gevent
import time
def eat():
print(f'线程1:{threading.current_thread().getName()}')
print('eat food 1')
time.sleep(3) # 加上mokey就能够识别到time模块的sleep了
print('eat food 2')
def play():
print(f'线程2:{threading.current_thread().getName()}')
print('play 1')
time.sleep(1) # 来回切换,直到一个I/O的时间结束
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print(f'主:{threading.current_thread().getName()}')
协程的优点:
1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2.单线程被就可以实现并发地效果,最大限度地利用cpu
3.修改共享数据不需加锁
原文:https://www.cnblogs.com/wxl1025/p/11253130.html