public abstract class ManagedBuffer {
/** Number of bytes of the data. */
public abstract long size();
/**
* Exposes this buffer‘s data as an NIO ByteBuffer. Changing the position and limit of the
* returned ByteBuffer should not affect the content of this buffer.
*/
// TODO: Deprecate this, usage may require expensive memory mapping or allocation.
public abstract ByteBuffer nioByteBuffer() throws IOException;
/**
* Exposes this buffer‘s data as an InputStream. The underlying implementation does not
* necessarily check for the length of bytes read, so the caller is responsible for making sure
* it does not go over the limit.
*/
public abstract InputStream createInputStream() throws IOException;
/**
* Increment the reference count by one if applicable.
*/
public abstract ManagedBuffer retain();
/**
* If applicable, decrement the reference count by one and deallocates the buffer if the
* reference count reaches zero.
*/
public abstract ManagedBuffer release();
/**
* Convert the buffer into an Netty object, used to write the data out.
*/
public abstract Object convertToNetty() throws IOException;
}
public final class FileSegmentManagedBuffer extends ManagedBuffer {
private final TransportConf conf;
private final File file;
private final long offset;
private final long length;
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
}
@Override
public long size() {
return length;
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
FileChannel channel = null;
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it‘s sufficiently small, as memory mapping has a high overhead.
if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException(String.format("Reached EOF before filling buffer\n" +
"offset=%s\nfile=%s\nbuf.remaining=%s",
offset, file.getAbsoluteFile(), buf.remaining()));
}
}
buf.flip();
return buf;
} else {
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
} catch (IOException e) {
try {
if (channel != null) {
long size = channel.size();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
}
throw new IOException("Error in opening " + this, e);
} finally {
JavaUtils.closeQuietly(channel);
}
}
@Override
public InputStream createInputStream() throws IOException {
FileInputStream is = null;
try {
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
} catch (IOException e) {
try {
if (is != null) {
long size = file.length();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
} finally {
JavaUtils.closeQuietly(is);
}
throw new IOException("Error in opening " + this, e);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(is);
throw e;
}
}
@Override
public ManagedBuffer retain() {
return this;
}
@Override
public ManagedBuffer release() {
return this;
}
@Override
public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
return new DefaultFileRegion(fileChannel, offset, length);
}
}
public File getFile() { return file; }
public long getOffset() { return offset; }
public long getLength() { return length; }
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file)
.add("offset", offset)
.add("length", length)
.toString();
}
}
public final class NettyManagedBuffer extends ManagedBuffer {
private final ByteBuf buf;
public NettyManagedBuffer(ByteBuf buf) {
this.buf = buf;
}
@Override
public long size() {
return buf.readableBytes();
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return buf.nioBuffer();
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteBufInputStream(buf);
}
@Override
public ManagedBuffer retain() {
buf.retain();
return this;
}
@Override
public ManagedBuffer release() {
buf.release();
return this;
}
@Override
public Object convertToNetty() throws IOException {
return buf.duplicate();
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("buf", buf)
.toString();
}
}
public final class NioManagedBuffer extends ManagedBuffer {
private final ByteBuffer buf;
public NioManagedBuffer(ByteBuffer buf) {
this.buf = buf;
}
@Override
public long size() {
return buf.remaining();
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return buf.duplicate();
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));
}
@Override
public ManagedBuffer retain() {
return this;
}
@Override
public ManagedBuffer release() {
return this;
}
@Override
public Object convertToNetty() throws IOException {
return Unpooled.wrappedBuffer(buf);
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("buf", buf)
.toString();
}
}
原文:http://www.cnblogs.com/gaoxing/p/4985558.html