1、当客户处理多个描述符时(一般是交互式输入和网络套接字),必须使用I/O复用。
2、当一个客户同时处理多个套接字时,而这种情况是可能的,但很少出现。
3、如果一个TCP服务器既要处理监听套接字,又要处理已连接套接字,一般也要用到I/O复用。
4、如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
5、如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | #!/usr/bin/python # -*- coding: utf-8 -*- import select import socket import Queue server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking( False ) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1 ) server_address = ( ‘127.0.0.1‘ , 8888 ) server.bind(server_address) server.listen( 10 ) #select轮询等待读socket集合 inputs = [server] #select轮询等待写socket集合 outputs = [] message_queues = {} #select超时时间 timeout = 20 while True : print "等待活动连接......" readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout) if not (readable or writable or exceptional) : print "select超时无活动连接,重新select...... " continue ; #循环可读事件 for s in readable : #如果是server监听的socket if s is server: #同意连接 connection, client_address = s.accept() print "新连接: " , client_address connection.setblocking( 0 ) #将连接加入到select可读事件队列 inputs.append(connection) #新建连接为key的字典,写回读取到的消息 message_queues[connection] = Queue.Queue() else : #不是本机监听就是客户端发来的消息 data = s.recv( 1024 ) if data : print "收到数据:" , data , "客户端:" ,s.getpeername() message_queues[s].put(data) if s not in outputs: #将读取到的socket加入到可写事件队列 outputs.append(s) else : #空白消息,关闭连接 print "关闭连接:" , client_address if s in outputs : outputs.remove(s) inputs.remove(s) s.close() del message_queues[s] for s in writable: try : msg = message_queues[s].get_nowait() except Queue.Empty: print "连接:" , s.getpeername() , ‘消息队列为空‘ outputs.remove(s) else : print "发送数据:" , msg , "到" , s.getpeername() s.send(msg) for s in exceptional: print "异常连接:" , s.getpeername() inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s] |
client端:
1 2 3 4 5 6 7 8 9 10 11 12 | #!/usr/bin/env python # -*- coding:utf-8 -*- import socket ip_port = ( ‘127.0.0.1‘ ,8888
) sk = socket.socket() sk.connect(ip_port) while True : inp = raw_input ( ‘please input:‘ ) sk.sendall(inp) sk.close() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | #!/usr/bin/python # -*- coding: utf-8 -*- import socket import select import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking( False ) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) server_address = ( "127.0.0.1" , 8888
) server.bind(server_address) server.listen( 5 ) print "服务器启动成功,监听IP:" , server_address message_queues = {} #超时,毫秒 timeout = 5000 #监听哪些事件 READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) READ_WRITE = (READ_ONLY|select.POLLOUT) #新建轮询事件对象 poller = select.poll() #注册本机监听socket到等待可读事件事件集合 poller.register(server,READ_ONLY) #文件描述符到socket映射 fd_to_socket = {server.fileno():server,} while True : print "等待活动连接......" #轮询注册的事件集合 events = poller.poll(timeout) if not events: print "poll超时,无活动连接,重新poll......" continue print "有" , len (events), "个新事件,开始处理......" for fd ,flag in events: s = fd_to_socket[fd] #可读事件 if flag & (select.POLLIN | select.POLLPRI) : if s is server : #如果socket是监听的server代表有新连接 connection , client_address = s.accept() print "新连接:" , client_address connection.setblocking( False ) fd_to_socket[connection.fileno()] = connection #加入到等待读事件集合 poller.register(connection,READ_ONLY) message_queues[connection] = Queue.Queue() else : #接收客户端发送的数据 data = s.recv( 1024 ) if data: print "收到数据:" , data , "客户端:" , s.getpeername() message_queues[s].put(data) #修改读取到消息的连接到等待写事件集合 poller.modify(s,READ_WRITE) else : # Close the connection print " closing" , s.getpeername() # Stop listening for input on the connection poller.unregister(s) s.close() del message_queues[s] #连接关闭事件 elif flag & select.POLLHUP : print " Closing " , s.getpeername() , "(HUP)" poller.unregister(s) s.close() #可写事件 elif flag & select.POLLOUT : try : msg = message_queues[s].get_nowait() except Queue.Empty: print s.getpeername() , " queue empty" poller.modify(s,READ_ONLY) else : print "发送数据:" , data , "客户端:" , s.getpeername() s.send(msg) #异常事件 elif flag & select.POLLERR: print " exception on" , s.getpeername() poller.unregister(s) s.close() del message_queues[s] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | #!/usr/bin/python # -*- coding: utf-8 -*- import socket, select import Queue serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) server_address = ( "127.0.0.1" , 8888
) serversocket.bind(server_address) serversocket.listen( 1 ) print "服务器启动成功,监听IP:" , server_address serversocket.setblocking( 0 ) timeout = 10 #新建epoll事件对象,后续要监控的事件添加到其中 epoll = select.epoll() #添加服务器监听fd到等待读事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) message_queues = {} fd_to_socket = {serversocket.fileno():serversocket,} while True : print "等待活动连接......" #轮询注册的事件集合 events = epoll.poll(timeout) if not events: print "epoll超时无活动连接,重新轮询......" continue print "有" , len (events), "个新事件,开始处理......" for fd, event in events: socket = fd_to_socket[fd] #可读事件 if event & select.EPOLLIN: #如果活动socket为服务器所监听,有新连接 if socket = = serversocket: connection, address = serversocket.accept() print "新连接:" , address connection.setblocking( 0 ) #注册新连接fd到待读事件集合 epoll.register(connection.fileno(), select.EPOLLIN) fd_to_socket[connection.fileno()] = connection message_queues[connection] = Queue.Queue() #否则为客户端发送的数据 else : data = socket.recv( 1024 ) if data: print "收到数据:" , data , "客户端:" , socket.getpeername() message_queues[socket].put(data) #修改读取到消息的连接到等待写事件集合 epoll.modify(fd, select.EPOLLOUT) #可写事件 elif event & select.EPOLLOUT: try : msg = message_queues[socket].get_nowait() except Queue.Empty: print socket.getpeername() , " queue empty" epoll.modify(fd, select.EPOLLIN) else : print "发送数据:" , data , "客户端:" , socket.getpeername() socket.send(msg) #关闭事件 elif event & select.EPOLLHUP: epoll.unregister(fd) fd_to_socket[fd].close() del fd_to_socket[fd] epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close() |
原文:http://www.cnblogs.com/daliangtou/p/5084176.html