首页 > Web开发 > 详细

netty源码

时间:2019-11-10 21:42:04      阅读:108      评论:0      收藏:0      [点我收藏+]

服务端代码可以是这样的

//1.定义server启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //2.定义工作组:boss分发请求给各个worker:boss负责监听端口请求,worker负责处理请求(读写)
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        //3.定义工作组
        serverBootstrap.group(boss,worker);

        //4.设置通道channel
        serverBootstrap.channel(NioServerSocketChannel.class);//A
        //serverBootstrap.channelFactory(new ReflectiveChannelFactory(NioServerSocketChannel.class));//旧版本的写法,但是此过程在A中有同样过程

        //5.添加handler,管道中的处理器,通过ChannelInitializer来构造
        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                //此方法每次客户端连接都会调用,是为通道初始化的方法

                //获得通道channel中的管道链(执行链、handler链)
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast(new StringDecoder());
//                pipeline.addLast("serverHandler1",new ServerHandler());
//                pipeline.addLast("serverHandler2",new ServerHandler2());
                pipeline.addLast(new StringEncoder());

                System.out.println("success to initHandler!");
            }
        });

        //6.设置参数
        //设置参数,TCP参数
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048);         //连接缓冲池的大小
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//维持链接的活跃,清除死链接
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);//关闭延迟发送

        //7.绑定ip和port
        try {
            ChannelFuture channelFuture = serverBootstrap.bind("0.0.0.0", 9099).sync();//Future模式的channel对象
            //7.5.监听关闭
            channelFuture.channel().closeFuture().sync();  //等待服务关闭,关闭后应该释放资源
        } catch (InterruptedException e) {
            System.out.println("server start got exception!");
            e.printStackTrace();
        }finally {
            //8.优雅的关闭资源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }

 

启动类是ServerBootstrap,有两个属性:group和childGroup,对应上面代码里的boss和worker,是两个NioEventLoopGroup的实例,关键方法是上面try里的bind,点进去看,

1、initAndRegister方法,用channelFactory新建一个channel,这个channel是上面代码中channel()方法的参数NioServerSocketChannel的实例。

然后调用group的register方法,具体实现在NioEventLoopGroup的父类MultithreadEventLoopGroup中,调用的是next().register,这个next是按轮询的方式取出group的child数组的元素,数组中存

的是NioEventLoop,具体的初始化是在NioEventLoopGroup的构造方法中,而且每一个NioEventLoop对应一个selector。register的具体实现在NioEventLoop的父类SingleThreadEventLoop中,

调用的channel的unsafe属性的register,而NioServerSocketChannel的unsafe是其父类AbstractNioMessageChannel的newSafe方法的返回值NioMessageUnsafe,其register方法在父类

AbstractUnsafe中,点进去register0方法中的doRegister方法,具体实现是在AbstractNioChannel中,这个是内部类对外部类的引用,发现有这一句:

javaChannel().register(eventLoop().unwrappedSelector(), 0, this),netty的NioServerSocketChannel其实是对Java nio的ServerSocketChannel的封装,javaChannel.register就是

取出ServerSocketChannel,调用其register方法,eventLoop().unwrappedSelector()这个就是loopGroup的loop数组中,loop--线程--selector是一对一的关系,而这几个“一”和channel是一对多

或者一对一关系,可以发现netty底层其实还是Java的nio。

2、再往下看,doBind0方法,eventloop.execute方法中有个channel.bind,调用的是pipeline.bind,pipeline是channel的属性,掌管这一个双向链表head到tail,中间可以放channel自己的handler,

这是个责任链模式,channel现在的bind以及以后读取到数据都是从这个链表的正向或者逆向流动处理一遍。

3、eventloop.execute方法会保证对应的线程启动,线程的run方法会同时执行task,以及进行对应的selector的select,像nio那样,遍历所有key,检测可读,可写,有链接到来等情况,

具体方法在processSelectedKeys。由于现在我们讨论的是NioServerSocketChannel,所以看OP_ACCEPT也就是有连接到来的情况,调用的是unsafe.read()。上面说过这是个NioMessageUnsafe,

点进去看其read方法,核心在doReadMessages(readBuf),具体实现由回到NioServerSocketChannel,发现直接调用熟悉的SocketChannel ch = SocketUtils.accept(javaChannel()),也就是

Java NIO的ServerSocketChannel的accept方法,得到socketChannel,并放在readBuf中,在下面的pipeline.fireChannelRead方法中再进行遍历,上面说过,这个是在handler链中流动,

按照Java NIO的流程,这里应该是让新得到的socketChannel注册到某个selector上,以后selector就遍历这些channel,进行实际数据的读取。那么现在pipeline中除了tail和head这两个空handler,

具体的处理在一个ServerBootstrap的内部类ServerBootstrapAcceptor中,这个ServerBootstrapAcceptor之所以能存在于pipeline中,是因为刚才说的initAndRegister方法中,channel生成之后

有一个init(channel)方法,往pipeline中添加了一个ChannelInitializer,这也是一个ChannelHandler。而在上面说的doRegister方法之后,有一个pipeline.fireChannelRegistered(),把管道中的

所有handler都调用channelRegistered方法,而ChannelInitializer的channelRegistered调用了initChannel方法, 这样就把ServerBootstrapAcceptor放入NioServerSocketChannel

的管道中,点击去看其channelRead方法,有childGroup.register(child),这个和之前的group.register道理一样,调用的是NioSocketChannel的unsafe:NioByteUnsafe的父类,同样是

AbstractUnsafe的register方法,也是从childLoopGroup这个子线程组中取出一个loop,把NioSocketChannel注册到对应的selector上,线程的run方法for循环中读取请求数据,在其pipeline中

执行channelRead方法,我们只需要把我们自己的handler放入pipelin就可以进行我们自己的逻辑。

到这里,数据流动大体上是通了。

4、具体的解码编码,粘包的处理,对内存比如buffer的管理,待完善。

netty源码

原文:https://www.cnblogs.com/chuliang/p/11831610.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!