首页 > 编程语言 > 详细

Netty章节十六:Java NIO

时间:2020-06-22 18:25:47      阅读:51      评论:0      收藏:0      [点我收藏+]

Java NIO

概念

  • java.io 最为核心的一个概念是流(Stream),面向流的编程。Java中一个流要么是输入流,要么是输出流不可能同时既是输入流又是输出流
  • java.nio 中拥有3个核心概念:SelectorChannelBuffer。在java.nio中是面向块(block)或是缓冲区(buffer)编程的。Buffer本身就是一块内存,底层实现上,它实际上是个数组,数据的读、写都是通过Buffer来实现的。

NIO

除了数组之外,Buffer还提供了对数据的结构化访问方式,并且可以追踪到系统的读写过程(读和写在底层都是通过相应的一些标示来判断读、写到什么位置,以及flip()翻转之后应该从什么位置开始读、可以读到什么地方,系统都可以准确的定位到)。

Java中的七种原声数据类型都有各自对应的Buffer类型,如IntBuffer、LongBuffer 等等,并没有BooleanBuffer类型

Channel指的是可以向其写入数据或是从中读取数据的对象,它类似于java.io中的Stream

所有数据的读写都是通过Buffer来进行的永远不会出现直接向Channel写入数据的情况,或是直接从Channel读取数据的情况。

与Stream不同的是,Channel是双向的,一个流只可能是InputStream或是OutputStream,Channel打开后则可以进行读取、写入或是读写

由于CHannel是双向的,因此它能更好地反映底层操作系统的真实情况,例如Linux系统中,底层操作系统的通道就是双向的

Nio Buffer中状态的关系

0 <= mark <= position <= limit <= capacity

通过Nio读取文件涉及到3个步骤:

  1. 从FileInputStream获取到FileChannel对象
  2. 创建Buffer对象
  3. 将数据从Channel读取到Buffer中

Buffer中状态详解

mark

mark 定义标记 默认为-1标记当前下标,然后可以调用reset()方法回到mark标记的位置,如果未设置mark还调用reset()方法时会抛出一个InvalidMarkException异常mark不能大于position,必须是position操作过的位置,也不能小于0

position

当前正在读或者写的位置
position 当前要操作的位置的下标(底层数组的下标,不同的实现有不同类型的数组名为 hb)

  • 在从通道读取时:您将所读取的数据放到底层的数组中。position 变量跟踪已经写了多少数据。 更准确地说,它指定了下一个字节将放到数组的哪一个元素中。因此,如果您从通道中读三个字节到缓冲区中,那么缓冲区的 position 将会设置为3,指向数组中第四个元素。

  • 在写入通道时:您是从缓冲区中获取数据。position 值跟踪从缓冲区中获取了多少数据。更准确地说,它指定下一个字节来自数组的哪一个元素。因此如果从缓冲区写了5个字节到通道中,那么缓冲区的 position 将被设置为5,指向数组的第六个元素,下一次要使用的元素的下标。

limit

当前可以读/写的最大的位置
limit Buffer的最大存储数量 永远不会为负
变量表明还有多少数据需要取出(在从缓冲区写入通道时),
或者还有多少空间可以放入数据(在从通道读入缓冲区时)。
从缓冲区写入通道时 limit 表示当前缓冲区内 可用数据/put的数量 的最大容量
从通道读入缓冲区时 limit 表示当前缓冲区可存入数据的最大容量,底层数组.length的值
position 总是小于或者等于 limit。

capacity

capacity
缓冲区的 capacity 表明可以储存在缓冲区中的最大数据容量。实际上,它指定了底层数组的大小 或者至少是指定了准许我们使用的底层数组的容量。
缓冲区初始化时定义的大小,永远不会变化,并且不会为负
limit 决不能大于 capacity。

最后一个元素的下一个元素

NIO 中 ByteBuffer的不同实现

DirectByteBuffer与HeapByteBuffer

DirectByteBuffer 直接内存模型

  • DirectByteBuffer是处于Java的内存模型中的(堆内存中),而存储数据的地方则是由底层的native方法向操作系统申请的一个堆外内存来使用

  • address 表示的是c/c++向操作系统申请的堆外内存当中真正分配的数据的地址,则可以通过address直接找到堆外的数据

如果使用HeapByteBuffer底层运转:
HeapByteBuffer以及它封装的底层数据(字节数组)都是存在于Java堆上的,(I/O需要与操作系统打交道)然而对与操作系统来说并不是直接操作HeapByteBuffer在堆上封装的字节数组(数据存放对象)。而是在Java内存模型外面重新开辟一块内存区域(存在于操作系统级别中),比如进行数据的写入 它会先将Java堆上的HeapByteBuffer里面的字节数组的内容拷贝到处于Java内存模型外的新开辟的内存空间中的某个区域,然后再把新拷贝到操作系统的内存空间中的内容拿出来,然后直接对I/O操作,进行数据的读写。

  • 如果是使用HeapByteBuffer(间接缓冲)进行I/O,则是多了一次数据拷贝过程

  • 如果使用的是DirectByteBuffer的话,在Java堆上就不会存在一个字节数组(用于数据存储)了,因为真正的数据本身就存在堆外了。如果进行读写的话,操作系统则会直接对堆外的内存区域进行操作进行I/O读写。(被称为 Zero-copy(零拷贝))

