NIO:非阻塞I/O
ServerSocketChannel和SocketChannel
Buffer类:缓存区;在NIO库汇总,所有数据都用缓存区处理。在读取数据时,它是直接读到缓冲区中;在写入数据时,它是先写到缓冲区中。任何时候访问NIO的数据,都是通过缓冲区进行操作。
缓冲区实质上是一个数组。通常它是一个自己数组
------
通道:Channel(t?æn?)
网络数据通过channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向移动(一个流必须是inputStream或者outputStream的子类);
------
多路复用器selector
多路复用器提供选择已经就绪的任务的能力。简单来说,selector会不断的轮询注册在其上的channel,如果某个channel上面发送读或写事件,这个channel就处于就绪状态,会被selector轮询出来,通过selectionkeykey获取就绪channel的集合,进行后续的I/O操作。
一个多路复用器selector可以同时轮询多个channel,由于jdk使用epoll()方法代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责selector的轮询,就可以接入成千上万的客户端。
----------
NIO服务端通信序列图
NioServer第一步:
1、打开serversocketChannel
2、绑定监听地址InetSocketAddress
3、创建selector,启动线程
4、将serversocketChannel注册到selector,监听
5、selector轮询就绪的key;
6、handleaccept()处理新的客户端接入
7、设置新建客户端连接的socket参数
8、向selector注册监听读操作,selectionkey.op_read
9、handleRead()异步读请求消息到byteBuffer
10、decode请求消息
11、异步写byteBuffer到sockechannel
使用java.nio开发,需要复杂的步骤;使用netty框架,简化开发步骤,netty框架封装了相关操作步骤。
注:nio的非阻塞开发在java网络编程中有例子
这里描述的是使用netty框架编写的服务端与客户端
未考虑拆包和粘包(未加入编码器)
服务端
public class TimeServer {
public void bind(int port) throws Exception{
//配置服务端的NIO线程组,ChildChannelHandler专门用于处理网络事件
//一个用于服务端接受客户端的连接,一个用于进行socketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//用于启动NIO服务端的辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)//功能是对应与JDKNIO中的ServerSocketChannel
.option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数
.childHandler(new ChildChannelHandler());//绑定IO事件处理类,主要用于处理网络IO事件,例如记录日志、对消息进行编码等
//绑定端口,同步等待成功,sync()同步阻塞方法
ChannelFuture f = b.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
//优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8088;
if(args != null && args.length != 0){
try {
port = Integer.valueOf(0);
} catch (Exception e) {
}
}
new TimeServer().bind(port);
}
}
服务端处理TimeServerHandler
public class TimeServerHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//类型转换,ByteBuf相当于java.nio.ByteBuffer
ByteBuf buf = (ByteBuf) msg;
//buf.readableBytes()获取缓冲区中的字节数
byte[] req = new byte[buf.readableBytes()];
//将缓冲区中的字节复制到字节数组中
buf.readBytes(req);
//通过string构造方法,获取请求消息
String body = new String(req,"UTF-8");
System.out.println("the time server receive order:"+body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new
java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
//将消息放入缓冲区中,再调用flush方法发送,
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发送异常,关闭ChannelHandlerContext
ctx.close();
}
}
客户端
public class TimeClient {
public void connect (int port,String host) throws Exception{
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//启动客户端辅助类
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)//Tcop参数
.handler(new ChannelInitializer<SocketChannel>() {
@Override//匿名内部类,处理IO网络事件TimeClientHandler
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host,port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] args){
int port = 8088;
if(args != null && args.length > 0 ){
port = Integer.valueOf(0);
}
try {
new TimeClient().connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端处理类
public class TimeClientHandler extends ChannelHandlerAdapter{
private final ByteBuf firstMessage;
private byte[] req;
private int counter;
public TimeClientHandler(){
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
//req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当客户端和服务端TCP链路建立成功之之后,Netty会调用channelActive方法
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("Now is:" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}
原文:https://www.cnblogs.com/lazyli/p/10816428.html