1 public static final Cumulator MERGE_CUMULATOR(){...} 2 private Cumulator cumulator = MERGE_CUMULATOR;
1 ByteBuf cumulation;
1 public static final Cumulator MERGE_CUMULATOR = new Cumulator() { 2 @Override 3 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { 4 ByteBuf buffer; 5 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() 6 || cumulation.refCnt() > 1) {//容量不足,扩容 7 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); 8 } else { 9 buffer = cumulation; 10 } 11 buffer.writeBytes(in); 12 in.release(); 13 return buffer; 14 } 15 };
1 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { 2 ByteBuf oldCumulation = cumulation; 3 cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); 4 cumulation.writeBytes(oldCumulation); 5 oldCumulation.release(); 6 return cumulation; 7 }
1 do { 2 byteBuf = allocHandle.allocate(allocator); 3 allocHandle.lastBytesRead(doReadBytes(byteBuf)); 4 if (allocHandle.lastBytesRead() <= 0) { 5 // nothing was read. release the buffer. 6 byteBuf.release(); 7 byteBuf = null; 8 close = allocHandle.lastBytesRead() < 0; 9 break; 10 } 11 allocHandle.incMessagesRead(1); 12 readPending = false; 13 pipeline.fireChannelRead(byteBuf);//触发点 14 byteBuf = null; 15 } while (allocHandle.continueReading());
1 @Override 2 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 3 if (msg instanceof ByteBuf) { 4 CodecOutputList out = CodecOutputList.newInstance(); 5 try { 6 ByteBuf data = (ByteBuf) msg; 7 first = cumulation == null; 8 if (first) { 9 cumulation = data; 10 } else { 11 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);//累加数据 12 } 13 callDecode(ctx, cumulation, out);//拆包 14 } catch (DecoderException e) { 15 throw e; 16 } catch (Throwable t) { 17 throw new DecoderException(t); 18 } finally { 19 if (cumulation != null && !cumulation.isReadable()) {//ByteBuf中没有可以读取的数据,释放ByteBuf 20 numReads = 0; 21 cumulation.release(); 22 cumulation = null; 23 } else if (++ numReads >= discardAfterReads) {//读取了数据 24 numReads = 0; 25 discardSomeReadBytes();//将已经读取的数据丢弃 26 } 27 int size = out.size();//读取了几个数据包 28 decodeWasNull = !out.insertSinceRecycled();//是否拆到一个数据包,insertSinceRecycled() = true表示out有增加或修改 29 fireChannelRead(ctx, out, size);//将数据包传递给下一个handler 30 out.recycle(); 31 } 32 } else { 33 ctx.fireChannelRead(msg); 34 } 35 }
1 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 2 try { 3 while (in.isReadable()) { 4 int outSize = out.size(); 5 if (outSize > 0) { 6 fireChannelRead(ctx, out, outSize);//传递给下一个handler 7 out.clear(); 8 if (ctx.isRemoved()) { 9 break; 10 } 11 outSize = 0; 12 } 13 int oldInputLength = in.readableBytes(); 14 decode(ctx, in, out);//抽象方法,具体的子类实现 15 if (ctx.isRemoved()) { 16 break; 17 } 18 if (outSize == out.size()) { 19 if (oldInputLength == in.readableBytes()) { 20 break; 21 } else { 22 continue; 23 } 24 } 25 if (oldInputLength == in.readableBytes()) { 26 throw new DecoderException( 27 StringUtil.simpleClassName(getClass()) + 28 ".decode() did not read anything but decoded a message."); 29 } 30 if (isSingleDecode()) { 31 break; 32 } 33 } 34 } catch (DecoderException e) { 35 throw e; 36 } catch (Throwable cause) { 37 throw new DecoderException(cause); 38 } 39 }
1 @Override 2 protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 3 Object decoded = decode(ctx, in); 4 if (decoded != null) { 5 out.add(decoded); 6 } 7 } 8 9 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { 10 final int eol = findEndOfLine(buffer);//找到了分割符就返回索引,没有返回-1 11 if (!discarding) {//不丢弃数据 12 if (eol >= 0) { 13 final ByteBuf frame; 14 final int length = eol - buffer.readerIndex();//数据包的长度 15 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1;//分割符的长度 16 if (length > maxLength) { 17 buffer.readerIndex(eol + delimLength); 18 fail(ctx, length); 19 return null; 20 } 21 if (stripDelimiter) {//是否带分隔符 22 frame = buffer.readRetainedSlice(length);//创建一个共享buffer对象的缓存区的一个子区域 23 buffer.skipBytes(delimLength); 24 } else { 25 frame = buffer.readRetainedSlice(length + delimLength); 26 } 27 return frame;//返回一个完整的数据包 28 } else { 29 final int length = buffer.readableBytes(); 30 if (length > maxLength) { 31 discardedBytes = length; 32 buffer.readerIndex(buffer.writerIndex()); 33 discarding = true;//后面读取到的数据丢弃 34 if (failFast) { 35 fail(ctx, "over " + discardedBytes); 36 } 37 } 38 return null; 39 } 40 } else {//丢弃掉这次读取的数据 41 if (eol >= 0) { 42 final int length = discardedBytes + eol - buffer.readerIndex(); 43 final int delimLength = buffer.getByte(eol) == ‘\r‘? 2 : 1; 44 buffer.readerIndex(eol + delimLength); 45 discardedBytes = 0; 46 discarding = false;//丢完一个完整包后,下一个包就不再丢弃 47 if (!failFast) { 48 fail(ctx, length); 49 } 50 } else { 51 discardedBytes += buffer.readableBytes(); 52 buffer.readerIndex(buffer.writerIndex()); 53 } 54 return null; 55 } 56 }
1 private static int findEndOfLine(final ByteBuf buffer) { 2 int i = buffer.forEachByte(ByteProcessor.FIND_LF);//FIND_LF=‘\n‘ 3 if (i > 0 && buffer.getByte(i - 1) == ‘\r‘) { 4 i--; 5 } 6 return i; 7 }
1 private boolean discarding; 2 3 if (length > maxLength) { 4 discarding = true; 5 } 6 7 if (discarding == true) { 8 if (eol >= 0) { 9 discarding = false; 10 } 11 }
1 private final boolean stripDelimiter; 2 3 public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) { 4 this.maxLength = maxLength; 5 this.failFast = failFast; 6 this.stripDelimiter = stripDelimiter;//whether the decoded frame should strip out the delimiter or not 7 }
原文:http://www.cnblogs.com/chenzl1024/p/7249631.html