首页 > 其他 > 详细

探究socketChannel原理

时间:2021-01-27 22:16:41      阅读:19      评论:0      收藏:0      [点我收藏+]

我们前面介绍了Selector是对操作系统IO多路复用的简单封装,有了IO多路复用之后,我们还需要非阻塞socket读写操作。因为内核告诉你A连接有数据可读,你想要读1k,事实上只读到了0.5k,如果使用传统的socket API那么线程就会阻塞在这里,而使用了非阻塞Socket就能在没有读满之前立刻返回,数据先放到内存里,然后可以继续读下一个B连接的数据。 SocketChannel就是NIO对于非阻塞socket操作的支持的组件,其在socket上封装了一层。

socket

1982年,BSD那帮人发布了socket和TCP/IP协议栈和select系统调用,当时unix系统严重缺乏进程间通信的手段,全靠fork大法在维持,所以在Socket发布的时候,是用来做进程间通信(IPC)的。放在37年后的今天,这个定义也不过时,网络通信其实就是不同机器的不同操作系统上的socket之间的通信。

 
技术分享图片
image.png

上图是典型的socket通信的流程。

  1. 通过socket()函数创建一个socket fd,代表通信端点
  2. 绑定端口,协议栈、socket类型 (eg: tcp就是流式socket)
  3. 监听, 就可以接受客户端的tcp连接,这个时候建立的tcp连接会存在内核的某个队列里,长度由SO_BACKLOG指定。
  4. 接受连接,通信。愉快的交换数据。
  5. 关闭连接。

TCP/IP协议栈将网络分成四层,分别是:

  • 应用层: 通常就是指你自己写的程序
  • 传输层(TCP): 其实传输层协议还有UDP,只是我们平时用得少。TCP是一种面向流的可靠传输
  • 网络层(IP): 基本就是靠IP协议,知道地址和端口来寻找目标机器
  • 物理层: 就是光纤,网线这种东西


     
    技术分享图片
    image.png

粗略的说,iP协议保证网络包通过路由器能投递给目标机器网卡,经过网卡驱动会触发一个中断给内核,内核会根据TCP/IP协议栈做CRC校验,然后层层解包还原用户数据,然后复制数据到socket的读写缓冲区,这些操作都是在内核空间完成。


 
技术分享图片
image.png

如果socket fd被加入到了多路复用的监听队列里,如epoll_ctl 加入的fd,那么下次epoll_wait的时候,将会返回该socket有数据可读可写。这样就完成一次完整的网络IO事件通知。这个时候用户空间的应用进程直接调用socket的read方法,内核就会将数据从socket的读缓冲区复制到应用进程的缓冲区了(“epoll2.6直接mmap,通知用户进程的时候,数据应该已经在用户空间了”)

SocketChannel详解

SocketChannel是对传统Java Socket API的改进,主要是支持了非阻塞的读写。同时改进了传统的单向流API, Channel同时支持读写(其实就是加了个中间层Buffer)。

创建socketChannel做了什么?

通过SocketChannel.open()可以打开一个SocketChannel, 最后还是委托给SelectorProvider的openSocketChannel方法

// sun.nio.ch.SelectorProvider
public SocketChannel openSocketChannel() throws IOException {
    // 调用SocketChannelImpl的构造器
    return new SocketChannelImpl(this);
}

// sun.nio.ch.SocketChannelImpl
SocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    // 创建socket fd
    this.fd = Net.socket(true);
    // 获取socket fd的值
    this.fdVal = IOUtil.fdVal(fd);
    // 初始化SocketChannel状态, 状态不多,总共就6个
    // 未初始化,未连接,正在连接,已连接,断开连接中,已断开
    this.state = ST_UNCONNECTED;
}

// sun.nio.ch.Net
static FileDescriptor socket(ProtocolFamily family, boolean stream)
    throws IOException {
    boolean preferIPv6 = isIPv6Available() &&
        (family != StandardProtocolFamily.INET);
    // 最后调用的是socket0
    return IOUtil.newFD(socket0(preferIPv6, stream, false));
}

// Due to oddities SO_REUSEADDR on windows reuse is ignored
private static native int socket0(boolean preferIPv6, boolean stream, boolean reuse);

 

可以看到,最后还是靠一个native方法socket0来创建socket fd,打开jdk/src/solaris/native/sun/nio/ch/Net.c

JNIEXPORT int JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
                            jboolean stream, jboolean reuse)
{
    int fd;
    int type = (stream ? SOCK_STREAM : SOCK_DGRAM);

    // 老朋友socket函数
    fd = socket(domain, type, 0);
    if (fd < 0) {
        return handleSocketError(env, errno);
    }

    ....省略非关键代码

    // 设置是否重用地址,如果打开的是ServerSocketChannel
    // 默认是重用的,其他普通SocketChannel默认不重用
    // 重用和不重用的区别在于,就算你关掉了程序,你绑定的
    // 本地端口也在一定时间内是已使用的(address already in use)
    if (reuse) {
        int arg = 1;
        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set SO_REUSEADDR");
            close(fd);
            return -1;
        }
    }
    ...
    return fd;
}

 