疑问?

为什么操作系统不直接操作存在与Java堆上的字节数组,从而减少一次Java堆数据拷贝到操作系统中的操作?

  • 如果操作系统直接操作堆内的数据则需要通过JNI的方式进行操作(通过JNI访问的内存区域地址一定是已经确认的才能访问到那个内存区域),然而操作系统正在访问内存区域的时候突然这个内存区域发生了GC(垃圾回收)多种垃圾回收算法除了CMS并发的标记清除的算法之外,其它的垃圾回收算法都涉及到一个先标记再去压缩的过程(例如A B C D E F这几块内存区域例如A D区域是要被删除的其它的保留,那么A D被清空之后会执行一次压缩的操作,压缩会造成对象的移动。移动的目的是为了腾出一块更大的、完整的、连续的内存区域,使得这个内存区域可以容纳一个更大的Java对象,压缩时被保留的数据内存区域则会整体向左移动,如果native在操作正在操作这个数组时发生了数据移动,那么数据就会全乱)。
  • 这种问题有三种解决方案:
    原生代码操作Java堆上内存 不可用
  1. 固定对象,让对象不发生移动,然而单个对象不移动是不太现实的 (不可取)
  2. 不让它发生垃圾回收 (不可取)
  3. 先将字节数组拷贝到堆外内存空间中,拷贝过程不会产生GC

内存释放

  1. HeapByteBuffer 拷贝到堆外内存的数据使用结束就会被释放
  2. DirectByteBuffer 当DirectByteBuffer对象被回收掉时,会通过address找到堆外内存通过JNI进行回收

MappedByteBuffer

直接字节缓冲区,其内容是一个文件的内存映射区域。

Mapped的ByteBuffer是可以通过FileChannel的map()方法实现

MappedByteBuffer与它表示的文件映射会一直生效,直到Buffer本身被垃圾回收掉

内存映射文件就是这个文件把它的内容映射到内存中,我们只需要在内存中操作相关的信息,最终的信息会被写到文件当中。
换句话说是我们不需要与磁盘的文件打交道,只需要与内存打交道就可以了,我们对内存所做的任何修改都会被直接写到磁盘中

内存映射文件是一个允许Java程序直接从内存访问的一种特殊文件,我们可以将整个文件或者文件的一部分映射到内存当中,
接下来由操作系统来处理相关的页面请求,并且将内存的修改写入到磁盘当中,我们的应用程序只需要处理内存的数据这样可以实现非常迅速的I/O操作
用于内存映射文件的内存本身是处于Java堆外内存的,内存映射文件的内存本身是一个堆外内存

NIO重要组件的说明

Channel

Java NIO Channel通道和流非常相似,主要有以下几点区别:

  • 通道可以读也可以写,流一般来说是单向的(只能读或者写)。
  • 通道可以异步读写。
  • 通道总是基于缓冲区Buffer来读写。

技术分享图片

Buffer

Java NIO中的Buffer用于和NIO通道进行交互。如你所知,数据是从Channel读入Buffer,从Buffer写入到Channel中的。
Buffer本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

Selector

它是一个双工多路传输的 SelectableChannel 对象
Selector是可以调用这个类的open()方法来创建,而这个open()本身又会使用系统默认的
选择器提供者(java.nio.channels.spi.SelectorProvider)来去创建一个新的选择器(selector)
选择器还可以通过调用java.nio.channels.spi.SelectorProvider的openSelector()方法并且提供了一个自定义的选择器提供者来去创建,一个选择器(Selector)会持续的保持打开状态,直到调用它的close()方法它才会真正的关闭。

一个可选择的通道(selectable channel‘s)向选择器(Selector)的注册结果由一个SelectionKey对象表示
一个selector会维护三种SelectionKey的集合:

  1. key set(键集(全集)) 包含代表此选择器当前通道注册的键。该集合由keys()方法返回。
  2. selected-key set(选定的键集) 是这样的一组key检测到key的通道已为至少一个操作做好准备,在先前的选择操作中在密钥的兴趣集中标识。此集合由link#selectedKeys()方法返回。它是key set的子集(例如key set有四种连接事件,已连接、连接断开、可读、可写,比如我们只对读和写感兴趣,那么selected-key set中就包含可读、可写的事件并不包含已连接、连接断开)
  3. cancelled-key(取消的键集) 是一组已取消(之前关注但是现在取消了),但其通道可能尚未注销,
    不如你不再关注这个动作了这个Channel本身是下一次再去Selector时进行关闭 它是key set的子集

