任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。在Java中,Runnable对象代表一个任务,Thread对象负责创建一个线程执行这个任务。
前提:1. 程序需要处理大量任务
2. 任务的执行时间相对创建线程和销毁的时间较短
方法1:
while (ture) {
Socket connection = socket.accept();
handleTask(connection); //单线程处理所用任务
方法2:
while (true) {
final Socket connection = socket.accept();
new Thread(() -> {handleTask(connection);}).start(); 每个任务一个线程处理
}
两种方法的缺点:1. 串行执行的问题在于响应慢,在多核处理器上吞吐量小。
2. 无限创建线程导致资源消耗大,可创建线程数有限制,恶意提交任务会导致程序负载过高。
Java的第3种解决方案是线程池,它是兼顾资源和并发的处理方案。
线程池的关键是任务的执行策略。
执行策略定义了:
~ 任务在哪个线程上运行
~ 任务按照什么顺序执行(FIFO, LIFO, 优先级)
~ 有多少个任务可以并发执行
~ 队列中有多少个任务等待执行
~ 系统由于过载而需要拒绝一个任务时,选择哪个任务,如何通过程序有任务被拒绝
~ 任务执行前后,应该进行哪些动作。
线程池通过将任务的提交和执行分离,可以根据硬件资源选择最佳的执行策略。
线程池的优势:1. 可以实现重现线程,减少了创建和销毁线程的开销。
2. 某些情况下,任务到达时,如果有空闲线程,可以立即执行任务,而不需要等待创建新线程,提高响应速度。
3. 线程池的大小可以调节,以便处理器保持忙碌状态,提高效率。
在java.util.concurrent包中,ThreadPoolExecutor是一个线程池实现。
图中的三个接口:
Executor:一个运行新任务的简单接口;
public interface Executor { /** * 在未来的某个时间执行任务command. 这个任务可能在一个新的线程中执行 , * 或者在线程池中的线程执行,或者一个正在运行的线程中执行。具体取决于Executor * 的实现 * @param 需要执行的任务 * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
ExecutorService:扩展Executor接口
public interface ExecutorService extends Executor { void shutdown(); // 停止接受任务。 List<Runnable> shutdownNow(); // 尝试停止所有活动的任务 boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。
ThreadPoolExceutor:继承AbstractExecutorService(JDK9)
1. 关键field
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 储存运行状态和线程数 private static final int COUNT_BITS = Integer.SIZE - 3; // Integer.SIZE = 32 COUNT_BITS = 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 0x0000 1111 1111 1111 1111 1111 1111 1110 // 运行状态储存在高位中
private static final int RUNNING = -1 << COUNT_BITS; // 0x1110 0000 0000 0000 0000 0000 0000 0000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0x0000 0000 0000 0000 0000 0000 0000 0000 private static final int STOP = 1 << COUNT_BITS; // 0x0010 0000 0000 0000 0000 0000 0000 0000 private static final int TIDYING = 2 << COUNT_BITS; // 0x0100 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS; // 0x0110 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 获取线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取运行状态和线程数
线程池的五种状态:
RUNNING: 运行状态,接受新提交的任务,并且可以处理队列中的任务。
SHUTDOWN: 关闭状态,不再接受新提交的任务,可以处理队列中的任务。在线程池处于
RUNNING状态时,调用shutdown()方法会使线程池进入到此状态。
STOP:停止状态,不再接受新提交的任务,也不处理队列中的任务,并且中断运行中的任务。
在RUNNING或SHUTDOWN状态时,调用shutdownNow()方法使线程池进入到该状态。
TIDYING: 所有任务都己终止,workerCount为0, 转换到TIDYING状态的线程池将运行terminate()方法。
TERMINATE: 终止状态,terminated()方法调用后进入此状态。
private final BlockingQueue<Runnable> workQueue; private final HashSet<Worker> workers = new HashSet<>(); private volatile ThreadFactory threadFactory;
workQueue用于保存任务
workers用于保存线程
threadFactory用于生产线程。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler
) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize: 线程池核心线程数量,作用:
* 当新任务在execute()提交时,根据corePoolSize去判断任务如何执行:
-> 如果线程池中线程小于corePoolSize,创建新的线程处理此任务,即使线程池中有空闲线程
-> 如果线程池中线程数大于等于corePoolSize且workQueue未满时,任务添加到workQueue中。
-> 如果线程池中线程数大于等于corePoolSize且workQueue己满时,创建新线程处理任务。
maximumPoolSize:最大线程数量
workQueue:未处理的任务队列
keepAliveTime:当线程池中的线程数量大于corePoolSize的时候,多余的线程不会立即销毁,而是会等待,
直到等待的时间超过了 keepAliveTime
threadFactory: 用于创建新线程
handler:当线程池的的线程数等于maximumPoolSize且workQueue己满时,使用handler处理新提交的线程
execute方法,实现执行任务的逻辑。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 分3步处理任务: * 1.线程数小于corePoolSize时,创建新线程执行此任务。 * addWorker 方法自动检查runState和workerCount * addWorker因为不能添加线程时,返回false * * 2. 如果可以添加到队列, 我们仍要再次检查线程池的状态
* (因为线程池中可能没有线程 或者在进入此方法时,线程池被关闭了。 * 如果线程池不处于RUNNING,清除刚才添加的任务
* 如果处于RUNNING且workerCount=0,创建新线程。
* * 3.如果不能将任务添加到队列, 就尝试创建一个新的线程。如果创建失败,拒绝任务
*/ int c = ctl.get(); // 获取线程池的运行状态和线程数 if (workerCountOf(c) < corePoolSize) { // 如果线程数小于corePoolSize时
// 第二个参数表示,限制添加线程的数量由谁决定
// true 由corePoolSize
// false 由maximumPoolSize
if (addWorker(command, true)) return; // 添加成功 c = ctl.get(); // 添加线程失败时,重新获取运行状态和线程数 }
// 线程池处于RUNNING状态且任务成功添加到workQueue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 重新获取运行状态和线程数,
// 重新判断运行状态,如果不是处于运行状态,移除上面调用workQueue.offer(command)时,添加的任务。
// 并且拒绝此任务
if (! isRunning(recheck) && remove(command)) reject(command);
// 如果处于运行状态,且线程数为0时 else if (workerCountOf(recheck) == 0) addWorker(null, false); // 在线程中添加线程去执行此任务。 }
// 如果线程池不是运行状态或者添加任务失败,且创建线程的失败时, else if (!addWorker(command, false)) reject(command); // 拒绝此任务 }
addWrok方法,实现增加线程的逻辑
检查是否可以根据当前线程池状态和给定边界(核心或最大线程数)添加新工作线程。 如果是,则相应地调整工作线程的计数,并且如果可能,创建并启动新工作程序,将firstTask作为其第一个任务运行。 如果线程池已停止或可以关闭,则此方法返回false。 如果线程工厂在询问时无法创建线程,它也会返回false。 如果线程创建失败,或者由于线程工厂返回null,或者由于异常(通常是Thread.start()中的OutOfMemoryError)会回滚
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //获取运行状态 // 运行状态为RUNNING时,可以执行任务,跳出if
// 处于SHUTDOWN状态,fisrtTask为null,workQueue不为空时,跳出if
if (rs >= SHUTDOWN && // 不处于RUNNING,继续判断,否则返回fasle ! (rs == SHUTDOWN && // 如果处于SHUTDOWN状态,继续判断,否则返回false firstTask == null && // fisrtTask为空,继续判断,否则返回false ! workQueue.isEmpty())) // workQueue为空时,返回false,否则跳出if return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || // 工作线程数大于CAPACITY时,返回false wc >= (core ? corePoolSize : maximumPoolSize)) // 小于CAPACITY,大于core或max时,返回false return false; if (compareAndIncrementWorkerCount(c)) // 如果添加线程成功,跳出第一个for循环 break retry; c = ctl.get(); // Re-read ctl 添加失败,重新读取状态 if (runStateOf(c) != rs) // 不是RUNNING状态, 重新跑到第一个for循环。 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 用firstTask创建一个新的Worker final Thread t = w.thread; // Worker利用ThreadFactory创建一个线程对象储存在其实例thread中 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 如果处于运行状态,或者 处于关闭状态但任务为null时 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 判断线程是否处于已经调用start(),如果是,抛出异常 throw new IllegalThreadStateException(); workers.add(w); // 将工作线程添加到Hashset中 int s = workers.size(); if (s > largestPoolSize) // 记录线程池中出现过最大的线程数 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 启动线程, t.start()调用的是Worker的run方法,见worker的构造器。 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
什么时候使用线程池
以服务器为例,请求数量很大时,如果每个请求使用一个线程来处理并且线程运行的时间很短,会导致线程频繁创建和销毁,这两个操作会使得服务器变得低效。另一方面,线程过多,内存资源也会被占用过多。
线程池是一种可以使线程处理完一个任务后,接着处理其他任务的技术。
线程池是怎么复用线程
如果将所有Runnable对象存放在一个Queue中,重写Thread的run方法为
public void run() { while(true) { Runnable task = Queue.get(); if(task != null){ task.run(); } } }
显然这个线程就可以重复执行任务
线程池如何复用一个线程-- ThreadPoolExecutor的实现(未完)
原文:https://www.cnblogs.com/yvkm/p/10621903.html