AIO实现非阻塞通信
java7 NIO2 提供了异步Channel支持,这种异步Channel可以提供更高效的IO,这种基于异步Channel的IO被称为异步IO(Asynchronous IO)
IO操作分为两步:1、程序发出IO请求 2、完成实际的IO操作
阻塞和非阻塞IO是根据第一步划分的:
发出IO请求如果阻塞线程则是阻塞IO,如果不阻塞线程,则是非阻塞IO。
同步IO和异步IO是根据第二步划分:
如果实际的IO操作是由操作系统完成,再将结果返回给应用程序,这就是异步IO。
如果实际的IO需要应用程序本身去执行,会阻塞线程,那就是同步IO。
(java传统的IO操作和基于Channel的非阻塞IO都是同步IO)
NIO2提供了一系列以Asynchronous开头的Channel接口和类。
其中AsynchronousSocketChannel、AsynchronousServerSocketChannel是支持TCP通信的异步Channel。
AsynchronousServerSocketChannel:负责监听的Channel,与ServerSocketChannel相似。
AsynchronousServerSocketChannel使用需要三步:
1)调用open()静态方法创建AsynchronousServerSocketChannel实例
2)调用AsynchronousServerSocketChannel的bind()方法让他在指定IP,端口监听。
3)调用AsynchronousServerSocketChannel的accept()方法接收连接请求。
AsynchronousSocketChannel:与SocketChannel类似,执行具体的IO操作
AsynchronousSocketChannel的用法也可以分为三步:
1)调用Open()静态方法创建AsynchronousSocketChannel实例
2)调用AsynchronousSocketChannel的connect()方法让他在指定IP,端口服务器。
3)调用AsynchronousSocketChannel的read()、write()方法进行读写。
AsynchronousServerSocketChannel、AsynchronousSocketChannel都允许使用线程池管理,open()方法创建对应实例时都可以传入AsynchronousChannelGroup。AsynchronousChannelGroup创建需要传入一个线程池ExecutorService。
AsynchronousServerSocketChannel的accept()方法、AsynchronousSocketChannel的read()、write()方法都有两个版本
1)返回Future对象版本:必须等到Future的get方法返回时IO操作才完成,get方法会阻塞线程的。
2)需要传入CompletionHandler版本:通过ComplctionHandler完成相关操作。
CompletionHandler是一个接口,该接口中定义了两个方法:
completed(V result,A attachment):当IO操作完成时触发该方法,第一参数表示IO操作返回的参数;第二个参数表示发起IO操作时传入的附加参数。
failed(Trowable exc,A attachment):当IO操作失败事触发该方法,第一参数表示异常信息,,第二个参数表示发起IO操作时传入的附加参数。
下面使用Future对象版本实现简单的AIO服务端、客户端通信:
package net; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; public class SimpleAIOServer { static final int PORT = 30000; public static void main(String[] args) throws Exception { try (//创建AsynchronousServerSocketChannel实例 AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();) { serverSocketChannel.bind(new InetSocketAddress(PORT)); while(true) { //采用循环接收客户端的连接 Future<AsynchronousSocketChannel> future = serverSocketChannel.accept(); //获取连接后返回AsynchronousSocketChannel AsynchronousSocketChannel socketChannel = future.get(); //向客户端输出数据 Future<Integer> future1 =socketChannel.write(ByteBuffer.wrap("AIO HELLO". getBytes("UTF-8"))); future1.get(); } } catch (IOException e) { e.printStackTrace(); } } } package net; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.Charset; import java.util.concurrent.Future; public class SimpleAIOClient { static final int PORT = 30000; public static void main(String[] args) throws Exception { //用户读取数据的Buffer ByteBuffer buff = ByteBuffer.allocate(1024); Charset charset = Charset.forName("UTF-8"); try(//创建AsynchronousSocketChannel实例 AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();) { //连接到远程服务器 Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT)); future.get(); buff.clear(); //socketChannel中读取数据 Future<Integer> future1 = socketChannel.read(buff); future1.get(); buff.flip(); String content = charset.decode(buff).toString(); System.out.println("服务器:" + content); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
结果:
服务器:AIO HELLO
AIO实现多人聊天
package net; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class AIOServer { static final int PORT = 30000; static List<AsynchronousSocketChannel> channelList = new ArrayList<>(); public void init() throws IOException { //创建一个线程池 ExecutorService executor = Executors.newFixedThreadPool(20); //以指定线程池创建分组管理器 AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor); //以线程池创建AsynchronousServerSocketChannel AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup); //绑定端口 serverSocketChannel.bind(new InetSocketAddress(PORT)); //使用CompletionHandler处理客户端连接请求,此处的Handler主要处理客户端连接请求 serverSocketChannel.accept(null, new AcceptHandler(serverSocketChannel)); } public static void main(String[] args) throws Exception { AIOServer aioServer = new AIOServer(); aioServer.init(); Thread.sleep(Integer.MAX_VALUE); //不让服务器停止 while(true) {} } }
package net; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.util.concurrent.ExecutionException; public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object>{ private AsynchronousServerSocketChannel serverSocketChannel = null; public AcceptHandler(AsynchronousServerSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel ; } //定义一个Buffer准备读取数据 ByteBuffer buff = ByteBuffer.allocate(1024); Charset charset = Charset.forName("UTF-8"); //当IO操作完成时触发该方法 @Override public void completed(final AsynchronousSocketChannel socketChannel, Object attachment) { //记录新进来的Channel AIOServer.channelList.add(socketChannel); //准备接收客户端的下一次连接 serverSocketChannel.accept(null, this); //读取客户端数据,此处的Handler主要处理读取客户数据 socketChannel.read(buff, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { buff.flip(); //将Buffer中的数据转换成字符串 String content = charset.decode(buff).toString(); //将客户端发来的数据 发送到么每个客户端 for(AsynchronousSocketChannel asc : AIOServer.channelList) { try { asc.write(ByteBuffer.wrap(content.getBytes(charset))).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } //清空buff容器,用户读取下一次数据 buff.clear(); } //当IO操作失败事触发该方法 @Override public void failed(Throwable exc, Object attachment) { System.out.println("读取数据失败:"+ exc); //读取数据失败,客户端出问题,移除对应的channel AIOServer.channelList.remove(socketChannel); } }); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("连接失败:"+exc); } }
package net; import java.awt.BorderLayout; import java.awt.event.ActionEvent; import java.awt.event.InputEvent; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.swing.AbstractAction; import javax.swing.Action; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.JTextField; import javax.swing.KeyStroke; public class AIOClient { static final int PORT = 30000; //与服务器通信的异步Channel AsynchronousSocketChannel socketChannel = null; JFrame mainWin = new JFrame("多人聊天"); JTextArea jta = new JTextArea(16,48); JTextField jtf = new JTextField(40); JButton sendBtn = new JButton("发送"); public void init() { mainWin.setLayout(new BorderLayout()); jta.setEditable(false); mainWin.add(new JScrollPane(jta),BorderLayout.CENTER); JPanel jp = new JPanel(); jp.add(jtf); jp.add(sendBtn); @SuppressWarnings("serial") Action sendAction = new AbstractAction() { @Override public void actionPerformed(ActionEvent e) { String content = jtf.getText(); if(content.trim().length() > 0) { //将输入内容写到channel中 try { socketChannel.write(ByteBuffer. wrap(content.getBytes(StandardCharsets.UTF_8))).get(); } catch (InterruptedException | ExecutionException e1) { e1.printStackTrace(); } } jtf.setText(""); } }; sendBtn.addActionListener(sendAction); jtf.getInputMap().put(KeyStroke.getKeyStroke(‘\n‘, InputEvent.CTRL_MASK), "send"); jtf.getActionMap().put("send", sendAction); mainWin.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); mainWin.add(jp, BorderLayout.SOUTH); mainWin.pack(); mainWin.setVisible(true); } public void connect() throws Exception { //用于读取数据的buffer ByteBuffer buff = ByteBuffer.allocate(1024); //创建一个线程池 ExecutorService executor = Executors.newFixedThreadPool(80); //以指定线程池创建分组管理器 AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor); //以分组管理器创建AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(channelGroup); //连接服务器 socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT)).get(); jta.append("***与服务器连接成功***\n"); socketChannel.read(buff, null, new CompletionHandler<Integer, Object>() { @Override public void completed(Integer result, Object attachment) { buff.flip(); //将Buffer转换成字符串 String content = StandardCharsets.UTF_8.decode(buff).toString(); //显示从服务器读取的数据 jta.append("某人说:"+content+"\n"); buff.clear(); //为下一次读取数据做准备 socketChannel.read(buff, null, this); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("读取数据失败"+exc); } }); } public static void main(String[] args) throws Exception { AIOClient aioClient = new AIOClient(); aioClient.init(); aioClient.connect(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package net; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.Charset; import java.util.concurrent.Future; public class SimpleAIOClient { static final int PORT = 30000 ; public static void main(String[] args) throws Exception { //用户读取数据的Buffer ByteBuffer buff = ByteBuffer.allocate( 1024 ); Charset charset = Charset.forName( "UTF-8" ); try ( //创建AsynchronousSocketChannel实例 AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();) { //连接到远程服务器 Future<Void> future = socketChannel.connect( new InetSocketAddress( "127.0.0.1" , PORT)); future.get(); buff.clear(); //socketChannel中读取数据 Future<Integer> future1 = socketChannel.read(buff); future1.get(); buff.flip(); String content = charset.decode(buff).toString(); System.out.println( "服务器:" + content); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
结果:
原文:https://www.cnblogs.com/jnba/p/10636828.html