FixedChannelPool是netty的连接池,除了这个以外netty还有另外一个连接池SimpleChannelPool,它们俩的关系其实就是儿子与爸爸,FixedChannelPool继承了SimpleChannelPool,这篇文章里主要是讲FixedChannelPool的故事。
注意上面讲的是连接池不是线程池喔。
1.初始化连接池
public void connect(int port, String host, int maxChannel) {
EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
// 连接池每次初始化一个连接的时候都会根据这个值去连接服务器
InetSocketAddress remoteaddress = InetSocketAddress.createUnresolved(host, port);// 连接地址
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.remoteAddress(remoteaddress);
// 初始化连接池
// 这个值可要好好保管好了,后面拿连接放连接都要通过它啦
FixedChannelPool channelPool = new FixedChannelPool(bootstrap, new SHChannelPoolHandler(), maxChannel);
}
细心的朋友可能看到SHChannelPoolHandler这个类并不是netty自身的,没错,这是咱们第二步要做的:
2.连接池操作类:
public class SHChannelPoolHandler implements ChannelPoolHandler {
/**
* 使用完channel需要释放才能放入连接池
*/
@Override
public void channelReleased(Channel ch) throws Exception {
// TODO Auto-generated method stub
// 刷新管道里的数据
ch.writeAndFlush(Unpooled.EMPTY_BUFFER); //flush掉所有写回的数据
// LoggerFactory.getLogger(SHChannelPoolHandler.class).info("释放channel,channel
// released " + ch.toString());
}
/**
* 获取连接池中的channel
*/
@Override
public void channelAcquired(Channel ch) throws Exception {
// TODO Auto-generated method stub
// LoggerFactory.getLogger(SHChannelPoolHandler.class).info("获取channel,channel
// acquired " + ch.toString());
}
/**
* 当channel不足时会创建,但不会超过限制的最大channel数
*/
@Override
public void channelCreated(Channel ch) throws Exception {
// TODO Auto-generated method stub
// LoggerFactory.getLogger(SHChannelPoolHandler.class).info("创建新channel,channel
// created " + ch.toString());
NioSocketChannel channel = (NioSocketChannel) ch;
// 客户端逻辑处理 ClientHandler这个也是咱们自己编写的,继承ChannelInboundHandlerAdapter,实现你自己的逻辑
channel.pipeline().addLast(new ClientHandler());
}
}
至此就完成啦
3.使用
// 从连接池拿到连接
Channel channel = this.channelPool.acquire().get();
// 写出数据
channel.write("xxxx");
// 连接放回连接池,这里一定记得放回去
this.channelPool.release(channel);
其中FixedChannelPool还有很多构造方法,包括获取连接的时候也有很多重载,详细的使用还是多看看官方文档吧
【连接池】
忘记自己借鉴了谁的代码,客户端连接池采用Netty的ChannelPoolMap接口,用网络连接地址做Key,用FixedChannelPool实例化value,即不同的连接服务地址对应不同的连接池。FixedChannelPool的理论连接数上限是Integer.MAX_VALUE,并且使用ChannelHealthChecker接口来判断channel被取出池的时候是否是活的,如果是活的才向应用层吐出去。这样一来保活问题就不用自己操心了。
构造方法需要传入一个实现了ChannelPoolHandler接口处理Handler,这个Handler需要实现三个方法
void channelReleased(Channel ch) throws Exception;
void channelAcquired(Channel ch) throws Exception;
void channelCreated(Channel ch) throws Exception;
在处理类中最重要的事情是给channel加载业务协议的编解码处理器
其中实现HandlerConfiguratorInterface接口(自定义的接口,只有一个方法public void configChannel(Channel channel);)的类,需要通过configChannel方法给Channel对象装配编解码器
对于Http的实现
这一步和常见的Netty处理器挂载方式是一致的。最后一个HttpResponseHandler是处理应答的Handler。
【关闭连接池】
客户端池还需要提供关闭的能力,否则程序无法正常退出
【发消息】
客户端池封装了异步和同步发消息的方法
异步方法
同步方法
【其他】
如果要发HTTP消息,需要自己封装Http消息体,否则Netty编码器会扔掉
【调用方式】
原文:https://www.cnblogs.com/heapStark/p/9270847.html