由于JDK中提供的ByteBuffer无法动态扩容,并且API使用复杂等原因,Netty中提供了ByteBuf。 Bytebuf的API操作更加便捷,可以动态扩容,提供了多种ByteBuf的实现,以及高效的零拷贝机制。
ByteBuf的操作
ByteBuf有三个重要的属性:capacity容量,readerIndex读取位置,writerIndex写入位置 提供了readerIndex和weiterIndex两个变量指针来支持顺序读和写操作
下图显示了一个缓冲区是如何被两个指针分割成三个区域的:
img
代码示例:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.Arrays;
public class ByteBufDemo {
public static void main(String[] args) {
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf为:" + buf.toString());
System.out.println("1.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为:" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为:" + buf.toString());
System.out.println("2.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 3. 读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为:" + Arrays.toString(new byte[] {b1, b2}));
System.out.println("读取一段内容后ByteBuf为:" + buf.toString());
System.out.println("3.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 4.将读取的内容丢弃
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后ByteBuf为:" + buf.toString());
System.out.println("4.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 5.清空读写指针
buf.clear();
System.out.println("清空读写指针后ByteBuf为:" + buf.toString());
System.out.println("5.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为:" + Arrays.toString(bytes2));
System.out.println("写入一段内容后ByteBuf为:" + buf.toString());
System.out.println("6.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
System.out.println("清零后ByteBuf为:" + buf.toString());
System.out.println("7.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为:" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为:" + buf.toString());
System.out.println("8.ByteBuf中的内容为:" + Arrays.toString(buf.array()) + "\n");
//Load the data into IDataView.
//This dataset is used for detecting spikes or changes not for training.
IDataView dataView www.chaoyul.com = mlContext.Data.LoadFromTextFile<ProductSalesData>(path: DatasetPath, hasHeader: true, separatorChar: www.sangyuLpt.com‘,‘);
//Apply data transformation to create predictions.
IDataView transformedData = tansformedModel.Transform(dataView);
var predictions = mlcontext.Data.CreateEnumerable<ProductSalesPrediction>www.baiyytwg.com(transformedData, reuseRowObject: false);
Console.WriteLine(www.huichengtxgw.com"Alert\tScore\tP-Value");
foreach (var p in predictions)
{
if (p.Prediction[www.jintianxuesha.com] == 1)
{
Console.BackgroundColor = ConsoleColor.DarkYellow;
Console.ForegroundColor = ConsoleColor.Black;
}
Console.WriteLine("{0}\t{1:0.00}\www.qjljdgt.cn {2:0.00}www.xgjrfwsc.cn ", p.Prediction[0], p.Prediction[1], p.Prediction[2]);
Console.ResetColor();
ButeBuf动态扩容
capacity默认值:256字节,最大值:Integer.MAX_VALUE (2G)
writeXXX方法调用时,通过AbstractByteBuf.ensureWritable0()方法进行检查 容量计算方法:AbstractByteBufAllocator.calculateNewCapacity
根据capacity的最小值要求,对应有两套计算方法: 没超过4兆:从64字节开始,每次递增一倍,直至计算出来的newCapacity满足新容量最小要求 示例:当前大小256,已写250,继续写10字节的数据,需要的最小容量要求是261,则新容量为64x2x2x2=512
超过4兆:新容量=新容量最小要求/4兆x4兆+4兆 示例:当前大小为3兆,已写3兆,继续写2兆,需要的最小容量大小为5兆,则新容量是8兆(不能超过最大值)
4兆的来源:一个固定的阈值AbstractByteBufAllocator.CALCULATE_THRESHOLD
ByteBuf的实现
img
在使用中都是通过ByteBufAllocator分配器进行申请,同时具备有内存管理功能
PooledByteBuf对象,内存 复用
PooledThreadCache:PooledByteBufAllocator实例维护的一个线程变量 多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chuck。PoolChuck里面维护了内存引用,内存复用的做法就是把buf的memory指向chuck的memory PooledByteBufAllocator.ioBuffer运作过程梳理:
img
零拷贝机制
Netty的零拷贝机制,是一种应用层的实现,和底层JVM,操作系统内存机制并无过多关联。
CompositeByteBuf,将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝
img
wrapedBuffer()方法,将byte[]数组包装成ByteBuf对象
img
slice()方法,将一个ByteBuf对象切割成多个ByteBuf对象
img
代码示例:
public class ZeroCopyTest {
public static void main(String[] args) {
ByteBuf buffer1 = Unpooled.buffer(7);
buffer1.writeByte(7);
ByteBuf buffer2 = Unpooled.buffer(7);
buffer2.writeByte(13);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuf = compositeByteBuf.addComponents(true, buffer1, buffer2);
System.out.println("CompositeByteBuf:" + newBuf);
byte[] bytes = {1, 2, 3};
ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bytes);
System.out.println("wrappedBuffer:" + wrappedBuffer.getByte(2));
bytes[2] = 7;
System.out.println("wrappedBuffer:" + wrappedBuffer.getByte(2));
ByteBuf buf = Unpooled.wrappedBuffer("Netty".getBytes());
ByteBuf slice = buf.slice(1, 2);
slice.unwrap();
System.out.println("slice:" + slice);
}
原文:https://www.cnblogs.com/dakunqq/p/11628481.html