收集医院设备信息时由于消息为较短的json字符串,所以我们使用netty的自定义协议进行简单的数据接收。
协议约定为:head(实际消息的字节长度)+body(实际需要解析的json数据)
解码器如下,分为两步骤,最后我们获取json字符串进行处理,如果想要使用byte[]直接解析,请忽略第二个解码处理器:
/** * Created by zzq on 2019/12/26. * <p> * ReplayingDecoder如果字节读取失败会抛出异常,则父类会自动捕获异常,重新调用decode直到能够重复调用 */ public class CustomDataFrameDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int bodyLength = in.readInt();//直接将头部数据获取,得知body的长度,此时ByteBuf中readIndex会向后移动 byte bodyTmp[] = new byte[bodyLength]; ByteBuf body = in.readBytes(bodyLength); body.readBytes(bodyTmp);//将body存入字节数组 CustomDataFrame customDataFrame = new CustomDataFrame(bodyLength, bodyTmp); out.add(customDataFrame); } }
/** * Created by zzq on 2019/12/26. */ public class CustomDataFrameToStrDecoder extends MessageToMessageDecoder<CustomDataFrame> { @Override protected void decode(ChannelHandlerContext ctx, CustomDataFrame msg, List out) throws Exception { if (msg != null) { String bodyStr = new String(msg.getBody(), Charset.forName("UTF-8")); out.add(bodyStr); } } }
服务端如下:
EventLoopGroup parent = new NioEventLoopGroup(1); EventLoopGroup children = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(parent, children) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new CustomDataFrameDecoder()); pipeline.addLast(new CustomDataFrameToStrDecoder()); pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("收到客户端消息:" + msg); String res = msg + "你好"; ctx.channel().writeAndFlush(res); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { String clientAddress = ctx.channel().remoteAddress().toString(); System.out.println("*通信管道注销成功 :" + clientAddress); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String clientAddress = ctx.channel().remoteAddress().toString(); System.out.println("*通信管道关闭" + clientAddress); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {//客户端连接建立 System.out.println("客户端连接建立"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//客户端失去连接 System.out.println("客户端失去连接"); } }); pipeline.addLast(new CustomDataFrameEncoder()); } }); ChannelFuture channelFuture = null; try { channelFuture = serverBootstrap.bind(9099).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { channelFuture.channel().close(); parent.shutdownGracefully(); children.shutdownGracefully(); }
为了测试服务端,编写了客户端代码如下:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new CustomDataFrameDecoder()); pipeline.addLast(new CustomDataFrameToStrDecoder()); pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("收到服务端返回的消息"); System.out.println(msg); } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); new Thread(new Runnable() { @Override public void run() { for (; ; ) { try { Thread.sleep(2000); } catch (Exception e) { } String msg = "{a:b,i:p}" + UUID.randomUUID().toString(); System.out.println("客户端发送消息:" + msg); ctx.channel().writeAndFlush(msg); } } }).start(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } }); pipeline.addLast(new CustomDataFrameEncoder()); } }); try { bootstrap.connect("127.0.0.1", 9099).sync().channel().closeFuture().sync(); } catch (Exception e) { eventLoopGroup.shutdownGracefully(); }
数据接收和发送需要使用到解码器的同时,还需要使用编码器,编码器如下:
/** * Created by zzq on 2019/12/26. */ public class CustomDataFrameEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { byte body[] = msg.getBytes(Charset.forName("UTF-8")); int bodyLength = body.length; out.writeInt(bodyLength);//向输出参数中添加数据 out.writeBytes(body); } }
但实际我们的服务端是不需要像客户端响应数据的,我们直接在数据接收端使用自定义线程池异步处理数据即可。
原文:https://www.cnblogs.com/zzq-include/p/12102339.html