首页 > Web开发 > 详细

netty io.netty.channel 简介1

时间:2015-08-12 21:53:42      阅读:1886      评论:0      收藏:0      [点我收藏+]

Interface AddressedEnvelope<M,A extends SocketAddress>

    此接口将一个消息、发送地址和接收地址封装到了一起

Interface Channel

    此接口表示到网络socket或者组件(component)的一个连接,其提供了IO操作的一些功能,比如read, write, connect, and bind.一个channel可以给用户提供如下功能:1.当前channel的状态(open、connected等)。2.channel的配置参数(如receive buffer size)。3.该channel支持的所有IO操作(read, write, connect, and bind)。4.还可以提供与此channel关联的ChannelPipeline,此pipeline主要负责处理该channel的所有IO事件和请求。

  所有的IO操作都是异步的。在Netty中所有的IO操作都是异步的。这就意味着任何IO操作都调用之后都会立即返回,不能保证IO操作在调用结束的时候完成。调用IO操作之后会返回一个ChannelFuture对象,该对象会在IO操作成功失败取消的时候进行notify.

  通道是有层次关系的。根据channel的创建方式,channel可以有一个parent,例如,一个由ServerSocketChannel接受请求创建的SocketChannel,调用其parent()方法,会返回ServerSocketChannel。层次结构的语义取决于channel依赖的传输具体实现,例如,你可以创建一个channel实现类,其可以创建一个和其共享一个socket链接的子channel,就像BEEP和SSH

  向下转型访问特殊操作。有些传输实现会暴露一些该实现特有的操作,通过向下转型可以调用这些操作,例如,对于旧的数据报传输,我们可以讲channel转换成DatagramChannel,然后就可以调用其特有的multicast join / leave等操作。

  释放资源。对一个channel操作完毕之后,一定要调用close()close(ChannelPromise) 方法来释放资源。

Interface ChannelConfig

    此类封装了channel配置属性信息

    如果需要特殊的配置信息,需要做向下转换,具体代码如下:

Channel ch = ...; 
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setTcpNoDelay(false);


    选项map(Option map)。是一个动态只写的属性,其提供了另外一种方式来设置属性,而不需要向下进行转换。通过setOptions(Map).方法可以更新option map。比如上面的代码,我们可以不用将ch转换为具体的SocketChannelConfig 具体代码如下:

Channel ch = ...; 
cfg.setsetOption(ChannelOption.TCP_NODELAY,false);

Interface ChannelFactory<T extends Channel>

    创建channel的工厂

Interface ChannelFuture

    其封装了异步IO操作的结果

    Netty中所有的IO操作都是异步的。这就意味着任何IO调用都会立即返回,而且不保证IO操作在调用结束的时候完成,调用IO操作会返回一个ChannelFurniture对象,通过这个对象你可以得到IO操作的状态信息和结果。channelfuture对象要么是未完成状态(uncompleted),要么是完成状态(completed)。当一个IO操作开始的时候会创建一个channelfuture对象,初始的channelfuture对象是未完成状态,它既不是成功(succeeded),也不是失败(failed),更没有取消(cancelled),因为IO操作还没有完成(finished)。如果IO操作完成了,有可能成功(succeeded),失败(failed),或者是取消(cancelled),channelfuture对象会被标记为完成状态(completed,并会附有相信的信息,比如失败的原因,需要注意的是失败(failed)和取消(cancelled)都属于完成状态。未完成和完成 与成功、失败、取消是两个不同的维度。下面图表示channelfuture的状态,左边是初始未完成状态,右边是完成状态,可能有三种成功失败和取消:

                                      +---------------------------+
                                      | Completed successfully    |
                                      +---------------------------+
                                 +---->      isDone() = true      |
 +--------------------------+    |    |   isSuccess() = true      |
 |        Uncompleted       |    |    +===========================+
 +--------------------------+    |    | Completed with failure    |
 |      isDone() = false    |    |    +---------------------------+
 |   isSuccess() = false    |----+---->   isDone() = true         |
 | isCancelled() = false    |    |    |    cause() = non-null     |
 |       cause() = null     |    |    +===========================+
 +--------------------------+    |    | Completed by cancellation |
                                 |    +---------------------------+
                                 +---->      isDone() = true      |
                                      | isCancelled() = true      |
                                      +---------------------------+

    此接口提供了很多方法来帮助你检查IO操作状态比如是否已经完成或者获取IO操作的结果,你可以添加ChannelFutureListener来监听channelfuture对象,这样当IO操作完成的时候,你会被通知到。

  推荐使用addListener,不建议使用await。addListener方法实在channelfuture上监听事件,是非阻塞的方法,当IO调用结束的时候,你会收到通知,在这之前你可以做别的事情,可以提升效率。而await方法是阻塞的。一旦调用之后,当前线程会阻塞,直到IO操作完成,而且会增加死锁的风险。

  不要在ChannelHandler中调用channelfuture的await方法。ChannelHandler中的时间处理方法是由IO线程调用的,一旦await方法被IO线程调用,IO操作将会等待永远不会完成,因为await方法阻塞了他等待的IO操作,就造成了死锁,代码如下:

// BAD - NEVER DO THIS @Override//永远不要这样用
 public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {     ChannelFuture future = ctx.channel().close();
     future.awaitUninterruptibly();
     // Perform post-closure operation
     // ...
 }

 // GOOD @Override//正确的做法
 public void channelRead(ChannelHandlerContext ctx,  GoodByeMessage msg) {     ChannelFuture future = ctx.channel().close();
     future.addListener(new ChannelFutureListener() {
         public void operationComplete(ChannelFuture future) {
             // Perform post-closure operation
             // ...
         }
     });
 }

    尽管await方法有上述缺点,但是调用await显然更简便,如果一定要调用await方法,请记住不要再IO线程里调用channelfuture的await方法,否则系统为了防止死锁,会抛出BlockingOperationException

不要混淆IO超(IO timeout)和await超时(await timeout)。调用方法Future.await(long),Future.await(long, TimeUnit), Future.awaitUninterruptibly(long), 或者Future.awaitUninterruptibly(long, TimeUnit)的超时与IO超时没有任何关系。如果IO超时channelfuture对象会被标记为带失败的完成状态(completed with failure),IO超时的参数可以通过option设置,代码如下:

// BAD - NEVER DO THIS 永远不要这样做
 Bootstrap b = ...; 
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly(10, TimeUnit.SECONDS);//IO超时应该设置到channelconfig,而不是channelfuture
 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     // You might get a NullPointerException here because the future
     // might not be completed yet.
     f.cause().printStackTrace();
 } else {
     // Connection established successfully
 }

 // GOOD 正确的做法
 Bootstrap b = ...;
 // Configure the connect timeout option. 
 b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);//这里最终会通过channelconfig来配置
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly();

 // Now we are sure the future is completed.
 assert f.isDone();

 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     f.cause().printStackTrace();
 } else {
     // Connection established successfully
 }

