共享内存有两个结构,一个是 Value
, 一个是 Array
,这两个结构内部都实现了锁机制,因此是多进程安全的。
Value 和 Array 都需要设置其中存放值的类型,d 是 double 类型,i 是 int 类型,具体的对应关系在Python 标准库的 sharedctypes 模块中查看。
1 from multiprocessing import Value, Array, Process 2 3 def woker(arr): 4 for i in range(len(arr)): 5 arr[i] = -arr[i] 6 7 if __name__ == ‘__main__‘: 8 arr = Array(‘i‘, [x for x in range(10)]) 9 print(arr) 10 print(arr[:]) 11 p = Process(target=woker, args=(arr,)) 12 p.start() 13 p.join() # 等子进程先执行完,否则两次print结果相同 14 print(arr[:])
<SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_10 object at 0x102c76510>> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
上面的共享内存支持两种结构 Value 和 Array。 Python 中还有一个强大的Manager,专门用来做数据共享。
其支持的类型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array。
一个 Manager 对象是一个服务进程,推荐多进程程序中,数据共享就用一个 manager 管理。
1 from multiprocessing import Manager, Process 2 3 def worker(dt, lt): 4 for i in range(10): 5 dt[i] = i*i 6 lt += [x for x in range(11, 20)] 7 8 if __name__ == ‘__main__‘: 9 manager = Manager() 10 dt = manager.dict() 11 lt = manager.list() 12 p = Process(target=worker, args=(dt, lt)) 13 p.start() 14 p.join(timeout=3) 15 print(type(dt)) 16 print(dt) 17 print(type(lt)) 18 print(lt)
<class ‘multiprocessing.managers.DictProxy‘> {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81} <class ‘multiprocessing.managers.ListProxy‘> [11, 12, 13, 14, 15, 16, 17, 18, 19]
如果有50个任务要执行, 但是 CPU 只有4核, 你可以创建50个进程来做这个事情。但是大可不必,徒增管理开销。如果你只想创建4个进程,让他们轮流替你完成任务,不用自己去管理具体的进程的创建销毁,那 Pool 是非常有用的。
Pool 是进程池,进程池能够管理一定的进程,当有空闲进程时,则利用空闲进程完成任务,直到所有任务完成为止。
Pool 进程池创建4个进程,不管有没有任务,都一直在进程池中等候,等到有数据的时候就开始执行。
Pool 的 API 列表如下:
注意:在调用pool.join()之前,必须先调用pool.close(),否则会出错。执行完close()后不会有新的进程加入到pool中,join()会等待所有子进程结束。
pool.apply_async 非阻塞(异步执行),定义的进程池最大进程数可同时执行
pool.apply 阻塞,一个进程结束后释放回池,下一个进程才开始执行
1 import multiprocessing 2 import time 3 4 5 def fun(msg): 6 print("#########start#### {0}".format(msg)) 7 time.sleep(3) 8 print("#########end###### {0}".format(msg)) 9 10 11 if __name__ == ‘__main__‘: 12 print("start main") 13 pool = multiprocessing.Pool(processes=3) 14 for i in range(1, 7): 15 msg = "hello {0}".format(i) 16 # pool.apply_async(fun, (msg,))# 执行时间 6s+ 17 pool.apply(fun, (msg,)) #执行时间 6*3=18+ 18 pool.close()#在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool 19 pool.join()#join 是等待所有的子进程结束 20 print("end main")
start main #########start#### hello 1 #########end###### hello 1 #########start#### hello 2 #########end###### hello 2 #########start#### hello 3 #########end###### hello 3 #########start#### hello 4 #########end###### hello 4 #########start#### hello 5 #########end###### hello 5 #########start#### hello 6 #########end###### hello 6 end main
1 import multiprocessing 2 import time 3 4 5 def fun(msg): 6 print("#########start#### {0}".format(msg)) 7 time.sleep(3) 8 print("#########end###### {0}".format(msg)) 9 10 11 if __name__ == ‘__main__‘: 12 print("start main") 13 pool = multiprocessing.Pool(processes=3) 14 for i in range(1, 7): 15 msg = "hello {0}".format(i) 16 pool.apply_async(fun, (msg,))# 执行时间 6s+ 17 # pool.apply(fun, (msg,)) #执行时间 6*3=18+ 18 pool.close()#在调用join之前,要先调用close,否则会报错,close执行完不会有新的进程加入到pool 19 pool.join()#join 是等待所有的子进程结束 20 print("end main")
start main #########start#### hello 1 #########start#### hello 2 #########start#### hello 3 #########end###### hello 1 #########end###### hello 2 #########end###### hello 3 #########start#### hello 4 #########start#### hello 5 #########start#### hello 6 #########end###### hello 5 #########end###### hello 6 #########end###### hello 4 end main
多线程实现方式有两种:
方法一:将要执行的方法作为参数传给Thread的构造方法(和多进程类似)
t = threading.Thread(target=func, args=(i,))
方法二:从Thread()继承,并重写run()
线程 < 进程,1个父进程中含多个线程。
多线程和多进程的不同之处在于:多线程本身是可以和父进程共享内存的。这也是为什么其中一个线程挂掉会导致其他线程也死掉的道理。
1 import threading 2 import time 3 4 def worker(args): 5 print("开始子进程 {0}".format(args)) 6 time.sleep(args) 7 print("结束子进程 {0}".format(args)) 8 9 if __name__ == ‘__main__‘: 10 11 print("start main") 12 t1 = threading.Thread(target=worker, args=(1,)) 13 t2 = threading.Thread(target=worker, args=(2,)) 14 t1.start() 15 t2.start() 16 t1.join() 17 t2.join() 18 print("end main")
start main 开始子进程 1 开始子进程 2 结束子进程 1 结束子进程 2 end main
1 import threading 2 import time 3 4 class Hello(threading.Thread): 5 def __init__(self, args): 6 super(Hello, self).__init__() 7 self.args = args 8 9 def run(self): 10 print("开始子进程 {0}".format(self.args)) 11 time.sleep(1) 12 print("结束子进程 {0}".format(self.args)) 13 14 if __name__ == ‘__main__‘: 15 16 a = 1 17 print("start main") 18 t1 = Hello(1) 19 t2 = Hello(2) 20 t1.start() 21 t2.start() 22 print("end main")
start main 开始子进程 1 开始子进程 2 end main 结束子进程 1 结束子进程 2
1 import threading 2 import time 3 4 class Hello(threading.Thread): 5 def __init__(self, args): 6 super(Hello, self).__init__() 7 self.args = args 8 global a 9 print("a = {0}".format(a)) 10 a += 1 11 12 def run(self): 13 print("开始子进程 {0}".format(self.args)) 14 print("结束子进程 {0}".format(self.args)) 15 16 if __name__ == ‘__main__‘: 17 a = 1 18 print("start main") 19 t1 = Hello(5) 20 time.sleep(3) 21 t2 = Hello(5) 22 t1.start() 23 t2.start() 24 print("#####a = {0}####".format(a)) 25 print("end main")
start main a = 1 a = 2 开始子进程 5 结束子进程 5 开始子进程 5 结束子进程 5 #####a = 3#### end main
1 import threadpool 2 3 def hello(m, n, o): 4 print("m = {0} n={1} o={2}".format(m, n, o)) 5 6 if __name__ == ‘__main__‘: 7 # 方法1 8 lst_vars_1 = [‘1‘, ‘2‘, ‘3‘] 9 lst_vars_2 = [‘4‘, ‘5‘, ‘6‘] 10 func_var = [(lst_vars_1, None), (lst_vars_2, None)] 11 # 方法2 12 # dict_vars_1 = {‘m‘: ‘1‘, ‘n‘: ‘2‘, ‘o‘: ‘3‘} 13 # dict_vars_2 = {‘m‘: ‘4‘, ‘n‘: ‘5‘, ‘o‘: ‘6‘} 14 # func_var = [(None, dict_vars_1), (None, dict_vars_2)] 15 16 pool = threadpool.ThreadPool(2) 17 requests = threadpool.makeRequests(hello, func_var) 18 [pool.putRequest(req) for req in requests] 19 pool.wait()
m = 1 n=2 o=3
m = 4 n=5 o=6
原文:https://www.cnblogs.com/karl-python/p/9080421.html