嗯,今天其实在看HtttpProcessor的实现,但是突然想到了以前在看poller的时候看到了有闭锁,用于控制当前connector的连接数量,嗯,那就顺便把这部分来看了。。。
在Tomcat中,通过继承AbstractQueuedSynchronizer来实现了自己的同步工具,进而来实现了一个用于控制连接数量的闭锁。。LimitLatch。。
这里就需对AbstractQueuedSynchronizer有一些初步的了解。。。
首先它concurrent类库中提供的一个用于构建自己的同步工具的一个工具类。。可以通过继承他来快速的完成一个同步类的实现
(1)acquireSharedInterruptibly()方法,用于以共享的方式来获取锁,如果暂时无法获取,将会将线程挂起到队列,进行阻塞,对于这个方法是否最终能获取锁,是通过tryAcquireShared()方法的返回来定义的,这个方法需要自己实现。。。如果能获取锁,那么返回1,否则返回-1.。。
(2)releaseShared()方法。以共享的方法释放一个锁,这样前面提到的挂起的线程将会唤醒,进而重新尝试获取锁。。。
好啦,接下来就来看看LimitLatch的定义吧,直接上代码好了,。,。代码还是很简单的。。
//其实是通过AbstractQueuedSynchronizer来构建的 public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); //构建Sync类型,实现基本的同步,以及阻塞。。 private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override //用于增加计数,如果计数增加之后小于最大的,那么返回1,不会阻塞,否则将会返回-1阻塞 protected int tryAcquireShared(int ignored) { //调用acquiredShared方法的时候会调用这个方法来返回状态,如果返回1,那么表示获取成功,返回-1表示获取失败,将会阻塞 long newCount = count.incrementAndGet(); //先增加计数 if (!released && newCount > limit) { //如果当前已经超过了最大的限制 // Limit exceeded count.decrementAndGet(); //减少计数 return -1; //返回-1,将阻塞当前线程 } else { return 1; } } @Override //用于减少计数 protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } } private final Sync sync; //同步对象 private final AtomicLong count; //计数器 private volatile long limit; //最大的数量 private volatile boolean released = false; //是否全部释放 /** * Instantiates a LimitLatch object with an initial limit. * @param limit - maximum number of concurrent acquisitions of this latch */ public LimitLatch(long limit) { this.limit = limit; //最大限制 this.count = new AtomicLong(0); this.sync = new Sync(); //sync 对象 } /** * Returns the current count for the latch * @return the current count for latch */ public long getCount() { return count.get(); } /** * Obtain the current limit. */ public long getLimit() { return limit; } /** * Sets a new limit. If the limit is decreased there may be a period where * more shares of the latch are acquired than the limit. In this case no * more shares of the latch will be issued until sufficient shares have been * returned to reduce the number of acquired shares of the latch to below * the new limit. If the limit is increased, threads currently in the queue * may not be issued one of the newly available shares until the next * request is made for a latch. * * @param limit The new limit */ public void setLimit(long limit) { this.limit = limit; } /** * Acquires a shared latch if one is available or waits for one if no shared * latch is current available. */ //增加计数,如果太大,那么等等待 public void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); } /** * Releases a shared latch, making it available for another thread to use. * @return the previous counter value */ //减少计数 public long countDown() { sync.releaseShared(0); //释放 long result = getCount(); if (log.isDebugEnabled()) { log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result); } return result; } /** * Releases all waiting threads and causes the {@link #limit} to be ignored * until {@link #reset()} is called. */ //通过将released设置为true,将会释放所有的线程,知道reset了 public boolean releaseAll() { released = true; return sync.releaseShared(0); } /** * Resets the latch and initializes the shared acquisition counter to zero. * @see #releaseAll() */ //重制 public void reset() { this.count.set(0); released = false; } /** * Returns <code>true</code> if there is at least one thread waiting to * acquire the shared lock, otherwise returns <code>false</code>. */ //当前是否有线程等待 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * Provide access to the list of threads waiting to acquire this limited * shared latch. */ //获取所有等待的线程 public Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } }
代码应该还是很简单的吧,而且注释也算是说的比较清楚。。。其实是构建了一个继承自AbstractQueuedSynchronizer的Sync对象,通过它来进行真正的同步功能。。。然后通过一个原子的整数计数器,和一个最大值,来判断当前是否可以获取锁
好啦,这里来看看Tomcat是如何通过LimitLatch来控制连接数量的吧,先来看看NioEndpoint的启动方法:
//启动当前的endpoint public void startInternal() throws Exception { if (!running) { running = true; //设置表示为,表示已经看是运行了 paused = false; //没有暂停 // Create worker collection if ( getExecutor() == null ) { //如果没有executor,那么创建 createExecutor(); //创建executor } initializeConnectionLatch(); //初始化闭锁,用于控制连接的数量 // Start poller threads pollers = new Poller[getPollerThreadCount()]; //根据设置的poller数量来创建poller对象的数组 for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); // 创建poller对象 Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); // 创建相应的poller线程 pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); //启动poller } startAcceptorThreads(); //启动acceptor } }
这里调用了initializeConnectionLatch方法来初始化闭锁,来看看吧:
//初始化闭锁,用于控制连接的数量 protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; //这个是无限的链接数量 if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); //根据最大的链接数量来创建 } return connectionLimitLatch; }
我们知道在Connector的配置中可以设置最大的链接数量,其实这里也就是通过这个数量来构建LimitLatch对象的。。。
嗯,Tomcat是从哪里获取连接呢,这个就要从Accecptor看了。。。
public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { //如果暂停了 state = AcceptorState.PAUSED; //更改当前acceptor的状态 try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { //如果没有运行,那么这里直接跳过 break; } state = AcceptorState.RUNNING; //设置当前acceptor的状态是running try { //if we have reached max connections, wait countUpOrAwaitConnection(); //增减闭锁的计数,如果connection数量已经达到了最大,那么暂停一下,这里用到的是connectionLimitLatch锁,可以理解为一个闭锁吧 SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket socket = serverSock.accept(); //调用serversocket的accept方法 } catch (IOException ioe) { //we didn‘t get a socket countDownConnection(); //出了异常,并没有获取链接,那么这里减少闭锁的计数 // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // setSocketOptions() will add channel to the poller // if successful if (running && !paused) { if (!setSocketOptions(socket)) { //这里主要是将socket加入到poller对象上面去,而且还要设置参数 countDownConnection(); //加入poller对象失败了的话,那么将闭锁的计数减低 closeSocket(socket); //关闭刚刚 创建的这个socket } } else { countDownConnection(); closeSocket(socket); } } catch (SocketTimeoutException sx) { // Ignore: Normal condition } catch (IOException x) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryError oom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwable oomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (Throwable letsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; //设置acceptor的状态为ended }
这里读一下Accecptor,的run方法可以知道,每次在调用serverSocketChannel的accept方法之前都会调用countUpOrAwaitConnection方法来增加闭锁的计数,如果有问题,那就会调用countDownConnection方法来降低闭锁的计数。。。
其实这里通过这两个方法就知道他们是干嘛的了,先来看看countUpOrAwaitConnection吧:
//这里用于增加闭锁的计数 protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) return; LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.countUpOrAwait(); //增加闭锁的counter }
没啥意思吧,就是调用刚刚创建的闭锁的countUpOrAwait方法,接下来来看看countDownConnection方法吧:
//用于减少闭锁的计数 protected long countDownConnection() { if (maxConnections==-1) return -1; LimitLatch latch = connectionLimitLatch; if (latch!=null) { long result = latch.countDown(); if (result<0) { getLog().warn("Incorrect connection count, multiple socket.close called on the same socket." ); } return result; } else return -1; }
这个也没啥意思吧。。。就是调用闭锁的countDown方法。。。
嗯,到这里整个Tomcat如何控制连接的数量就算是比较清楚了吧。。。
最后,我们知道是通过调用endpoint的cancelledKey方法来关闭一个连接的,来看看它的实现吧:
//取消一个注册 public void cancelledKey(SelectionKey key, SocketStatus status) { try { if ( key == null ) return;//nothing to do KeyAttachment ka = (KeyAttachment) key.attachment(); if (ka != null && ka.isComet() && status != null) { ka.setComet(false);//to avoid a loop if (status == SocketStatus.TIMEOUT ) { if (processSocket(ka.getChannel(), status, true)) { return; // don‘t close on comet timeout } } else { // Don‘t dispatch if the lines below are canceling the key processSocket(ka.getChannel(), status, false); } } key.attach(null); //将附件设置为null if (ka!=null) handler.release(ka); //可以取消这个attachment了 else handler.release((SocketChannel)key.channel()); if (key.isValid()) key.cancel(); //取消key if (key.channel().isOpen()) { //如果channel还是打开的,那么需要关闭channel try { key.channel().close(); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.channelCloseFail"), e); } } } try { if (ka!=null) { ka.getSocket().close(true); //关闭sockt } } catch (Exception e){ if (log.isDebugEnabled()) { log.debug(sm.getString( "endpoint.debug.socketCloseFail"), e); } } try { if (ka != null && ka.getSendfileData() != null && ka.getSendfileData().fchannel != null && ka.getSendfileData().fchannel.isOpen()) { ka.getSendfileData().fchannel.close(); } } catch (Exception ignore) { } if (ka!=null) { ka.reset(); countDownConnection(); //降低用于维护连接数量的闭锁 } } catch (Throwable e) { ExceptionUtils.handleThrowable(e); if (log.isDebugEnabled()) log.error("",e); } }
这里可以看到调用了countDownConnection方法来降低闭锁的计数。。
最后总结:Tomcat通过在acceptor中对闭锁的获取来控制总连接的数量,如果连接数量达到了最大的限制,那么将会被阻塞。。直到有连接关闭为止。。。这样acceptor的线程就又被唤醒了。。。
Tomcat源码阅读之闭锁的实现与连接数量的控制,布布扣,bubuko.com
原文:http://blog.csdn.net/fjslovejhl/article/details/22090885