首页 > 编程语言 > 详细

python 多进程使用总结

时间:2020-04-24 13:49:44      阅读:56      评论:0      收藏:0      [点我收藏+]
 

  python中的多进程主要使用到 multiprocessing 这个库。这个库在使用 multiprocessing.Manager().Queue时会出问题,建议大家升级到高版本python,如2.7.11,可具体参考《python版本升级》。

  python使用线程池可参考《python线程池实现

一、多进程使用

1、linux下可使用 fork 函数

技术分享图片
#!/bin/env python
import os

print Process (%s) start... % os.getpid()
pid = os.fork()
if pid==0:
    print I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid())
    os._exit(1)
else:
    print I (%s) just created a child process (%s). % (os.getpid(), pid)
技术分享图片

输出 

Process (22246) start...
I (22246) just created a child process (22247).
I am child process (22247) and my parent is 22246.

2、使用 multiprocessing

技术分享图片
#!/bin/env python
from multiprocessing import Process
import os
import time

def run_proc(name):
    time.sleep(3)
    print Run child process %s (%s)... % (name, os.getpid())

if __name__==__main__:
    print Parent process %s. % os.getpid()
    processes = list()
    for i in range(5):
        p = Process(target=run_proc, args=(test,))
        print Process will start.
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    print Process end.
技术分享图片

输出

技术分享图片
Parent process 38140.
Process will start.
Process will start.
Process will start.
Process will start.
Process will start.
Run child process test (38141)...
Run child process test (38142)...
Run child process test (38143)...
Run child process test (38145)...
Run child process test (38144)...
Process end.

real    0m3.028s
user    0m0.021s
sys     0m0.004s
技术分享图片

 

二、进程池

1、使用 multiprocessing.Pool 非阻塞

技术分享图片
#!/bin/env python

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(3):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()    # behind close() or terminate()
    print "Sub-process(es) done."
技术分享图片

运行结果

技术分享图片
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
Sub-process(es) done.

real    0m3.493s
user    0m0.056s
sys     0m0.022s
技术分享图片

2、使用 multiprocessing.Pool 阻塞版本

技术分享图片
#!/bin/env python

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(3):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))      

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()    # behind close() or terminate()
    print "Sub-process(es) done."
技术分享图片

运行结果

技术分享图片
msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

real    0m9.061s
user    0m0.036s
sys     0m0.019s
技术分享图片

区别主要是 apply_async和 apply函数,前者是非阻塞的,后者是阻塞。可以看出运行时间相差的倍数正是进程池数量

3、使用 multiprocessing.Pool 并关注结果

技术分享图片
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."
技术分享图片

运行结果

技术分享图片
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

real    0m3.526s
user    0m0.054s
sys     0m0.024s
技术分享图片

4、在类中使用 multiprocessing.Pool

类中使用进程池会一般会出现错误

PicklingError: Cant pickle <type instancemethod>: attribute lookup __builtin__.instancemethod failed

这个提示是因为 multiprocessing.Pool中使用了Queue通信,所有进入队列的数据必须可序列化(picklable),包括自定义类实例等。如下:

技术分享图片
#!/bin/env python

import multiprocessing

class SomeClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)
        #result = pool.apply_async(self.f, [10])     
        #print result.get(timeout=1)           
        print pool.map(self.f, range(10))

SomeClass().go()
技术分享图片

运行提示

技术分享图片
Traceback (most recent call last):
  File "4.py", line 18, in <module>
    SomeClass().go()
  File "4.py", line 16, in go
    print pool.map(self.f, range(10))
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
cPickle.PicklingError: Cant pickle <type instancemethod>: attribute lookup __builtin__.instancemethod failed
技术分享图片

解决如下:(1)

技术分享图片
#!/bin/env python
import multiprocessing

def func(x):
    return x*x

class SomeClass(object):
    def __init__(self,func):
        self.f = func

    def go(self):
        pool = multiprocessing.Pool(processes=4)
        #result = pool.apply_async(self.f, [10])
        #print result.get(timeout=1)
        print pool.map(self.f, range(10))

SomeClass(func).go()
技术分享图片

