首页 > 其他 > 详细

ZooKeeper源码分析(二)

时间:2015-06-28 02:08:23      阅读:394      评论:0      收藏:0      [点我收藏+]

?

??上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码

?

?

/**
   这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器
 *
 */
public class ClientCnxn {
    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);

    private static final String ZK_SASL_CLIENT_USERNAME =
        "zookeeper.sasl.client.username";

    /** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为
	 */
    private static boolean disableAutoWatchReset;
    static {
        disableAutoWatchReset =
            Boolean.getBoolean("zookeeper.disableAutoWatchReset");
        if (LOG.isDebugEnabled()) {
            LOG.debug("zookeeper.disableAutoWatchReset is "
                    + disableAutoWatchReset);
        }
    }

    static class AuthData {
        AuthData(String scheme, byte data[]) {
            this.scheme = scheme;
            this.data = data;
        }

        String scheme;

        byte data[];
    }
	
    private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();

    /**
	 *哪些已经发送出去的目前正在等待响应的包
     */
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

    /**
     * 那些需要发送的包
     */
    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

	// 超时时间
    private int connectTimeout;

    /**
	 *客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求
     */
    private volatile int negotiatedSessionTimeout;
	
	// 读取超时时间
    private int readTimeout;

	// 会话超时时间
    private final int sessionTimeout;

	// ZooKeeper
    private final ZooKeeper zooKeeper;

	//客户端监视器管理器
    private final ClientWatchManager watcher;

	//会话ID
    private long sessionId;

	//会话密钥
    private byte sessionPasswd[] = new byte[16];

    // 是否只读
    private boolean readOnly;

    final String chrootPath;
   // 发送线程
    final SendThread sendThread;
	// 事件回调线程
    final EventThread eventThread;

    /**
     * Set to true when close is called. Latches the connection such that we
     * don‘t attempt to re-connect to the server if in the middle of closing the
     * connection (client sends session disconnect to server as part of close
     * operation)
     */
    private volatile boolean closing = false;
    
    /**
	  一组客户端可以连接的Zk主机
     */
    private final HostProvider hostProvider;

    /**
	 * 第一次和读写服务器建立连接时设置为true,之后不再改变。
	   这个值用来处理客户端没有sessionId连接只读模式服务器的场景.
	   客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以
	   当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话
	   如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的 
     */
    volatile boolean seenRwServerBefore = false;


    public ZooKeeperSaslClient zooKeeperSaslClient;

    public long getSessionId() {
        return sessionId;
    }

    public byte[] getSessionPasswd() {
        return sessionPasswd;
    }

    public int getSessionTimeout() {
        return negotiatedSessionTimeout;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();

        SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
        SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
        sb
            .append("sessionid:0x").append(Long.toHexString(getSessionId()))
            .append(" local:").append(local)
            .append(" remoteserver:").append(remote)
            .append(" lastZxid:").append(lastZxid)
            .append(" xid:").append(xid)
            .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
            .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
            .append(" queuedpkts:").append(outgoingQueue.size())
            .append(" pendingresp:").append(pendingQueue.size())
            .append(" queuedevents:").append(eventThread.waitingEvents.size());

        return sb.toString();
    }

   

    /**
     * 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用
     * 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器
     */
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
            throws IOException {
        this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
             clientCnxnSocket, 0, new byte[16], canBeReadOnly);
    }

    
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        //客户端实例
        this.zooKeeper = zooKeeper;
        // 客户端Watcher管理器
        this.watcher = watcher;
        //sessionId
        this.sessionId = sessionId;
        //会话密钥
        this.sessionPasswd = sessionPasswd;
        //会话超时时间
        this.sessionTimeout = sessionTimeout;
        //服务器地址列表管理器
        this.hostProvider = hostProvider;
         //根路径
        this.chrootPath = chrootPath;
       // 连接超时时间是会话超时时间和服务器数量的比值
        connectTimeout = sessionTimeout / hostProvider.size();
       // 读超时时间是会话超时时间的2/3
        readTimeout = sessionTimeout * 2 / 3;
		
        readOnly = canBeReadOnly;
        //创建发送和事件处理线程
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

   
    public static boolean getDisableAutoResetWatch() {
        return disableAutoWatchReset;
    }
   
    public static void setDisableAutoResetWatch(boolean b) {
        disableAutoWatchReset = b;
    }
	//启动发送和事件处理线程
    public void start() {
        sendThread.start();
        eventThread.start();
    }

