传统IO流程示意图
mmap优化流程示意图
sendFile Linux2.1版本优化流程示意图
sendFile Linux
服务端
package com.ronnie.nio.zeroCopy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class NewIOServer {
public static void main(String[] args) throws IOException {
InetSocketAddress address = new InetSocketAddress(8096);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(address);
// 创建Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
while (true){
SocketChannel socketChannel = serverSocketChannel.accept();
int readCount = 0;
while (-1 != readCount){
try {
readCount = socketChannel.read(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
// 倒带, position设为0, mark重置为-1
byteBuffer.rewind();
}
}
}
}
客户端
package com.ronnie.nio.zeroCopy;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class NewIOClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8096));
String filename = "flink-1.9.0-bin-scala_2.12.tgz";
// 得到一个文件channel
FileChannel fileChannel = new FileInputStream(filename).getChannel();
// 准备发送
long startTime = System.currentTimeMillis();
// 在Linux下一次transferTo方法就可以完成传输
// 在Windows下一次调用transferTo 只能发送 8M, 就需要分段传输文件, 而且主要传输时的位置需要记录
long transferCount = 0L;
if (fileChannel.size() <= 8){
// transferTo() 参数1: 从什么位置开始, 参数2: 截多少, 参数3: 可写的管道对象)
transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
} else {
int times = (int) (fileChannel.size() / 8 + 1);
for (int i = 1; i < times; i++){
transferCount += fileChannel.transferTo(8 * i, 8 * i + 8, socketChannel);
}
}
System.out.println("Total byte count: " + transferCount + " time consumed: " + (System.currentTimeMillis() - startTime));
// 关闭
fileChannel.close();
}
}
代码(这是fileChannelImpl中的反编译代码)
public long transferTo(long var1, long var3, WritableByteChannel var5) throws IOException {
// 确认当前管道已经开启, 检查到未开启会抛出异常
this.ensureOpen();
// 如果传入的管道未开启, 抛出异常
if (!var5.isOpen()) {
throw new ClosedChannelException();
// 如果当前管道不可读, 抛出异常
} else if (!this.readable) {
throw new NonReadableChannelException();
// 如果传入的管道是实现类 且 该管道不可写, 抛出异常
} else if (var5 instanceof FileChannelImpl && !((FileChannelImpl)var5).writable) {
throw new NonWritableChannelException();
// 如果 position >= 0 且 count >= 0
} else if (var1 >= 0L && var3 >= 0L) {
// 获取当前管道的长度
long var6 = this.size();
// 如果position已经超过当前管道末尾, 就返回0
if (var1 > var6) {
return 0L;
} else {
// 将count数与2147483647L比较并获取其中最小值, 再转换成int, 传给var8, 其实这里就是做了一个防止count越界的处理
int var8 = (int)Math.min(var3, 2147483647L);
// 如果管道末尾到position之间的长度小于var8
if (var6 - var1 < (long)var8) {
// 就把该值赋给var8
var8 = (int)(var6 - var1);
}
long var9;
// transferToDirectly 直接传输
if ((var9 = this.transferToDirectly(var1, var8, var5)) >= 0L) {
return var9;
} else {
// transferToTrustedChannel 传输到可靠的管道
// transferToArbitraryChannel 传输到任意的管道
// 其实就是先尝试传输到可靠的管道, 如果传输失败, 再用任意管道继续传输
return (var9 = this.transferToTrustedChannel(var1, (long)var8, var5)) >= 0L ? var9 : this.transferToArbitraryChannel(var1, var8, var5);
}
}
} else {
throw new IllegalArgumentException();
}
}
原文:https://www.cnblogs.com/ronnieyuan/p/12009692.html