1、netty在进行字节数组传输的时候,会出现粘包和分包的情况。当个数据还好,如果数据量很大。并且不间断的发送给服务器,这个时候就会出现粘包和分包的情况。
2、简单来说:channelBuffer在接收包的时候,会在当时进行处理,但是当数据量一大,这个时候数据的分隔就不是很明显了。这个时候会出现数据多了或者少了的情况
3、处理的方式,一般就是编解码。自己定义数据:数据长度+数据。这种简单的方式来实现。在server进行解密操作。
4、具体的实现过程
a、客户端进行编码
package com.troy.application.buffer; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class Client { public static void main(String[] args) { try { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost",9000)); //需要发送的数据 String message = "hello"; //这里的4代表message的长度所占字节 ByteBuffer byteBuffer = ByteBuffer.allocate(4+message.length()); //设置数据 byteBuffer.putInt(message.length()); byteBuffer.put(message.getBytes()); //写出数据 socketChannel.write(ByteBuffer.wrap(byteBuffer.array())); } catch (IOException e) { e.printStackTrace(); } } }
b、服务端
package com.troy.application.buffer; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { public static void main(String[] args) { //声明服务类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //设定线程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService work = Executors.newCachedThreadPool(); //设置工厂 serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,work)); //设置管道流 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); //添加处理方式 channelPipeline.addLast("decode",new PackageDecoder()); channelPipeline.addLast("hello",new HelloHandler()); return channelPipeline; } }); //设置端口 serverBootstrap.bind(new InetSocketAddress(9000)); } }
c、解码
package com.troy.application.buffer; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; public class PackageDecoder extends OneToOneDecoder { @Override protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, Object o) throws Exception { ChannelBuffer channelBuffer = (ChannelBuffer) o; if (channelBuffer.readableBytes() > 4) { //标记读取位置 channelBuffer.markReaderIndex(); //读取数据长度 int n = channelBuffer.readInt(); if (channelBuffer.readableBytes() < n) { //如果数据长度小于设定的数据,则处于缓存状态 channelBuffer.resetReaderIndex(); //缓存当前数据,等待数据接入 return null; } byte[] bytes = new byte[n]; channelBuffer.readBytes(bytes); return new String(bytes); } //缓存当前数据,等待数据接入 return null; } }
d、数据接收处理
package com.troy.application.buffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class HelloHandler extends SimpleChannelHandler{ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println(e.getMessage()); } }