果然,底层还是socket函数,这样一个socket fd就创建好了。
其实创建个socket fd操作系统内核做了很多事情的,要判断一大堆东西,还要创建和初始化读写缓冲区,加自旋锁等.

** 如何实现非阻塞**
正常在c里我们实现非阻塞是靠fcntl这个函数,这个函数全称就是file control,
通过它可以管理fd的各种属性,比如设置fd的阻塞与否。

fcntl的函数签名为:

#include <fcntl.h>

int fcntl(int fildes, int cmd, ...);

 

第一个参数是传入的fd, 第二个参数是操作类型,后面是flag
要设置非阻塞,操作类型是F_SETFL和F_GETFL,flag是O_NONBLOCK

那么JVM是怎么做的呢,在SocketChannel上有一个configureBlocking函数,这个函数是设置当前SocketChannel是否是阻塞的,和selector一起用的时候一定要设置成非阻塞才有意义, 阻塞的话就不需要IO多路复用的事件通知了。

// java.nio.channels.spi.AbstractSelectableChannel
public final SelectableChannel configureBlocking(boolean block)
    throws IOException
{
    ...
    // 模板方法模式,调用子类的实现
    implConfigureBlocking(block);
    ...
    return this;
}

 

在SocketChannelImpl里看

protected void implConfigureBlocking(boolean block) throws IOException {
    IOUtil.configureBlocking(fd, block);
}

将这个操作又交给了IOUtil的configureBlocking, 同时还传入了我们上面创建的socket fd. 打开IOUtil一看

public static native void configureBlocking(FileDescriptor fd,
                                            boolean blocking)
    throws IOException;

 

还是要找c的实现,打开IOUtil.c

JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_configureBlocking(JNIEnv *env, jclass clazz,
                                         jobject fdo, jboolean blocking)
{
    if (configureBlocking(fdval(env, fdo), blocking) < 0)
        JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
}

static int
configureBlocking(int fd, jboolean blocking)
{
    // 所以还是靠file control
    int flags = fcntl(fd, F_GETFL);
    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);

    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

 

可以看到,JVM也是靠fcntl来实现非阻塞的,所以服务端编程知道一些底层的API还是有价值的和有必要的。

SocketChannel的读写
那么SocketChannel是如何读写呢,打开SocketChannelImpl

public int read(ByteBuffer buf) throws IOException {
  ...
  // n表示读到的数据长度
  int n = 0;
  for (;;) {
      // 从socket fd里读数据,长度由buf决定
      n = IOUtil.read(fd, buf, -1, nd);
      if ((n == IOStatus.INTERRUPTED) && isOpen()) {
          // The system call was interrupted but the channel
          // is still open, so retry
          continue;
      }
      return IOStatus.normalize(n);
  }
  ...
}

 

读交给了IOUtil的read方法

static int read(FileDescriptor fd, ByteBuffer dst, long position,
                NativeDispatcher nd)
    throws IOException
{
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    // 判断是不是DirectBuffer,是直接读进去
    // DirectBuffer是有名的冰山对象,其后可能关联着一堆直接内存
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, nd);

    // 如果传入的不是DirectBuffer,那么使用临时的DirectBuffer
    // Substitute a native buffer
    ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    try {
        int n = readIntoNativeBuffer(fd, bb, position, nd);
        bb.flip();
        if (n > 0)
            dst.put(bb);
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                        long position, NativeDispatcher nd)
    throws IOException
{
    int pos = bb.position();
    int lim = bb.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);

    if (rem == 0)
        return 0;
    int n = 0;

    // 调用本地方法去读
    // 要读socket fd一定要知道起始地址
    // 感兴趣可以看看https://stackoverflow.com/questions/11981474/pread-and-lseek-not-working-on-socket-file-descriptor
    // 调用完毕bb的那个DirectBuffer的直接内存里就有数据了
    if (position != -1) {
        n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                     rem, position);
    } else {
        n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    if (n > 0)
        bb.position(pos + n);
    return n;
}

static native int pread0(FileDescriptor fd, long address, int len,
                         long position) throws IOException;

 

为啥一定要用DirectBuffer?, 在JVM里是有GC的,但在调用Socket Api进行读写通信的时候,需传入的是一个固定的内存地址,假如数据使用的是堆内地址,GC之后对象地址就变了,这时socket读写就会崩。

上面还有最后一个pread0本地方法,这个是文件IO函数,第一个参数传入socket fd的时候,将会从socket的读缓冲区复制数据到目标地址.

Java NIO 由以下几个核心部分组成:

1 、Buffer
2、Channel
3、Selector

传统的IO操作面向数据流,意味着每次从流中读一个或多个字节,直至完成,数据没有被缓存在任何地方。

NIO操作面向缓冲区,数据从Channel读取到Buffer缓冲区,随后在Buffer中处理数据。

Buffer

A buffer is a linear, finite sequence of elements of a specific primitive type.

