首页 > 编程语言 > 详细

Python 多进程 有简入繁

时间:2020-10-15 22:17:22      阅读:34      评论:0      收藏:0      [点我收藏+]

  在工作中经常要处理大数据,单进程有的时候是不够用的,所以常使用多进程或者进程池,所以今天在这里自己做一个小总结,方便自己以后的查阅,当然能帮去到朋友们就更好了。

 

一、如何开启一个进程?

  • 例子:
from multiprocessing import Process


def main(num):
    while (1):
        print num


if __name__ == __main__:
    num = 1
    m_process = Process(target=main, args=(num,))
    m_process.start()
    m_process.join()

 

  解释:引用python模块multiprocessing ,我们在这里开启一个进程来运行函数main,这里用一个死循环是为了更直观的看到进程开启了,target后面是函数名字args后面的元组是此函数的参数。(注:元组的最后一定要有逗号),由Process创建的实力对象有两个方法,start()是开启进程,join()函数会等待进程中函数完成任务后结束进程。

 

一、如何开启多个进程?

  • 例如:
from multiprocessing import Process


def main(num):
    while (1):
        print num


def func_many_process():
    process_list = []
    for num in range(10):
        m_process = Process(target=main, args=(num,))
        process_list.append(m_process)
    for tmp_process in process_list:
        tmp_process.start()
    for tmp_process in process_list:
        tmp_process.join()


if __name__ == __main__:
    func_many_process()

 

  解释:这是我自己研究的,通多一个list,暂时存储进程的对象,之后将所有的进程打开。如果要处理的特别多可以在三个for循环外再加个循环,遍历所有任务每次处理10个

  例如:

from multiprocessing import Process

NUMBER_OF_PROCESSES = 10


def main(num):
    print num


def func_many_process():
    process_list = []
    num1 = 0
    for num in range(1000):
        num1 += 1
        m_process = Process(target=main, args=(num,))
        process_list.append(m_process)

        if num1 == NUMBER_OF_PROCESSES:
            for tmp_process in process_list:
                tmp_process.start()
            for tmp_process in process_list:
                tmp_process.join()
            process_list = []
            num1 = 0


if __name__ == __main__:
    func_many_process()

 

  到这里大家一定会说,谁会用这么蠢的方法,进程池就可以解决了,当然,那是一定的,接下来我们就探讨一下进程池的问题

  首先我们用一下系统自带的进程是解决一下上述问题:

  例如:

from multiprocessing import Pool

NUMBER_OF_PROCESSES = 10


def main(num):
    print num


def func_many_process():
    tmp_pool = Pool(NUMBER_OF_PROCESSES)
    for num in range(1000):
        tmp_pool.apply_async(main, (num,))
    tmp_pool.close()
    tmp_pool.join()


if __name__ == __main__:
    func_many_process()

  解释:是不是感觉代码一下字清晰明了了许多,因为少了很多逻辑,主要还快了很多,不得不说这是超级棒的。刚开始的时候我也对其爱不释手,但是有一天,我发现了两个非常严峻的问题。

  问题一:这样写的进程无法获取到信号ctrl+c,换句话说我们除了killall在Linux中直接将其杀死没有其他办法,而且在杀死的过程中很容易造成许多僵尸进程。这真的很可怕,因为我就因为这个原因坑了我的队友。

      当然,人都是被逼出来的,我当时第一个想法就是如何才能获取到这个信号呢?经历了九九八十一难,我失败了,最后查到一个权威的说法,这是因为python本身存在的显现,当然这里如果有大神可以做到请教教我

  问题二:貌似在进程池中无法再创建子进程或者进程池。向大神们请教

      例如:

from multiprocessing import Pool
from multiprocessing import Process

NUMBER_OF_PROCESSES = 10

def func(num):
    print num+1000

def main(num):
    print num
    m_process = Process(target=func, args=(num,))
    m_process.start()
    m_process.join()

def func_many_process():
    tmp_pool = Pool(NUMBER_OF_PROCESSES)
    for num in range(10):
        tmp_pool.apply_async(main, (num,))
    tmp_pool.close()
    tmp_pool.join()


if __name__ == __main__:
    func_many_process()

 

  

from multiprocessing import Pool
from multiprocessing import Process

NUMBER_OF_PROCESSES = 10

def func(num):
    print num+1000

def main(num):
    print num
    tmp_pool1 = Pool(NUMBER_OF_PROCESSES)
    for num in range(100):
        tmp_pool1.apply_async(func, (num,))
    tmp_pool1.close()
    tmp_pool1.join()

def func_many_process():
    tmp_pool = Pool(NUMBER_OF_PROCESSES)
    for num in range(10):
        tmp_pool.apply_async(main, (num,))
    tmp_pool.close()
    tmp_pool.join()


if __name__ == __main__:
    func_many_process()

  解释:这两个代码中我都在尝试在main函数中再次开启进程或者进程池,但是都失败了,虽然没有报错,但是我没有拿到我想要的结果。

 

  最后为大家奉上我平时有的多进程框架,是从同事哪里学习而来,用起来很方便,代码也很好理解,我删减了一番,只留下了框架。

import multiprocessing
import traceback

CONSTANT_PROCESS_NUM = 10


class MyProcess(multiprocessing.Process):
    def __init__(self, queue, cond_lock):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.cond_lock = cond_lock
        self.start()

    def run(self):
        while True:
            try:
                if self.queue.qsize() == 0:
                    break
                func, args = self.queue.get(block=False)
                func(args)
                self.queue.task_done()
            except Exception, e:
                if self.queue.empty(): break
                self.queue.task_done()
                print e


class MyProcessPool():
    def __init__(self, queue, cond_lock, size):
        self.queue = queue
        self.pool = []
        for i in range(size):
            self.pool.append(MyProcess(queue, cond_lock))

    def joinAll(self):
        for thd in self.pool:
            if thd.is_alive():  thd.join()


def Fun_Process():
    m_queue = multiprocessing.Manager().Queue()
    cond_lock_obj = multiprocessing.Lock()

    for num in range(1000):
        try:
            m_queue.put((main, num))
        except Exception, e:
            print traceback.print_exc()
            continue
    my_process_pool = MyProcessPool(queue=m_queue, cond_lock=cond_lock_obj, size=CONSTANT_PROCESS_NUM)
    my_process_pool.joinAll()


def main(num):
    print num


if __name__ == __main__:
    Fun_Process()

相关知识点

  进程间通信中的对列:multiprocessing.Manager().Queue()

  文件锁:cond_lock_obj = multiprocessing.Lock()

    用法:

      锁:cond_lock_obj .acquire()

      开:cond_lock_obj .release()

  例如:

cond_lock_obj .acquire()
with open("./1.txt", "a+") as f:
    f.write(num)
cond_lock_obj .release()

解释:这里面利用了共享对列来实现进程池,先将所有的任务投放到队列中,之后开启用一定数量进程的进程池,在不多的获取队列中的任务来干活,这样就避免不断地开关进程导致资源和时间的浪费。

Python 多进程 有简入繁

原文:https://www.cnblogs.com/lijianping/p/13814834.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!