<pre>
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
</pre>
// 日志
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class);
private static final String PROP_MODE = "io.netty.buffer.bytebuf.checkAccessible";
private static final boolean checkAccessible;
static {
checkAccessible = SystemPropertyUtil.getBoolean(PROP_MODE, true);
if (logger.isDebugEnabled()) {
logger.debug("-D{}: {}", PROP_MODE, checkAccessible);
}
}
// 所有的子类都是共享同一个对象的
static final ResourceLeakDetector<ByteBuf> leakDetector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
很显然缓冲区的实现由子类自行定义
检查流程 【ensuerAccessible()验证是否有效内存引用 -- 边界检查 】--> 获取字节内容 --> readerIndex增加
检查流程 【参数自身边界校验 -- ensuerAccessible()验证是否有效内存引用 -- 参数相对边界检查 -- 调用alloc对象的calculateNewCapacity方法判断扩容 -- 调整capacity 】--> 写入字节内容 --> writerIndex增加
参数是当前所需最小的容量和允许的最大容量,返回调整后的容量值
public int calculateNewCapacity(int minNewCapacity, int maxCapacity)
扩容的策略是设定了threshold,当然这个值是netty自己设定的一个经验值,如果新的容量需求小于该值,从64B开始进行double增长,这样的倍增操作不会带来太多内存浪费,但是如果大于threshold则不适合double,因为容量变得更大可能有效利用空间率会降低,所以按照threshold的倍数进行平滑的扩张。
mark
和reset
对应的index操作,比较简单
discard的相关函数
<pre>
BEFORE discardReadBytes()
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
AFTER discardReadBytes()
+------------------+--------------------------------------+
| readable bytes | writable bytes (got more space) |
+------------------+--------------------------------------+
| | |
readerIndex (0) <= writerIndex (decreased) <= capacity
</pre>
// 如命名所指的功能,进行特定的volatile变量进行原子性操作
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
private volatile int refCnt;
retain
和 release
分别指向引用计数的增加和减少
refCnt
CAS
+ loop
的方式实现引用计数的原子性增加和减少特点
private volatile int refCnt;
private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}
与其用悲观锁的方式不如用乐观锁的方式。当update失败我们可以进行回滚。大多数情况下计数的增加是正确的。
这个改进是用getAndAdd
替换掉compareAndSet
,这会引发不同的cpu指令(CMPXCHG to XADD)。因为CPU知道它将会修改内存所以会避免竞态的发生。
在高竞态下将会有两倍的速度的提升。
这个新的方法有一个负面的影响,那就是引用计数可能会临时进入一个错误的状态。而在并发情况下可能导致其他程序发生错误。比如:
Time 1 Thread 1: obj.retain(INT_MAX - 1)
Time 2 Thread 1: obj.retain(2)
Time 2 Thread 2: obj.retain(1)
之前的方式线程1总会失败而线程2总会成功。但是现在会导致线程2可能发生在线程1进行回滚的时候从而发生失败。
这是可以接受
compareAndSet
to getAndAdd
Faster refcounting
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don‘t resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
private boolean release0(int decrement) {
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don‘t over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
这个热点的调用的函数,之前做法是通过volatile读取,会引发大量的上下文切换等。如果改成non-volatile将会降低上下文切换并且利于内联。
核心观点的该函数只是一个best-effort目的,即使通过了并发有函数使得buffer引用计数置为无效也是无法规避的。那么与其如此不如提高效率,而且大多数buffer是处于单线程内。
Big performance win when multiple ByteBuf methods are called from a method.
// 通过类内参数的偏移量去取
protected final void ensureAccessible() {
if (checkAccessible && internalRefCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
}
起始是改成乐观锁模式后带来的来自存在可能的并发边界问题
Possible race condition on AbstractReferenceCounted
问题在于所有的修改都是先进行修改,那么将会存在一种情况,refCnt为0的时候被retian函数有效增加为1这段时间,因为不好判断是被释放了还是overflow的判断条件内。
所以接下来的修改,将腾出最后一位bit进行标识是否释放,从而能够保证只释放一次。
这个PR的改进是提供一个较强的并发语境。竞态仍然存在但是存在的可能性非常小,比如释放的同时保持操作发生溢出,但是一些保障措施仍然有效。当一个release成功返回true,那么能够保障后续的retain或者release调用会抛出异常,并且deallocate只会被执行一次。
release
会将refcount改成表示失效的奇数(和0一样)使用
getAndAdd,
release使用
CAS loop更加坚固的并发语境,高竞争下有延迟但还是比之前逻辑快了两倍
// even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0
@SuppressWarnings("unused")
private volatile int refCnt = 2;
// 返回真实值,因为内部使用偶数表示
private static int realRef(int refCnt) {
return (refCnt & 1) != 0 ? 0 : refCnt >>> 1;
}
0作为一个失效边不适合在竞态中,除非CAS,否则0很难判断是release过没有。
这个优化方案是利用最后一位来标定是否释放过,所以不能使用最后一位,所有的计数要进行乘二处理。
本次修改将很多条件拆分成私有函数有利于内联。同时注意命名的规范性。
private ByteBuf retain0(final int increment) {
// all changes to the raw count are 2x the "real" change
int adjustedIncrement = increment << 1; // overflow OK here,因为符号位也可以用来存储
int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement);
if ((oldRef & 1) != 0) {
throw new IllegalReferenceCountException(0, increment);
}
// don‘t pass 0!
if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0)
|| (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) {
// overflow case
refCntUpdater.getAndAdd(this, -adjustedIncrement);
throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
}
return this;
}
private boolean release0(int decrement) {
int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement); // 返回对外的真实值,并且如果是奇数会抛异常
//第一次直接non-volatile进行cas
if (decrement == realCnt) {
if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
deallocate();
return true;
}
return retryRelease0(decrement);
}
return releaseNonFinal0(decrement, rawCnt, realCnt);
}
private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) {
if (decrement < realCnt
// all changes to the raw count are 2x the "real" change
&& refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
return retryRelease0(decrement);
}
private boolean retryRelease0(int decrement) {
for (;;) {
int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement);
if (decrement == realCnt) {
if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
deallocate();
return true;
}
} else if (decrement < realCnt) {
// all changes to the raw count are 2x the "real" change
if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
} else {
throw new IllegalReferenceCountException(realCnt, -decrement);
}
Thread.yield(); // this benefits throughput under high contention
}
}
/**
* Like {@link #realRefCnt(int)} but throws if refCnt == 0
*/
private static int toLiveRealCnt(int rawCnt, int decrement) {
if ((rawCnt & 1) == 0) {
return rawCnt >>> 1;
}
// odd rawCnt => already deallocated
throw new IllegalReferenceCountException(0, -decrement);
}
Big endian Java heap buffer implementation.It is recommended to use UnpooledByteBufAllocator.heapBuffer(int, int), Unpooled.buffer(int) and Unpooled.wrappedBuffer(byte[]) instead of calling the constructor explicitly.
private final ByteBufAllocator alloc;
byte[] array;
// internal bytebuffer,利用ByteBuffer.wrap(array);实现
private ByteBuffer tmpNioBuf;
// 参数检查会检查是否为null,然后进行参数合法性校验
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
// 调用父类,引用计数加1
super(maxCapacity);
checkNotNull(alloc, "alloc");
checkNotNull(initialArray, "initialArray");
if (initialArray.length > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
}
this.alloc = alloc;
// 直接将穿过的引用赋值
setArray(initialArray);
setIndex(0, initialArray.length);
}
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
...
this.alloc = alloc;
// 赋值array,但是tmpNioBuf还是null
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
Array相关函数,allocateArray
freeArray
setArrary
,由于只是直接利用java堆所以直接new,noop,直接赋值
获取基本信息,order
isDirect
capacity
hasArray
array
arrayOffset
hasMemoryAddress
,分别是大端,false,有array,返回array变量,0,false
// 将按照newCapacity的大小去截取旧的数组
// new的比较大,那么会全部复制
// new小于旧的,截取new大小的数组内容,同时只保留复制未读部分,已读均不进行复制,更新index
@Override
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int oldCapacity = array.length;
byte[] oldArray = array;
// 如果设置新的容量大于旧的容量,那么将申请新的空间,将原来的内存复制到新的空间中
if (newCapacity > oldCapacity) {
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) {
// 如果设置新的容量小于旧的容量,默认只保留【0-newCapacity】之间的内容,如果这部分内容中有未读取完的,按照原来的readIndex位置复制回去
// 如果这部分内容中都是已读的,不进行复制
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
return this;
}
getBytes
setBytes
nioBuffer
getType...
系列函数,以下例子可以简单了解大小端读取差异
@Override
public short getShort(int index) {
ensureAccessible();
return _getShort(index);
}
@Override
protected short _getShort(int index) {
return HeapByteBufUtil.getShort(array, index);
}
@Override
public short getShortLE(int index) {
ensureAccessible();
return _getShortLE(index);
}
static short getShort(byte[] memory, int index) {
return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
}
static short getShortLE(byte[] memory, int index) {
return (short) (memory[index] & 0xff | memory[index + 1] << 8);
}
配置功能性函数copy
大致是copy array然后调用构造函数,internalNioBuffer
返回成员变量(同时初始化ByteBuffer.wrap(array)
),deallocate
只是将array置为空,unwrap
返回null
if .. else if …
语句可以改成if .. if ..
可读性更高ByteBuf & AbstractReferenceCountedByteBuf & UnpooledHeapByteBuf
原文:https://www.cnblogs.com/GrimReaper/p/10386412.html