所有这三个集合在新创建的selector中都是空的

我们可以通过Channel的register()方法将一个selector注册到Channel上面,同时也会将一个键(key)添加到选择器的键(key)集中,
selection选择操作期间,已取消的key将从key set中删除。key set 本身不能直接修改。

当它被取消的时候,一个key会被添加到cancelled-key这个集合当中,取消方式有 关闭了这个Channel或者是调用SelectionKey的cancel()方法
取消一个key会导致这个key所关联的Channel在下一次选择操作(selection operations指的就是select()方法)当中被取消注册,在这个时候这个key将会从cancelled-key集合中被移除掉

keys这个键会被添加到selected-key set这个集合 通过选择操作(selection operations),一个key可能直接从selected-key set中被移除了通过调用set(selected-key set,选定的键集)的remove()方法或者是通过这个集合的Iterator(迭代器)的remove()方法,这个Iterator是通过这个集合所获取的除了上面两种方法keys是绝对不会以其它方式被移除的,特别的它们并不会作为选择操作(selection operations)的副作用,keys是不可能直接被添加到selected-key set当中的

SelectionKey

它代表一件事情,表示Channel向一个Selector注册的令牌

每次当一个Channel被注册到一个selector到时候都会创建一个SelectionKey,一个key保持有效直到它被调用了cancel()方法取消,或关闭其Channel或关闭selector而被取消为止,取消一个键不会立即从Selector中移除而是将其添加到Selector的cancelled-key set(取消键集)中以便在下次selection operation.(选择操作)期间将其删除。

SelectionKey包括两个操作集合是通过整数表示的一个是ready set(准备集合) interest set(感兴趣的操作集合)

key set(键集)包含代表此选择器当前通道注册的键。该集合由keys()方法返回。
selected-key set(选定的键集)是这样的一组key检测到key的通道已为至少一个操作做好准备,在先前的选择操作中在密钥的兴趣集中标识。

此集合由link#selectedKeys()方法返回。它是key set的子集(例如key set有四种连接事件,已连接、连接断开、可读、可写,比如我们只对读和写感兴趣,那么selected-key set中就包含可读、可写的事件并不包含已连接、连接断开)cancelled-key(取消的键集)是一组已取消(之前关注但是现在取消了),但其通道可能尚未注销,不如你不再关注这个动作了这个Channel本身是下一次再去Selector时进行关闭 它是key set的子集

绝对方法与相对方法的含义

  • 相对方法:limit值与position值会在操作时被考虑到
  • 绝对方法:完全忽略掉limit与position值

注意

Buffer中put的是什么数据类型,get取的时候就需要以什么类型取,否则会因为数据类型所占据的字节大小不同读取的长度不同,多取或少取的情况,然后会导致数据读取错乱

程序案例

程序1 ByteBuffer的基本使用

生成随机数并put添加到Buffer中 然后再get获取打印

public class NioTest1 {
    public static void main(String[] args) {
        //beforeFixing();
        afterMdification();
    }

    private static void beforeFixing() {
        //获取一个int类型的Buffer  allocate(10)    设置最大存储容量为10
        IntBuffer intBuffer = IntBuffer.allocate(10);

        //IntBuffer.capacity()  获取最大存储容量
        for (int i = 0; i < intBuffer.capacity(); i++) {
            //SecureRandom()  生成一个更随机的随机数
            //nextInt(20)   int类型的 范围是0-20之间
            int randomNumber = new SecureRandom().nextInt(20);
            //往IntBuffer中put值,并将position自增
            intBuffer.put(randomNumber);
        }

        //反转此缓冲区  将最大存储容量(limit)设置为当前下表值,将当前下标(position)重置为0
        intBuffer.flip();

        //下面是否还有内容
        while (intBuffer.hasRemaining()){
            System.out.println(intBuffer.get());
        }
    }
    private static void afterMdification() {
        IntBuffer intBuffer = IntBuffer.allocate(10);

        System.out.println("capacity:" + intBuffer.capacity());

        for (int i = 0; i < 5; i++) {
            int randomNumber = new SecureRandom().nextInt(20);
            intBuffer.put(randomNumber);
        }
        //10
        System.out.println("before limit:" + intBuffer.limit());

        intBuffer.flip();

        //5
        System.out.println("after limit:" + intBuffer.limit());

        System.out.println("enter while loop");
        while (intBuffer.hasRemaining()){
            //0 - 4
            System.out.println("position:" + intBuffer.position());
            //5
            System.out.println("limit:" + intBuffer.limit());
            //10
            System.out.println("capacity:" + intBuffer.capacity());

            System.out.println(intBuffer.get());
        }
    }
}

