首页 > 其他 > 详细

example for NIO

时间:2015-07-17 02:15:05      阅读:230      评论:0      收藏:0      [点我收藏+]

特别声明:本文转载自 QING_____

?

NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.

?

为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.

  1. TestMain.java:引导类
  2. ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄.
  3. Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性.
  4. ServerControllor.java:server端连接处理类,负责接收连接和数据处理
  5. ServerHandler.java:server端连接维护类.

TestMain.java:

?

Java代码??bubuko.com,布布扣
  1. package?com.test.web;??
  2. ??
  3. ??
  4. public?class?TestMain?{??
  5. ??
  6. ????/**?
  7. ?????*?@param?args?
  8. ?????*/??
  9. ????public?static?void?main(String[]?args)?throws?Exception{??
  10. ????????int?port?=?30008;??
  11. ????????ServerControllor?sc?=?new?ServerControllor(port);??
  12. ????????sc.start();??
  13. ????????Thread.sleep(2000);??
  14. ????????ClientControllor?cc?=?new?ClientControllor("127.0.0.1",?port);??
  15. ????????cc.start();??
  16. ????????Packet?p1?=?Packet.wrap("Hello,I?am?first!");??
  17. ????????cc.put(p1);??
  18. ????????Packet?p2?=?Packet.wrap("Hello,I?am?second!");??
  19. ????????cc.put(p2);??
  20. ????????Packet?p3?=?Packet.wrap("Hello,I?am?thread!");??
  21. ????????cc.put(p3);??
  22. ??
  23. ????}??
  24. ??
  25. }??

?

?

ClientControllor.java

?

?