输出结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

(2)一般情况下我们如果在类中写好了处理逻辑,想要尽可能减少代码变动则可以使用下面方法

技术分享图片
#!/bin/env python

import multiprocessing

class SomeClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        result = list()
        pool = multiprocessing.Pool(processes=4)
        for i in range(10):
            result.append(pool.apply_async(func, [self, i]))
        pool.close()
        pool.join()
        for res in result:
            print res.get(timeout=1)   

def func(client, x):
    return client.f(x)

SomeClass().go()
技术分享图片

输出结果:

技术分享图片
0
1
4
9
16
25
36
49
64
81
技术分享图片

使用(2)的解决方法需要注意,如果SomeClass实例中有包含任何不可序列化的数据则会一直报错,一般是到res.get()报错,这时候你就要重新查看代码是否有不可序列化的变量了。如果有的话可以更改成全局变量解决。

 

三、多进程中使用线程池

有一种情景下需要使用到多进程和多线程:在CPU密集型的情况下一个ip的处理速度是0.04秒前后,单线程运行的时间大概是3m32s,单个CPU使用率100%;使用进程池(size=10)时间大概是6m50s,其中只有1个进程的CPU使用率达到90%,其他均是在30%左右;使用线程池(size=10)时间大概是4m39s,单个CPU使用率100%

可以看出使用多进程在这时候并不占优势,反而更慢。因为进程间的切换消耗了大部分资源和时间,而一个ip只需要0.04秒。而使用线程池由于只能利用单核CPU,则再怎么加大线程数量都没法提升速度,所以这时候应该使用多进程加多线程结合。

技术分享图片
def run(self):
    self.getData()
    ipNums = len(self.ipInfo)
    step = ipNums / multiprocessing.cpu_count()
    ipList = list()
    i = 0
    j = 1
    processList = list()
    for ip in self.ipInfo:
        ipList.append(ip)
        i += 1
        if i == step * j or i == ipNums:
            j += 1
            def innerRun():
                wm = Pool.ThreadPool(CONF.POOL_SIZE)
                for myIp in ipList:
                    wm.addJob(self.handleOne, myIp)
                wm.waitForComplete()
            process = multiprocessing.Process(target=innerRun)
            process.start()
            processList.append(process)
            ipList = list()
    for process in processList:
        process.join()
技术分享图片

机器有8个CPU,则使用8个进程加线程池,速度提升到35s,8个CPU的利用率均在50%左右,机器平均CPU75%左右。

 

四、多进程间通信

个人使用的比较多的是队列和共享内存。需要注意的是队列中Queue.Queue是线程安全的,但并不是进程安全,所以多进程一般使用线程、进程安全的multiprocessing.Queue(),而使用这个Queue如果数据量太大会导致进程莫名卡住(绝壁大坑来的),需要不断地消费。

The Queue class is a near clone of Queue.Queue. For example:
技术分享图片
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, hello])

if __name__ == __main__:
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, ‘hello‘]"
    p.join()
技术分享图片
Queues are thread and process safe.

测试卡住的程序如下:

技术分享图片
#!/bin/env python
from multiprocessing import Process, Queue


class A(object):
    def __init__(self):
        pass
    def r(self):
        def f(q):
            import time
            time.sleep(1)
            s = 2000 * ss‘        # 不卡不卡不卡
            # s = 20000 * ‘ss‘          # 卡住卡住卡住              
            q.put([hello, s])
            print "q.put([‘hello‘, s])"
        q = Queue(maxsize=0)
        pL = list()
        for i in range(10):
            p = Process(target=f, args=(q,))
            p.start()
            pL.append(p)
        for p in pL:
            p.join()
        print len(q.get())


if __name__ == __main__:
    A().r()
技术分享图片

 

共享内存使用的一般是multiprocessing.Manager().Array/list/value/dict等。

其他的通信方式特别是分布式多进程可学习 廖雪峰官方网站 http://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/001386832973658c780d8bfa4c6406f83b2b3097aed5df6000

 

python 多进程使用总结

原文:https://www.cnblogs.com/xiondun/p/12766537.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!