首页 > 系统服务 > 详细

多进程(三)

时间:2019-04-27 11:06:15      阅读:137      评论:0      收藏:0      [点我收藏+]
进程同步(?使用Queue&JoinableQueue) #coding=utf-8 import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self,task_queue,result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task =self.task_queue.get() if next_task is None: print("%s Exiting" %proc_name) self.task_queue.task_done() break print("%s:%s" %(proc_name,next_task)) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self,a,b): self.a = a self.b = b def __call__(self): time.sleep(0.1) return "%s * %s = %s" %(self.a,self.b,self.a * self.b) def __str__(self): return "%s * %s " %(self.a,self.b) if __name__ == "__main__": tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() num_consumers = multiprocessing.cpu_count() print("Creating %d consumer" %num_consumers) consumers = [Consumer(tasks,results) for i in range(num_consumers)] for w in consumers: w.start() num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i,i)) for i in range(num_consumers): tasks.put(None) tasks.join() while num_jobs: result = results.get() print("Result: %s" %result) num_jobs -= 1 进程同步(加锁-Lock) 不加锁 加锁 锁是为了确保数据一致性,比如读写锁,每个进程给一个变量增加?1?,但是如果在一个进程读取但还没有写入的时候,另外的进程也同时读取了,并写入该值,则最后写入的值是错误的,这时候就需要加锁来保持数据一致性 进程同步(加锁-Semaphore) Semaphore用于控制对共享资源的访问数量。Semaphore锁和Lock稍有不同,Semaphore相当于N把锁,获取其中一把就可以执行。可用锁的总数N在创建实例时传入,比如s = Semaphore(n)。与Lock一样,如果可用锁为0,进程将会阻塞,直到可用锁大于0。 #coding=utf-8 import multiprocessing import time def worker(s,i): ????s.acquire() ??? print(multiprocessing.current_process().name + "acquire") ??? time.sleep(i) ??? print(multiprocessing.current_process().name + "release") ????s.release() ? if __name__== "__main__": ????s?= multiprocessing.Semaphore(3) ??? for i in range(5): ??????? p = multiprocessing.Process(target=worker,args=(s,i*2)) ??????? p.start() 进程同步(信号传递-Event) Event提供一种简单的方法,可以在进程间传递状态信息,实现进程间同步通信。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。 #coding=utf-8 import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() #等待收到能执行信号,如果一直未收到将一直阻塞,死等 print("wait_for_event: e.is_set()->",e.is_set()) print("可以继续做一些事情了。") def wait_for_event_timeout(e,t): print("wait_for_event_timeout: starting") e.wait(t)#等待t秒超时,此时Event的状态仍未未设置 print("wait_for_event_timeout: e.is_set()->",e.is_set()) e.set()#设置Event的状态 print("信号状态已被设置") if __name__ == "__main__": e = multiprocessing.Event() print("begin,e.is_set()",e.is_set()) p1 = multiprocessing.Process(name = "block",target=wait_for_event,args=(e,)) p1.start() p2 = multiprocessing.Process(name = "nonblock",target=wait_for_event_timeout,args=(e,2)) p2.start() time.sleep(3) print("main: event is set") 进程同步(使用管道-Pipe) Pipe是两个进程间通信的工具。Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。Pipe的每个端口同时最多一个进程读写,否则会出现各种问题,可能造成corruption异常。Pipe对象建立的时候,返回一个含有两个元素的元组对象,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。 #coding=utf-8 import multiprocessing as mp def proc_1(pipe): pipe.send("hello") print("proc_1 received: %s" %pipe.recv()) pipe.send("what is your name? ") print("proc_1 received: %s " %pipe.recv()) def proc_2(pipe): print("proc_2 received : %s" %pipe.recv()) pipe.send("hello ,too") print("proc_2 received: %s" %pipe.recv()) pipe.send("I don‘t tell you!") if __name__ == "__main__": pipe = mp.Pipe() print(len(pipe)) print(type(pipe)) p1 = mp.Process(target=proc_1,args=(pipe[0],)) p2 = mp.Process(target=proc_2,args=(pipe[1],)) p2.start() p1.start() p1.join() p2.join() #coding=utf-8 import multiprocessing as mp from multiprocessing import Process,Lock def write_file(content,lock,file_path="e:\\a.txt"): lock.acquire() #保证同时只有一个进程在写文件 with open(file_path,"a") as fp: fp.write(content + "\n") lock.release() def proc_1(pipe,lock): pipe.send("hello") info = pipe.recv() print("proc_1 recieved: %s" %info) write_file(info,lock) pipe.send("what is your name?") info = pipe.recv() print("proc_1 recieved: %s" %info) write_file(info,lock) def proc_2(pipe,lock): info = pipe.recv() print("proc_2 recieved: %s " %info) write_file(info,lock) pipe.send("hello too") info = pipe.recv() print("proc_2 recieved: %s" %info) write_file(info,lock) pipe.send("I don‘t tell you!") if __name__ == "__main__": pipe = mp.Pipe()#创建一个管道对象,为一个元组,包含了pipe的两端 lock = Lock()#创建一个锁对象 print(len(pipe)) print(type(pipe)) #创建两个子进程 p1 = mp.Process(target=proc_1,args=(pipe[0],lock)) p2 = mp.Process(target=proc_2,args=(pipe[1],lock)) p1.start() p2.start() p1.join() p2.join() #coding=utf-8 import multiprocessing as mp from multiprocessing import Process,Lock def write_file(content,lock,file_path="e:\\a.txt"): lock.acquire() with open(file_path,"a") as fp: fp.write(content + "\n") lock.release() def proc_1(pipe,lock): pipe.send("hello") info = pipe.recv() print("proc_1 recieved: %s" %info) write_file(info,lock) pipe.send("what is your name?") info = pipe.recv() print("proc_1 recieved: %s" %info) write_file(info,lock) def proc_2(pipe,lock): info = pipe.recv() print("proc_2 recieved: %s " %info) write_file(info,lock) pipe.send("hello too") info = pipe.recv() print("proc_2 recieved: %s" %info) write_file(info,lock) pipe.send("I don‘t tell you!") if __name__ == "__main__": p1_list = [] p2_list = [] pipe = mp.Pipe() lock = Lock() print(len(pipe)) print(type(pipe)) for i in range(5): p1_list.append(mp.Process(target=proc_1,args=(pipe[0],lock))) for i in range(5): p2_list.append(mp.Process(target=proc_2,args=(pipe[1],lock))) for i in range(5): p1_list[i].start() p2_list[i].start() for i in range(5): p1_list[i].join() p2_list[i].join() print("***********") 进程同步(使用Condition) 一个condition变量总是与某些类型的锁相联系,当几个condition变量必须共享同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和release() 会调用与锁相关联的相应方法。 wait()方法会释放锁,当另外一个进程使用notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,Condition类实现了一个conditon变量。这个condition变量允许一个或多个进程等待,直到他们被另一个进程通知。如果lock参数,被给定一个非空的值,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。 wait(timeout=None) :等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的进程没有得到锁,那么会抛出一个RuntimeError异常。 wait()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前会一直阻塞如果有等待的进程,notify()方法会唤醒一个在等待conditon变量的进程。 notify_all() 则会唤醒所有在等待conditon变量的进程。 注意: notify()和notify_all()不会释放锁,也就是说,进程被唤醒后不会立刻返回他们的wait() 调用。除非进程调用notify()和notify_all()之后放弃了锁的所有权。 在典型的设计风格里,利用condition变量加锁去允许访问一些共享状态,进程在获取到它想得到的状态前,会反复调用wait()。修改状态的进程在他们状态改变时 调用 notify() or notify_all(),用这种方式,进程会尽可能的获取到想要的一个等待者状态 #coding=utf-8 import multiprocessing as mp import time def consumer(cond): with cond:#此处获得与锁的联系 print("consumer before wait") cond.wait() #释放锁 #程序在此一直等待,等待notify()或notify_all()唤醒 print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notify_all()#唤醒所有等待锁的进程,此处为通知消费者可以消费了 print("producer after notifyAll") if __name__ == "__main__": condition = mp.Condition() p1 = mp.Process(name = "p1",target=consumer,args=(condition,)) p2 = mp.Process(name = "p2",target=consumer,args=(condition,)) p3 = mp.Process(name = "p3",target = producer,args = (condition,)) p1.start() time.sleep(2) p2.start() time.sleep(2) p3.start() #注意消费者进程的启动需要放在生产者启动之前,否则会出现生产者已经notify的情况下,消费者会无限等待的情况 注意消费者进程的启动需要放在生产者启动之前,否则会出现生产者已经notify的情况下,消费者会无限等待的情况 示例: #coding=utf-8 import multiprocessing as mp import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notify_all() print("producer after notifyAll") if __name__ == "__main__": condition = mp.Condition() p1 = mp.Process(name = "p1",target=consumer,args=(condition,)) p2 = mp.Process(name = "p2",target=consumer,args=(condition,)) p3 = mp.Process(name = "p3",target = producer,args = (condition,)) p1.start() time.sleep(2) p3.start() #p3#此处生产者进程放在了消费者进程p2的前面,注意查看输出 time.sleep(2) p2.start() 多进程间共享数字变量 未使用共享变量 #coding=utf-8 from multiprocessing import Process def f(n,a): n = 3.1415 for i in range(len(a)): a[i] = -a[i] if __name__ == "__main__": num = 0 #主进程的num arr = list(range(10))#主进程的arr p = Process(target = f,args=(num,arr))#修改了传入f函数的值,但是数据属于子进程,主进程不能访问 p.start() p.join() print(num) print(arr[:]) 结果为:主进程变量的值 注意: 此程序存在两个进程 主进程和创建的子进程p,两个进程的数据是相互独立的,if __name__ == ‘__main__‘:模块下的num,arr属于主进程,这两个参数被传入子进程执行的函数后,存在于子进程的空间内,和主进程不影响; 使用共享变量 在上例中,主进程、子进程间要共享变量的话,需要创建能在进程间共享的变量; #coding=utf-8 from multiprocessing import Process,Value,Array def f(n,a): n.value = 3.1415 for i in range(len(a)): a[i] = -a[i] if __name__ == "__main__": num = Value("d",0.0)#创建一个进程间共享的数字类型,默认值为0 arr = Array("i",range(10))#创建一个进程间共享的数组类型,初始值为range[10] p = Process(target=f,args=(num,arr)) p.start() p.join() print(num.value)#打印共享变量num的值 print(arr[:]) 共享变量、加锁 #coding=utf-8 from multiprocessing import Process,Value,Lock import time class Counter(object): def __init__(self,initval = 0): self.val = Value("i",initval) self.lock = Lock() def increment(self): with self.lock:#加锁,保证同一时刻只能有一个进程在操作共享变量 self.val.value += 1 def value(self): with self.lock: return self.val.value def func(counter): for i in range(50): time.sleep(0.01) counter.increment() if __name__ == "__main__": counter = Counter(0) procs = [Process(target=func,args=(counter,)) for i in range(10)] for p in procs: p.start() for p in procs: p.join() print(counter.value()) 多进程间共享字符串变量 #coding=utf-8 from multiprocessing import Process,Value,Manager from ctypes import c_char_p def greet(shareStr): shareStr.value = shareStr.value + ",World!" if __name__ == "__main__": manager = Manager() shareStr = manager.Value(c_char_p,"Hello")#主进程的变量 process = Process(target = greet,args=(shareStr,))#子进程共享主进程的字符串变量 process.start() process.join() print(shareStr.value) 多进程间共享不同类型的数据结构对象 #coding=utf-8 from multiprocessing import Process,Manager def f(shareDict,shareList): shareDict[1] = "1" shareDict["2"] = 2 shareDict[0.25] = None shareList.reverse() if __name__ == "__main__": manager = Manager() shareDict = manager.dict() #创建共享的字典 shareList = manager.list(range(10)) #创建共享的列表 p = Process(target=f,args=(shareDict,shareList)) p.start() p.join() print(shareDict) print(shareList) Manager()函数返回一个管理对象,它控制了一个服务端进程,用来保持Python对象,并允许其它进程使用代理来管理这些对象。Manager()返回的管理者,支持类型包括,list, dict, Namespace, Lock,RLock, Semaphore, BoundedSemaphore,Condition, Event, Queue, Value and Array。managers比使用共享内存对象更灵活,因为它支持任意对象类型。 同样的,单个的manager可以通过网络在不同机器上进程间共享。但是,会比共享内存慢。 进程间共享实例对象 #coding=utf-8 from multiprocessing import Pool,Value,Lock from multiprocessing.managers import BaseManager import os,time class MyManager(BaseManager): pass def Manager(): m = MyManager() m.start() return m class Counter(object): def __init__(self,initval=0): self.val = Value("i",initval) self.lock = Lock() def increment(self): with self.lock: self.val.value += 1 def value(self): with self.lock: return self.val.value MyManager.register("Counter",Counter) def long_time_task(name,counter): time.sleep(0.2) print("Run task %s (%s)...\n" %(name,os.getpid())) start = time.time() for i in range(50): time.sleep(0.01) counter.increment() end = time.time() print("Task %s runs %0.2f seconds." %(name,(end-start))) if __name__ == "__main__": manager = Manager() counter = manager.Counter(0) print("Parent process %s." %os.getpid()) p = Pool() for i in range(5): p.apply_async(long_time_task,args=(str(i),counter)) print("Waiting for all subprocesses done...") p.close() p.join() print("All subprocesses done.") print(counter.value()) 进程日志 #encoding=utf-8 import multiprocessing import logging import sys def worker(): print("I am working...") sys.stdout.flush() if __name__ == "__main__": #设置日志输出到控制台 multiprocessing.log_to_stderr() #获取日志对象 logger = multiprocessing.get_logger() #设置日志的级别 logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join() 守护进程 守护进程就是不阻挡主进程退出,会随着主进程退出而退出,如果要等待守护进程退出,需要加上join函数。 #encoding=utf-8 import multiprocessing import logging import sys,time def daemon(): p = multiprocessing.current_process() print("Starting: ",p.name,p.pid) #缓冲区数据显示到终端 sys.stdout.flush() print("Exiting: ",p.name,p.pid) sys.stdout.flush() def non_daemon(): p = multiprocessing.current_process() print("Starting:",p.name,p.pid) sys.stdout.flush() print("Exiting: ",p.name,p.pid) sys.stdout.flush() if __name__ == "__main__": #设置日志输出到控制台 multiprocessing.log_to_stderr() #获取日志对象 logger = multiprocessing.get_logger() #设置日志级别 logger.setLevel(logging.DEBUG) d = multiprocessing.Process(name="daemon",target=daemon) #设置进程d为守护进程 d.daemon = True n = multiprocessing.Process(name="nondaemon",target=daemon) #设置进程n为非守护进程 n.daemon = False d.start() #time.sleep(1)#不要sleep n.start() #d.join() #n.join() print("d.is_alive()",d.is_alive()) print("n.is_alive()",n.is_alive()) print("main Process end! ") 主进程首先执行完成,守护进程随着主进程退出直接退出了,根据没有执行守护进程的内容 import subprocess subprocess.run(args=[‘ls‘,‘-al‘]) >>> subprocess.run(["dir","test"],shell = True,cwd="e:\\") >>> subprocess.Popen(["dir","test"],shell = True,cwd="e:\\") <subprocess.Popen object at 0x00000000025DA7F0> subprocess.run(["ls","-al"],shell=True,cwd="/home") subprocess模块 从Python2.4版本以后,可以使用subprocess这个模块来产生子进程,并连接到子进程的标准输入/输出/错误中去,还可以得到子进程的返回值。看一下官方的解释: DESCRIPTION This module allows you to spawn processes, connect to their input/output/error pipes, and obtain their return codes. 即允许你去创建一个新的进程让其执行另外的程序,并与它进行通信,获取标准的输入、标准输出、标准错误以及返回码等。另外subprocess还提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。 创建subprocess进程 subprocess模块用subprocess.Popen类来创建进程,并与进程进行复杂的交互。其构造函 数如下: __init__(self, args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0) 主要参数说明: ? args: args should be a string, or a sequence of program arguments.也就是说必须是一个字符 串或者序列类型(如:字符串、list、元组),用于指定进程的可执行文件及其参数。如果是一个序列类型参数,则序列的第一个元素通常都必须是一个可执行文件的路径。当然也可以使用executeable参数来指定可执行文件的路径。 ? bufsize: 如果指定了bufsize参数作用就和内建函数open()一样:0表示不缓冲,1表示行缓冲,其他正 数表示近似的缓冲区字节数,负数表示使用系统默认值。默认是0。 ? stdin,stdout,stderr: 分别表示程序的标准输入、标准输出、标准错误。有效的值可以是PIPE,存在的文件描述符 ,存在的文件对象或None,如果为None需从父进程继承过来;stdout可以是PIPE,表示对 子进程创建一个管道;stderr可以是STDOUT,表示标准错误数据应该从应用程序中捕获并 作为标准输出流stdout的文件句柄。 ? shell: 如果这个参数被设置为True,程序将通过shell来执行。 ? env: 它描述的是子进程的环境变量。如果为None,子进程的环境变量将从父进程继承而来。 创建Popen类的实例对象: res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) cmd:向子进程传入需要执行的shell命令,如:ls –al subprocess.PIPE:在创建Popen对象时,subprocess.PIPE可以初始化为stdin, stdout或stderr的参数,表示与子进程通信的标准输入流,标准输出流以及标准错误,将多个子进程的输入和输出连接在一起,构成管道(pipe)。 subprocess.STDOUT:作为Popen对象的stderr的参数,表示将标准错误通过标准输出流输出。 示例1: 在目录e:\\test创建一个文件夹hhq2 >>> a = subprocess.Popen("mkdir hhq2",shell=True,cwd="e:\\test")#命令为一个字符串 >>> a <subprocess.Popen object at 0x000000000280A080> >>> a = subprocess.Popen(["mkdir", "hhq4"],shell=True,cwd="e:\\test")#命令为一个里列表 >>> a <subprocess.Popen object at 0x000000000255BFD0> >>> print(a) <subprocess.Popen object at 0x000000000255BFD0> 示例2: #encoding=utf-8 import subprocess #得到子进程obj,执行python命令,标准输入、输出都是管道 obj = subprocess.Popen (["python"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) #从标准输入写入内容 obj.stdin.write(b"print(1);")#需要写入bytes类型, obj.stdin.write(b"print(2);") obj.stdin.write(b"print(3);") obj.stdin.write(b"print(4);") obj.stdin.close() #读出标准输出的内容 cmd_out = obj.stdout.read() obj.stdout.close() cmd_error = obj.stderr.read() obj.stderr.close() print(cmd_out) print(cmd_error) 示例3: #encoding=utf-8 import subprocess obj = subprocess.Popen (["python"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) obj.stdin.write(b"print(1)\n") obj.stdin.write(b"print(2)\n") obj.stdin.write(b"print(3)\n") obj.stdin.write(b"print(4)\n") out_info,out_error = obj.communicate()#返回一个元组,返回标准输出和错误输出 print(out_info) print(out_error) 示例4,将一个子进程的输出,作为另一个子进程的输入: #encoding=utf-8 import subprocess child1 = subprocess.Popen(["ls","-al"],stdout=subprocess.PIPE) child2 = subprocess.Popen(["wc","-l"],stdin=child1.stdout,stdout=subprocess.PIPE) out = child2.communicate()#返回的是一个元组 print(out) Subprocess进程通信实例 打开一个只有ip地址的文本文件,读取其中的ip或域名,然后进行ping操作, 并将ping结果写入ping.txt文件中。ip.txt文件内容如下: www.baidu.com www.taobao.com 123.45.5.34 127.0.0.1 #encoding=utf-8 import subprocess import os class Shell(object): def runCmd(self,cmd): res = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) #获取子进程的标准输出、标准错误信息 sout,serr = res.communicate() #返回进程指定的结果码、输出信息、错误信息,子进程的编号 return res.returncode,sout,serr,res.pid shell = Shell() fp = open("e:\\ip.txt",encoding="utf-8") ipList = fp.readlines() fp.close() fp = open("e:\\ping.txt","a",encoding="utf-8") print(ipList) for ip in ipList: ip = ip.strip() result = shell.runCmd("ping " + ip) if result[0] == 0: w = ip + ": 0" else: w = ip + ": 1" fp.write(w + "\n") print(result[1].decode("gbk")) fp.close() 标准输入、输出、错误输出 >>> import sys >>> a = sys.stdin.readline() dddd >>> a ‘dddd\n‘ >>> sys.stdout.write("bb") bb2#输出字符个数 >>> sys.stderr.write("ddd") 3 ddd>>>

多进程(三)

原文:https://blog.51cto.com/13496943/2385185

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!