1.信号量
1.Semaphore
2.内部通过锁的原理实现,内置计数器
3.同一时间,只能有指定数量的进程执行某一段被控制的代码
2.事件
1.Event
2.wait阻塞,受到事件状态控制的同步组件
3.is_set
clear() -->false 阻塞
set() -->true 不阻塞
3.队列
1.Queue,JoinableQueue
2.put() 队列满时,阻塞
get() 队列空时,阻塞
full() empty() 不准确
3.task_done() --get()
join() --put()
1.管道
队列又是基于(管道+锁)实现的
2.进程间的数据共享
3.进程池和回调函数
1.为什么会出现进程池?
开启进程,会开启属于这个进程的空间,过多的进程会降低效率
2.进程池
1.python中,启动时创建一个属于进程的池子
这个池子指定能存放多少个进程
1.队列
from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() while True: try: msg = conn1.recv() print(msg) except: conn1.close() break if __name__ == ‘__main__‘: conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,conn2)) p.start() conn1.close() for i in range(10): conn2.send(‘hello!!!‘) conn2.close()
2.生产者消费者,队列,加锁
import time,random from multiprocessing import Process,Pipe,Lock def producer(con,pro,name,food): con.close() for i in range(2): time.sleep(random.randint(1,3)) f = ‘%s生产了%s%s‘%(name,food,i) print(f) pro.send(f) pro.close() def consumer(con,pro,name,lock): pro.close() while True: try: lock.acquire() c = con.recv() lock.release() print(‘%s消费了%s‘%(name,c)) time.sleep(random.randint(0,1)) except EOFError: print(‘%s我来啦‘%name) lock.release() con.close() break if __name__ == ‘__main__‘: con, pro = Pipe() lock = Lock() p = Process(target=producer,args=(con,pro,‘alex‘,‘包子‘)) p.start() c1 = Process(target=consumer,args=(con,pro,‘alex‘,lock)) c1.start() c2 = Process(target=consumer,args=(con,pro,‘小明‘,lock)) c2.start() con.close() pro.close()
3.数据共享
from multiprocessing import Manager,Process,Lock def main(dic,lock): lock.acquire() dic[‘count‘] -= 1 lock.release() if __name__ == ‘__main__‘: lock = Lock() m = Manager() dic = m.dict({‘count‘: 30}) p_lst = [] for i in range(20): p = Process(target=main, args=(dic,lock)) p.start() p_lst.append(p) for i in p_lst: i.join() print(‘main process:‘,dic)
4.进程池效率
import time from multiprocessing import Pool,Process def func(n): for i in range(10): print(n) if __name__ == ‘__main__‘: start = time.time() pool = Pool(5) pool.map(func,range(100)) t1 = time.time() - start start = time.time() p_lst = [] for u in range(100): p = Process(target=func,args=(u,)) p.start() p_lst.append(p) for i in p_lst: i.join() t2 = time.time() - start print(t1,t2)
5.进程池开启方式2
import os import time from multiprocessing import Pool def func(n): print(‘start %s‘% os.getpid()) time.sleep(1) print(‘end %s‘% os.getpid()) if __name__ == ‘__main__‘: p = Pool() for i in range(10): p.apply_async(func,args=(i,)) p.close() p.join()
6.进程池的返回值
import time from multiprocessing import Pool def func(i): time.sleep(1) return i*i if __name__ == ‘__main__‘: p = Pool() ret_l = [] for i in range(10): ret = p.apply_async(func,args=(i,)) ret_l.append(ret) for i in ret_l: print(i.get()) # map import time from multiprocessing import Pool def func(i): time.sleep(1) return i*i if __name__ == ‘__main__‘: p = Pool() ret_l = [] ret = p.map(func,range(10)) # map自带join和close,一次性把所有结果返回 print(map)
7.回调函数
import os from multiprocessing import Pool def func1(i): print(‘func1: ‘, os.getpid()) func2(i) return i*i def func2(i): print(‘func2: ‘, os.getpid()) def func3(i): print(‘func3: ‘, os.getpid()) print(i) if __name__ == ‘__main__‘: print(‘主进程:‘,os.getpid()) p = Pool() p.apply_async(func1,args=(10,),callback=func3) p.close() p.join()
原文:https://www.cnblogs.com/wan2-0/p/10888667.html