?

?

?

// 事件处理线程
class EventThread extends Thread {
	    //等待处理的事件
        private final LinkedBlockingQueue<Object> waitingEvents =
            new LinkedBlockingQueue<Object>();

		 /**
		  *这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。
		  **/
        private volatile KeeperState sessionState = KeeperState.Disconnected;

       private volatile boolean wasKilled = false;
       private volatile boolean isRunning = false;

        EventThread() {
		   // 构造一个线程名
            super(makeThreadName("-EventThread"));
            setUncaughtExceptionHandler(uncaughtExceptionHandler);
			// 设置为守护线程
            setDaemon(true);
        }
        // 
        public void queueEvent(WatchedEvent event) {
			// 如果WatchedEvent的类型是None状态是sessionStat的值则不处理
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
			// 获取事件的状态
            sessionState = event.getState();

            // 构建一个基于事件的监视器
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // 排队pair,稍后处理
            waitingEvents.add(pair);
        }
		
		// 排队Packet
       public void queuePacket(Packet packet) {
          if (wasKilled) {
             synchronized (waitingEvents) {
                if (isRunning) waitingEvents.add(packet);
                else processEvent(packet);
             }
          } else {
             waitingEvents.add(packet);
          }
       }

        public void queueEventOfDeath() {
            waitingEvents.add(eventOfDeath);
        }

        @Override
        public void run() {
           try {
              isRunning = true;
              while (true) {
			     //从等待处理的事件队列中获取事件
                 Object event = waitingEvents.take();
				 
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down");
        }

		// 真正处理事件的入口,主要是回调处理
       private void processEvent(Object event) {
          try {
			// 如果事件是WatcherSetEventPair
              if (event instanceof WatcherSetEventPair) {
                  //每个监视器都会处理这个事件
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } else {
                  Packet p = (Packet) event;
                  int rc = 0;
				  // 获取客户端路径
                  String clientPath = p.clientPath;
                  if (p.replyHeader.getErr() != 0) {
                      rc = p.replyHeader.getErr();
                  }
                  if (p.cb == null) {
                      LOG.warn("Somehow a null cb got to EventThread!");
                  } else if (p.response instanceof ExistsResponse
                          || p.response instanceof SetDataResponse
                          || p.response instanceof SetACLResponse) {
						 // 获取回调对象
                      StatCallback cb = (StatCallback) p.cb;
					  // 如果处理成功回调方法会传入响应状态,否则响应状态为null
                      if (rc == 0) {
                          if (p.response instanceof ExistsResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetDataResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetDataResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetACLResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetACLResponse) p.response)
                                              .getStat());
                          }
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetDataResponse) {
                      DataCallback cb = (DataCallback) p.cb;
                      GetDataResponse rsp = (GetDataResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getData(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetACLResponse) {
                      ACLCallback cb = (ACLCallback) p.cb;
                      GetACLResponse rsp = (GetACLResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getAcl(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetChildrenResponse) {
                      ChildrenCallback cb = (ChildrenCallback) p.cb;
                      GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetChildren2Response) {
                      Children2Callback cb = (Children2Callback) p.cb;
                      GetChildren2Response rsp = (GetChildren2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }
                  } else if (p.response instanceof CreateResponse) {
                      StringCallback cb = (StringCallback) p.cb;
                      CreateResponse rsp = (CreateResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())));
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.cb instanceof VoidCallback) {
                      VoidCallback cb = (VoidCallback) p.cb;
                      cb.processResult(rc, clientPath, p.ctx);
                  }
              }
          } catch (Throwable t) {
              LOG.error("Caught unexpected throwable", t);
          }
       }
    }

?

ZooKeeper源码分析(二)

原文:http://zhangwei-david.iteye.com/blog/2222476

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!