在读取数据时会调用io.netty.buffer.AbstractByteBuf.writeBytes(ScatteringByteChannel, int),然后调用io.netty.buffer.ByteBuf.setBytes(int, ScatteringByteChannel, int),setBytes方法调用nio.channel.read,如果当前连接已经意外中断,会收到JDK NIO层抛出的ClosedChannelException异常,setBytes方法捕获该异常之后直接返回-1,
//NioByteUnsafe.read方法 public void read() { ... boolean close = false; try { ... do { ... int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { ... close = localReadAmount < 0; break; } ... } while (...); ... if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { ... } finally { ... } } //NioSocketChannel.doReadBytes方法 protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); } //AbstractByteBuf.writeBytes方法 public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; } //UnpooledHeapByteBuf.setBytes方法 public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); try { return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); } catch (ClosedChannelException e) { return -1; } }
//AbstractTrafficShapingHandler.channelRead方法 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { long size = calculateSize(msg); long curtime = System.currentTimeMillis(); if (trafficCounter != null) { //增加字节累计数 trafficCounter.bytesRecvFlowControl(size); if (readLimit == 0) { // no action ctx.fireChannelRead(msg); return; } // compute the number of ms to wait before reopening the channel long wait = getTimeToWait(readLimit, trafficCounter.currentReadBytes(), trafficCounter.lastTime(), curtime); if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal // time in order to // try to limit the traffic if (!isSuspended(ctx)) { ctx.attr(READ_SUSPENDED).set(true); // Create a Runnable to reactive the read if needed. If one was create before it will just be // reused to limit object creation Attribute<Runnable> attr = ctx.attr(REOPEN_TASK); Runnable reopenTask = attr.get(); if (reopenTask == null) { reopenTask = new ReopenReadTimerTask(ctx); attr.set(reopenTask); } ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS); } else { // Create a Runnable to update the next handler in the chain. If one was create before it will // just be reused to limit object creation Runnable bufferUpdateTask = new Runnable() { @Override public void run() { ctx.fireChannelRead(msg); } }; ctx.executor().schedule(bufferUpdateTask, wait, TimeUnit.MILLISECONDS); return; } } } ctx.fireChannelRead(msg); } //AbstractTrafficShapingHandler.getTimeToWait方法 private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) { long interval = curtime - lastTime; if (interval <= 0) { // Time is too short, so just lets continue return 0; } return (bytes * 1000 / limit - interval) / 10 * 10; } private static class TrafficMonitoringTask implements Runnable { ... @Override public void run() { if (!counter.monitorActive.get()) { return; } long endTime = System.currentTimeMillis(); //还原累计字节数,lastTime等变量 counter.resetAccounting(endTime); if (trafficShapingHandler1 != null) { trafficShapingHandler1.doAccounting(counter); } counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(), TimeUnit.MILLISECONDS); } }