Java代码??bubuko.com,布布扣
  1. package?com.test.web;??
  2. ??
  3. import?java.net.InetSocketAddress;??
  4. import?java.net.SocketAddress;??
  5. import?java.nio.ByteBuffer;??
  6. import?java.nio.channels.SocketChannel;??
  7. import?java.util.concurrent.BlockingQueue;??
  8. import?java.util.concurrent.LinkedBlockingQueue;??
  9. import?java.util.zip.Adler32;??
  10. import?java.util.zip.Checksum;??
  11. ??
  12. public?class?ClientControllor?{??
  13. ??
  14. ????private?BlockingQueue<Packet>?inner?=?new?LinkedBlockingQueue<Packet>(100);//no?any?more??
  15. ????private?Object?lock?=?new?Object();??
  16. ????private?InetSocketAddress?remote;??
  17. ????private?Thread?thread?=?new?ClientThread(remote);??
  18. ????public?ClientControllor(String?host,int?port){??
  19. ????????remote?=?new?InetSocketAddress(host,?port);??
  20. ????}??
  21. ??????
  22. ????public?void?start(){??
  23. ????????if(thread.isAlive()?||?remote?==?null){??
  24. ????????????return;??
  25. ????????}??
  26. ????????synchronized?(lock)?{??
  27. ????????????thread.start();??
  28. ????????}??
  29. ??????????????
  30. ??????????
  31. ????}??
  32. ????public?boolean?put(Packet?packet){??
  33. ????????return?inner.offer(packet);??
  34. ????}??
  35. ??????
  36. ????public?void?clear(){??
  37. ????????inner.clear();??
  38. ????}??
  39. ??????
  40. ????class?ClientThread?extends?Thread?{??
  41. ????????SocketAddress?remote;??
  42. ????????SocketChannel?channel;??
  43. ????????ClientThread(SocketAddress?remote){??
  44. ????????????this.remote?=?remote;??
  45. ????????}??
  46. ????????@Override??
  47. ????????public?void?run(){??
  48. ????????????try{??
  49. ????????????????try{??
  50. ????????????????????channel?=?SocketChannel.open();??
  51. ????????????????????channel.configureBlocking(true);??
  52. ????????????????????boolean?isSuccess?=?channel.connect(new?InetSocketAddress(30008));??
  53. ????????????????????if(!isSuccess){??
  54. ????????????????????????while(!channel.finishConnect()){??
  55. ????????????????????????????System.out.println("Client?is?connecting...");??
  56. ????????????????????????}??
  57. ????????????????????}??
  58. ????????????????????System.out.println("Client?is?connected.");??
  59. //??????????????????Selector?selector?=?Selector.open();??
  60. //??????????????????channel.register(selector,?SelectionKey.OP_WRITE);??
  61. //??????????????????while(selector.isOpen()){??
  62. //??????????????????????selector.select();??
  63. //??????????????????????Iterator<SelectionKey>?it?=?selector.selectedKeys().iterator();??
  64. //??????????????????????while(it.hasNext()){??
  65. //??????????????????????????SelectionKey?key?=?it.next();??
  66. //??????????????????????????it.remove();??
  67. //??????????????????????????if(!key.isValid()){??
  68. //??????????????????????????????continue;??
  69. //??????????????????????????}??
  70. //??????????????????????????if(key.isWritable()){??
  71. //??????????????????????????????write();??
  72. //??????????????????????????}??
  73. //??????????????????????}??
  74. //??????????????????}??
  75. ????????????????????while(channel.isOpen()){??
  76. ????????????????????????write();??
  77. ????????????????????}??
  78. ????????????????}catch(Exception?e){??
  79. ????????????????????e.printStackTrace();??
  80. ????????????????}finally{??
  81. ????????????????????if(channel?!=?null){??
  82. ????????????????????????try{??
  83. ????????????????????????????channel.close();??
  84. ????????????????????????}catch(Exception?ex){??
  85. ????????????????????????????ex.printStackTrace();??
  86. ????????????????????????}??
  87. ????????????????????}??
  88. ????????????????}??
  89. ????????????}catch(Exception?e){??
  90. ????????????????e.printStackTrace();??
  91. ????????????????inner.clear();??
  92. ????????????}??
  93. ????????}??
  94. ??????????
  95. ????????private?void?write()?throws?Exception{??
  96. ????????????Packet?packet?=?inner.take();??
  97. ????????????synchronized?(lock)?{??
  98. ????????????????ByteBuffer?body?=?packet.getBuffer();//??
  99. ????????????????ByteBuffer?head?=?ByteBuffer.allocate(4);??
  100. ????????????????head.putInt(body.limit());??
  101. ????????????????head.flip();??
  102. ????????????????while(head.hasRemaining()){??
  103. ????????????????????channel.write(head);??
  104. ????????????????}??
  105. ????????????????Checksum?checksum?=?new?Adler32();??
  106. ????????????????while(body.hasRemaining()){??
  107. ????????????????????checksum.update(body.get());??
  108. ????????????????}??
  109. ????????????????body.rewind();??
  110. ????????????????while(body.hasRemaining()){??
  111. ????????????????????channel.write(body);??
  112. ????????????????}??
  113. ????????????????long?cks?=?checksum.getValue();??
  114. ????????????????ByteBuffer?tail?=?ByteBuffer.allocate(8);??
  115. ????????????????tail.putLong(cks);??
  116. ????????????????tail.flip();??
  117. ????????????????while(tail.hasRemaining()){??
  118. ????????????????????channel.write(tail);??
  119. ????????????????}??
  120. ????????????}??
  121. ??????????????
  122. ????????}??
  123. ????}??
  124. }??

?

?

Handler.java(接口,面向设计):

?

Java代码??bubuko.com,布布扣
  1. package?com.test.web;??
  2. ??
  3. import?java.nio.channels.SocketChannel;??
  4. ??
  5. public?interface?Handler?{??
  6. ??
  7. ????public?void?handle(SocketChannel?channel);??
  8. }??

?

?

Packet.java

?