程序2 传统IO切换到NIO

传统io切换到Nio,使用FileInputStream流关联要读取的文件,然后使用nio的方式将文件的内容读取到程序当中

public class NioTest2 {
    public static void main(String[] args) throws IOException, InterruptedException {
        FileInputStream fileInputStream =
                new FileInputStream("NioTest2.txt");
        //getChannel()  不是InputStream中的方法 子类新增的
        FileChannel channel = fileInputStream.getChannel();

        //创建一个长度为512字节的ByteBuffer对象,实际512是底层数组的大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        //将channel对象中的数据读到ByteBuffer中,写到ByteBuffer中
        channel.read(byteBuffer);

        //操作的反转
        byteBuffer.flip();

        //hasRemaining()  是否还有值 底层就是判断当前下表是否小于最大范围 position < limit
        //remaining()   返回limit - position的值 大于0就说明还有值
        while (byteBuffer.remaining() > 0){
            System.out.println("Character:" + (char) byteBuffer.get());
        }

        channel.close();
    }
}

程序3 IO读文件转为NIO写文件

通过普通io读取文件内容,然后转换为nio再写入/输出到某个文件中

public class NioTest3 {
    public static void main(String[] args) throws IOException {
        FileOutputStream fileOutputStream =
                new FileOutputStream("NioTest3.txt");
        FileChannel fileChannel = fileOutputStream.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        //要向外写出的字节数组
        byte[] messages = "hello world rainbow sea".getBytes();

        for (int i = 0; i < messages.length; i++) {
            byteBuffer.put(messages[i]);
        }

        byteBuffer.flip();
        //将数据写入到文件中
        fileChannel.write(byteBuffer);

        fileChannel.close();
    }
}

程序4 ByteBuffer的文件读写

获取一个文件的内容,然后写到另一个文件中

public class NioTest4 {
    public static void main(String[] args) throws IOException {
        FileInputStream inputStream = new FileInputStream("input.txt");
        FileOutputStream outputStream = new FileOutputStream("output.txt");

        FileChannel inputChannel = inputStream.getChannel();
        FileChannel outputChannel = outputStream.getChannel();

        ByteBuffer buffer = ByteBuffer.allocate(512);

        while (true){
            //如果这行被删掉,将会出现,position = limit 没有数据可读问题  while永远不会跳出
            buffer.clear();

            int read = inputChannel.read(buffer);

            System.out.println(read);

            if(read == -1){
                break;
            }
            buffer.flip();

            outputChannel.write(buffer);
        }

        inputChannel.close();
        outputChannel.close();
    }
}

程序5 ByteBuffer类型化的put与get方法

ByteBuffer类型化的put与get方法。Buffer中put的是什么数据类型,get取的时候就需要以什么类型取,否则会因为数据类型所占据的字节大小不同读取的长度不同,多取或少取的情况,然后会导致数据读取错乱

public class NioTest5 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        buffer.putInt(18);
        buffer.putDouble(5000000L);
        buffer.putChar(‘彩‘);
        buffer.putShort((short)2);
        buffer.putChar(‘虹‘);

        buffer.flip();

        System.out.println(buffer.getInt());
        System.out.println(buffer.getDouble());
        System.out.println(buffer.getChar());
        System.out.println(buffer.getShort());
        System.out.println(buffer.getChar());


    }
}

程序6 分片/分割Buffer

分片/分割Buffer,分割一个Buffer(A)指定起点和终点(就是指定position与limit,下标),然后调用Buffer的slice()方法会返回一个新的Buffer(B)的引用,这个新的Buffer的元素就是被分割Buffer(A)起点到终点之间的元素(包含起点不包含终点),这些元素可以被认为是老Buffer的快照新的Buffer与原有的Buffer的底层数据是一份,并不会重新拷贝一份,无论是对新的Buffer还是老的Buffer的起点到终点(包含起点不包含终点)范围内的元素进行修改都会反映到另外一个Buffer上/数据是同步的因为底层引用的都是同一个,但是两个Buffer的 position, limit, and mark都是互不相干独立的

Slice Buffer 就相当于原有Buffer的快照。

Slice Buffer 与原有Buffer共享相同的底层数组(与原有的Buffer共同的底层数据就是一份)

通过Slice Buffer或者原有Buffer对数据(起-终的数据,不包含终)进行修改都会反映到另一个Buffer上

