首页 > 其他 > 详细

ThreadPool原理分析

时间:2014-06-05 07:37:28      阅读:556      评论:0      收藏:0      [点我收藏+]

看下ThreadPoolImpl的构造函数:

/**
     * This constructor is used to create an unbounded threadpool
     */
    public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
        inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
        maxWorkerThreads = Integer.MAX_VALUE;
        workQueue = new WorkQueueImpl(this);
	threadGroup = tg ;
	name = threadpoolName;
	initializeMonitoring();
    }
线程池,一开始感觉以为会有一个链表什么的数据结构来充当这个“池”,但是在实现中怎么找都没有找到。然后跟踪了下源码,才发现原来线程池是通过ThreadGroup来管理。

/**
     * To be called from the workqueue to create worker threads when none
     * available.
     */
    void createWorkerThread() {
        WorkerThread thread;
        
	synchronized (lock) {
	    if (boundedThreadPool) {
		if (currentThreadCount < maxWorkerThreads) {
		    thread = new WorkerThread(threadGroup, getName());
		    currentThreadCount++;
		} else {
		    // REVIST - Need to create a thread to monitor the
		    // the state for deadlock i.e. all threads waiting for
		    // something which can be got from the item in the 
		    // workqueue, but there is no thread available to
		    // process that work item - DEADLOCK !!
		    return;
		}
	    } else {
		thread = new WorkerThread(threadGroup, getName());
		currentThreadCount++;
	    }
	}

通过线程池创建新的工作线程的时候会把线程池维护的一个线程组初始化到新建线程中。
然后新的问题就来了,如果想要获得当前线程池中的所有对象怎么办呢?

这里可以简单把线程池ThreadPool想象成ThreadGroup的工具类,就相当于ThreadLocal之于Thread一样。然后可以通过ThreadPool间接的操作ThreadGroup中的Thread对象。这里看到了,线程在被创建的时候会初始化到一个线程组中。但是在什么时候加入到线程组呢?看源码:

public synchronized void start() {
        /**
	 * This method is not invoked for the main method thread or "system"
	 * group threads created/set up by the VM. Any new functionality added 
	 * to this method in the future may have to also be added to the VM.
	 *
	 * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
        group.add(this);
        start0();
        if (stopBeforeStart) {
	    stop0(throwableFromStop);
	}
    }

这里看到了,线程在运行的时候会把自己加入到线程组中。这样就明白线程池管理"池"中对象的基本手段了。

我们知道线程池一般是用来保活一些线程来避免重复创建线程的开销,那线程池是怎么保活线程的呢?

看源码:

private class WorkerThread extends Thread
    {
        private Work currentWork;
        private int threadId = 0; // unique id for the thread
        // thread pool this WorkerThread belongs too
        private String threadPoolName;
	// name seen by Thread.getName()
	private StringBuffer workerThreadName = new StringBuffer();

        WorkerThread(ThreadGroup tg, String threadPoolName) {
	    super(tg, "Idle");
	    this.threadId = ThreadPoolImpl.getUniqueThreadId();
            this.threadPoolName = threadPoolName;
	    setName(composeWorkerThreadName(threadPoolName, "Idle"));
        }
        
        public void run() {
            while (true) {
                try {

		    synchronized (lock) {
			availableWorkerThreads++;
		    }
                    
                    // Get some work to do
                    currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);

		    synchronized (lock) {
			availableWorkerThreads--;
			// It is possible in notifyForAvailableWork that the
			// check for availableWorkerThreads = 0 may return
			// false, because the availableWorkerThreads has not been
			// decremented to zero before the producer thread added 
			// work to the queue. This may create a deadlock, if the
			// executing thread needs information which is in the work
			// item queued in the workqueue, but has no thread to work
			// on it since none was created because availableWorkerThreads = 0
			// returned false.
			// The following code will ensure that a thread is always available
			// in those situations
			if  ((availableWorkerThreads == 0) && 
				(workQueue.workItemsInQueue() > 0)) {
			    createWorkerThread();
			}
		    }

                    // Set the thread name for debugging.
	            setName(composeWorkerThreadName(threadPoolName,
				      Integer.toString(this.threadId)));

                    long start = System.currentTimeMillis();
                    
		    try {
			// Do the work
			currentWork.doWork();
		    } catch (Throwable t) {
			// Ignore all errors.
			;
		    }
                    
                    long end = System.currentTimeMillis();
                    

		    synchronized (lock) {
			totalTimeTaken += (end - start);
			processedCount++;
		    }

		    // set currentWork to null so that the work item can be 
		    // garbage collected
		    currentWork = null;

	            setName(composeWorkerThreadName(threadPoolName, "Idle"));

                } catch (TimeoutException e) {
                    // This thread timed out waiting for something to do.

		    synchronized (lock) {
			availableWorkerThreads--;

			// This should for both bounded and unbounded case
			if (currentThreadCount > minWorkerThreads) {
			    currentThreadCount--;
			    // This thread can exit.
			    return;
			} else {
			    // Go back to waiting on workQueue
			    continue;
			}
		    }
                } catch (InterruptedException ie) {
                    // InterruptedExceptions are
                    // caught here.  Thus, threads can be forced out of
                    // requestWork and so they have to reacquire the lock.
                    // Other options include ignoring or
                    // letting this thread die.
                    // Ignoring for now. REVISIT
		    synchronized (lock) {
			availableWorkerThreads--;
		    }

                } catch (Throwable e) {

                    // Ignore any exceptions that currentWork.process
                    // accidently lets through, but let Errors pass.
                    // Add debugging output?  REVISIT
		    synchronized (lock) {
			availableWorkerThreads--;
		    }

                }
            }
        }

	private String composeWorkerThreadName(String poolName, String workerName) {
            workerThreadName.setLength(0);
	    workerThreadName.append("p: ").append(poolName);
	    workerThreadName.append("; w: ").append(workerName);
	    return workerThreadName.toString();
	}
    } // End of WorkerThread class

通过源码可以发现是通过在run中加了个while语句,这样线程就会不停的运行。但是到这里我就会想到,如果一直在while循环的话是不是有点浪费资源?

为了“稍微节省CPU”,第一这里把建立的所有工作线程都设置成了Daemon(守候)进程了,官方解释:

// The thread must be set to a daemon thread so the
// VM can exit if the only threads left are PooledThreads
// or other daemons.  We don't want to rely on the
// calling thread always being a daemon.
第二就是线程池中维护了一个任务列表:

private static int threadCounter = 0; // serial counter useful for debugging

    private WorkQueue workQueue;
    
    // Stores the number of available worker threads
    private int availableWorkerThreads = 0;

当工作线程创建的时候如果没有需要执行的任务那他就会阻塞在任务队列上:

 public void run() {
            while (true) {
                try {

		    synchronized (lock) {
			availableWorkerThreads++;
		    }
                    
                    // Get some work to do
                    currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);

		    synchronized (lock) {
			availableWorkerThreads--;
			// It is possible in notifyForAvailableWork that the
			// check for availableWorkerThreads = 0 may return
			// false, because the availableWorkerThreads has not been
			// decremented to zero before the producer thread added 
			// work to the queue. This may create a deadlock, if the
			// executing thread needs information which is in the work
			// item queued in the workqueue, but has no thread to work
			// on it since none was created because availableWorkerThreads = 0
			// returned false.
			// The following code will ensure that a thread is always available
			// in those situations
			if  ((availableWorkerThreads == 0) && 
				(workQueue.workItemsInQueue() > 0)) {
			    createWorkerThread();
			}
		    }

这里的requestWork就是获取queue中的可执行任务,然后再来看这个方法的具体实现:

Work requestWork(long waitTime) 
        throws TimeoutException, InterruptedException
    {
        Work workItem;
        synchronized (this) {
            if (theWorkQueue.size() != 0) {
                workItem = (Work)theWorkQueue.removeFirst();
                totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
                workItemsDequeued++;
                return workItem;
            }

            try {

                long remainingWaitTime = waitTime;
                long finishTime = System.currentTimeMillis() + waitTime;

                do {

                    this.wait(remainingWaitTime);

                    if (theWorkQueue.size() != 0) {
                        workItem = (Work)theWorkQueue.removeFirst();
                        totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
                        workItemsDequeued++;
                        return workItem;
                    }

                    remainingWaitTime = finishTime - System.currentTimeMillis();

                } while (remainingWaitTime > 0);

                throw new TimeoutException();

            } catch (InterruptedException ie) {
                throw ie;
            }
        }
    }

从上面的源代码可以知道,如果没有任务的时候(theWorkQueue.size() == 0),便会阻塞在队列上。this.wait中的this代表的是:
public class WorkQueueImpl implements WorkQueue的对象;

当queue中添加任务的时候,便会通知等待的工作线程:

public void addWork(Work work) {
        synchronized (this) {
            workItemsAdded++;
            work.setEnqueueTime(System.currentTimeMillis());
            theWorkQueue.addLast(work);
	    ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this);
        }
    }
这里看到是调用了ThreadPool中的notifyForAvailableWork方法,然后来看下这个方法的实现:

/**
     * To be called from the workqueue when work is added to the
     * workQueue. This method would create new threads if required
     * or notify waiting threads on the queue for available work
     */
    void notifyForAvailableWork(WorkQueue aWorkQueue) {
	synchronized (lock) {
	    if (availableWorkerThreads == 0) {
		createWorkerThread();
	    } else {
		aWorkQueue.notify();
	    }
	}
    }


aWorkQueue.notify();是唤醒等待线程中的一个线程来执行这个任务。

到这里线程池的大体实现与过程便很清晰了。

ThreadPool原理分析,布布扣,bubuko.com

ThreadPool原理分析

原文:http://blog.csdn.net/aigoogle/article/details/27342817

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!