Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
发送数据和接收数据比较类似,可以将这两部分结合起来学习。 接收数据:自适应缓冲区和连接读是为了解决什么问题
和读数据一样,写数据我们也会碰到以下问题:
我们再看一下,Netty 是如何解决这几个问题的。这部分才是本小节内容的核心。
自适应写(maxBytesPerGatheringWrite):
Netty 批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整 maxBytesPerGatheringWrite)。
连续写(writeSpinCount):
同连接读一样,每个连接默认最多连续写 16 次,即使还有数据也暂时不处理了,先处理下一个连接。
注册 OP_WRITE 事件
如果 socket sendbuf 已经写不动,那就注册 OP_WRITE 事件。当触发 OP_WRITE 事件时,则取消 OP_WRITE 事件,并继续写。
高低水位线(writeBufferWaterMark)
Netty 待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成
false ,让应用自己做决定要不要发送数据了。
Netty 中将发送数据分为两步,write 只写到缓冲区,只有调用 flush 才会真正发送数据。
发送数据核心步骤有三步:addMessage、addFlush、doWrite。如下图所示:
(1)写的本质
(2)OP_WRITE vs OP_READ
(3)写优化
ctx#write
-> HeadContext#write
-> AbstractChannel.AbstractUnsafe#write
-> ChannelOutboundBuffer#addMessage # √ 添加到缓冲区
ctx#flush
-> HeadContext#flush
-> AbstractChannel.AbstractUnsafe#flush
-> ChannelOutboundBuffer#addFlush # √ 将缓冲区的数据状态改成待发送状态
-> flush0
-> NioSocketChannel#doWrite # √ 核心逻辑,真正发送数据
ChannelOutboundBuffer 缓冲区本质是一个单向链表,addMessage、addFlush、doWrite 会更新链表的状态。
private Entry flushedEntry; // 相当于队头。doWrite时将发送数据后并更新
private Entry unflushedEntry; // 指向第一个未刷新的数据。addFlush时更新
private Entry tailEntry; // 队尾。addMessage 添加到队尾并更新
private int flushed; // 表示flushedEntry~unflushedEntry中未刷新结点的个数
说明: addFlush 更新 unflushedEntry 后,并不表示链表之前的缓冲区数据已经发送。只有调用 doWrite 才真正发送数据,并更新 flushedEntry,将结点从缓冲区中剔除。
ctx#write 调用 addMessage 方法将 msg 添加到缓冲区。addMessage 完成了二件事:
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 1. 添加到队尾,更新tailEntry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// 2. 如果超过高水位线,将可写标志设置为false
incrementPendingOutboundBytes(entry.pendingSize, false);
}
说明: addMessage 的参数 size 为 msg 的大小,Netty 默认使用 DefaultMessageSizeEstimator,直接调用 ((ByteBuf) msg).readableBytes() 获取数据包的大小。高低水位线最后再来统一分析。
ctx#flush 先调用 addFlush 更新 unflushedEntry,然后调用 unsafe.flush0() 将 flushedEntry ~ unflushedEntry 之间的数据刷新出去。addFlush 同样做了两件事:
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null)
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
unsafe.flush0() 实际上是调用 NioSocketChannel#doWrite 方法,将 flushedEntry ~ unflushedEntry 之间的数据刷新出去,同时更新 flushedEntry。这也是最复杂的一部分。doWrite 方法:
NioSocketChannel#doWrite
-> ChannelOutboundBuffer#nioBuffers # 将in中待刷新的数据转换成ByteBuffer[]
-> SocketChannel#write # 真正刷新数据
-> adjustMaxBytesPerGatheringWrite # 动态调整下一次刷新的数据
-> ChannelOutboundBuffer#removeBytes # 将in中已经刷新的结点移除
-> AbstractNioByteChannel#incompleteWrite # 处理未全部刷新完成的情况
-> AbstractNioByteChannel#clearOpWrite # 清除 OP_WRITE 事件
doWrite 代码如下(有删减)
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// 1. 将缓冲区in中的数据转换成ByteBuffer[]
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// do...while 刷新缓冲区数据
long attemptedBytes = in.nioBufferSize();
// 2. socket api 刷新数据
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
// 3. socket sendbuf 刷不动后,调用incompleteWrite,并结点刷新
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// 4. 动态调整每次最大刷新的数据量
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
// 5. 删除in中已经刷新的数据结点
in.removeBytes(localWrittenBytes);
}
说明: in.nioBuffers 和 in.removeBytes 都是 ChannelOutboundBuffer 的操作,先不去管它。我们看一下,adjustMaxBytesPerGatheringWrite 和 incompleteWrite 这两个方法都做了些什么事。
(1)adjustMaxBytesPerGatheringWrite
// maxBytesPerGatheringWrite:默认是SO_SNDBUF大小
// attempted表示尝试刷新的数据大小,而written表示真实刷新的数据大小。
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
// 1. 扩大2倍。全部写满了,可能还有更多的数据需要写。
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
// 2. 缩小2倍。attempted>4KB且真实写入的数据不到attempted的一半
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
(2)incompleteWrite
incompleteWrite 表示数据没有写完,又分两种情况:true 表示 socket sendbuf 写不了;false 表示连续写次数超过16次,提交flushTask,空闲时继续写。
protected final void incompleteWrite(boolean setOpWrite) {
// 1. socket sendbuf 写不了
if (setOpWrite) {
setOpWrite();
// 2. 连续写次数超过16次,提交flushTask,空闲时继续写
} else {
clearOpWrite();
eventLoop().execute(flushTask);
}
}
WriteBufferWaterMark 中设置了默认的高低水位线,高水位线默认为 32KB,低水位线默认为 64KB。
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
高低水位线设置代码如下:
// 超过高水位线,可写标志设置成false
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
// 超过低水位线,可写标志设置成true
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
每天用心记录一点点。内容也许不重要,但习惯很重要!
原文:https://www.cnblogs.com/binarylei/p/12642938.html