首页 > Web开发 > 详细

【netty这点事儿】ByteBuf 的使用模式

时间:2018-03-01 11:03:21      阅读:287      评论:0      收藏:0      [点我收藏+]

堆缓冲区

最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。 这种模式被称为支撑数组
(backing array), 它能在没有使用池化的情况下提供快速的分配和释放。

直接缓冲区

直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。直接缓冲区对于网络数据传输是理想的选择。因为如果你的数据包含在一个在堆上分配的缓冲区中,那么事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲区复制到一个直接缓冲区中。
直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。所以该类型内存只在收发数据包内存使用效率较高,在netty应用层中不合适。

复合缓冲区

让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议传输的消息。这两部分由应用程序的不同模块产生, 将会在消息被发送的时候组装。该应用程序可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新的头部。
因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个
完美的选择。
需要注意的是, Netty使用了CompositeByteBuf来优化套接字的I/O操作,尽可能地消除了
由JDK的缓冲区实现所导致的性能以及内存使用率的惩罚。这种优化发生在Netty的核心代码中,因此不会被暴露出来,但是你应该知道它所带来的影响。

ByteBuf 的分配方式

池化的分配 PooledByteBufAllocator是ByteBufAllocator的默认方式

可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到
ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。
池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。此实现使用了一种称为jemalloc的已被大量现代操作系统所采用的高效方法来分配内存。
该方式在netty中是默认方式。

非池化的分配 UnpooledByteBufAllocator

可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提供了一个简单的称为 Unpooled 的工具类, 它提供了静态的辅助方法来创建未池化的 ByteBuf实例。

依托http进行性能测试

netty http 代码

HttpServer.java
package http.server;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

public class HttpServer {

    private static Log log = LogFactory.getLog(HttpServer.class);
    
    public void start(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel ch) throws Exception {
                                    // server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码
                                    ch.pipeline().addLast(new HttpResponseEncoder());
                                    // server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码
                                    ch.pipeline().addLast(new HttpRequestDecoder());
                                    ch.pipeline().addLast(new HttpServerInboundHandler());
                                }
                            }).option(ChannelOption.SO_BACKLOG, 128) 
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        HttpServer server = new HttpServer();
        log.info("Http Server listening on 5656 ...");
        server.start(5656);
    }
}

HttpServerInboundHandler.java

package http.server;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpVersion;

public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {
    private static Log log = LogFactory.getLog(HttpServerInboundHandler.class);

    // private HttpRequest request;

    // static ByteBuf buf = Unpooled.wrappedBuffer("hello world".getBytes());
    byte[] bs = "hello world".getBytes();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // if (msg instanceof HttpRequest) {
        // request = (HttpRequest) msg;
        //
        // String uri = request.uri();
        // // System.out.println("Uri:" + uri);
        // }
        // if (msg instanceof HttpContent) {
        // HttpContent content = (HttpContent) msg;
        // ByteBuf buf = content.content();
        // // System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
        // buf.release();

        // String res = "hello world.";
        // ByteBuf buf = Unpooled.wrappedBuffer(bs);
        // ByteBuf buf = Unpooled.directBuffer();
        // ByteBuf buf = Unpooled.buffer();
        // ByteBuf buf = ctx.alloc().heapBuffer();// 池化堆内存
        // ByteBuf buf = ctx.alloc().directBuffer(); // 池化直接内存
        // ByteBuf buf = Unpooled.buffer();// 非池化堆内存
        ByteBuf buf = Unpooled.directBuffer();// 非池化堆内存
        buf.writeBytes(bs);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK, buf);

        // response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        /*
         * if (HttpHeaders.isKeepAlive(request)) { response.headers().set(CONNECTION, Values.KEEP_ALIVE); }
         */
        ctx.write(response);
        // ctx.flush();
        // }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause.getMessage());
        ctx.close();
    }

}

池化堆内存 ctx.alloc().heapBuffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.76ms    9.31ms 180.34ms   92.96%
    Req/Sec   138.20k    43.16k  210.22k    66.50%
  10957832 requests in 20.09s, 522.51MB read
Requests/sec: 545332.08
Transfer/sec:     26.00MB
real    0m20.104s
user    0m10.441s
sys 0m44.703s

池化直接内存 ctx.alloc().directBuffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     4.47ms    9.99ms 149.70ms   91.76%
    Req/Sec   138.51k    41.31k  209.94k    63.38%
  10981466 requests in 20.09s, 523.64MB read
Requests/sec: 546684.37
Transfer/sec:     26.07MB
real    0m20.098s
user    0m10.890s
sys 0m45.081s

非池化堆内存 Unpooled.buffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     4.00ms    8.72ms 150.05ms   91.52%
    Req/Sec   138.84k    42.05k  209.72k    63.81%
  11017442 requests in 20.09s, 525.35MB read
Requests/sec: 548379.99
Transfer/sec:     26.15MB
real    0m20.101s
user    0m10.639s
sys 0m45.191s

非池化直接内存 Unpooled.directBuffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.64ms    9.36ms 156.79ms   92.71%
    Req/Sec   124.55k    33.90k  191.90k    71.61%
  9890536 requests in 20.07s, 471.62MB read
Requests/sec: 492854.62
Transfer/sec:     23.50MB
real    0m20.076s
user    0m9.774s
sys 0m41.801s

【netty这点事儿】ByteBuf 的使用模式

原文:https://www.cnblogs.com/xxxuwentao/p/8487236.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!