在工作中经常要处理大数据,单进程有的时候是不够用的,所以常使用多进程或者进程池,所以今天在这里自己做一个小总结,方便自己以后的查阅,当然能帮去到朋友们就更好了。
一、如何开启一个进程?
from multiprocessing import Process def main(num): while (1): print num if __name__ == ‘__main__‘: num = 1 m_process = Process(target=main, args=(num,)) m_process.start() m_process.join()
解释:引用python模块multiprocessing ,我们在这里开启一个进程来运行函数main,这里用一个死循环是为了更直观的看到进程开启了,target后面是函数名字,args后面的元组是此函数的参数。(注:元组的最后一定要有逗号),由Process创建的实力对象有两个方法,start()是开启进程,join()函数会等待进程中函数完成任务后结束进程。
一、如何开启多个进程?
from multiprocessing import Process def main(num): while (1): print num def func_many_process(): process_list = [] for num in range(10): m_process = Process(target=main, args=(num,)) process_list.append(m_process) for tmp_process in process_list: tmp_process.start() for tmp_process in process_list: tmp_process.join() if __name__ == ‘__main__‘: func_many_process()
解释:这是我自己研究的,通多一个list,暂时存储进程的对象,之后将所有的进程打开。如果要处理的特别多可以在三个for循环外再加个循环,遍历所有任务每次处理10个
例如:
from multiprocessing import Process NUMBER_OF_PROCESSES = 10 def main(num): print num def func_many_process(): process_list = [] num1 = 0 for num in range(1000): num1 += 1 m_process = Process(target=main, args=(num,)) process_list.append(m_process) if num1 == NUMBER_OF_PROCESSES: for tmp_process in process_list: tmp_process.start() for tmp_process in process_list: tmp_process.join() process_list = [] num1 = 0 if __name__ == ‘__main__‘: func_many_process()
到这里大家一定会说,谁会用这么蠢的方法,进程池就可以解决了,当然,那是一定的,接下来我们就探讨一下进程池的问题
首先我们用一下系统自带的进程是解决一下上述问题:
例如:
from multiprocessing import Pool NUMBER_OF_PROCESSES = 10 def main(num): print num def func_many_process(): tmp_pool = Pool(NUMBER_OF_PROCESSES) for num in range(1000): tmp_pool.apply_async(main, (num,)) tmp_pool.close() tmp_pool.join() if __name__ == ‘__main__‘: func_many_process()
解释:是不是感觉代码一下字清晰明了了许多,因为少了很多逻辑,主要还快了很多,不得不说这是超级棒的。刚开始的时候我也对其爱不释手,但是有一天,我发现了两个非常严峻的问题。
问题一:这样写的进程无法获取到信号ctrl+c,换句话说我们除了killall在Linux中直接将其杀死没有其他办法,而且在杀死的过程中很容易造成许多僵尸进程。这真的很可怕,因为我就因为这个原因坑了我的队友。
当然,人都是被逼出来的,我当时第一个想法就是如何才能获取到这个信号呢?经历了九九八十一难,我失败了,最后查到一个权威的说法,这是因为python本身存在的显现,当然这里如果有大神可以做到,请教教我。
问题二:貌似在进程池中无法再创建子进程或者进程池。向大神们请教。
例如:
from multiprocessing import Pool from multiprocessing import Process NUMBER_OF_PROCESSES = 10 def func(num): print num+1000 def main(num): print num m_process = Process(target=func, args=(num,)) m_process.start() m_process.join() def func_many_process(): tmp_pool = Pool(NUMBER_OF_PROCESSES) for num in range(10): tmp_pool.apply_async(main, (num,)) tmp_pool.close() tmp_pool.join() if __name__ == ‘__main__‘: func_many_process()
from multiprocessing import Pool from multiprocessing import Process NUMBER_OF_PROCESSES = 10 def func(num): print num+1000 def main(num): print num tmp_pool1 = Pool(NUMBER_OF_PROCESSES) for num in range(100): tmp_pool1.apply_async(func, (num,)) tmp_pool1.close() tmp_pool1.join() def func_many_process(): tmp_pool = Pool(NUMBER_OF_PROCESSES) for num in range(10): tmp_pool.apply_async(main, (num,)) tmp_pool.close() tmp_pool.join() if __name__ == ‘__main__‘: func_many_process()
解释:这两个代码中我都在尝试在main函数中再次开启进程或者进程池,但是都失败了,虽然没有报错,但是我没有拿到我想要的结果。
最后为大家奉上我平时有的多进程框架,是从同事哪里学习而来,用起来很方便,代码也很好理解,我删减了一番,只留下了框架。
import multiprocessing import traceback CONSTANT_PROCESS_NUM = 10 class MyProcess(multiprocessing.Process): def __init__(self, queue, cond_lock): multiprocessing.Process.__init__(self) self.queue = queue self.cond_lock = cond_lock self.start() def run(self): while True: try: if self.queue.qsize() == 0: break func, args = self.queue.get(block=False) func(args) self.queue.task_done() except Exception, e: if self.queue.empty(): break self.queue.task_done() print e class MyProcessPool(): def __init__(self, queue, cond_lock, size): self.queue = queue self.pool = [] for i in range(size): self.pool.append(MyProcess(queue, cond_lock)) def joinAll(self): for thd in self.pool: if thd.is_alive(): thd.join() def Fun_Process(): m_queue = multiprocessing.Manager().Queue() cond_lock_obj = multiprocessing.Lock() for num in range(1000): try: m_queue.put((main, num)) except Exception, e: print traceback.print_exc() continue my_process_pool = MyProcessPool(queue=m_queue, cond_lock=cond_lock_obj, size=CONSTANT_PROCESS_NUM) my_process_pool.joinAll() def main(num): print num if __name__ == ‘__main__‘: Fun_Process()
相关知识点
进程间通信中的对列:multiprocessing.Manager().Queue()
文件锁:cond_lock_obj = multiprocessing.Lock()
用法:
锁:cond_lock_obj .acquire()
开:cond_lock_obj .release()
例如:
cond_lock_obj .acquire() with open("./1.txt", "a+") as f: f.write(num) cond_lock_obj .release()
解释:这里面利用了共享对列来实现进程池,先将所有的任务投放到队列中,之后开启用一定数量进程的进程池,在不多的获取队列中的任务来干活,这样就避免不断地开关进程导致资源和时间的浪费。
原文:https://www.cnblogs.com/lijianping/p/13814834.html