进程:
python的进程没有全局解释器锁GIL,可以在多核系统上同时运行,用来处理cpu操作密集的任务,但是进程之间数据不能共享。
linix系统上(windows可能也是),每个进程均由父进程启动。
1 import multiprocessing #导入多进程模块 2 #使用语法与多线程基本类似 3 import time 4 import threading,random 5 def run1(name,x,y): 6 time.sleep(1) 7 print(‘%s:‘%name,x,y,x**2+2*y**2) 8 print(‘threading %s ID:%s‘%(name,threading.get_ident()))#threading.get_ident()返回当前线程的ID 9 def run(name): 10 time.sleep(random.randrange(2)) 11 print(‘hello‘,name) 12 t=threading.Thread(target=run1,args=(name,random.randint(3,5),random.randint(0,2)))#进程里启动一个新线程 13 t.start() 14 p_obj=[] 15 if __name__==‘__main__‘:#一定要加这一句 16 for i in range(20): 17 p=multiprocessing.Process(target=run,kwargs={‘name‘:‘wulihui%s‘%i,}) 18 p.start() 19 p_obj.append(p) 20 for p in p_obj: 21 p.join()
进程之间的通讯:
不同进程内存是不共享的,实现进程间的通讯可以使用以下方法(共同之处在于都需要一个中间代理):
1、进程queue队列,使用方法和线程差不多,但是线程里的queue出了进程便不能访问,因此引入进程queue(包括multiprocessing.Queue, multiprocessing.LifoQueue, multiprocessing.PriorityQueue)。
大致原理为:父进程在创建子进程时,将multiprocessing.Queue克隆到子进程,两个进程间的multiprocessing.Queue通过pickle序列化和反序列化实现数据共享。queue只能实现数据的传递。
1 from multiprocessing import Process, Queue 2 3 4 def f(q): 5 q.put([42, None, ‘hello‘])#在子进程中给队列放东西 6 print(‘子进程get:‘,q.get())#在子进程中取出父进程存的数据 7 8 9 if __name__ == ‘__main__‘: 10 q = Queue() 11 p = Process(target=f, args=(q,)) 12 p.start() 13 print(‘父进程get:‘,q.get()) #从父进程中取q里的数据 14 q.put(‘wulihuang‘)#父进程里给q存数据 15 p.join()
2、multiprocessing.pipe()管道,pipe()函数的作用是return一系列有pipe连接的对象,默认为双向连接(return两个对象)。
pipe()也只能实现数据的传递。
1 import multiprocessing 2 3 def f(conn): 4 conn.send([42, None, ‘hello‘]) 5 print(‘子进程recv:‘,conn.recv())#recv会堵塞,直到接收到数据 6 conn.close() 7 8 if __name__ == ‘__main__‘: 9 parent_conn, child_conn = multiprocessing.Pipe() 10 p = multiprocessing.Process(target=f, args=(child_conn,)) 11 p.start() 12 print(‘父进程recv:‘,parent_conn.recv()) # prints "[42, None, ‘hello‘]" 13 parent_conn.send(‘hello‘) 14 p.join()
3、manager()管理器
manager可以实现数据的传递和修改。
1 from multiprocessing import Process, Manager 2 import os 3 4 5 def f(d, l): 6 d[1] = ‘1‘ 7 d[‘2‘] = 2 8 d[0.25] = None 9 l.append(os.getpid()) 10 print(os.getpid()) 11 12 13 if __name__ == ‘__main__‘: 14 with Manager() as manager:#等价于manager=Manager(),不用加锁 15 d = manager.dict()#生成一个字典,这个字典可在多个进程之间传递和共享 16 #manager支持的type包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array 17 18 l = manager.list(range(5))#生成一个列表,这个列表可在多个进程之间传递和共享 19 p_list = [] 20 for i in range(10): 21 p = Process(target=f, args=(d, l)) 22 p.start() 23 p_list.append(p) 24 for res in p_list: 25 res.join() 26 27 print(d) 28 print(l)
获取进程ID
from multiprocessing import Process import os def info(title): print(title) print(‘module name:‘, __name__)#打印模块名 print(‘parent process:‘, os.getppid())#获得父进程ID,此处父进程为pycharm。 print(‘process id:‘, os.getpid())#获得自己的ID,进程每启动一次ID号会变 print("\n\n") def f(name): info(‘\033[31;1mfunction f\033[0m‘) print(‘hello‘, name) if __name__ == ‘__main__‘: info(‘\033[32;1mmain process line\033[0m‘) p = Process(target=f, args=(‘bob‘,))#此时父进程ID变为__main__的ID, p.start() p.join()
进程lock
进程lock的作用:保证进程同步,保证屏幕打印数据时不会被其他进程插入导致数据打印混乱。
1 #进程同步,lock的作用:保证屏幕上打印数据时,不会被其他进程插入,导致数据打印混乱 2 from multiprocessing import Process, Lock 3 4 5 def f(l, i): 6 l.acquire() 7 print(‘hello world‘, i) 8 l.release() 9 10 11 if __name__ == ‘__main__‘: 12 lock = Lock() 13 14 for num in range(10): 15 Process(target=f, args=(lock, num)).start()
进程池
进程池内部维护一个进程序列,同一时间允许多少个进程在CPU上运行。使用进程池时,如果进程池序列中没有可供使用的进程,那么进程会进入等待,直到进程池中有可用进程位为止。
进程池中有两个方法:
apply同步执行,进程间串行。
apply_async异步执行,并行。
1 from multiprocessing import Process, Pool 2 import time,os 3 4 5 def Foo(i): 6 time.sleep(1) 7 print(‘in process‘,os.getpid()) 8 return i+100#return的值会作为callback函数的参数 9 10 11 def Bar(arg): 12 print(‘-->exec done:‘, arg) 13 14 if __name__ == ‘__main__‘:#这一句不能少。作用是为了区分是主动执行一个脚本,还是当成一个模块来调用。如果主动执行这个脚本, 15 #后面的代码执行,如果导入模块,则不执行后面的代码。在本脚本中__name__=‘__main__‘,从别处导入时__name__等于模块名。 16 pool = Pool(5)#pool=Pool(processes=5)#允许进程池里放入5个进程 17 18 for i in range(10): 19 pool.apply_async(func=Foo, args=(i,), callback=Bar)#往pool里放入进程,并行。callback=回调,执行完Foo后执行Bar,回调由主进程执行。 20 #pool.apply(func=Foo, args=(i,))#往pool里放入进程,串行 21 print(i) 22 23 print(‘end‘) 24 pool.close() 25 pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。pool.join()等所有进程结束。必须先close,再join
协程:
协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的定义:
1、必须在只有一个单线程里实现并发
2、修改共享数据不需加锁
3、用户程序里自己保存多个控制流的上下文栈
4、一个协程遇到IO操作自动切换到其它协程
1 def consumer(name): 2 print("--->starting eating baozi...") 3 while True: 4 new_baozi = yield#堵塞程序 5 print("[%s] is eating baozi %s" % (name, new_baozi)) 6 # time.sleep(1) 7 8 9 def producer(): 10 r = con.__next__()#启动生成器,生成数据 11 r = con2.__next__() 12 n = 0 13 while n < 5: 14 n += 1 15 con.send(n) 16 con2.send(n) 17 print("\033[32;1m[producer]\033[0m is making baozi %s" % n) 18 19 20 if __name__ == ‘__main__‘: 21 con = consumer("c1")#定义生成器con 22 con2 = consumer("c2")#定义生成器con2 23 p = producer()
协程的优点:
1、无需线程上下文切换的开销(不需要线程切换)
2、无需原子操作锁定及同步的开销(不需要线程锁lock)
"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作:这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
3、方便切换控制流,简化编程模型
4、高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
1、无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
2、进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。(为了避免这个情况,协程遇到I/O操作便切换上下文)
gevent模块实现协程
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
gevent不能检测出urllib的I\O操作,需要打上补丁,gevent.monkey.patch_all()。
1 #greenlet是已封装好的协程 2 from greenlet import greenlet 3 4 5 def test1(): 6 print(12) 7 gr2.switch() 8 print(34) 9 gr2.switch() 10 11 12 def test2(): 13 print(56) 14 gr1.switch() 15 print(78) 16 17 18 gr1 = greenlet(test1)#启动协程1 19 gr2 = greenlet(test2)#启动协程2 20 gr1.switch()#手动切换
import gevent #自动切换 def func1(): print(‘\033[31;1m李闯在跟海涛搞...\033[0m‘) gevent.sleep(2)#卡住程序2s并切换协程 print(‘\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m‘) def func2(): print(‘\033[32;1m李闯切换到了跟海龙搞...\033[0m‘) gevent.sleep(1)#卡住程序1s并切换协程 print(‘\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m‘) def func3(): print(‘\033[33;1m小四进来了...\033[0m‘) gevent.sleep(0)#切换一次协程但不卡住 print(‘\033[33;1m小四报警了...\033[0m‘) gevent.joinall([ gevent.spawn(func3), gevent.spawn(func2), gevent.spawn(func1),#加参数gevent.spawn(func1,args) ])#gevent.joinall传入一个列表,gevent.spawn生成一个协程
from gevent import monkey#打入补丁,使gevent能够检测出url的I/O操作 monkey.patch_all()#把当前程序的所有I/O操作单独坐上标记 import gevent,time from urllib.request import urlopen def f(url): print(‘GET: %s‘ % url) resp = urlopen(url) data = resp.read() print(‘%d bytes received from %s.‘ % (len(data), url)) synch_start_time=time.time() urls=[‘https://www.python.org/‘,‘https://www.yahoo.com/‘,‘https://github.com/‘] for url in urls: f(url) print(‘同步时间:%s秒‘%(time.time()-synch_start_time)) async_start_time=time.time() gevent.joinall([ gevent.spawn(f, ‘https://www.python.org/‘),#加参数gevent.spawn(func1,args) gevent.spawn(f, ‘https://www.yahoo.com/‘), gevent.spawn(f, ‘https://github.com/‘), ]) print(‘异步耗时:%s秒‘%(time.time()-async_start_time))
# import socket#直接导入socket和从gevent导入socket区别不大 import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind((‘0.0.0.0‘, port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == ‘__main__‘: server(8001)
1 import socket 2 3 HOST = ‘localhost‘ # The remote host 4 PORT = 8001 # The same port as used by the server 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 s.connect((HOST, PORT)) 7 while True: 8 msg = bytes(input(">>:"), encoding="utf8") 9 s.sendall(msg) 10 data = s.recv(1024) 11 # print(data) 12 13 print(‘Received‘, repr(data)) 14 s.close()
import socket import threading count = 0 lock=threading.Lock() def sock_conn(): lock.acquire() global count count += 1 lock.release() client = socket.socket() client.connect(("localhost",8001)) #while True: #msg = input(">>:").strip() #if len(msg) == 0:continue client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果 client.close() for i in range(100): t = threading.Thread(target=sock_conn) t.start()
事件驱动与异步IO
事件驱动模型:目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。通过回调通知主进程I/O操作完毕。
I/O 多路复用
I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。
对于一次I/O访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO):服务端等数据和数据从内核拷贝到用户都被堵塞
- 非阻塞 I/O(nonblocking IO):
- I/O 多路复用( IO multiplexing):IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO(事件驱动IO)。
信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)
注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。
import select,socket import queue,sys #select.select(rlist, wlist, xlist)等到一个或多个文件描述符为某种I / O做好准备。windows只支持socket。 # rlist - 等待直到准备好去读,socket里使用是想让内核帮忙检测的链接 # wlist - 等待直到准备好去写 # xlist - 等待直到出现某一种异常情况,socket里使用是想让内核帮忙检测的链接,会返回断开的链接 sever=socket.socket() sever.bind((‘localhost‘,9000)) sever.listen(100) msg_dic={} inputs=[sever,]#检测的socket列表 outputs=[] while True: readable,writeable,exceptional=select.select(inputs,outputs,inputs) #返回三个列表,第一个为可以读数据的列表,第二个为,第三个为断开出异常的列表 print(readable,writeable,exceptional) for r in readable: if r is sever:#代表进来一个新链接 conn,conn_addr=sever.accept()#没有链接即报错,BlockingIOError print(‘进来新链接‘,conn_addr) sever.setblocking(False) # 将socketserver设置为非堵塞,accept和recv不会堵塞 # print(‘>>:‘,conn.getpeername())#返回远程链接端的地址,当是IP时返回(addr,port) # print(conn.getsockname())#获得当前服务端的地址,当是IP时返回(addr,port) inputs.append(conn)#添加新的socket到select里 msg_dic[conn]=queue.Queue()#为每一个链接生成一个队列,存放要返回给客户端的数据 #不堵塞情况下,recv收不到数据会报错 else: try: data=r.recv(1024) if data: print(data.decode(‘utf-8‘)) print(sys.stderr, ‘received "%s" from %s‘ % (data, r.getpeername()))#返回远程链接端的地址,当是IP时返回(addr,port) #r.send(data.upper()) msg_dic[r].put(data.upper()) if r not in outputs: outputs.append(r)#放入返回的链接队列里 except ConnectionResetError as e: print(‘client 中断‘,e) print(‘closing‘, conn_addr, ‘after reading no data‘) if r in outputs: outputs.remove(r) # 既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉 inputs.remove(r) # inputs中也删除掉 r.close() # 把这个连接关闭掉 del msg_dic[r] for w in writeable: if not msg_dic[w].empty(): data_to_client=msg_dic[w].get() print(data_to_client.decode()) w.send(data_to_client) outputs.remove(w)#确保下次循环writeable里没有之前加入的socket for e in exceptional: inputs.remove(e) if e in readable: readable.remove(e) if e in outputs: outputs.remove(e) if e in writeable: writeable.remove(e) e.close() del msg_dic[e] sever.close()
import socket client=socket.socket() client.connect((‘localhost‘,10000)) while True: msg=input(‘>>:‘) if len(msg)==0:continue client.send(msg.encode(‘utf-8‘)) data=client.recv(1024) print(data.decode()) client.close()
selector模块
import selectors import socket sel = selectors.DefaultSelector()#生成一个select对象 def accept(sock, mask): conn, addr = sock.accept() # Should be ready print(‘accepted‘, conn, ‘from‘, addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read)#将conn注册到sel里,回调函数变为read def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print(‘echoing‘, repr(data), ‘to‘, conn) conn.send(data) else: print(‘closing‘, conn) sel.unregister(conn)#取消注册 conn.close() sock = socket.socket() sock.bind((‘localhost‘, 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept)#第一个参数用于监听的socket,第三个参数每来一个新链接调用accept函数 while True: events = sel.select()#可能调用epoll或select,优先调用epoll,windows上只能调用selector,默认堵塞, #有活动的链接,则返回活动的链接列表events for key, mask in events: callback = key.data#相当于callback=accept callback(key.fileobj, mask)#key.fillobj=文件句柄
原文:https://www.cnblogs.com/wulihui/p/9503029.html