Java代码??bubuko.com,布布扣
  1. package?com.test.web;??
  2. ??
  3. import?java.io.Serializable;??
  4. import?java.nio.ByteBuffer;??
  5. import?java.nio.charset.Charset;??
  6. ??
  7. public?class?Packet?implements?Serializable?{??
  8. ??
  9. ????/**?
  10. ?????*??
  11. ?????*/??
  12. ????private?static?final?long?serialVersionUID?=?7719389291885063462L;??
  13. ??????
  14. ????private?ByteBuffer?buffer;??
  15. ??????
  16. ????private?static?Charset?charset?=?Charset.defaultCharset();??
  17. ??????
  18. ????private?Packet(ByteBuffer?buffer){??
  19. ????????this.buffer?=?buffer;??
  20. ????}??
  21. ??????
  22. ????public?String?getDataAsString(){??
  23. ????????return?charset.decode(buffer).toString();??
  24. ????}??
  25. ??????
  26. ????public?byte[]?getData(){??
  27. ????????return?buffer.array();??
  28. ????}??
  29. ??????
  30. ????public?ByteBuffer?getBuffer(){??
  31. ????????return?this.buffer;??
  32. ????}??
  33. ??????
  34. ??????
  35. ????public?static?Packet?wrap(ByteBuffer?buffer){??
  36. ????????return?new?Packet(buffer);??
  37. ????}??
  38. ??????
  39. ????public?static?Packet?wrap(String?data){??
  40. ????????ByteBuffer?source?=?charset.encode(data);??
  41. ????????return?new?Packet(source);??
  42. ????}??
  43. }??

?

?

ServerControllor.java

?

Java代码??bubuko.com,布布扣
  1. package?com.test.web;??
  2. ??
  3. import?java.net.InetSocketAddress;??
  4. import?java.nio.channels.SelectionKey;??
  5. import?java.nio.channels.Selector;??
  6. import?java.nio.channels.ServerSocketChannel;??
  7. import?java.nio.channels.SocketChannel;??
  8. import?java.util.Iterator;??
  9. ??
  10. public?class?ServerControllor?{??
  11. ????private?int?port;??
  12. ????private?Thread?thread?=?new?ServerThread();;??
  13. ????private?Object?lock?=?new?Object();??
  14. ????public?ServerControllor(){??
  15. ????????this(0);??
  16. ????}??
  17. ????public?ServerControllor(int?port){??
  18. ????????this.port?=?port;??
  19. ????}??
  20. ??????
  21. ????public?void?start(){??
  22. ????????if(thread.isAlive()){??
  23. ????????????return;??
  24. ????????}??
  25. ????????synchronized?(lock)?{??
  26. ????????????thread.start();??
  27. ????????????System.out.println("Server?starting....");??
  28. ????????}??
  29. ????}??
  30. ??????
  31. ??????
  32. ????class?ServerThread?extends?Thread?{??
  33. ????????private?static?final?int?TIMEOUT?=?3000;??
  34. ????????private?ServerHandler?handler?=?new?ServerHandler();??
  35. ????????@Override??
  36. ????????public?void?run(){??
  37. ????????????try{??
  38. ????????????????ServerSocketChannel?channel?=?null;??
  39. ????????????????try{??
  40. ????????????????????channel?=?ServerSocketChannel.open();??
  41. ????????????????????channel.configureBlocking(false);??
  42. ????????????????????channel.socket().setReuseAddress(true);??
  43. ????????????????????channel.socket().bind(new?InetSocketAddress(port));??
  44. ????????????????????Selector?selector?=?Selector.open();??
  45. ????????????????????channel.register(selector,?SelectionKey.OP_ACCEPT);??
  46. ????????????????????while(selector.isOpen()){??
  47. ????????????????????????System.out.println("Server?is?running,port:"?+?channel.socket().getLocalPort());??
  48. ????????????????????????if(selector.select(TIMEOUT)?==?0){??
  49. ????????????????????????????continue;??
  50. ????????????????????????}??
  51. ????????????????????????Iterator<SelectionKey>?it?=?selector.selectedKeys().iterator();??
  52. ????????????????????????while(it.hasNext()){??
  53. ????????????????????????????SelectionKey?key?=?it.next();??
  54. ????????????????????????????it.remove();??
  55. ????????????????????????????if(!key.isValid()){??
  56. ????????????????????????????????continue;??
  57. ????????????????????????????}??
  58. ????????????????????????????if(key.isAcceptable()){??
  59. ????????????????????????????????accept(key);??
  60. ????????????????????????????}else?if(key.isReadable()){??
  61. ????????????????????????????????read(key);??
  62. ????????????????????????????}??
  63. ????????????????????????}??
  64. ????????????????????}??
  65. ????????????????}catch(Exception?e){??
  66. ????????????????????e.printStackTrace();??
  67. ????????????????}finally{??
  68. ????????????????????if(channel?!=?null){??
  69. ????????????????????????try{??
  70. ????????????????????????????channel.close();??
  71. ????????????????????????}catch(Exception?ex){??
  72. ????????????????????????????ex.printStackTrace();??
  73. ????????????????????????}??
  74. ????????????????????}??
  75. ????????????????}??
  76. ????????????}catch(Exception?e){??
  77. ????????????????e.printStackTrace();??
  78. ????????????}??
  79. ????????}??
  80. ??????????
  81. ????????private?void?accept(SelectionKey?key)?throws?Exception{??
  82. ????????????SocketChannel?socketChannel?=?((ServerSocketChannel)?key.channel()).accept();??
  83. ????????????socketChannel.configureBlocking(true);??
  84. ????????????//socketChannel.register(key.selector(),?SelectionKey.OP_READ);??
  85. ????????????handler.handle(socketChannel);??
  86. ????????}??
  87. ??????????
  88. ????????private?void?read(SelectionKey?key)?throws?Exception{??
  89. ????????????SocketChannel?channel?=?(SocketChannel)key.channel();??
  90. ????????????//handler.handle(channel);??
  91. ????????}??
  92. ????}??
  93. }??

