基于IO模型的并发 |
一、IO模型介绍
本文所讨论的是Linux环境下的network IO,IO模型可以分为5种,分别是阻塞IO(blocking IO)、非阻塞IO(non-blocking IO)、多路复用IO(IO multiplexing)、异步IO(asynchronous IO)、信号驱动IO(signal IO)。其中信号驱动IO(signal IO)在实际中并不常用,所有主要介绍其余四种IO Model。
总结一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的进程 (线程),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:
#1)等待数据准备 (Waiting for the data to be ready) #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
这两个阶段很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。
#1、输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果会阻塞状态,则会经历wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常 #2、输出操作:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常 #3、接收外来链接:accept,与输入操作类似 #4、发起外出链接:connect,与输出操作类似
二、阻塞IO
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。
三、非阻塞IO
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
下面我们来做一个非阻塞IO的示例:
import socket import time sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM) sk.bind((‘127.0.0.1‘,6667)) sk.listen(5) sk.setblocking(False) #把阻塞状态设置成非阻塞状态 print(‘waiting client connection...‘) while True: try: connection,address = sk.accept() print(‘+++‘,address) client_messge = connection.recv(1024) print(str(client_messge,‘utf8‘)) connection.close() except Exception as e: print(e) time.sleep(4)
import socket,time sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) while True: sk.connect((‘127.0.0.1‘,6667)) print(‘hello‘) sk.sendall(bytes(‘hello‘,‘utf8‘)) time.sleep(2) break
非阻塞IO的优点是能够在等待任务完成的时间里干其他活了,但是并不推荐。原因是它的缺点也十分明显:
#1. 循环调用recv()将大幅度推高CPU占用率; #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
四、多路复用IO
多路复用IO的好处,就在于单个进程就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。 这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个连接。
下面我们来做一个多路复用IO的示例:
import socket import select sk = socket.socket() sk.bind((‘127.0.0.1‘,8801)) sk.listen(5) inputs = [sk,] while True: r,w,e = select.select(inputs,[],[],5) #第一个输入列表、第二个输出列表、第三个错误列表、第四个每隔5秒钟 print(len(r)) for obj in r: if obj == sk: conn,addr = obj.accept() print(conn) inputs.append(conn) else: data_bype = obj.recv(1024) print(str(data_bype,‘utf8‘)) inp = input(‘回答%s号客户>>>‘%inputs.index(obj)) obj.sendall(bytes(inp,‘utf8‘)) print(‘>>‘,r)
import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8801)) while True: inp = input(‘>>>‘) sk.sendall(bytes(inp,‘utf8‘)) data = sk.recv(1024) print(str(data,‘utf8‘))
多路复用IO的优点就是相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。
五、异步IO
linux下的asynchronous IO其实用得不多,它的流程图如下:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
异步IO最大特点就是全程无阻塞。
六、selector模块
它的功能与select模块类似,实现高效的多路复用IO,常用于非阻塞的socket编程中。
下面我们利用selector模块来实现socket编程:
import selectors import socket sel = selectors.DefaultSelector() def accept(sock,mask): conn,addr = sock.accept() print(‘accepted‘,conn,‘from‘,addr) conn.setblocking(False) sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask): try: data = conn.recv(1000) if not data: raise Exception print(‘echoing‘,repr(data),‘to‘,conn) conn.send(data) except Exception as e: print(‘closing‘,conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind((‘localhost‘,8090)) sock.listen(100) sock.setblocking(False) sel.register(sock,selectors.EVENT_READ,accept) #注册,sock绑定accept函数 while True: events = sel.select() #监听 for key,mask in events: callback = key.data #第一次key.data返回一个函数 callback(key.fileobj,mask) #第一次key.fileobj返回sock
import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8090)) while 1: inp = input(‘>>>‘) sk.send(inp.encode(‘utf8‘)) data = sk.recv(1024) print(data.decode(‘utf8‘))
原文:https://www.cnblogs.com/lzc69/p/11638349.html