public class NioTest6 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);

        for (int i = 0; i < buffer.capacity(); i++) {
            buffer.put((byte)i);
        }

        buffer.position(2);
        buffer.limit(6);

        //分割出一个新的buffer
        ByteBuffer sliceBuffer = buffer.slice();

        buffer.clear();

        /*while (buffer.hasRemaining()) {
            System.out.println(buffer.get());
        }
        System.out.println("------------");*/

        //更改分割出来的buffer的数据
        for (int i = 0; i < sliceBuffer.capacity(); i++) {
            byte b = sliceBuffer.get(i);
            sliceBuffer.put(i,(b *= 2));
        }


        /*方式1
        buffer.position(0);
        buffer.limit(buffer.capacity());*/
        //方式2
        buffer.clear();

        //打印原buffer的内容,查看是否被影响
        while (buffer.hasRemaining()) {
            System.out.println(buffer.get());
        }

    }
}

程序7 只读Buffer

只读Buffer,我们可以随时将一个普通Buffer(可读写Buffer)调用asReadOnlyBuffer()方法返回一个只读Buffer但是不能将一个只读Buffer转换为读写Buffer

public class NioTest7 {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);

        for (int i = 0; i < buffer.capacity(); i++) {
            buffer.put((byte)i);
        }

        //返回一个只读Buffer,不允许写入
        ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

        //查看Buffer的类型
        System.out.println(buffer.getClass());
        System.out.println(readOnlyBuffer.getClass());

        readOnlyBuffer.position(0);
        //readOnlyBuffer.put((byte)2);
    }
}

程序8 DirectByteBuffer

获取一个文件的内容,然后写到另一个文件中 使用 DirectByteBuffer 测试

public class NioTest8 {
    public static void main(String[] args) throws IOException {
        FileInputStream inputStream = new FileInputStream("input.txt");
        FileOutputStream outputStream = new FileOutputStream("output2.txt");

        FileChannel inputChannel = inputStream.getChannel();
        FileChannel outputChannel = outputStream.getChannel();

        ByteBuffer buffer = ByteBuffer.allocateDirect(512);

        while (true){
            //如果这行被删掉,将会出现,position = limit 没有数据可读问题  while永远不会跳出
            buffer.clear();

            int read = inputChannel.read(buffer);

            System.out.println(read);

            if(read == -1){
                break;
            }
            buffer.flip();

            outputChannel.write(buffer);
        }

        inputChannel.close();
        outputChannel.close();
    }
}

程序9 MappedByteBuffer

内存映射文件,修改一个文件

public class NioTest9 {
    public static void main(String[] args) throws IOException {
        //RandomAccessFile(文件的名字,读写状态(rw表示能读能写))
        RandomAccessFile randomAccessFile = new RandomAccessFile("NioTest9.txt", "rw");
        FileChannel fileChannel = randomAccessFile.getChannel();

        /*
            map(映射模式,映射的起始位置,映射多少位)
                映射模式:表示是读模式、写模式还是读写模式
                FileChannel.MapMode.READ_WRITE  能读能写
                FileChannel.MapMode.READ_ONLY  只读
                FileChannel.MapMode.PRIVATE  专用(写时复制)映射的模式
         */
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5);

        //修改文件
        mappedByteBuffer.put(0,(byte)‘1‘);
        mappedByteBuffer.put(3,(byte)‘3‘);

        randomAccessFile.close();
    }
}

程序10 文件锁

文件锁,可以锁一个文件的整体或部分
锁有两种:

  1. 共享锁 都可以读
  2. 排查锁 只能写 只有一个程序来进行写,读多个程序都可以对锁住的部分进行读
public class NioTest10 {
    public static void main(String[] args) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile("NioTest10.txt", "rw");
        FileChannel fileChannel = randomAccessFile.getChannel();

        /*
            lock(从那个位置开始锁,锁的长度,true表示共享锁false表示排查锁)
         */
        FileLock fileLock = fileChannel.lock(3,6,true);

        //isValid()查看是否有效
        System.out.println("valid:" + fileLock.isValid());
        //查看锁是否是共享锁
        System.out.println("lock type:" + fileLock.isShared());

        fileLock.release();
        randomAccessFile.close();
    }
}

程序11 Buffer的Scattering与Gathering 网络程序

关于Buffer的Scattering(散开,分散成多个)与Gathering(收集,将多个汇成一个)

Scattering在读的时候我们不仅可以传递一个buffer还可以传递一个buffer数组,例如Scattering将来自于Channel的内容读到多个Buffer中,只有前面的Buffer读满了并且Channel中还有数据,那么他就会将数据读到下一个Buffer中。

Gathering我们往外写的时候也可以传递一个数组,写的时候从第一个开始,将Buffer数组中的Buffer一个一个的写出去 (只有将前面的buffer读完才会读取后面的,按顺序进行)

此程序可以使用curl,nc等等 进行测试

public class NioTest11 {

    public static void main(String[] args)throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(8899);
        serverSocketChannel.socket().bind(address);

        int messageLength = 2 + 3 + 4;

