PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据.
ps:使用这组I/O流必须在多线程环境下.
首先简单的介绍一下这两个类的实现原理,PipedInputStream和PipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东东.还有两个变量,in,out,in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费.
因为生产和消费的方法都是synchronized的,所以肯定是生产者先生产出一定数量的东西,消费者才可以开始消费,所以在生产的时候发现in==out,那一定是满了,同理,在消费的时候发现in==out,那一定是消费完了,因为生产的东西永远要比消费来得早,消费者最多可以消费和生产的数量相等的东西,而不会超出.
好了,介绍完之后,看看SUN高手是怎么实现这些功能的.由于buffer(存放产品的通道)这个关键变量在PipedInputStream消费者这个类中,所以要想对buffer操作,只能通过PipedInputStream来操作,因此将产品放入通道的操作是在PipedInputStream中.
存放产品的行为:
- protected synchronized void receive(int b) throws IOException {
- checkStateForReceive();
- writeSide = Thread.currentThread();
- if (in == out)
- awaitSpace();
- if (in < 0) {
- in = 0;
- out = 0;
- }
- buffer[in++] = (byte) (b & 0xFF);
- if (in >= buffer.length) {
- in = 0;
- }
- }
-
- synchronized void receive(byte b[], int off, int len) throws IOException {
- checkStateForReceive();
- writeSide = Thread.currentThread();
- int bytesToTransfer = len;
- while (bytesToTransfer > 0) {
- if (in == out)
- awaitSpace();
- int nextTransferAmount = 0;
- if (out < in) {
- nextTransferAmount = buffer.length - in;
- } else if (in < out) {
- if (in == -1) {
- in = out = 0;
- nextTransferAmount = buffer.length - in;
- } else {
- nextTransferAmount = out - in;
- }
- }
- if (nextTransferAmount > bytesToTransfer)
- nextTransferAmount = bytesToTransfer;
- assert (nextTransferAmount > 0);
- System.arraycopy(b, off, buffer, in, nextTransferAmount);
- bytesToTransfer -= nextTransferAmount;
- off += nextTransferAmount;
- in += nextTransferAmount;
- if (in >= buffer.length) {
- in = 0;
- }
- }
- }
消费产品的行为:
- public synchronized int read() throws IOException {
- if (!connected) {
- throw new IOException("Pipe not connected");
- } else if (closedByReader) {
- throw new IOException("Pipe closed");
- } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
- && (in < 0)) {
- throw new IOException("Write end dead");
- }
-
- readSide = Thread.currentThread();
- int trials = 2;
- while (in < 0) {
- if (closedByWriter) {
-
- return -1;
- }
- if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
- throw new IOException("Pipe broken");
- }
-
- notifyAll();
- try {
- wait(1000);
- } catch (InterruptedException ex) {
- throw new java.io.InterruptedIOException();
- }
- }
- int ret = buffer[out++] & 0xFF;
- if (out >= buffer.length) {
- out = 0;
- }
- if (in == out) {
-
- in = -1;
- }
- return ret;
- }
-
- public synchronized int read(byte b[], int off, int len) throws IOException {
- if (b == null) {
- throw new NullPointerException();
- } else if ((off < 0) || (off > b.length) || (len < 0)
- || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
-
- int c = read();
- if (c < 0) {
- return -1;
- }
- b[off] = (byte) c;
- int rlen = 1;
-
-
-
-
-
- while ((in >= 0) && (--len > 0)) {
- b[off + rlen] = buffer[out++];
- rlen++;
- if (out >= buffer.length) {
- out = 0;
- }
- if (in == out) {
-
- in = -1;
- }
- }
- return rlen;
- }
PipedInputStream/PipedOutputStream原理
原文:http://www.cnblogs.com/qiumingcheng/p/5336745.html