Interface ChannelFutureListener

    用来监听channelfuture的结果,调用ChannelFuture.addListener(GenericFutureListener)方法之后,异步IO的操作完成之后会通知channelfuturelistener

    GenericFutureListener.operationComplete(Future)是直接被IO线程调用的,因此如果在该方法中调用耗时任务或者是阻塞的操作会导致意外停顿。如果你确实需要执行一个耗时操作或耗时操作,请用线程池另起一个线程来执行耗时操作。

Interface ChannelHandler

    此接口负责处理一个IO事件,或者拦截一个IO操作。并将事件或操作转发给ChannelPipeline中的下一个channelhandler对象。

  建议继承ChannelHandlerAdapter代替实现channelHandler接口。因为channelhandler接口有很多方法需要实现,而ChannelhandlerAdaptor默认实现了一些方法,大部分情况下你只需要实现一些必要的方法就可以了。

  上下文对象(The context object)。channelhandler需要ChannelHandlerContext对象。channelhandler对象通过channelhandlercontext对象与channelhandler的所属的channelpipeline交互。通过context对象,channelhandler可以将事件转发给他的上游和下游,或者动态修改pipeline,对于特殊的handler可以存储信息(通过AttributeKeys)。

  状态管理。channelhandler经常需要存储一些状态信息,最简单的推荐的方法是使用成员变量,代码如下:

 public interface Message {
     // your methods here
 }

 public class DataServerHandler extends SimpleChannelInboundHandler<Message> {     
     private boolean loggedIn;

     @Override
     protected void messageReceived(ChannelHandlerContext ctx, Message message) {         
         Channel ch = e.getChannel();
         if (message instanceof LoginMessage) {
             authenticate((LoginMessage) message);             
             loggedIn = true;
         } else (message instanceof GetDataMessage) {
             if (loggedIn) {
                 ch.write(fetchSecret((GetDataMessage) message));
             } else {
                 fail();
             }
         }
     }
     ...
 }

 上面代码中因为channelhandler实例中有一个变量来专门表示一个链接的状态,即一个连接有一个状态,所以你必须为每一个新channel创建一个新的channelhandler实例,避免竞争条件下一个未经授权的客户端获取重要信息。正确代码如下:

 // Create a new handler instance per channel.
 // See ChannelInitializer.initChannel(Channel).
 public class DataServerInitializer extends ChannelInitializer<Channel> {    
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("handler", new DataServerHandler());
     }
 }

  Using AttributeKey虽然建议使用成员变量来存储channelhandler的状态,但是为了考虑安全问题需要为每个channel创建一个channelhandler实例,有些情况下你可能不想创建那么多实例,在这种情况下,你需要用到AttributeKeys,他可以附着到(attached)channelhandlercontext上,代码如下:

 public interface Message {
     // your methods here
 } 
 @Sharable //这个注解很重要后面会介绍
 public class DataServerHandler extends SimpleChannelInboundHandler<Message> {
     private final AttributeKey<Boolean> auth =           
         AttributeKey.valueOf("auth");     
     @Override
     protected void messageReceived(ChannelHandlerContext ctx, Message message) {         
         Attribute<Boolean> attr = ctx.attr(auth);         
         Channel ch = ctx.channel();

         if (message instanceof LoginMessage) {
             authenticate((LoginMessage) o);             
             attr.set(true);
         } else (message instanceof GetDataMessage) {
             if (Boolean.TRUE.equals(attr.get())) {
                 ch.write(fetchSecret((GetDataMessage) o));
             } else {
                 fail();
             }
         }
     }
     ...
 }

    通过上面的代码可以将channelhandler的状态attach到channelhandlercontext上,你可以将这个channelhandler实例添加到不同的pipeline,代码如下:

 public class DataServerInitializer extends ChannelInitializer<Channel> {

     private static final DataServerHandler SHARED = new DataServerHandler();     
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("handler", SHARED);
     }
 }


  @Sharable注解 上面的用attributekey实例代码中用到了@Sharable注解,如果channelhandler加上了@sharable注解,意味着你可以只创建一个实例,然后你可以将该实例放到任意不同的pipeline中,而不必考虑竞争条件。如果不加这个注解,你每次向pipeline中添加channelhandler,都需要创建一个新的实例,否则会有并发问题。


netty io.netty.channel 简介1

原文:http://my.oschina.net/u/2376392/blog/491554

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!