通道channel
FileChannel
- FileChannel是一个连接到文件的通道,可以对文件进行读写
- FileChannel是阻塞的,不能运行在非阻塞模式下
- FileChannel对象是线程安全的,多个线程可以在同一个实例上并发调用方法而不会引起任何问题
- FileChannel的初始化
public static void main(String[] args) throws IOException {
File file = new File("D:\\TEXT.txt");
// 只能用于读的通道
FileChannel readCh = new FileInputStream(file).getChannel();
// 只能用于写的通道
FileChannel writCh = new FileOutputStream(file).getChannel();
// 可以读也可以写的通道
FileChannel rwCh = new RandomAccessFile(file, "rw").getChannel();
// 在Java SE1.7中提供了新的初始化方法
// 这个Path后面会介绍
Path path = Paths.get("D:", "TEXT.txt");
// 只能用于读的通道
FileChannel nReadCh = FileChannel.open(path, StandardOpenOption.READ);
// 只能用于写的通道
FileChannel nWriteCh = FileChannel.open(path, StandardOpenOption.WRITE);
// 可以读也可以写的通道,第二个参数可以是个数组
FileChannel nReadWrite = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ);
}
5. 基本方法
public static void main(String[] args) throws IOException {
File file = new File("D:\\t2.txt");
// 如果文件不存,就会直接创建一个空文件
FileChannel rw = new RandomAccessFile(file, "rw").getChannel();
// 初始化一个缓冲区
ByteBuffer bb = ByteBuffer.wrap("temp".getBytes());
// 注意 通道也有一个读写的位置,而且是从底层的文件描述符获得的
// 这也就意味着一个对象对该position的更新可以被另一个对象看到
rw.position();
// 向通道道写入数据
while (bb.hasRemaining()) {
// position放在末尾,write会自动对文件进行扩容
rw.write(bb);
}
// 先清空缓冲区
bb.clear();
// 通道的读写文件位置在末尾,设置到0位置
rw.position(0);
// 读数据到缓冲区
rw.read(bb);
// 通道关联文件的大小(字节)
rw.size();
// 截断文件,只保留前三个字节,其他删掉
rw.truncate(3);
// 所有的现代文件系统都会缓存数据和延迟磁盘文件更新以提高性能。调用force()方法要求文件的所有待定修改立即同步到磁盘
// boolean参数设置 元数据 是否要写到磁盘
// 元数据:指文件所有者、访问权限、最后一次修改时间等信息
// 同步元数据要求操作系统至少一次的I/O操作,为了提高性能,可以不同步元数据,同时也不会牺牲数据完整性
rw.force(false);
// 关闭通道
if (rw.isOpen())
rw.close();
}
6. Channel-to-Channel传输
public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {
// 这里仅列出部分API
public abstract long transferTo (long position, long count, WritableByteChannel target)
public abstract long transferFrom (ReadableByteChannel src, long position, long count)
}
transferTo()和transferFrom()方法允许将一个通道交叉连接到另一个通道,而不需要通过一个中间缓冲区来传递数据。只有FileChannel类有这两个方法,因此Channel-to-Channel传输中通道之一必须是FileChannel。不能在socket通道之间直接传输数据,不过socket通道实现WritableByteChannel和ReadableByteChannel接口,因此文件的内容可以用transferTo()方法传输给一个socket通道,或者也可以用transferFrom()方法将数据从一个socket通道直接读取到一个文件中。
public static void main(String[] args) throws IOException {
// 将两个通道相连传输数据
File inFile = new File("D:" + File.separator + "temp.png");
File outFile = new File("E:" + File.separator + "temp.png");
FileChannel in = new FileInputStream(inFile).getChannel();
FileChannel out = new FileOutputStream(outFile).getChannel();
// 将inFile文件数据拷贝到outFile
out.transferFrom(in, 0, in.size());
in.transferTo(0, in.size(), out);
}
7. 通道可以向缓冲区数组写入数据,并按顺序填充每个缓冲区直到所有缓冲区满或者没有数据可读为止。聚集写也是以类似的方式完成,数据从列表中的每个缓冲区中顺序取出来发送到通道就好像顺序写入一样
文件锁定
- 文件锁定是依赖于操作系统的,而且锁的对象是文件内部区域而不是通道或线程
- 文件锁是进程级的,不是线程级的,一个java线程获得了文件区域a的锁,那这个锁是归JVM进程所持有的,表示这个JVM进程的所有线程对区域a有同等权限。如果这个JVM的另一个线程再次申请区域a或者区域b(包含区域a)的锁,会抛出一个异常OverlappingFileLockException重叠的文件锁。
- 独占锁:锁定的区域用于读写操作。该区域属于进程独享。
- 共享锁:允许多个进程从该区域读取内容,阻止任何进程修改该区域并阻止任何进程获得包含该区域的独占的锁,
- 现在让我们来看下与文件锁定有关的FileChannel API方法
// 如果请求的锁定范围是有效的,阻塞直至获取锁
public final FileLock lock()
// 尝试获取锁非阻塞,立刻返回结果
public final FileLock tryLock()
// 第一个参数:要锁定区域的起始位置
// 第二个参数:要锁定区域的尺寸,
// 第三个参数:true为共享锁,false为独占锁
public abstract FileLock lock (long position, long size, boolean shared)
public abstract FileLock tryLock (long position, long size, boolean shared)</span>
- 锁定区域的范围不一定要限制在文件的size值以内,锁可以扩展从而超出文件尾。因此,我们可以提前把待写入数据的区域锁定,我们也可以锁定一个不包含任何文件内容的区域,比如文件最后一个字节以外的区域。如果之后文件增长到达那块区域,那么您的文件锁就可以保护该区域的文件内容了。相反地,如果您锁定了文件的某一块区域,然后文件增长超出了那块区域,那么新增加 的文件内容将不会受到您的文件锁的保护。
- 不带参数的简单形式的lock()方法是一种在整个文件上请求独占锁的便捷方法,锁定区域等于它能达到的最大范围。该方法等价于
fileChannel.lock(0L,Long.MAX_VALUE,false)
- 并非所有的操作系统都支持共享锁,可能在请求共享锁时获得独占锁,可以调用FileLock类的isShared方法查询获得的锁类型。
public static void main(String[] args) throws IOException {
File file = new File("D:\\t2.txt");
// 如果文件不存,就会直接创建一个空文件
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
// 写入4个字节
fileChannel.write(ByteBuffer.wrap("abcd".getBytes()));
// 将前2个字节区域锁定(共享锁)
FileLock lock1 = fileChannel.lock(0, 2, true);
// 当前锁持有锁的类型(共享锁/独占锁)
lock1.isShared();
// IOException 不能修改只读的共享区域
// fileChannel.write(ByteBuffer.wrap("a".getBytes()));
// 可以修改共享锁之外的区域,从第三个字节开始写入
fileChannel.write(ByteBuffer.wrap("ef".getBytes()), 2);
// OverlappingFileLockException 重叠的文件锁异常
// FileLock lock2 = fileChannel.lock(0, 3, true);
// FileLock lock3 = fileChannel.lock(0, 3, false);
//得到创建锁的通道
lock1.channel();
//锁的起始位置
lock1.position();
//锁的范围
lock1.size();
//判断锁是否与指定文件区域有重叠
lock1.overlaps(position, size)
// 记得用try/catch/finally{release()}方法释放锁
lock1.release();
}
socket通道
- socket通道可以在非阻塞模式下运行,一个线程可以通过Selector选择器管理多个socket连接,并不需要为每个socket连接分配一个线程,也避免了大量线程上下文切换的开销。具有很好的可伸缩性和灵活性。
- 全部socket的通道类(SocketChannel、DatagramChannel、ServerSocketChannel)都继承了AbstractSelectableChannel这个抽象类,这个抽象类有一个注册方法register()可以将通道类注册到Selector选择器中,一个通道可以注册到多个选择器中。任何一个通道和选择器的关系都被封装在SelectionKey中。
// 通过静态open()工厂方法初始化ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 监听端口1100 SE1.7中新增的方法,之前需要ssc.socket().bind(endpoint);
ssc.bind(new InetSocketAddress(1100));
// 初始化选择器
Selector s = Selector.open();
// 必须设置Socket通道为非阻塞才能注册到Selector中
if (ssc.isBlocking())
ssc.configureBlocking(false);
// 非阻塞模式accept会立即返回,如果没有连接传入,直接返回null
// SocketChannel sc = ssc.accept();
// 注册到选择器中,并指定该通道对什么事件感兴趣
SelectionKey sk = ssc.register(s, SelectionKey.OP_ACCEPT);
sk.attach(new Object());//如果有需要可以添加一个附件
SocketChannel和DatagramChannel都实现了读写功能的接口,而ServerSocketChannel并没有实现。ServerSocketChannel只负责监听传入的连接和创建SocketChannel对象。
真正的就绪选择必须由操作系统来做。操作系统的一项最重要的功能就是处理I/O请求并通知各个线程它们的数据已经准备好了。选择器类提供了这种抽象,使用Java代码能够以可移植的方式,请求底层的操作系统提供就绪选择服务。
可以只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件,线程池的大小是可以调整的(或者它自己进行动态的调整)。
package i.io.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
public class SelectSockets {
/**
* 服务器端
*/
public static class ServerSocketListen implements Runnable {
@Override
public void run() {
try {
server(1234);
} catch (Exception e) {
e.printStackTrace();
}
}
public void server(int... port) throws Exception {
// 初始化一个选择器
Selector selector = Selector.open();
// 监听多个端口
for (int pt : port) {
System.out.println("Listening on port " + pt);
// 初始化一个服务器套接字通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置端口服务器通道监听的端口
serverChannel.bind(new InetSocketAddress(pt));
// 设置监听套接字的非阻塞模式
serverChannel.configureBlocking(false);
// 注册ServerSocketChannel选择器
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
while (true) {
// 这个可能阻塞很长时间,返回后选择集包含准备好的通道键
if (selector.select() == 0)
continue;
// 处理准备好的通道
handleChannel(selector);
}
}
/**
* 注册通道和通道感兴趣的业务到选择器
*/
protected void handleChannel(Selector selector) throws Exception {
// 得到选择键的迭代器
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 有新的连接
if (key.isAcceptable()) {
// 得到服务器通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
// 设置通道为非阻塞
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
// 有可读取数据的通道
if (key.isReadable()) {
readDataFromSocket(key);
}
it.remove();
}
}
/**
* 读取通道的数据
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 初始化缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 将数据读到缓冲区
while (socketChannel.read(buffer) > 0) {
// 切换到缓冲区到读模式
buffer.flip();
// 以字符视图打开缓冲
CharBuffer cb = buffer.asCharBuffer();
StringBuilder messClient = new StringBuilder();
while (cb.hasRemaining()) {
messClient.append(cb.get());
}
System.err.println("2-服务器端接收客户端数据:" + messClient.toString());
buffer.clear();
}
// 回写
System.err.println("3-反馈数据到客户端:go");
Charset charset = Charset.forName("gbk");
socketChannel.write(charset.encode("还可以吧"));
}
}
/**
* 客户端类
*/
public static class ClientSocketListen {
public void client() throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT);
sc.connect(new InetSocketAddress(1234));
while (true) {
if (selector.select() == 0)
continue;
handleChannel(selector);
}
}
protected void handleChannel(Selector selector) throws ClosedChannelException, IOException {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
int op = key.readyOps();
SocketChannel channel = (SocketChannel) key.channel();
// 监听这个通道,用于接收服务器端的反馈数据
if ((op & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) {
if (channel.finishConnect()) {
//一个选择器只能有一个当前通道的实例,
// channel.register(selector, SelectionKey.OP_READ);
key.interestOps(SelectionKey.OP_READ);
System.out.println("1-客户端发数据到服务器:go");
ByteBuffer bb = ByteBuffer.allocate(8);
bb.asCharBuffer().put("4个字符");
channel.write(bb);
bb.clear();
}
}
// 接收服务器端反馈数据
if ((op & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
ByteBuffer bb = ByteBuffer.allocate(1024);
while (channel.read(bb) > 0) {
bb.flip();
Charset charset = Charset.forName("gbk");
System.out.println("4-客户端接收服务器反馈数据:" + charset.decode(bb));
}
}
it.remove();
}
}
}
public static void main(String[] argv) throws Exception {
// 先用一个线程启动服务器端
new Thread(new SelectSockets.ServerSocketListen()).start();
// 客户端调用
new SelectSockets.ClientSocketListen().client();
}
}
DatagramChannel是面向UDP的,DatagramChannel对象既可以充当服务器(监听者)也可以充当客户端(发送者), 不同于SocketChannel(必须连接了才有用并且只能连接一次),DatagramChannel对象可以任意次数地进行连接或断开连接。每次连接都可以到一个不同的远程地址。调用disconnect()方法可以配置通道,以便它能再次接收来自安全管理器(如果已安装)所允许的任意远程地址的数据或发送数据到这些地址上。
列出几种使用数据包的情况
程序可以承受数据丢失或无序的数据。
希望「发射后不管」(fire and forget)而不需要知道您发送的包是否已接收。
数据吞吐量比可靠性更重要。
您需要同时发送数据给多个接受者(多播或者广播)。
包隐喻比流隐喻更适合手边的任务。