17.进程线程
from multiprocessing import Process
import time
import os
def test(sub_p):
for j in range(20):
print(‘%s----pid=%s‘ % (sub_p, os.getpid()))
time.sleep(1)
for i in range(3):
p = Process(target=test, name=‘sub_%s‘ % i, args=(i,))
print(‘sub process %s‘ % p.name)
p.start()
p.join(5) # join表示延时时间,也就是等待子进程的时间,当10秒过了以后,则会运行主进程。
for i in range(60):
print(‘----‘)
time.sleep(1)
from multiprocessing import Process
import time
import os
class ProcessClass(Process):
def __init__(self, sub_p):
super(ProcessClass, self).__init__()
self.sub_p = sub_p
def run(self): # 重写run方法,当调用start方法时,则会默认调用run方法,所以不用再填写target参数。
for j in range(20):
print(‘%s----pid=%s‘ % (self.sub_p, os.getpid()))
time.sleep(1)
for i in range(3):
p = ProcessClass(i)
p.start()
p.join(5) # 这里将会等待子进程单独运行5秒。
for i in range(60): # 主进程,当join等待结束收,则会父子进程一起运行。但是如果当父进程运行完,子进程还没有结束,那么父进程会继续等子进程。
print(‘--main--‘)
time.sleep(1)
方法 | 描述 |
apply() | 以同步方式添加进程 |
apply_async() | 以异步方式添加进程 |
close() | 关闭Pool,使其不接受新任务(还可以使用) |
terminate() | 不管任务是否完成,立即终止 |
join() | 主进程阻塞,等待子进程的退出,必须在close和terminate后使用 |
from multiprocessing import Pool # 导入Pool模块类
import os
import time
def work(num):
print(‘进程的pid是%d,进程值是%d‘ % (os.getpid(), num))
time.sleep(2)
p = Pool(2) # 实例化对象,参数2表示创建2个子进程,就是说每次只能执行2个进程。
for i in range(6):
print(‘--%d--‘ % i)
# 向实例对象添加6次任务,就是6个进程,但是实例对象的进程池只有2个,所以每次往进程池中放2个进程,
# 当进程池中的2个进程执行完以后,再允许向进程池中添加进程。
p.apply_async(work, (i,))
p.close() # 关闭进程池,不再接收进程任务。
p.join() # 当子进程工作结束后,则会运行主进程。
方法 | 描述 |
put | q.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) |
put_nowait | q.put_nowati(数据),放入数据(如队列已满,则不等待队列取出后再放入,直接报错) |
get | q.get(数据),取出数据(如队列为空,阻塞等待队列放入数据后再取出) |
get_nowait | q.get_nowait(数据),取出数据(如队列为空,不等待队列取出之前数据,直接报错),放入数据后立马判断是否为空有时为True,原因是放入值和判断同时进行 |
qsize | q.qsize() 消息数量 |
empty | q.empty() 返回True或False,判断是否为空 |
full | q.full() 返回值为True或False,判断是否为满 |
from multiprocessing import Process, Pipe
def f(conn):
conn.send([1, ‘test‘, None])
conn.send([2, ‘test‘, None])
print(‘child get: %s‘ % conn.recv()) # 没数据时读阻塞
conn.close()
if __name__ == "__main__":
# Pipe(duplex=True)返回管道的两端,duplex=True时双向管道;False时单向parent_conn只读,child_conn只写
parent_conn, child_conn = Pipe(duplex=True)
p = Process(target=f, args=(child_conn,))
p.start()
res = parent_conn.recv()
print(‘parent get: %s, type=%s‘ % (res, type(res)))
print(‘parent get: %s‘ % parent_conn.recv())
parent_conn.send(‘father test‘)
p.join()
value
+
array, Manager
Python中进程间共享数据,处理基本的queue,pipe和value
+
array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
Manager支持的类型有
list
,
dict
,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
from multiprocessing import Manager, Process, Lock
def work(d, lock):
with lock: # 不加锁而操作共享的数据,肯定会出现数据错乱
d[‘count‘] -= 1
if __name__ == ‘__main__‘:
lock = Lock()
with Manager() as m:
dic = m.dict({‘count‘: 100})
p_l = []
for i in range(100):
p = Process(target=work, args=(dic, lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
# encoding=utf-8
‘‘‘
进程间通信
‘‘‘
from multiprocessing.managers import BaseManager
from multiprocessing import RLock
MANAGER_PORT = 6000
MANAGER_DOMAIN = ‘0.0.0.0‘
MANAGER_AUTH_KEY = ‘aaaaaaaaaaaaaaa‘
#定义一个Manager类
class InfoManager(BaseManager): pass
class DictItem():
def __init__(self, ):
self.items = dict()
def set(self, key, value):
self.items[key] = value
def get(self, key):
return self.items.get(key)
def __setitem__(self, key, value):
self.set(key, value)
#为这个manager类注册存储容器,也就是通过这个manager类实现的共享的变量,
#这个变量最好是一个类实例,自己定义的或者python自动的类的实例都可以
#这里不能把d改成dict(),因为Client那边执行d[‘keyi‘]=‘value‘的时候会报错:d这个变量不能修改
d = DictItem()
lock = RLock()
InfoManager.register(‘dict‘, callable=lambda: d)
InfoManager.register(‘open_qq_login_lock‘, callable=lambda: lock)
class ManagerServer():
‘‘‘
multiprocess Manager服务类
‘‘‘
def __init__(self, domain, port, auth_key):
self.domain = domain
self.port = port
self.auth_key = auth_key
def start_manager_server(self):
self.queue_manager = InfoManager(address=(‘‘, self.port), authkey=self.auth_key)
# self.dict = self.queue_manager.dict()
self.server = self.queue_manager.get_server()
def run(self):
self.start_manager_server()
self.server.serve_forever()
def stop(self):
self.server.shutdown()
self.is_stop = 1
class ManagerClient():
‘‘‘
访问mutiprocess Manager的类
‘‘‘
def __init__(self, domain, port, auth_key):
self.domain = domain
self.port = port
self.auth_key = auth_key
# self.get_share_dict()
self.info_manager = InfoManager(address=(self.domain, self.port), authkey=self.auth_key)
self.info_manager.connect()
def get_dict(self):
# self.dict = m.dict()
self.dict = self.info_manager.dict()
return self.dict
def get_open_qq_login_lock(self):
self.open_qq_login_lock = self.info_manager.open_qq_login_lock()
return self.open_qq_login_lock
if __name__ == ‘__main__‘:
pass
import manager
def run():
manager_server = manager.ManagerServer(manager.MANAGER_DOMAIN, manager.MANAGER_PORT, manager.MANAGER_AUTH_KEY)
manager_server.run()
if __name__ == ‘__main__‘:
run()
# 进程间共享变量
manager_client = manager.ManagerClient(manager.MANAGER_DOMAIN, manager.MANAGER_PORT, manager.MANAGER_AUTH_KEY)
share_dict = manager_client.get_dict()
open_qq_login_lock = manager_client.get_open_qq_login_lock()
注意:
使用Manager可以方便的进行多进程数据共享,但当使用Manager处理list、dict等可变数据类型时,需要非常注意一个陷阱。看下面的代码:
from multiprocessing import Process, Manager
manager = Manager()
m = manager.list()
m.append({‘id‘:1})
def test():
m[0][‘id‘] = 2
p = Process(target=test)
p.start()
p.join()
print(m[0])
{‘id‘: 1},
不是预期的:{‘id‘: 2}要达到预期的结果,代码应改为:
from multiprocessing import Process, Manager
manager = Manager()
m = manager.list()
m.append({‘id‘:1})
def test():
hack = m[0]
hack[‘id‘] = 2
m[0] = hack
p = Process(target=test)
p.start()
p.join()
print(m[0])
__setitem__
方法来让它获得通知 原文:https://www.cnblogs.com/aaron-agu/p/10872752.html