1 multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
import multiprocessing import os def run_proc(name): print(‘Child process {0} {1} Running ‘.format(name, os.getpid())) if __name__ == ‘__main__‘: print(‘Parent process {0} is Running‘.format(os.getpid())) for i in range(5): p = multiprocessing.Process(target=run_proc, args=(str(i),)) print(‘process start‘) p.start() p.join() print(‘Process close‘)
结果:
Parent process 5408 is Running
process start
process start
process start
process start
process start
Child process 0 1044 Running
Child process 1 1120 Running
Child process 3 10824 Running
Child process 2 9292 Running
Child process 4 10528 Running
pool=Pool(numprocess,initializer,initargs)
numproxess:需要创建的进程个数,如果忽略将使用cpu_count()的值。即系统上的CPU数量。
initializer:每个进程启动时都要调用的对象。
initargs:为initalizer传递的参数。
apply_async(要调用的方法,参数列表,关键字参数列表):使用非阻塞方式调用指定方法,并行执行(同时执行)
apply(要调用的方法,参数列表,关键字参数列表):使用阻塞方式调用指定方法,,阻塞方式就是要等上一个进程退出后,下一个进程才开始运行。
close():关闭进程池,不再接受进的进程请求,但已经接受的进程还是会继续执行。
terminate():不管程任务是否完成,立即结束。
join():主进程堵塞(就是不执行join下面的语句),直到子进程结束,注意,该方法必须在close或terminate之后使用。
pool.map(func,iterable,chunksize):将可调用对象func应用给iterable的每一项,然后以列表形式返回结果,
通过将iterable划分为多块,并分配给工作进程,可以并行执行。chunksize指定每块中的项数,
如果数据量较大,可以增大chunksize的值来提升性能。
pool.map_async(func,iterable,chunksize,callback):与map方法不同之处是返回结果是异步的,
如果callback指定,当结果可用时,结果会调用callback。
pool.imap(func,iterable,chunksize):与map()方法的不同之处是返回迭代器而非列表。
pool.imap_unordered(func,iterable,chunksize):与imap()不同之处是:结果的顺序是根据从工作进程接收到的时间而定的。
pool.get(timeout):如果没有设置timeout,将会一直等待结果,
如果设置了timeout,超过timeout将引发multiprocessing.TimeoutError异常。
pool.ready():如果调用完成,返回True
pool.successful():如果调用完成并且没有引发异常,返回True,如果在结果就绪之前调用,将引发AssertionError异常。
pool.wait(timeout):等待结果变为可用,timeout为等待时间。
import multiprocessing import os import time def run_task(name): print(‘Task {0} pid {1} is running, parent id is {2}‘.format(name, os.getpid(), os.getppid())) time.sleep(1) print(‘Task {0} end.‘.format(name)) if __name__ == ‘__main__‘: print(‘current process {0}‘.format(os.getpid())) #设定池内进程数 p = multiprocessing.Pool(processes=3) for i in range(6): p.apply_async(run_task, args=(i,)) print(‘Waiting for all subprocesses done...‘) p.close() p.join() print(‘All processes done!‘)
current process 562 Task 0 pid 778 is running, parent id is 562 Task 1 pid 779 is running, parent id is 562 Task 2 pid 780 is running, parent id is 562 Waiting for all subprocesses done... Task 1 end. Task 0 end. Task 2 end. Task 3 pid 779 is running, parent id is 562 Task 4 pid 778 is running, parent id is 562 Task 5 pid 780 is running, parent id is 562 Task 4 end. Task 5 end. Task 3 end. All processes done!
apply(func[,args[,kwds]]))
import multiprocessing import os import time def run_task(name): print(‘Task {0} pid {1} is running, parent id is {2}‘.format(name, os.getpid(), os.getppid())) time.sleep(1) print(‘Task {0} end.‘.format(name)) if __name__ == ‘__main__‘: print(‘current process {0}‘.format(os.getpid())) p = multiprocessing.Pool(processes=3) for i in range(6): p.apply(run_task, args=(i,)) print(‘Waiting for all subprocesses done...‘) p.close() p.join() print(‘All processes done!‘)
current process 562 Task 0 pid 785 is running, parent id is 562 Task 0 end. Task 1 pid 786 is running, parent id is 562 Task 1 end. Task 2 pid 787 is running, parent id is 562 Task 2 end. Task 3 pid 785 is running, parent id is 562 Task 3 end. Task 4 pid 786 is running, parent id is 562 Task 4 end. Task 5 pid 787 is running, parent id is 562 Task 5 end. Waiting for all subprocesses done... All processes done!
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def proc_write(q,urls): print(‘Process(%s) is writing...‘ % os.getpid()) for url in urls: q.put(url) print(‘Put %s to queue...‘ % url) time.sleep(random.random()) # 读数据进程执行的代码: def proc_read(q): print(‘Process(%s) is reading...‘ % os.getpid()) while True: url = q.get(True) print(‘Get %s from queue.‘ % url) if __name__==‘__main__‘: # 父进程创建Queue,并传给各个子进程: q = Queue() proc_writer1 = Process(target=proc_write, args=(q,[‘url_1‘, ‘url_2‘, ‘url_3‘])) proc_writer2 = Process(target=proc_write, args=(q,[‘url_4‘,‘url_5‘,‘url_6‘])) proc_reader = Process(target=proc_read, args=(q,)) # 启动子进程proc_writer,写入: proc_writer1.start() proc_writer2.start() # 启动子进程proc_reader,读取: proc_reader.start() # 等待proc_writer结束: proc_writer1.join() proc_writer2.join() # proc_reader进程里是死循环,无法等待其结束,只能强行终止: proc_reader.terminate()
multiprocessing.Pipe([duplex])
from multiprocessing import Process, Pipe def send(pipe): pipe.send([‘spam‘,42, ‘egg‘]) # send 传输一个列表 pipe.close() if __name__ == ‘__main__‘: (con1, con2) = Pipe() # 创建两个 Pipe 实例 sender = Process(target=send, args=(con1, )) # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),) sender.start() # Process 类启动进程 print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 从send收到消息 con2.close()
结果
con2 got: [‘spam‘, 42, ‘egg‘]
from multiprocessing import Process, Pipe def talk(pipe): pipe.send(dict(name=‘Bob‘, spam=42)) # 传输一个字典 reply = pipe.recv() # 接收传输的数据 print(‘talker got:‘, reply) if __name__ == ‘__main__‘: (parentEnd, childEnd) = Pipe() # 创建两个 Pipe() 实例,也可以改成 conf1, conf2 child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程,名称为 child child.start() # 启动进程 print(‘parent got:‘, parentEnd.recv()) # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据 parentEnd.send({x * 2 for x in ‘spam‘}) # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据 child.join() # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply print(‘parent exit‘)
结果:
parent got: {‘name‘: ‘Bob‘, ‘spam‘: 42} talker got: {‘pp‘, ‘mm‘, ‘aa‘, ‘ss‘} parent exit
原文:https://www.cnblogs.com/IMWU/p/10855709.html