00、进程
所谓同步:就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列
。要么成功都成功,失败都失败,两个任务的状态可以保持一致。 所谓异步:是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了
。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列
。比如我去银行办理业务,可能会有两种方式:
第一种 :选择排队等候;
第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;
第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;
第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。
阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,‘egon‘,)
4 kwargs表示调用对象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18}
5 name为子进程的名称
# 在python进程中开启一个子进程
import time
from multiprocessing import Process
def f(name):
print("hello", name)
print("我是子进程")
if __name__ == ‘__main__‘:
p = Process(target=f, args=("pontoon", ))
p.start()
time.sleep(1)
print("执行主进程的内容了")
>>>hello pontoon
我是子进程
执行主进程的内容了
import os
import time
from multiprocessing import Process
def func():
print(54321)
time.sleep(1)
print("子进程:", os.getpid())
print("子进程的父进程:", os.getppid())
print(12345)
if __name__ == ‘__main__‘:
p = Process(target=func)
p.start()
print("*" * 20)
print("父进程:", os.getpid()) # 查看当前进程的进程号
print("父进程的父进程", os.getppid())
>>>********************
父进程: 2972
父进程的父进程 14060
54321
子进程: 13992
子进程的父进程: 2972
12345
import os
import time
from multiprocessing import Process
def func():
print(54321)
time.sleep(1)
print("子进程:", os.getpid())
print("子进程的父进程:", os.getppid())
print(12345)
if __name__ == ‘__main__‘:
p = Process(target=func)
p.start()
p.join() # 将异步的程序变成同步的
print("*" * 20)
print("进程结束了") # 查看当前进程的进程号
# 代码似乎这样执行才是正常的,这就是join()方法的作用
>>>54321
子进程: 10472
子进程的父进程: 8692
12345
********************
进程结束了
def func(a, b):
print(a)
time.sleep(3)
print(b)
if __name__ == ‘__main__‘:
p = Process(target=func, args=(10, 20))
p.start()
p = Process(target=func, args=(11, 21))
p.start()
p = Process(target=func, args=(12, 22))
p.start()
p = Process(target=func, args=(13, 23))
p.start()
print("*" * 20)
# join()之上的程序是异步的,join()之后的方法变成了同步
p.join()
print("进程结束了") # 查看当前进程的进程号
>>>********************
10
11
12
13
20
21
22
23
进程结束了
import os
import time
from multiprocessing import Process
def func(a, b):
print("*" * a)
time.sleep(3)
print("*" * b)
if __name__ == ‘__main__‘:
for i in range(10):
p = Process(target=func, args=(5, 10))
p.start()
print("*" * 20)
# join()之上的程序是异步的,join()之后的方法变成了同步
p.join()
print("进程结束了") # 查看当前进程的进程号
>>>*****
*****
*****
*****
*****
*****
*****
********************
*****
*****
*****
**********
**********
**********
**********
**********
**********
**********
**********
进程结束了 # 出问题了
**********
**********
import time
from multiprocessing import Process
def func(a, b):
print("*" * a)
time.sleep(6)
print("*" * b)
if __name__ == ‘__main__‘:
p_list = []
for i in range(5):
p = Process(target=func, args=(5, 10))
p_list.append(p)
p.start()
[p.join() for p in p_list]
print("进程结束了") # 查看当前进程的进程号
>>>*****
*****
*****
*****
*****
**********
**********
**********
**********
**********
进程结束了
# 需求想500个文件里面写数据,用异步的方式实现
import os
from multiprocessing import Process
def func(file_name, contents):
with open(file_name, ‘w‘) as f:
f.write(contents * ‘+‘)
if __name__ == ‘__main__‘:
p_list = []
for i in range(5):
p = Process(target=func, args=("info{0}".format(i), i))
p_list.append(p)
p.start()
[p.join() for p in p_list]
print([i for i in os.walk(r‘G:\进线程‘)])
import os
from multiprocessing import Process
# 开启多进程的第二种方式
class MyProcess(Process):
def run(self):
print(os.getpid())
if __name__ == ‘__main__‘:
print("主:", os.getpid())
p1 = MyProcess()
p1.start()
p2 = MyProcess()
p2.start()
>>>主: 7824
12324
14660
# Process 类中的属性一览
class Process(object):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self.name = ‘‘ # 进程名
self.daemon = False #
self.authkey = None #
self.exitcode = None
self.ident = 0
self.pid = 0 # 进程号
self.sentinel = None
self.is_alive = ‘‘ # 判断子进程是否活着
self.terminate = ‘‘ # 结束一个子进程
import os
from multiprocessing import Process
# 开启多进程的第二种方式
class MyProcess(Process):
def __init__(self, args1, args2):
super(MyProcess, self).__init__()
self.arg1 = args1
self.arg2 = args2
def run(self):
print(self.pid)
print(self.name)
print(self.arg1)
print(self.arg2)
if __name__ == ‘__main__‘:
print("主:", os.getpid())
p1 = MyProcess(10, 20)
p1.start()
p2 = MyProcess(11, 22)
p2.start()
>>>主: 6916
9288
MyProcess-1
10
20
7412
MyProcess-2
11
22
from multiprocessing import Process
def work():
global n
n=0
print(‘子进程内: ‘,n)
if __name__ == ‘__main__‘:
n = 100
p=Process(target=work)
p.start()
p.join() # 执行玩子进程之后在执行父进程
print(‘主进程内: ‘,n)
>>>子进程内: 0
主进程内: 100
# 在子进程中设置了全局变量但是在主进程中的n的值并没有发生改变,得出结论:主进程与子进程之间的数据也是隔离的
# server端
from socket import *
from multiprocessing import Process
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)
def talk(conn,client_addr):
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == ‘__main__‘: #windows下start进程一定要写到这下面
while True:
conn,client_addr=server.accept()
p=Process(target=talk,args=(conn,client_addr))
p.start()
# client端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
client.send(msg.encode(‘utf-8‘))
msg=client.recv(1024)
print(msg.decode(‘utf-8‘))
import time
from multiprocessing import Process
# 守护进程
def func():
while 1:
time.sleep(0.5)
print(‘2222‘)
if __name__ == ‘__main__‘:
p = Process(target=func)
# p.deamon = True
p.start()
i = 0
while i < 4:
print(‘11111111‘)
time.sleep(1)
i += 1
# 查看输出的结果
>>>11111111
2222
11111111
2222
2222
11111111
2222
2222
11111111
2222
2222
2222
2222
2222
...
import time
from multiprocessing import Process
# 守护进程
def func():
while 1:
time.sleep(0.5)
print(‘2222‘)
if __name__ == ‘__main__‘:
p = Process(target=func)
p.daemon = True # 在start之前加
p.start()
i = 0
while i < 4:
print(‘11111111‘)
time.sleep(1)
i += 1
>>>11111111
2222
11111111
2222
2222
11111111
2222
2222
11111111
2222
2222
import time
from multiprocessing import Process
def func():
while 1:
time.sleep(0.5)
print(‘2222‘)
if __name__ == ‘__main__‘:
p = Process(target=func)
p.daemon = True # 在start之前加
p.start()
i = 0
while i < 4:
print(‘11111111‘)
time.sleep(1)
i += 1
>>>11111111
2222
11111111
2222
2222
11111111
2222
2222
11111111
2222
2222
#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
#并发运行,效率高,但竞争写同一文件,数据写入错乱
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open(‘db‘))
print(‘\033[43m剩余票数%s\033[0m‘ %dic[‘count‘])
def get():
dic=json.load(open(‘db‘))
time.sleep(0.1) #模拟读数据的网络延迟
if dic[‘count‘] >0:
dic[‘count‘]-=1
time.sleep(0.2) #模拟写数据的网络延迟
json.dump(dic,open(‘db‘,‘w‘))
print(‘\033[43m购票成功\033[0m‘)
def task():
search()
get()
if __name__ == ‘__main__‘:
for i in range(100): #模拟并发100个客户端抢票
p=Process(target=task)
p.start()
# 由并发变成了串行,牺牲了运行效率,但避免了竞争
import random
from multiprocessing import Process,Lock
def func(n, l):
l.acquire()
time.sleep(random.random())
print("为子线程加锁了%s" % n)
l.release()
if __name__ == ‘__main__‘:
l = Lock()
for i in range(3):
p = Process(target=func, args=(i, l))
p.start()
>>>为子线程加锁了0
为子线程加锁了1
为子线程加锁了2
# --------------------------------------------------------------
def func(n):
time.sleep(random.random())
print("为子线程加锁了%s" % n)
if __name__ == ‘__main__‘:
for i in range(3):
p = Process(target=func, args=(i))
p.start()
>>>为子线程加锁了1
为子线程加锁了2
为子线程加锁了0
# 对比两段代码的执行结果——加锁使得子进程变成了‘有序’的状态了
#文件db的内容为:{"count":5}
#注意一定要用双引号,不然json无法识别
#并发运行,效率高,但竞争写同一文件,数据写入错乱
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open(‘db‘))
print(‘\033[43m剩余票数%s\033[0m‘ %dic[‘count‘])
def get():
dic=json.load(open(‘db‘))
time.sleep(random.random()) #模拟读数据的网络延迟
if dic[‘count‘] >0:
dic[‘count‘]-=1
time.sleep(random.random()) #模拟写数据的网络延迟
json.dump(dic,open(‘db‘,‘w‘))
print(‘\033[32m购票成功\033[0m‘)
else:
print(‘\033[31m购票失败\033[0m‘)
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == ‘__main__‘:
lock = Lock()
for i in range(100): #模拟并发100个客户端抢票
p=Process(target=task,args=(lock,))
p.start()
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理
#因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore
# 信号量
def ktv(i, sem):
sem.acquire() # 获取钥匙
print(‘%s走进KTV‘ % i)
time.sleep(random.randint(1, 5))
print(‘%s走出了KTV‘ % i)
sem.release()
if __name__ == ‘__main__‘:
sem = Semaphore(4) # 设置4个锁
for i in range(20):
p = Process(target=ktv, args=(i, sem))
p.start()
>>>0走进KTV
1走进KTV
2走进KTV
3走进KTV
1走出了KTV
4走进KTV
2走出了KTV
6走进KTV
0走出了KTV
...
from multiprocessing import Event
# 一个信号可以使所有的进程都进入阻塞状态,也可以控制所有的进程接触阻塞。
# 一个事件被创建之后默认是阻塞状态
e = Event() # 创建了一个事件
print(e.is_set()) # 查看是否阻塞,默认是阻塞状态的
e.set() # 设置阻塞
print(e.is_set())
e.wait() # 根据e.is_set()的值决定是否阻塞
print("wahaha")
e.clear() # 清除阻塞
print(e.is_set())
e.wait()
print(‘shuangwaiwai‘) # 阻塞了就不会在打印了
from multiprocessing import Event
def cars(e, i):
if not e.is_set():
print(‘car%i在等待‘ % i)
e.wait()
print("car%i通过" % i)
# 红绿灯事件
def light(e):
while True:
if e.is_set():
e.clear()
print("红灯亮了")
else:
e.set()
print(‘绿灯亮了‘)
time.sleep(2)
if __name__ == ‘__main__‘:
e = Event()
traffic = Process(target=light, args=(e, ))
traffic.start()
for i in range(20):
car = Process(target=cars, args=(e, i))
car.start()
time.sleep(random.random())
from multiprocessing import Queue, Process
def produce(q):
q.put("hello") # 向对列中放入数据
def consume(q):
print(q.get()) # 向队列中取数据
if __name__ == ‘__main__‘:
q = Queue()
p = Process(target=produce, args=(q,))
p.start()
c = Process(target=consume, args=(q, ))
c.start()
from multiprocessing import Process, Queue
import random
import time
def consumer(q, name):
# 定义消费者
while True:
food = q.get()
if food is None:
print("%s获取到了一个空" % name)
break
print("%s消费了%s" % (name, food))
time.sleep(random.randint(1, 3))
def produce(name, food, q):
# 定义生产者
for i in range(10):
time.sleep(random.randint(1, 3))
f = "{0}生产了{1}{2}".format(name, food, i)
print(f)
q.put(f)
if __name__ == ‘__main__‘:
q = Queue(10)
p1 = Process(target=produce, args=("lee", "包子", q))
p2 = Process(target=produce, args=("dan", "泔水", q))
c1 = Process(target=consumer, args=(q, ‘pon‘))
c2 = Process(target=consumer, args=(q, ‘toon‘))
p1.start()
p2.start()
c1.start()
c2.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
# 升级版
from multiprocessing import Process, JoinableQueue
import random
import time
def consumer(q, name):
while True:
food = q.get()
print("%s消费了%s" % (name, food))
time.sleep(random.randint(1, 3))
q.task_done() # 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。
def produce(name, food, q):
# 定义生产者
for i in range(10):
time.sleep(random.randint(1, 3))
f = "{0}生产了{1}{2}".format(name, food, i)
print(f)
q.put(f)
q.join() # 当队列中所有的包子都被消费玩了,程序执行完
if __name__ == ‘__main__‘:
q = JoinableQueue(20)
p1 = Process(target=produce, args=("lee", "包子", q))
p2 = Process(target=produce, args=("dan", "泔水", q))
c1 = Process(target=consumer, args=(q, ‘pon‘))
c2 = Process(target=consumer, args=(q, ‘toon‘))
p1.start()
p2.start()
c1.daemon = True # 创建一个守护进程 循环的案例中,随着主进程的结束,子进程也会跟着结束 所以他的作用是判断 主进程是否结束
c2.daemon = True # 创建一个守护进程 循环的案例中,随着主进程的结束,子进程也会跟着结束
c1.start()
c2.start()
p1.join() # join执行完,那么主进程就执行完
p2.join() # join执行完,那么主进程就执行完
# 管道
from multiprocessing import Pipe, Process
def func(conn1, conn2):
conn2.close()
while True:
try:
msg = conn1.recv()
print(msg)
except EOFError:
conn1.close()
break
if __name__ == ‘__main__‘:
conn1, conn2 = Pipe()
Process(target=func, args=(conn1, conn2)).start()
conn1.close()
for i in range(20):
conn2.send("吃了么")
conn2.close()
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
# 进程池
import time
from multiprocessing import Pool, Process
def func(n):
for i in range(5):
print(n+1)
if __name__ == ‘__main__‘:
start = time.time()
pool = Pool(5)
pool.map(func, range(100))
t1 = time.time() - start
start = time.time()
p_List = []
for i in range(100):
p = Process(target=func, args=(i, ))
p_List.append(p)
p.start()
for p in p_List:
p.join()
t2 = time.time() - start
print(t1, t2)
>>>0.22115707397460938 6.861854076385498 # 差距是相当的明显
原文:https://www.cnblogs.com/pontoon/p/10296271.html