知识点
一个thread + 队列 == 一个单线程线程池 =====> 线程安全的,任务是线性串行执行的
线程安全,不会产生阻塞效应 ,使用对象组
线程不安全,会产生阻塞效应, 使用对象池
Netty5服务端接受客户端的消息,需要继承SimpleChannelInboundHandler类,同理,客户端接受服务端消息,也需要实现该类
针对单客户端多连,断开重连,可以看第五点
1、Netty5Server.java
package com.example.netty.lesson6; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * @author yangwj * @date 2020/4/4 21:29 */ public class Netty5Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss worker----其实就是线程池 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //设置线程池 bootstrap.group(boss,worker); //设置socket工厂 bootstrap.channel(NioServerSocketChannel.class); //设置管道工厂 bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new StringEncoder()); channel.pipeline().addLast(new ServerHandler()); } }); //设置参数 bootstrap.option(ChannelOption.SO_BACKLOG,2048);//表示可以接受2048连接(连接缓冲池设置) bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);//维持连接的活跃,清除死连接 bootstrap.childOption(ChannelOption.TCP_NODELAY,true);//关闭延迟发送 //绑定端口 ChannelFuture future = bootstrap.bind(51503); //等待服务端关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //释放资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
2、ServerHandler.java
package com.example.netty.lesson6; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @author yangwj * @date 2020/4/4 21:41 */ public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("接受的消息:"+msg); //返回给客户端信息 ctx.channel().writeAndFlush("服务端返回给你:你好"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //连接时触发 System.out.println("channelActive"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //断开时触发 System.out.println("channelInactive"); } }
3、Netty5Client.java
package com.example.netty.lesson6; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * @author yangwj * @date 2020/4/4 21:59 */ public class Netty5Client { public static void main(String[] args) { //服务类 Bootstrap bootstrap = new Bootstrap(); //woker EventLoopGroup worker = new NioEventLoopGroup(); try { //设置线程池 bootstrap.group(worker); //设置socket工厂 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new StringEncoder()); channel.pipeline().addLast(new ClientHandler()); } }); //连接服务端 ChannelFuture connect = bootstrap.connect("127.0.0.1",51503); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while (true){ System.out.println("请输入:"); String msg = null; msg = bufferedReader.readLine(); connect.channel().writeAndFlush(msg); } } catch (IOException e) { e.printStackTrace(); }finally { worker.shutdownGracefully(); } } }
4、ClientHandler.java
package com.example.netty.lesson6; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @author yangwj * @date 2020/4/4 22:08 */ public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println("客户端收到的消息:"+msg); } }
##########以上存在连接断开,无法重连的问题,下面是解决方案#############
5、Netty5MutilClient.java
package com.example.netty.lesson6; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * @author yangwj * @date 2020/4/4 22:33 */ public class Netty5MutilClient { //服务类 private Bootstrap bootstrap = new Bootstrap(); //会话 private List<Channel> channels = new ArrayList<>(); //引用计数器 private final AtomicInteger index = new AtomicInteger(); //初始化 public void init(int count){ EventLoopGroup worker = new NioEventLoopGroup(); //设置线程池 bootstrap.group(worker); //设置socket工厂 bootstrap.channel(NioSocketChannel.class); //设置管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new StringEncoder()); channel.pipeline().addLast(new ClientHandler()); } }); for (int i =1; i<count; i++){ //连接服务端 ChannelFuture connect = bootstrap.connect("127.0.0.1",51503); channels.add(connect.channel()); } } /** * 获取会话 * @return */ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count){ Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重连 reconnect(channel); if(count >= channels.size()){ throw new RuntimeException("no can use channel"); } return getFirstActiveChannel(count + 1); } return channel; } /** * 重连 * @param channel */ private void reconnect(Channel channel){ synchronized(channel){ if(channels.indexOf(channel) == -1){ return ; } Channel newChannel = bootstrap.connect("127.0.0.1", 51503).channel(); channels.set(channels.indexOf(channel), newChannel); } } }
6、启动客户端Netty5MutilClientStart.java
package com.example.netty.lesson6; import java.io.BufferedReader; import java.io.InputStreamReader; /** * 单客户端多连接 * @author yangwj * @date 2020/4/4 22:49 */ public class Netty5MutilClientStart { public static void main(String[] args) { Netty5MutilClient client = new Netty5MutilClient(); client.init(5); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); while (true){ try { System.out.println("请输入:"); String msg = bufferedReader.readLine(); client.nextChannel().writeAndFlush(msg); }catch (Exception e){ e.printStackTrace(); } } } }
完毕!
原文:https://www.cnblogs.com/ywjfx/p/12811688.html