        ByteBuffer[] buffers = new ByteBuffer[3];
        buffers[0] = ByteBuffer.allocate(2);
        buffers[1] = ByteBuffer.allocate(3);
        buffers[2] = ByteBuffer.allocate(4);

        SocketChannel socketChannel = serverSocketChannel.accept();

        while (true){
            //Scattering
            int bytesRead = 0;
            while (bytesRead < messageLength){
                long r = socketChannel.read(buffers);
                bytesRead += r;

                System.out.println("bytesRead:" + bytesRead);
                Arrays.asList(buffers).stream()
                        .map(buffer -> "position:" + buffer.position() + ", limit:" + buffer.limit())
                        .forEach(System.out::println);
            }

            Arrays.asList(buffers).forEach(buffer ->{
                buffer.flip();
            });
            
			//Gathering
            long bytesWritten = 0;
            while (bytesWritten < messageLength){
                long w = socketChannel.write(buffers);
                bytesWritten += w;
            }

            Arrays.asList(buffers).forEach(buffer -> {
                buffer.clear();
            });

            System.out.println("byteRead:" + bytesRead + ", bytesWritten:" + bytesWritten +
                    ", messageLength:" + messageLength);
        }
    }
}

应用场景

比如说我们进行网络操作的时候,我们自定义一个协议要求第一个传递过来的请求的数据它的第1个headler长度是10个字节、第2个headler长度是3个字节、第3个可能是body(消息体)长度是可变的。这种情况下就可以使用buffer的Scattering,将第一个headler的10字节读取到第一个buffer中将第二个headler的3个字节读取到第二个buffer中,最后将body读取到第三个buffer中。这样就天然的实现了数据的分门别类,而不是只传递一个buffer将headler与body的信息都读到这一个buffer中然后再去解析这个buffer。

程序12 监听多个端口,打印客户端发来的数据

网络程序,监听5000, 5001, 5002, 5003, 5004 5个端口号,然后打印连接到这5个端口号上的客户端发来的信息

public class NioTest12 {
    public static void main(String[] args)throws IOException {
        int[] ports = {5000, 5001, 5002, 5003, 5004};

        Selector selector = Selector.open();

        //将一个selector对象注册到系统的多个端口号上,使得一个selector监听多个端口
        for (int i = 0; i < ports.length; i++) {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //configureBlocking()   配置是否阻塞 false:不阻塞 true:阻塞
            serverSocketChannel.configureBlocking(false);
            //获取到ServerSocket对象
            ServerSocket serverSocket = serverSocketChannel.socket();
            //创建端口绑定对象
            InetSocketAddress address = new InetSocketAddress(ports[i]);
            //端口绑定
            serverSocket.bind(address);
            /*
                将一个Channel注册到一个selector上,并返回selection key
                register(待注册的selector,对什么key感兴趣)

                SelectionKey.OP_ACCEPT   接受   SelectionKey.OP_CONNECT 连接
                SelectionKey.OP_READ    读   SelectionKey.OP_WRITE   写

                本程序register()的意思是:
                    将当前这个Channel注册到selector上面,并且注册的感兴趣的key是接受连接,
                    表示当有客户端向服务器端发起一个连接的时候,服务器端就会获取到这个连接,然后与之建立真正的连接
            */
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("监听端口:" + ports[i]);
        }

        while (true){
            //返回的是一个key的数量,可能是0
            //进行一个阻塞,等待感兴趣的事件被触发
            int numbers = selector.select();
            System.out.println("numbers = " + numbers);

            //通过SelectionKey不仅能获取事件的集合,还可以反过来获取到与之关联的(发送这个事件的Channel对象)
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            System.out.println("selectionKeys = " + selectionKeys);
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                //如果selectionKey是一个ACCEPT事件   连接事件
                if(selectionKey.isAcceptable()){
                    ServerSocketChannel serverSocketChannel1 =
                            (ServerSocketChannel) selectionKey.channel();
                    //接受连接 服务端通过返回的SocketChannel与客户端进行交互
                    ServerSocket socket = serverSocketChannel1.socket();
                    //接受与此通道的socket建立的连接。
                    SocketChannel socketChannel = serverSocketChannel1.accept();
                    socketChannel.configureBlocking(false);
                    //这个表示真正连接的对象连接的Channel,将它注册到selector中,关注的事件是读事件
                    socketChannel.register(selector,SelectionKey.OP_READ);
                    //将其从SelectionKey中移除,不移除则还会监听这个已经创建的连接
                    iterator.remove();
                    System.out.println("获得客户端连接:" + socketChannel);
                }else if(selectionKey.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    int bytesRead = 0;

                    while (true){
                        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                        byteBuffer.clear();

                        int read = socketChannel.read(byteBuffer);
                        if (read <= 0){
                            break;
                        }

                        byteBuffer.flip();

                        socketChannel.write(byteBuffer);

                        bytesRead += read;
                    }
                    //打印消息
                    System.out.println("读取:" + bytesRead + ", 来自于:" + socketChannel);
                    //将这个channel关注的事件从这个selector中移除
                    iterator.remove();
                }
            }
        }

    }
}

