AIO:异步非阻塞IO,当系统中有IO操作后,会产生一个单独的线程,由它将所有需要处理的事情交由本地操作系统来完成,操作系统处理完成后,再将结果返回即可。
以常用的烧水问题来举例,服务器让A去烧水,A在水壶上装了一个开关,这个开关当水烧开时会通知A水已经烧好,再由A来处理,这个开关主要就是通过回调函数来实现。
在AIO中,主要通过使用CompletionHandler来完成,CompletionHandler是用于消除异步I / O操作结果的处理程序。
CompletionHandler有两个接口方法completed与failed
void completed(V result, A attachment)
result
- I / O操作的结果。attachment
- 启动时附加到I / O操作的对象。
void failed(Throwable exc, A attachment)
exc
- 表示I / O操作失败的原因的例外attachment
- 启动时附加到I / O操作的对象。
以下是学习过程中找的AIO相关实现代码,在此做个记录:
服务端:
package com.learn.aio.server; import com.study.info.HostInfo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; class EchoHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private boolean exit; public EchoHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer result, ByteBuffer byteBuffer) { byteBuffer.flip(); // 读取之前先重置 String read = new String(byteBuffer.array(), 0, byteBuffer.remaining()).trim(); String outMsg = "【Echo】" + read; // 回应的信息 if ("byebye".equalsIgnoreCase(read)) { outMsg = "服务已断开,拜拜"; this.exit = true; } this.echoWrite(outMsg); } private void echoWrite(String content){ ByteBuffer buffer = ByteBuffer.allocate(100); buffer.put(content.getBytes()); buffer.flip(); this.clientChannel.write(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){ @Override public void completed(Integer result, ByteBuffer buf) { if (buf.hasRemaining()){ EchoHandler.this.clientChannel.write(buffer, buffer, this); }else { if(EchoHandler.this.exit == false){ ByteBuffer readBuffer = ByteBuffer.allocate(100); EchoHandler.this.clientChannel.read(readBuffer, readBuffer, new EchoHandler(EchoHandler.this.clientChannel)); } } } @Override public void failed(Throwable exc, ByteBuffer buf) { try { EchoHandler.this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer byteBuffer) { try { this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } // 连接接收的回调处理 class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServerThread> { @Override public void completed(AsynchronousSocketChannel channel, AIOServerThread aioServerThread) { aioServerThread.getServerChannel().accept(aioServerThread, this); // 接收连接 ByteBuffer byteBuffer = ByteBuffer.allocate(100); channel.read(byteBuffer, byteBuffer, new EchoHandler(channel)); } @Override public void failed(Throwable exc, AIOServerThread aioServerThread) { System.out.println("客户端连接创建失败...."); aioServerThread.getLatch().countDown(); } } //设置服务器处理线程 class AIOServerThread implements Runnable { private AsynchronousServerSocketChannel serverChannel = null; //服务器通道 private CountDownLatch latch = null; //同步处理操作层 public AIOServerThread() throws Exception { this.latch = new CountDownLatch(1); this.serverChannel = AsynchronousServerSocketChannel.open(); //打开服务器通道 this.serverChannel.bind(new InetSocketAddress(HostInfo.PORT)); System.out.println("服务已启动,监听端口为:" + HostInfo.PORT); } public AsynchronousServerSocketChannel getServerChannel() { return serverChannel; } public CountDownLatch getLatch() { return latch; } @Override public void run() { this.serverChannel.accept(this, new AcceptHandler()); try { this.wait(); // 线程等待 } catch (InterruptedException e) { e.printStackTrace(); } } } public class AIOEchoServer { public static void main(String[] args) throws Exception{ new Thread(new AIOServerThread()).start(); } }
客户端:
package com.learn.aio.client; import com.study.info.HostInfo; import com.study.util.InputUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); String readMsg = new String(buffer.array(), 0, buffer.remaining()); System.out.println(readMsg); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } this.latch.countDown(); } } class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer>{ private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()){ this.clientChannel.write(buffer, buffer, this); }else { ByteBuffer readBuffer = ByteBuffer.allocate(100); this.clientChannel.read(readBuffer, readBuffer, new ClientReadHandler(this.clientChannel, this.latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } this.latch.countDown(); } } class AIOClientThread implements Runnable { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AIOClientThread() throws Exception{ this.clientChannel = AsynchronousSocketChannel.open(); this.clientChannel.connect(new InetSocketAddress(HostInfo.HOST_NAME, HostInfo.PORT)); this.latch = new CountDownLatch(1); } @Override public void run() { try { this.latch.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } public boolean sendMessage(String msg){ ByteBuffer byteBuffer = ByteBuffer.allocate(100); byteBuffer.put(msg.getBytes()); byteBuffer.flip(); this.clientChannel.write(byteBuffer, byteBuffer, new ClientWriteHandler(this.clientChannel, this.latch)); if("byebye".equalsIgnoreCase(msg)){ return false; } return true; } } public class AIOEchoClient { public static void main(String[] args) throws Exception{ AIOClientThread clientThread = new AIOClientThread(); new Thread(clientThread).start(); while (clientThread.sendMessage(InputUtil.getString("请输入要发送的内容:"))){ } } }
其他:
package com.study.util; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class InputUtil { private static final BufferedReader KEYBOARD_INPUT = new BufferedReader(new InputStreamReader(System.in)); private InputUtil(){ } public static String getString(String prompt){ boolean flag = true; //数据接受标记 String str = null; while (flag){ System.out.println(prompt); try { str = KEYBOARD_INPUT.readLine(); // 读取一行数据 if(str == null || "".equals(str)){ System.out.println("数据输入错误,不允许为空!"); }else { flag = false; } } catch (IOException e) { e.printStackTrace(); } } return str; } }
package com.study.info; public interface HostInfo { public static final String HOST_NAME = "localhost"; public static final int PORT = 9999; }
原文:https://www.cnblogs.com/shlearn/p/11706208.html