首页 > Web开发 > 详细

Netty FixedChannelPool

时间:2018-07-06 00:13:31      阅读:616      评论:0      收藏:0      [点我收藏+]

FixedChannelPool是netty的连接池,除了这个以外netty还有另外一个连接池SimpleChannelPool,它们俩的关系其实就是儿子与爸爸,FixedChannelPool继承了SimpleChannelPool,这篇文章里主要是讲FixedChannelPool的故事。 
注意上面讲的是连接池不是线程池喔。

  • 使用场景 
    作为客户端想要连接服务器,但是并不想像传统的那样一个连接一个线程的来,线程资源有限就不说了,当连接很多的时候可要 怎 么 办!这时候想到一个办法,那就是连接池了。使用连接池咱们可以把所有的连接都放入连接池,当需要的时候拿出来,使用完再放回去。

1.初始化连接池

public void connect(int port, String host, int maxChannel) {

        EventLoopGroup group = new NioEventLoopGroup(1);

        Bootstrap bootstrap = new Bootstrap();

// 连接池每次初始化一个连接的时候都会根据这个值去连接服务器
        InetSocketAddress remoteaddress = InetSocketAddress.createUnresolved(host, port);// 连接地址
        bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .remoteAddress(remoteaddress);

// 初始化连接池
// 这个值可要好好保管好了,后面拿连接放连接都要通过它啦
        FixedChannelPool channelPool = new FixedChannelPool(bootstrap, new SHChannelPoolHandler(), maxChannel);

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

细心的朋友可能看到SHChannelPoolHandler这个类并不是netty自身的,没错,这是咱们第二步要做的: 
2.连接池操作类:

public class SHChannelPoolHandler implements ChannelPoolHandler {

    /**
     * 使用完channel需要释放才能放入连接池
     */
    @Override
    public void channelReleased(Channel ch) throws Exception {
        // TODO Auto-generated method stub

        // 刷新管道里的数据
        ch.writeAndFlush(Unpooled.EMPTY_BUFFER); //flush掉所有写回的数据  

        // LoggerFactory.getLogger(SHChannelPoolHandler.class).info("释放channel,channel
        // released " + ch.toString());
    }

    /**
     * 获取连接池中的channel
     */
    @Override
    public void channelAcquired(Channel ch) throws Exception {
        // TODO Auto-generated method stub

        // LoggerFactory.getLogger(SHChannelPoolHandler.class).info("获取channel,channel
        // acquired " + ch.toString());
    }

    /**
     * 当channel不足时会创建,但不会超过限制的最大channel数
     */
    @Override
    public void channelCreated(Channel ch) throws Exception {
        // TODO Auto-generated method stub

        // LoggerFactory.getLogger(SHChannelPoolHandler.class).info("创建新channel,channel
        // created " + ch.toString());

        NioSocketChannel channel = (NioSocketChannel) ch;

        // 客户端逻辑处理   ClientHandler这个也是咱们自己编写的,继承ChannelInboundHandlerAdapter,实现你自己的逻辑
        channel.pipeline().addLast(new ClientHandler());

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

至此就完成啦 
3.使用

// 从连接池拿到连接
    Channel channel = this.channelPool.acquire().get();

        // 写出数据
        channel.write("xxxx");

        // 连接放回连接池,这里一定记得放回去
        this.channelPool.release(channel);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

其中FixedChannelPool还有很多构造方法,包括获取连接的时候也有很多重载,详细的使用还是多看看官方文档

 

【连接池】

忘记自己借鉴了谁的代码,客户端连接池采用Netty的ChannelPoolMap接口,用网络连接地址做Key,用FixedChannelPool实例化value,即不同的连接服务地址对应不同的连接池。FixedChannelPool的理论连接数上限是Integer.MAX_VALUE,并且使用ChannelHealthChecker接口来判断channel被取出池的时候是否是活的,如果是活的才向应用层吐出去。这样一来保活问题就不用自己操心了。

 

  1.  
    public class TcpClientPool {
  2.  
    final EventLoopGroup group = new NioEventLoopGroup();
  3.  
    final Bootstrap bootstrap = new Bootstrap();
  4.  
    private static final int thread_num = Runtime.getRuntime().availableProcessors();
  5.  
    // key 是地址, value是pool,即一个地址一个pool
  6.  
    private AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap;
  7.  
     
  8.  
    public void build(ChannelPoolHandler poolHandler) throws Exception {
  9.  
    bootstrap.group(group)
  10.  
    .channel(NioSocketChannel.class)
  11.  
    .option(ChannelOption.TCP_NODELAY, true)
  12.  
    .option(ChannelOption.SO_KEEPALIVE, true);
  13.  
     
  14.  
    poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
  15.  
    @Override
  16.  
    protected SimpleChannelPool newPool(InetSocketAddress key) {
  17.  
    return new FixedChannelPool(bootstrap.remoteAddress(key), poolHandler, thread_num);
  18.  
    }
  19.  
    };
  20.  
    }
  21.  
    /* 下面的代码省略 */
  22.  
    }

 

构造方法需要传入一个实现了ChannelPoolHandler接口处理Handler,这个Handler需要实现三个方法

 

void channelReleased(Channel chthrows Exception;

void channelAcquired(Channel chthrows Exception;

void channelCreated(Channel chthrows Exception;

在处理类中最重要的事情是给channel加载业务协议的编解码处理器

 

  1.  
    public abstract class BaseChannelPoolHandler implements ChannelPoolHandler {
  2.  
     
  3.  
    private HandlerConfiguratorInterface configurator;
  4.  
    public BaseChannelPoolHandler(HandlerConfiguratorInterface configurator) {
  5.  
    this.configurator = configurator;
  6.  
    }
  7.  
    /**
  8.  
    * 因为是裸的channel,所以需要给他配置上编解码器
  9.  
    * 只需要配置一次就可以,因为channel会被还回到池里
  10.  
    */
  11.  
    @Override
  12.  
    public void channelCreated(Channel ch) throws Exception {
  13.  
    configurator.configChannel(ch);
  14.  
    }
  15.  
     
  16.  
    @Override
  17.  
    public void channelReleased(Channel ch) throws Exception {}
  18.  
     
  19.  
    @Override
  20.  
    public void channelAcquired(Channel ch) throws Exception {}
  21.  
    }


其中实现HandlerConfiguratorInterface接口(自定义的接口,只有一个方法public void configChannel(Channel channel);)的类,需要通过configChannel方法给Channel对象装配编解码器

 

对于Http的实现

 

  1.  
    public class HttpConfigurator implements HandlerConfiguratorInterface {
  2.  
    private static final int HTTP_AGGREGATE_SIZE = 8192;
  3.  
     
  4.  
    @Override
  5.  
    public void configChannel(Channel ch) {
  6.  
    SocketChannel channel = (SocketChannel)ch;
  7.  
    channel.config().setKeepAlive(true);
  8.  
    channel.config().setTcpNoDelay(true);
  9.  
    channel.pipeline()
  10.  
    .addLast(new HttpClientCodec())
  11.  
    .addLast(new HttpObjectAggregator(HTTP_AGGREGATE_SIZE))
  12.  
    .addLast(new HttpResponseHandler());
  13.  
    }
  14.  
    }

这一步和常见的Netty处理器挂载方式是一致的。最后一个HttpResponseHandler是处理应答的Handler。

 

【关闭连接池】

客户端池还需要提供关闭的能力,否则程序无法正常退出

 

  1.  
    public void close() {
  2.  
    poolMap.close();
  3.  
    group.shutdownGracefully();
  4.  
    }


【发消息】

 

客户端池封装了异步和同步发消息的方法

异步方法

 

  1.  
    public void asyncWriteMessage(InetSocketAddress address, Object message) {
  2.  
    SimpleChannelPool pool = getPool(address);
  3.  
    Future<Channel> future = pool.acquire();
  4.  
    // 获取到实例后发消息
  5.  
    future.addListener((FutureListener<Channel>)f -> {
  6.  
    if (f.isSuccess()) {
  7.  
    Channel ch = f.getNow();
  8.  
    if (ch.isWritable()) {
  9.  
    ch.writeAndFlush(message);
  10.  
    }
  11.  
    // 归还实例
  12.  
    pool.release(ch);
  13.  
    }
  14.  
    });
  15.  
    }


同步方法

 

 

  1.  
    public boolean syncWriteMessage(InetSocketAddress address, Object message) {
  2.  
    SimpleChannelPool pool = getPool(address);
  3.  
    Future<Channel> future = pool.acquire();
  4.  
    try {
  5.  
    Channel channel = future.get();
  6.  
    if (channel.isWritable()) {
  7.  
    channel.writeAndFlush(message);
  8.  
    pool.release(channel);
  9.  
    return true;
  10.  
    }
  11.  
    pool.release(channel);
  12.  
    } catch (Exception e) {
  13.  
    e.printStackTrace();
  14.  
    }
  15.  
    return false;
  16.  
    }


【其他】

 

如果要发HTTP消息,需要自己封装Http消息体,否则Netty编码器会扔掉

 

  1.  
    public class HttpMsgComposer {
  2.  
    public static Object compose(URI uri, String msg) throws Exception {
  3.  
    DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
  4.  
    uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
  5.  
     
  6.  
    // 构建http请求
  7.  
    request.headers().set(HttpHeaderNames.HOST, uri.getHost());
  8.  
    request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
  9.  
    request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
  10.  
    request.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
  11.  
    return request;
  12.  
    }
  13.  
    }


【调用方式】

 

 

  1.  
    public class App {
  2.  
    public static void main(String[] args) throws Exception {
  3.  
    TcpClientPool pool = new TcpClientPool();
  4.  
    pool.build(new HttpChannelPoolHandler(new HttpConfigurator()));
  5.  
     
  6.  
    String url = "http://163.com";
  7.  
    URI uri = new URI(url);
  8.  
    String host = uri.getHost();
  9.  
    int port = uri.getPort() == -1 ? 80 : uri.getPort();
  10.  
    InetSocketAddress address = new InetSocketAddress(host, port);
  11.  
     
  12.  
    for (int i = 0; i < 10; i++) {
  13.  
    pool.asyncWriteMessage(address, HttpMsgComposer.compose(uri, "Hello World"));
  14.  
    }
  15.  
     
  16.  
    while (true) {
  17.  
    Thread.sleep(1000L);
  18.  
    pool.close();
  19.  
    break;
  20.  
    }
  21.  
    }
  22.  

Netty FixedChannelPool

原文:https://www.cnblogs.com/heapStark/p/9270847.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!