一块缓存区,内部使用字节数组存储数据,并维护几个特殊变量,实现数据的反复利用。
1、mark:初始值为-1,用于备份当前的position;
2、position:初始值为0,position表示当前可以写入或读取数据的位置,当写入或读取一个数据后,position向前移动到下一个位置;
3、limit:写模式下,limit表示最多能往Buffer里写多少数据,等于capacity值;读模式下,limit表示最多可以读取多少数据。
4、capacity:缓存数组大小


 
技术分享图片
image.png

clear():一旦读完Buffer中的数据,需要让Buffer准备好再次被写入,clear会恢复状态值,但不会擦除数据。

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

 

mark():把当前的position赋值给mark

public final Buffer mark() {
    mark = position;
    return this;
}

 

reset():把mark值还原给position

flip():Buffer有两种模式,写模式和读模式,flip后Buffer从写模式变成读模式。

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

 

rewind():重置position为0,从头读写数据。

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

 

目前Buffer的实现类有以下几种:

ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
MappedByteBuffer
 
技术分享图片
image.png

ByteBuffer

A byte buffer,extend from Buffer

ByteBuffer的实现类包括"HeapByteBuffer"和"DirectByteBuffer"两种。

HeapByteBuffer

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}
HeapByteBuffer(int cap, int lim) {  
    super(-1, 0, lim, cap, new byte[cap], 0);
}

 

HeapByteBuffer通过初始化字节数组hd,在虚拟机堆上申请内存空间。

DirectByteBuffer

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}
DirectByteBuffer(int cap) {
    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        // Round up to page boundary
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;
}

 

DirectByteBuffer通过unsafe.allocateMemory申请堆外内存,并在ByteBuffer的address变量中维护指向该内存的地址。
unsafe.setMemory(base, size, (byte) 0)方法把新申请的内存数据清零。

Channel

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

NIO把它支持的I/O对象抽象为Channel,Channel又称“通道”,类似于原I/O中的流(Stream),但有所区别:
1、流是单向的,通道是双向的,可读可写。
2、流读写是阻塞的,通道可以异步读写。
3、流中的数据可以选择性的先读到缓存中,通道的数据总是要先读到一个缓存中,或从缓存中写入,如下所示:


 
技术分享图片
image.png

目前已知Channel的实现类有:

FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel

FileChannel

A channel for reading, writing, mapping, and manipulating a file.
一个用来写、读、映射和操作文件的通道。

FileChannel的read、write和map通过其实现类FileChannelImpl实现。

read实现

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.read(fd, dst, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

 

FileChannelImpl的read方法通过IOUtil的read实现:

static int read(FileDescriptor fd, ByteBuffer dst, long position,
                NativeDispatcher nd) IOException {
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, nd);

    // Substitute a native buffer
    ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    try {
        int n = readIntoNativeBuffer(fd, bb, position, nd);
        bb.flip();
        if (n > 0)
            dst.put(bb);
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

 

通过上述实现可以看出,基于channel的文件数据读取步骤如下:
1、申请一块和缓存同大小的DirectByteBuffer bb。
2、读取数据到缓存bb,底层由NativeDispatcher的read实现。
3、把bb的数据读取到dst(用户定义的缓存,在jvm中分配内存)。
read方法导致数据复制了两次

write实现

public int write(ByteBuffer src) throws IOException {
    ensureOpen();
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.write(fd, src, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

 

和read实现一样,FileChannelImpl的write方法通过IOUtil的write实现:

static int write(FileDescriptor fd, ByteBuffer src, long position,
                 NativeDispatcher nd) throws IOException {
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);
    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);
        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

 

通过上述实现可以看出,基于channel的文件数据写入步骤如下:
1、申请一块DirectByteBuffer,bb大小为byteBuffer中的limit - position。
2、复制byteBuffer中的数据到bb中。
3、把数据从bb中写入到文件,底层由NativeDispatcher的write实现,具体如下:

private static int writeFromNativeBuffer(FileDescriptor fd, 
        ByteBuffer bb, long position, NativeDispatcher nd)
    throws IOException {
    int pos = bb.position();
    int lim = bb.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);

    int written = 0;
    if (rem == 0)
        return 0;
    if (position != -1) {
        written = nd.pwrite(fd,
                            ((DirectBuffer)bb).address() + pos,
                            rem, position);
    } else {
        written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    if (written > 0)
        bb.position(pos + written);
    return written;
}

 

write方法也导致了数据复制了两次

Channel和Buffer示例

File file = new RandomAccessFile("data.txt", "rw");
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(48);

int bytesRead = channel.read(buffer);
while (bytesRead != -1) {
    System.out.println("Read " + bytesRead);
    buffer.flip();
    while(buffer.hasRemaining()){
        System.out.print((char) buffer.get());
    }
    buffer.clear();
    bytesRead = channel.read(buffer);
}
file.close();

 

注意buffer.flip() 的调用,首先将数据写入到buffer,然后变成读模式,再从buffer中读取数据。



作者:tracy_668
链接:https://www.jianshu.com/p/ba3bf3c62018
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

探究socketChannel原理

原文:https://www.cnblogs.com/stop-Word/p/14336766.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!