实现场景: 聊天
服务端,客户端A,客户端B,客户端C。当客户端发送消息给服务端后,服务端在将这条消息广播个所有客户端户端A,客户端B,客户端C。
需求1: 客户端上线后,会通知所有客户端上线。
如客户端A先建立连接,不需要通知。
当客户端B与服务端建立连接,服务端告诉A,客户端B上线。
A和B建立连接后,客户端C和服务端建立连接。服务端广播一条信息给A和B。
需求2: A、B、C都已经建立连接,当A发送一条给服务端,服务端广播这条消息给所有客户端,客户端A会提示这条消息是自己发送的。
一、服务端程序的编写
1、MyChartServer 类
public class MyChartServer { public static void main(String[] args) throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup worderGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,worderGroup).channel(NioServerSocketChannel.class) .childHandler(new MyChatInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); worderGroup.shutdownGracefully(); } } }
2、MyChatInitializer 类
public class MyChatInitializer extends ChannelInitializer<SocketChannel>{ protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyChartServerHandle()); } }
3、MyChartServerHandle 类
public class MyChartServerHandle extends SimpleChannelInboundHandler<String>{ //用于保存所有Channel对象 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //channel为当前客户端发给服务端的channel Channel channel = ctx.channel(); channelGroup.forEach(ch -> { if(channel != ch){ ch.writeAndFlush(channel.remoteAddress() + " 发送的消息:" + msg + "\n"); } else{ ch.writeAndFlush("[自己]" + msg + "\n"); } }); CommonUtil.println(ctx.channel().remoteAddress() + ", " + msg); ctx.channel().writeAndFlush("from server: " + UUID.randomUUID()); } //表示连接建立 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //chanel可以理解成Connection Channel channel = ctx.channel(); //广播消息给所有的客户端 channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 加入\n"); channelGroup.add(channel); } //表示连接断掉了 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //广播消息给所有的客户端 channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 离开\n"); //下面这行代码Netty会自动调用 //channelGroup.remove(channel); } //表示连接时活动状态 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //广播消息给所有的客户端 channelGroup.writeAndFlush( channel.remoteAddress() + " 上线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //广播消息给所有的客户端 channelGroup.writeAndFlush( channel.remoteAddress() + " 下线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace();; ctx.close(); } }
原文:https://www.cnblogs.com/linlf03/p/11296379.html