程序13 Java编解码问题

Java编解码问题,将一个文件的内容拷贝到另一个文件,源文件的文件编码是utf-8

使用ISO-8859-1解码然后再输出并没有出现乱码这是为什么?

源文件

hello
miki
出发目标彩虹海

Java代码

public class NioTest13 {
    public static void main(String[] args) throws Exception {
        String inputFile = "NioTest13_In.txt";
        String outputFile = "NioTest13_Out.txt";

        //RandomAccessFile(文件位置,可进行什么操作) 随机存取文件
        //"r" 可进行读操作 "w" 可进行写操作 "rw" 可进行读写操作
        RandomAccessFile inputRandomAccessFile = new RandomAccessFile(inputFile, "r");
        RandomAccessFile outputRandomAccessFile = new RandomAccessFile(outputFile, "rw");

        long inputLength = new File(inputFile).length();

        FileChannel inputFileChannel = inputRandomAccessFile.getChannel();
        FileChannel outputFileChannel = outputRandomAccessFile.getChannel();

        /*
            内存映射文件
            map(映射模式,映射的起始位置,映射多少位)
                映射模式:表示是读模式、写模式还是读写模式
                FileChannel.MapMode.READ_WRITE  能读能写
                FileChannel.MapMode.READ_ONLY  只读
                FileChannel.MapMode.PRIVATE  专用(写时复制)映射的模式
         */
        MappedByteBuffer inputData = inputFileChannel.map(FileChannel.MapMode.READ_ONLY, 0, inputLength);

        System.out.println("----------------------");

        //输出系统的所有编码方式
        Charset.availableCharsets().forEach((k,v) -> {
            System.out.println(k + "  ,  " + v);
        });

        System.out.println("----------------------");

        //创建一个字符集对象
        Charset charset = Charset.forName("ISO-8859-1");
        //解码器 将字节数组转换为字符串
        CharsetDecoder decoder = charset.newDecoder();
        //编码器 将字符串转换为字节数组
        CharsetEncoder encoder = charset.newEncoder();

        CharBuffer charBuffer = decoder.decode(inputData);
        ByteBuffer byteBuffer = encoder.encode(charBuffer);

        //文件写入
        outputFileChannel.write(byteBuffer);

        inputFileChannel.close();
        outputFileChannel.close();
    }
}

解答

1.为什么这个程序使用ISO-8859-1的编码方式还可以实现中文的正确输出,中文却不会乱码?
因为本程序中使用ISO-8859-1的解码与编码的过程中并没有更改原字节的值以及多字节的配对(因为ISO-8859-1就只支持一个字节表示一个字符,并不会组合字符),程序中charBuffer中乱码的原因是因为Charset的编码方式是ISO-8859-1在生成CharBuffer时,是以一个字节一个字符的规则进行解析/转换的。然而中文在UTF-8中可能是以3个字节或更多字节表示所以在解析本应该以三个字节组合一起的字节进行了逐个单一的解析,然后将每 一个字节的十六进制数对应在ISO-8859-1编码表上的字符,逐一赋给了charBuffer所以charBuffer的中文部分是乱码的。当程序将ByteBuffer的内容写入到目标文件中后,因为idea默认文件使用的就是UTF-8所以在解析程序写到目标文件的字节码时是使用UTF-8的规则
进行的解析(与原文件解析规则相同),则不会出现乱码。

如果使用解析规则大于一个字节或者中途会改变字节配对的编码方式则会出现乱码。

程序14 多人聊天程序

使用NIO实现多人聊天功能

Server

/**聊天室功能,Nio server 用一个通道,服务器端只会有一个线程*/
public class NioServer {

