看下ThreadPoolImpl的构造函数:
/** * This constructor is used to create an unbounded threadpool */ public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) { inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT; maxWorkerThreads = Integer.MAX_VALUE; workQueue = new WorkQueueImpl(this); threadGroup = tg ; name = threadpoolName; initializeMonitoring(); }线程池,一开始感觉以为会有一个链表什么的数据结构来充当这个“池”,但是在实现中怎么找都没有找到。然后跟踪了下源码,才发现原来线程池是通过ThreadGroup来管理。
/** * To be called from the workqueue to create worker threads when none * available. */ void createWorkerThread() { WorkerThread thread; synchronized (lock) { if (boundedThreadPool) { if (currentThreadCount < maxWorkerThreads) { thread = new WorkerThread(threadGroup, getName()); currentThreadCount++; } else { // REVIST - Need to create a thread to monitor the // the state for deadlock i.e. all threads waiting for // something which can be got from the item in the // workqueue, but there is no thread available to // process that work item - DEADLOCK !! return; } } else { thread = new WorkerThread(threadGroup, getName()); currentThreadCount++; } }
这里可以简单把线程池ThreadPool想象成ThreadGroup的工具类,就相当于ThreadLocal之于Thread一样。然后可以通过ThreadPool间接的操作ThreadGroup中的Thread对象。这里看到了,线程在被创建的时候会初始化到一个线程组中。但是在什么时候加入到线程组呢?看源码:
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); start0(); if (stopBeforeStart) { stop0(throwableFromStop); } }
我们知道线程池一般是用来保活一些线程来避免重复创建线程的开销,那线程池是怎么保活线程的呢?
看源码:
private class WorkerThread extends Thread { private Work currentWork; private int threadId = 0; // unique id for the thread // thread pool this WorkerThread belongs too private String threadPoolName; // name seen by Thread.getName() private StringBuffer workerThreadName = new StringBuffer(); WorkerThread(ThreadGroup tg, String threadPoolName) { super(tg, "Idle"); this.threadId = ThreadPoolImpl.getUniqueThreadId(); this.threadPoolName = threadPoolName; setName(composeWorkerThreadName(threadPoolName, "Idle")); } public void run() { while (true) { try { synchronized (lock) { availableWorkerThreads++; } // Get some work to do currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout); synchronized (lock) { availableWorkerThreads--; // It is possible in notifyForAvailableWork that the // check for availableWorkerThreads = 0 may return // false, because the availableWorkerThreads has not been // decremented to zero before the producer thread added // work to the queue. This may create a deadlock, if the // executing thread needs information which is in the work // item queued in the workqueue, but has no thread to work // on it since none was created because availableWorkerThreads = 0 // returned false. // The following code will ensure that a thread is always available // in those situations if ((availableWorkerThreads == 0) && (workQueue.workItemsInQueue() > 0)) { createWorkerThread(); } } // Set the thread name for debugging. setName(composeWorkerThreadName(threadPoolName, Integer.toString(this.threadId))); long start = System.currentTimeMillis(); try { // Do the work currentWork.doWork(); } catch (Throwable t) { // Ignore all errors. ; } long end = System.currentTimeMillis(); synchronized (lock) { totalTimeTaken += (end - start); processedCount++; } // set currentWork to null so that the work item can be // garbage collected currentWork = null; setName(composeWorkerThreadName(threadPoolName, "Idle")); } catch (TimeoutException e) { // This thread timed out waiting for something to do. synchronized (lock) { availableWorkerThreads--; // This should for both bounded and unbounded case if (currentThreadCount > minWorkerThreads) { currentThreadCount--; // This thread can exit. return; } else { // Go back to waiting on workQueue continue; } } } catch (InterruptedException ie) { // InterruptedExceptions are // caught here. Thus, threads can be forced out of // requestWork and so they have to reacquire the lock. // Other options include ignoring or // letting this thread die. // Ignoring for now. REVISIT synchronized (lock) { availableWorkerThreads--; } } catch (Throwable e) { // Ignore any exceptions that currentWork.process // accidently lets through, but let Errors pass. // Add debugging output? REVISIT synchronized (lock) { availableWorkerThreads--; } } } } private String composeWorkerThreadName(String poolName, String workerName) { workerThreadName.setLength(0); workerThreadName.append("p: ").append(poolName); workerThreadName.append("; w: ").append(workerName); return workerThreadName.toString(); } } // End of WorkerThread class
为了“稍微节省CPU”,第一这里把建立的所有工作线程都设置成了Daemon(守候)进程了,官方解释:
// The thread must be set to a daemon thread so the // VM can exit if the only threads left are PooledThreads // or other daemons. We don't want to rely on the // calling thread always being a daemon.第二就是线程池中维护了一个任务列表:
private static int threadCounter = 0; // serial counter useful for debugging private WorkQueue workQueue; // Stores the number of available worker threads private int availableWorkerThreads = 0;
public void run() { while (true) { try { synchronized (lock) { availableWorkerThreads++; } // Get some work to do currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout); synchronized (lock) { availableWorkerThreads--; // It is possible in notifyForAvailableWork that the // check for availableWorkerThreads = 0 may return // false, because the availableWorkerThreads has not been // decremented to zero before the producer thread added // work to the queue. This may create a deadlock, if the // executing thread needs information which is in the work // item queued in the workqueue, but has no thread to work // on it since none was created because availableWorkerThreads = 0 // returned false. // The following code will ensure that a thread is always available // in those situations if ((availableWorkerThreads == 0) && (workQueue.workItemsInQueue() > 0)) { createWorkerThread(); } }
Work requestWork(long waitTime) throws TimeoutException, InterruptedException { Work workItem; synchronized (this) { if (theWorkQueue.size() != 0) { workItem = (Work)theWorkQueue.removeFirst(); totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); workItemsDequeued++; return workItem; } try { long remainingWaitTime = waitTime; long finishTime = System.currentTimeMillis() + waitTime; do { this.wait(remainingWaitTime); if (theWorkQueue.size() != 0) { workItem = (Work)theWorkQueue.removeFirst(); totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); workItemsDequeued++; return workItem; } remainingWaitTime = finishTime - System.currentTimeMillis(); } while (remainingWaitTime > 0); throw new TimeoutException(); } catch (InterruptedException ie) { throw ie; } } }
public class WorkQueueImpl implements WorkQueue的对象;
public void addWork(Work work) { synchronized (this) { workItemsAdded++; work.setEnqueueTime(System.currentTimeMillis()); theWorkQueue.addLast(work); ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this); } }这里看到是调用了ThreadPool中的notifyForAvailableWork方法,然后来看下这个方法的实现:
/** * To be called from the workqueue when work is added to the * workQueue. This method would create new threads if required * or notify waiting threads on the queue for available work */ void notifyForAvailableWork(WorkQueue aWorkQueue) { synchronized (lock) { if (availableWorkerThreads == 0) { createWorkerThread(); } else { aWorkQueue.notify(); } } }
到这里线程池的大体实现与过程便很清晰了。
原文:http://blog.csdn.net/aigoogle/article/details/27342817