?

?

ServerHandler.java

?

?

Java代码??bubuko.com,布布扣
  1. package?com.test.web;??
  2. ??
  3. import?java.nio.ByteBuffer;??
  4. import?java.nio.channels.SocketChannel;??
  5. import?java.util.HashMap;??
  6. import?java.util.Map;??
  7. import?java.util.concurrent.Semaphore;??
  8. import?java.util.zip.Adler32;??
  9. import?java.util.zip.Checksum;??
  10. ??
  11. class?ServerHandler?implements?Handler?{??
  12. ??
  13. ????private?static?Semaphore?semaphore?=?new?Semaphore(Runtime.getRuntime().availableProcessors()?+?1);??
  14. ??????
  15. ????private?static?Map<SocketChannel,Thread>?holder?=?new?HashMap<SocketChannel,Thread>(32);??
  16. ??????
  17. ????@Override??
  18. ????public?void?handle(SocketChannel?channel)?{??
  19. ????????synchronized?(holder)?{??
  20. ????????????if(holder.containsKey(channel)){??
  21. ????????????????return;??
  22. ????????????}??
  23. ????????????Thread?t?=?new?ReadThread(channel);??
  24. ????????????holder.put(channel,?t);??
  25. ????????????t.start();??
  26. ????????}??
  27. ????}??
  28. ??????
  29. ??????
  30. ????static?class?ReadThread?extends?Thread{??
  31. ????????SocketChannel?channel;??
  32. ????????ReadThread(SocketChannel?channel){??
  33. ????????????this.channel?=?channel;??
  34. ????????}??
  35. ????????@Override??
  36. ????????public?void?run(){??
  37. ????????????try{??
  38. ????????????????semaphore.acquire();??
  39. ????????????????boolean?eof?=?false;??
  40. ????????????????while(channel.isOpen()){??
  41. ????????????????????//ByteBuffer?byteBuffer?=?new?ByteBuffer(1024);??
  42. ????????????????????ByteBuffer?head?=?ByteBuffer.allocate(4);//int?for?data-size??
  43. ????????????????????while(true){??
  44. ????????????????????????int?cb?=?channel.read(head);??
  45. ????????????????????????if(cb?==?-1){??
  46. ????????????????????????????throw?new?RuntimeException("EOF?error,data?lost!");??
  47. ????????????????????????}??
  48. ????????????????????????if(isFull(head)){??
  49. ????????????????????????????break;??
  50. ????????????????????????}??
  51. ????????????????????}??
  52. ????????????????????head.flip();??
  53. ????????????????????int?dataSize?=?head.getInt();??
  54. ????????????????????if(dataSize?<=?0){??
  55. ????????????????????????throw?new?RuntimeException("Data?format?error,something?lost???");??
  56. ????????????????????}??
  57. ????????????????????ByteBuffer?body?=?ByteBuffer.allocate(dataSize);??
  58. ????????????????????while(true){??
  59. ????????????????????????int?cb?=?channel.read(body);??
  60. ????????????????????????if(cb?==?-1){??
  61. ????????????????????????????throw?new?RuntimeException("EOF?error,data?lost!");??
  62. ????????????????????????}else?if(cb?==?0?&&?this.isFull(body)){??
  63. ????????????????????????????break;??
  64. ????????????????????????}??
  65. ????????????????????}??
  66. ????????????????????ByteBuffer?tail?=?ByteBuffer.allocate(8);//int?for?data-size??
  67. ????????????????????while(true){??
  68. ????????????????????????int?cb?=?channel.read(tail);??
  69. ????????????????????????if(cb?==?-1){??
  70. ????????????????????????????eof?=?true;??
  71. ????????????????????????}??
  72. ????????????????????????if(isFull(tail)){??
  73. ????????????????????????????break;??
  74. ????????????????????????}??
  75. ????????????????????}??
  76. ????????????????????tail.flip();??
  77. ????????????????????long?sck?=?tail.getLong();??
  78. ????????????????????Checksum?checksum?=?new?Adler32();??
  79. ????????????????????checksum.update(body.array(),?0,?dataSize);??
  80. ????????????????????long?cck?=?checksum.getValue();??
  81. ????????????????????if(sck?!=?cck){??
  82. ????????????????????????throw?new?RuntimeException("Sorry,some?data?lost?or?be?modified,please?check!");??
  83. ????????????????????}??
  84. ????????????????????body.flip();??
  85. ????????????????????Packet?packet?=?Packet.wrap(body);??
  86. ????????????????????System.out.println(packet.getDataAsString());??
  87. ????????????????????if(eof){??
  88. ????????????????????????break;??
  89. ????????????????????}??
  90. ????????????????}??
  91. ????????????}catch(Exception?e){??
  92. ????????????????e.printStackTrace();??
  93. ????????????}finally{??
  94. ????????????????if(channel?!=?null){??
  95. ????????????????????try{??
  96. ????????????????????????channel.close();??
  97. ????????????????????}catch(Exception?ex){??
  98. ????????????????????????ex.printStackTrace();??
  99. ????????????????????}??
  100. ????????????????}??
  101. ????????????????holder.remove(channel);??
  102. ????????????????semaphore.release();??
  103. ????????????}??
  104. ????????}??
  105. ??????????
  106. ????????private?boolean?isFull(ByteBuffer?byteBuffer){??
  107. ????????????return?byteBuffer.position()?==?byteBuffer.capacity()???true?:?false;??
  108. ????????}??
  109. ????}??
  110. ??
  111. }??

?

?

--End--

example for NIO

原文:http://ixhong.iteye.com/blog/2227865

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!