    /**
     * 保存连接的客户端的集合
     */
    private static Map<String, SocketChannel> clientMap = new HashMap<>();

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置为非阻塞的
        serverSocketChannel.configureBlocking(false);
        ServerSocket socket = serverSocketChannel.socket();
        //绑定端口号
        socket.bind(new InetSocketAddress(8899));
        //获取选择器
        Selector selector = Selector.open();
        //注册ServerSocketChannel到selector中,并设置感兴趣的事件为 连接接受时触发
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            try {
                //阻塞,等待感兴趣的事件触发
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                selectionKeys.forEach(selectionKey -> {
                    final SocketChannel client;

                    try {
                        //判断是否是Accep是事件
                        if (selectionKey.isAcceptable()) {
                            //这里向下类型转换为ServerSocketChannel是因为36行注册到Accep事件的Channel就是这个类型的
                            //当前selector只有一个Channel并且感兴趣的事件还是Accept,所以可以断定就是36注册的ServerSocketChannel
                            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                            client = server.accept();
                            client.configureBlocking(false);
                            //注册一个SocketChannel到selector中,感兴趣的事件为读事件
                            client.register(selector,SelectionKey.OP_READ);

                            String key = "[" + UUID.randomUUID().toString() + "]";
                            clientMap.put(key,client);
                        }else if(selectionKey.isReadable()){
                            //这个类型转换和上面的一样,都是确定触发这个事件的Channel类型
                            client = (SocketChannel) selectionKey.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

                            //将Channel对象中的数据写入Buffer中
                            int count = client.read(byteBuffer);
                            if (count > 0){
                                //反转Buffer
                                byteBuffer.flip();

                                Charset charset = Charset.forName("UTF-8");
                                //通过Charset.decode()解码ByteBuffer并调用array()返回一个char[]数组,String可通过valueOf()
                                //将char[]数组转换为一个String
                                String receivedMessage = String.valueOf(charset.decode(byteBuffer).array()).replace("\u0000","");

                                System.out.println(client + ":" + receivedMessage);

                                //发送者在clientMap中的key,也就是生成的uuid
                                String senderKey = null;
                                //循环遍历clientMap通过value得到发送者的key
                                for (Map.Entry<String,SocketChannel> entry : clientMap.entrySet()){
                                    if(entry.getValue() == client){
                                        senderKey = entry.getKey();
                                        break;
                                    }
                                }

                                for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
                                    SocketChannel value = entry.getValue();

                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    //将数据读入Buffer
                                    writeBuffer.put((senderKey + ":" + receivedMessage).getBytes());
                                    writeBuffer.flip();
                                    //向客户端写入数据
                                    value.write(writeBuffer);
                                }
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
                //清空selectionKeys,以免出现已经处理过的selectionKey对象还存在与感兴趣的selectionKeys集合中
                selectionKeys.clear();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Client

public class NioClient {
    public static void main(String[] args) {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);

            Selector selector = Selector.open();
            socketChannel.connect(new InetSocketAddress("localhost", 8899));
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                selector.select();

                Set<SelectionKey> keySet = selector.selectedKeys();
                for (SelectionKey selectionKey : keySet) {
                    if (selectionKey.isConnectable()) {
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        //指示此通道上是否正在进行连接操作。如果是则可以进行连接成功标示
                        if (client.isConnectionPending()) {
                            //表示连接建立成功,完成连接
                            client.finishConnect();

                            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                            writeBuffer.put((LocalDateTime.now() + " 连接成功").getBytes());
                            writeBuffer.flip();
                            //向服务器端写入数据
                            client.write(writeBuffer);
                            //获取一个线程池

                            //不推荐被使用
                            //ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
                            //及建议使用下面的

                            /*
                                corePoolSize - 线程池核心池的大小。
                                maximumPoolSize - 线程池的最大线程数。
                                keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
                                unit - keepAliveTime 的时间单位。
                                workQueue - 用来储存等待执行任务的队列。
                                threadFactory - 线程工厂。
                                handler - 拒绝策略。
                            */
                            ExecutorService executorService = new ThreadPoolExecutor(10, 100, 2000,
                                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), Executors.defaultThreadFactory());

                            executorService.submit(() -> {
                                while (true) {
                                    try {
                                        //清除Buffer
                                        writeBuffer.clear();
                                        //获取键盘输入
                                        InputStreamReader in = new InputStreamReader(System.in);
                                        BufferedReader bufferedReader = new BufferedReader(in);
                                        //只读取一行
                                        String sendMessage = bufferedReader.readLine();
                                        //添加进buffer
                                        writeBuffer.put(sendMessage.getBytes());
                                        writeBuffer.flip();
                                        //向服务器端发送
                                        client.write(writeBuffer);
                                    } catch (Exception ex) {
                                        ex.printStackTrace();
                                    }
                                }
                            });
                            client.register(selector, SelectionKey.OP_READ);
                        }
                        /*isReadable()  测试此键的通道是否已准备好进行读取。 如果此键的通道不支持读取操作,则此方法始终返回false。*/
                    } else if (selectionKey.isReadable()) {
                        SocketChannel serverWrite = (SocketChannel) selectionKey.channel();

                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int read = serverWrite.read(byteBuffer);
                        if (read > 0) {
                            byteBuffer.flip();

                            Charset charset = Charset.forName("UTF-8");
                            String receivedMessage = String.valueOf(charset.decode(byteBuffer).array()).replace("\u0000", "");
                            //打印服务器端发来的数据
                            System.out.println(receivedMessage);
                        }
                    }
                }
                //清除selectedKeys集合
                keySet.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Netty章节十六:Java NIO

原文:https://www.cnblogs.com/mikisakura/p/13177424.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!