本文围绕一下几点阐述:
package com.gupaoedu.vip.netty.io.nio.buffer; import java.nio.IntBuffer; public class IntBufferDemo { public static void main(String[] args) { // 分配新的 int 缓冲区,参数为缓冲区容量 // 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。它将具有一个底层实现数组,其数组偏移量将为零。 IntBuffer buffer = IntBuffer.allocate(8); for (int i = 0; i < buffer.capacity(); ++i) { int j = 2 * (i + 1); // 将给定整数写入此缓冲区的当前位置,当前位置递增 buffer.put(j); } // 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为 0 buffer.flip(); // 查看在当前位置和限制位置之间是否有元素 while (buffer.hasRemaining()) { // 读取此缓冲区当前位置的整数,然后当前位置递增 int j = buffer.get(); System.out.print(j + " "); } } }
hel.
package com.gupaoedu.vip.netty.io.nio.buffer; import java.io.FileInputStream; import java.nio.*; import java.nio.channels.*; public class BufferDemo { public static void main(String args[]) throws Exception { //这用用的是文件 IO 处理 FileInputStream fin = new FileInputStream("E://test.txt"); //创建文件的操作管道 FileChannel fc = fin.getChannel(); //分配一个 10 个大小缓冲区,说白了就是分配一个 10 个大小的 byte 数组 ByteBuffer buffer = ByteBuffer.allocate(10); output("初始化", buffer); //先读一下 fc.read(buffer); output("调用 read()", buffer); //准备操作之前,先锁定操作范围 buffer.flip(); output("调用 flip()", buffer); //判断有没有可读数据 while (buffer.remaining() > 0) { byte b = buffer.get(); // System.out.print(((char)b)); } output("调用 get()", buffer); //可以理解为解锁 buffer.clear(); output("调用 clear()", buffer); //最后把管道关闭 fin.close(); } //把这个缓冲里面实时状态给答应出来 public static void output(String step, Buffer buffer) { System.out.println(step + " : "); //容量,数组大小 System.out.print("capacity: " + buffer.capacity() + ", "); //当前操作数据所在的位置,也可以叫做游标 System.out.print("position: " + buffer.position() + ", "); //锁定值,flip,数据操作范围索引只能在 position - limit 之间 System.out.println("limit: " + buffer.limit()); System.out.println(); } }
package com.gupaoedu.vip.netty.io.nio.buffer; import java.nio.ByteBuffer; /** 手动分配缓冲区 */ public class BufferWrap { public void myMethod() { // 分配指定大小的缓冲区 ByteBuffer buffer1 = ByteBuffer.allocate(10); // 包装一个现有的数组 byte array[] = new byte[10]; ByteBuffer buffer2 = ByteBuffer.wrap( array ); } }
package com.gupaoedu.vip.netty.io.nio.buffer; import java.nio.ByteBuffer; /** * 缓冲区分片 */ public class BufferSlice { static public void main( String args[] ) throws Exception { ByteBuffer buffer = ByteBuffer.allocate( 10 ); // 缓冲区中的数据 0-9 for (int i=0; i<buffer.capacity(); ++i) { buffer.put( (byte)i ); } // 创建子缓冲区 buffer.position( 3 ); buffer.limit( 7 ); ByteBuffer slice = buffer.slice(); // 改变子缓冲区的内容 for (int i=0; i<slice.capacity(); ++i) { byte b = slice.get( i ); b *= 10; slice.put( i, b ); } buffer.position( 0 ); buffer.limit( buffer.capacity() ); while (buffer.remaining()>0) { System.out.println( buffer.get() ); } } }
package com.gupaoedu.vip.netty.io.nio.buffer; import java.nio.*; /** 只读缓冲区 */ public class ReadOnlyBuffer { static public void main( String args[] ) throws Exception { ByteBuffer buffer = ByteBuffer.allocate( 10 ); // 缓冲区中的数据 0-9 for (int i=0; i<buffer.capacity(); ++i) { buffer.put( (byte)i ); } // 创建只读缓冲区 ByteBuffer readonly = buffer.asReadOnlyBuffer(); // 改变原缓冲区的内容 for (int i=0; i<buffer.capacity(); ++i) { byte b = buffer.get( i ); b *= 10; buffer.put( i, b ); } readonly.position(0); readonly.limit(buffer.capacity()); // 只读缓冲区的内容也随之改变 while (readonly.remaining()>0) { System.out.println( readonly.get()); } } }
package com.gupaoedu.vip.netty.io.nio.buffer; import java.io.*; import java.nio.*; import java.nio.channels.*; /** * 直接缓冲区 */ public class DirectBuffer { static public void main( String args[] ) throws Exception { //首先我们从磁盘上读取刚才我们写出的文件内容 String infile = "E://test.txt"; FileInputStream fin = new FileInputStream( infile ); FileChannel fcin = fin.getChannel(); //把刚刚读取的内容写入到一个新的文件中 String outfile = String.format("E://testcopy.txt"); FileOutputStream fout = new FileOutputStream( outfile ); FileChannel fcout = fout.getChannel(); // 使用 allocateDirect,而不是 allocate ByteBuffer buffer = ByteBuffer.allocateDirect(1024); while (true) { buffer.clear(); int r = fcin.read(buffer); if (r==-1) { break; } buffer.flip(); fcout.write(buffer); } } }
package com.gupaoedu.vip.netty.io.nio.buffer; import java.io.*; import java.nio.*; import java.nio.channels.*; /** * IO 映射缓冲区 */ public class MappedBuffer { static private final int start = 0; static private final int size = 1024; static public void main( String args[] ) throws Exception { RandomAccessFile raf = new RandomAccessFile( "E://test.txt", "rw" ); FileChannel fc = raf.getChannel(); //把缓冲区跟文件系统进行一个映射关联 //只要操作缓冲区里面的内容,文件内容也会跟着改变 MappedByteBuffer mbb = fc.map( FileChannel.MapMode.READ_WRITE,start, size ); mbb.put( 0, (byte)97 ); mbb.put( 1023, (byte)122 ); raf.close(); } }
/* * 注册事件 */ private Selector getSelector() throws IOException { // 创建 Selector 对象 Selector sel = Selector.open(); // 创建可选择通道,并配置为非阻塞模式 ServerSocketChannel server = ServerSocketChannel.open(); server.configureBlocking(false); // 绑定通道到指定端口 ServerSocket socket = server.socket(); InetSocketAddress address = new InetSocketAddress(port); socket.bind(address); // 向 Selector 中注册感兴趣的事件 server.register(sel, SelectionKey.OP_ACCEPT); return sel; }
/* * 开始监听 */ public void listen() { System.out.println("listen on " + port); try { while(true) { // 该调用会阻塞,直到至少有一个事件发生 selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = (SelectionKey) iter.next(); iter.remove(); process(key); } } } catch (IOException e) { e.printStackTrace(); } }
/* * 根据不同的事件做处理 */ private void process(SelectionKey key) throws IOException{ // 接收请求 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } // 读信息 else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); int len = channel.read(buffer); if (len > 0) { buffer.flip(); content = new String(buffer.array(),0,len); SelectionKey sKey = channel.register(selector, SelectionKey.OP_WRITE); sKey.attach(content); } else { channel.close(); } buffer.clear(); } // 写事件 else if (key.isWritable()) { SocketChannel channel = (SocketChannel) key.channel(); String content = (String) key.attachment(); ByteBuffer block = ByteBuffer.wrap(("输出内容:" + content).getBytes()); if(block != null){ channel.write(block); }else{ channel.close(); } } }
package com.gupaoedu.vip.netty.io.nio.channel; import java.io.*; import java.nio.*; import java.nio.channels.*; public class FileInputDemo { static public void main( String args[] ) throws Exception { FileInputStream fin = new FileInputStream("E://test.txt"); // 获取通道 FileChannel fc = fin.getChannel(); // 创建缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取数据到缓冲区 fc.read(buffer);
buffer.flip(); while (buffer.remaining() > 0) { byte b = buffer.get(); System.out.print(((char)b)); } fin.close(); } }
package com.gupaoedu.vip.netty.io.nio.channel; import java.io.*; import java.nio.*; import java.nio.channels.*; public class FileOutputDemo { static private final byte message[] = { 83, 111, 109, 101, 32,98, 121, 116, 101, 115, 46 }; static public void main( String args[] ) throws Exception { FileOutputStream fout = new FileOutputStream( "E://test.txt" ); FileChannel fc = fout.getChannel(); ByteBuffer buffer = ByteBuffer.allocate( 1024 ); for (int i=0; i<message.length; ++i) { buffer.put( message[i] ); } buffer.flip(); fc.write( buffer ); fout.close(); } }
IO模型 | 相对性能 | 关键思路 | 操作系统 | JAVA支持 |
select | 较高 | Reactor | windows/Linux |
支持,Reactor 模式(反应器设计模式)。Linux 操作
系统的 kernels 2.4 内核版本之前,默认使用
select;而目前 windows 下对同步 IO 的支持,都
是 select 模型
|
poll | 较高 | Reactor | Linux |
Linux 下的 JAVA NIO 框架,Linux kernels 2.6 内
核版本之前使用 poll 进行支持。也是使用的
Reactor 模式。
|
epoll | 高 | Reactor/Proactor | Linux |
Linux kernels 2.6 内核版本及以后使用 epoll 进行
支持;Linux kernels 2.6 内核版本之前使用 poll
进行支持;另外一定注意,由于 Linux 下没有
Windows 下的 IOCP 技术提供真正的 异步 IO 支
持,所以 Linux 下使用 epoll 模拟异步 IO。
|
kqueue | 高 | Proactor | Linux | 目前 JAVA 的版本不支持。 |
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); }
WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe(); }
public Pipe openPipe() throws IOException { return new PipeImpl(this); }
PipeImpl(SelectorProvider sp) { long pipeFds = IOUtil.makePipe(true); int readFd = (int) (pipeFds >>> 32); int writeFd = (int) pipeFds; FileDescriptor sourcefd = new FileDescriptor(); IOUtil.setfdVal(sourcefd, readFd); source = new SourceChannelImpl(sp, sourcefd); FileDescriptor sinkfd = new FileDescriptor(); IOUtil.setfdVal(sinkfd, writeFd); sink = new SinkChannelImpl(sp, sinkfd); }
/** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */ static native long makePipe(boolean blocking);
JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) { int fd[2]; if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return 0; } if (blocking == JNI_FALSE) { if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); close(fd[0]); close(fd[1]); return 0; } } return ((jlong) fd[0] << 32) | (jlong) fd[1]; } static int configureBlocking(int fd, jboolean blocking) { int flags = fcntl(fd, F_GETFL); int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); }
/** * Returns two file descriptors for a pipe encoded in a long. * The read end of the pipe is returned in the high 32 bits, * while the write end is returned in the low 32 bits. */
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
public ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this); }
public ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; }
protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; }
private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); } private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout); // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生 read 的 FD private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生 write 的 FD private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生 except 的 FD
public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd); JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,jint scoutFd) { /* Write one byte into the pipe */ const char byte = 1; send(scoutFd, &byte, 1, 0); }
原文:https://www.cnblogs.com/qlsem/p/11521789.html