特别声明:本文转载自 QING_____
?
NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.
?
为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.
- TestMain.java:引导类
- ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄.
- Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性.
- ServerControllor.java:server端连接处理类,负责接收连接和数据处理
- ServerHandler.java:server端连接维护类.
TestMain.java:
?
- package?com.test.web;??
- ??
- ??
- public?class?TestMain?{??
- ??
- ?????
- ?
- ??
- ????public?static?void?main(String[]?args)?throws?Exception{??
- ????????int?port?=?30008;??
- ????????ServerControllor?sc?=?new?ServerControllor(port);??
- ????????sc.start();??
- ????????Thread.sleep(2000);??
- ????????ClientControllor?cc?=?new?ClientControllor("127.0.0.1",?port);??
- ????????cc.start();??
- ????????Packet?p1?=?Packet.wrap("Hello,I?am?first!");??
- ????????cc.put(p1);??
- ????????Packet?p2?=?Packet.wrap("Hello,I?am?second!");??
- ????????cc.put(p2);??
- ????????Packet?p3?=?Packet.wrap("Hello,I?am?thread!");??
- ????????cc.put(p3);??
- ??
- ????}??
- ??
- }??
?
?
ClientControllor.java
?
?
?
?
Handler.java(接口,面向设计):
?
- package?com.test.web;??
- ??
- import?java.nio.channels.SocketChannel;??
- ??
- public?interface?Handler?{??
- ??
- ????public?void?handle(SocketChannel?channel);??
- }??
?
?
Packet.java
?
- package?com.test.web;??
- ??
- import?java.io.Serializable;??
- import?java.nio.ByteBuffer;??
- import?java.nio.charset.Charset;??
- ??
- public?class?Packet?implements?Serializable?{??
- ??
- ?????
- ?
- ??
- ????private?static?final?long?serialVersionUID?=?7719389291885063462L;??
- ??????
- ????private?ByteBuffer?buffer;??
- ??????
- ????private?static?Charset?charset?=?Charset.defaultCharset();??
- ??????
- ????private?Packet(ByteBuffer?buffer){??
- ????????this.buffer?=?buffer;??
- ????}??
- ??????
- ????public?String?getDataAsString(){??
- ????????return?charset.decode(buffer).toString();??
- ????}??
- ??????
- ????public?byte[]?getData(){??
- ????????return?buffer.array();??
- ????}??
- ??????
- ????public?ByteBuffer?getBuffer(){??
- ????????return?this.buffer;??
- ????}??
- ??????
- ??????
- ????public?static?Packet?wrap(ByteBuffer?buffer){??
- ????????return?new?Packet(buffer);??
- ????}??
- ??????
- ????public?static?Packet?wrap(String?data){??
- ????????ByteBuffer?source?=?charset.encode(data);??
- ????????return?new?Packet(source);??
- ????}??
- }??
?
?
ServerControllor.java
?
- package?com.test.web;??
- ??
- import?java.net.InetSocketAddress;??
- import?java.nio.channels.SelectionKey;??
- import?java.nio.channels.Selector;??
- import?java.nio.channels.ServerSocketChannel;??
- import?java.nio.channels.SocketChannel;??
- import?java.util.Iterator;??
- ??
- public?class?ServerControllor?{??
- ????private?int?port;??
- ????private?Thread?thread?=?new?ServerThread();;??
- ????private?Object?lock?=?new?Object();??
- ????public?ServerControllor(){??
- ????????this(0);??
- ????}??
- ????public?ServerControllor(int?port){??
- ????????this.port?=?port;??
- ????}??
- ??????
- ????public?void?start(){??
- ????????if(thread.isAlive()){??
- ????????????return;??
- ????????}??
- ????????synchronized?(lock)?{??
- ????????????thread.start();??
- ????????????System.out.println("Server?starting....");??
- ????????}??
- ????}??
- ??????
- ??????
- ????class?ServerThread?extends?Thread?{??
- ????????private?static?final?int?TIMEOUT?=?3000;??
- ????????private?ServerHandler?handler?=?new?ServerHandler();??
- ????????@Override??
- ????????public?void?run(){??
- ????????????try{??
- ????????????????ServerSocketChannel?channel?=?null;??
- ????????????????try{??
- ????????????????????channel?=?ServerSocketChannel.open();??
- ????????????????????channel.configureBlocking(false);??
- ????????????????????channel.socket().setReuseAddress(true);??
- ????????????????????channel.socket().bind(new?InetSocketAddress(port));??
- ????????????????????Selector?selector?=?Selector.open();??
- ????????????????????channel.register(selector,?SelectionKey.OP_ACCEPT);??
- ????????????????????while(selector.isOpen()){??
- ????????????????????????System.out.println("Server?is?running,port:"?+?channel.socket().getLocalPort());??
- ????????????????????????if(selector.select(TIMEOUT)?==?0){??
- ????????????????????????????continue;??
- ????????????????????????}??
- ????????????????????????Iterator<SelectionKey>?it?=?selector.selectedKeys().iterator();??
- ????????????????????????while(it.hasNext()){??
- ????????????????????????????SelectionKey?key?=?it.next();??
- ????????????????????????????it.remove();??
- ????????????????????????????if(!key.isValid()){??
- ????????????????????????????????continue;??
- ????????????????????????????}??
- ????????????????????????????if(key.isAcceptable()){??
- ????????????????????????????????accept(key);??
- ????????????????????????????}else?if(key.isReadable()){??
- ????????????????????????????????read(key);??
- ????????????????????????????}??
- ????????????????????????}??
- ????????????????????}??
- ????????????????}catch(Exception?e){??
- ????????????????????e.printStackTrace();??
- ????????????????}finally{??
- ????????????????????if(channel?!=?null){??
- ????????????????????????try{??
- ????????????????????????????channel.close();??
- ????????????????????????}catch(Exception?ex){??
- ????????????????????????????ex.printStackTrace();??
- ????????????????????????}??
- ????????????????????}??
- ????????????????}??
- ????????????}catch(Exception?e){??
- ????????????????e.printStackTrace();??
- ????????????}??
- ????????}??
- ??????????
- ????????private?void?accept(SelectionKey?key)?throws?Exception{??
- ????????????SocketChannel?socketChannel?=?((ServerSocketChannel)?key.channel()).accept();??
- ????????????socketChannel.configureBlocking(true);??
- ??????????????
- ????????????handler.handle(socketChannel);??
- ????????}??
- ??????????
- ????????private?void?read(SelectionKey?key)?throws?Exception{??
- ????????????SocketChannel?channel?=?(SocketChannel)key.channel();??
- ??????????????
- ????????}??
- ????}??
- }??
?
?
ServerHandler.java
?
?
- package?com.test.web;??
- ??
- import?java.nio.ByteBuffer;??
- import?java.nio.channels.SocketChannel;??
- import?java.util.HashMap;??
- import?java.util.Map;??
- import?java.util.concurrent.Semaphore;??
- import?java.util.zip.Adler32;??
- import?java.util.zip.Checksum;??
- ??
- class?ServerHandler?implements?Handler?{??
- ??
- ????private?static?Semaphore?semaphore?=?new?Semaphore(Runtime.getRuntime().availableProcessors()?+?1);??
- ??????
- ????private?static?Map<SocketChannel,Thread>?holder?=?new?HashMap<SocketChannel,Thread>(32);??
- ??????
- ????@Override??
- ????public?void?handle(SocketChannel?channel)?{??
- ????????synchronized?(holder)?{??
- ????????????if(holder.containsKey(channel)){??
- ????????????????return;??
- ????????????}??
- ????????????Thread?t?=?new?ReadThread(channel);??
- ????????????holder.put(channel,?t);??
- ????????????t.start();??
- ????????}??
- ????}??
- ??????
- ??????
- ????static?class?ReadThread?extends?Thread{??
- ????????SocketChannel?channel;??
- ????????ReadThread(SocketChannel?channel){??
- ????????????this.channel?=?channel;??
- ????????}??
- ????????@Override??
- ????????public?void?run(){??
- ????????????try{??
- ????????????????semaphore.acquire();??
- ????????????????boolean?eof?=?false;??
- ????????????????while(channel.isOpen()){??
- ??????????????????????
- ????????????????????ByteBuffer?head?=?ByteBuffer.allocate(4);??
- ????????????????????while(true){??
- ????????????????????????int?cb?=?channel.read(head);??
- ????????????????????????if(cb?==?-1){??
- ????????????????????????????throw?new?RuntimeException("EOF?error,data?lost!");??
- ????????????????????????}??
- ????????????????????????if(isFull(head)){??
- ????????????????????????????break;??
- ????????????????????????}??
- ????????????????????}??
- ????????????????????head.flip();??
- ????????????????????int?dataSize?=?head.getInt();??
- ????????????????????if(dataSize?<=?0){??
- ????????????????????????throw?new?RuntimeException("Data?format?error,something?lost???");??
- ????????????????????}??
- ????????????????????ByteBuffer?body?=?ByteBuffer.allocate(dataSize);??
- ????????????????????while(true){??
- ????????????????????????int?cb?=?channel.read(body);??
- ????????????????????????if(cb?==?-1){??
- ????????????????????????????throw?new?RuntimeException("EOF?error,data?lost!");??
- ????????????????????????}else?if(cb?==?0?&&?this.isFull(body)){??
- ????????????????????????????break;??
- ????????????????????????}??
- ????????????????????}??
- ????????????????????ByteBuffer?tail?=?ByteBuffer.allocate(8);??
- ????????????????????while(true){??
- ????????????????????????int?cb?=?channel.read(tail);??
- ????????????????????????if(cb?==?-1){??
- ????????????????????????????eof?=?true;??
- ????????????????????????}??
- ????????????????????????if(isFull(tail)){??
- ????????????????????????????break;??
- ????????????????????????}??
- ????????????????????}??
- ????????????????????tail.flip();??
- ????????????????????long?sck?=?tail.getLong();??
- ????????????????????Checksum?checksum?=?new?Adler32();??
- ????????????????????checksum.update(body.array(),?0,?dataSize);??
- ????????????????????long?cck?=?checksum.getValue();??
- ????????????????????if(sck?!=?cck){??
- ????????????????????????throw?new?RuntimeException("Sorry,some?data?lost?or?be?modified,please?check!");??
- ????????????????????}??
- ????????????????????body.flip();??
- ????????????????????Packet?packet?=?Packet.wrap(body);??
- ????????????????????System.out.println(packet.getDataAsString());??
- ????????????????????if(eof){??
- ????????????????????????break;??
- ????????????????????}??
- ????????????????}??
- ????????????}catch(Exception?e){??
- ????????????????e.printStackTrace();??
- ????????????}finally{??
- ????????????????if(channel?!=?null){??
- ????????????????????try{??
- ????????????????????????channel.close();??
- ????????????????????}catch(Exception?ex){??
- ????????????????????????ex.printStackTrace();??
- ????????????????????}??
- ????????????????}??
- ????????????????holder.remove(channel);??
- ????????????????semaphore.release();??
- ????????????}??
- ????????}??
- ??????????
- ????????private?boolean?isFull(ByteBuffer?byteBuffer){??
- ????????????return?byteBuffer.position()?==?byteBuffer.capacity()???true?:?false;??
- ????????}??
- ????}??
- ??
- }??
?
?
--End--
example for NIO
原文:http://ixhong.iteye.com/blog/2227865