package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<String> submit = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
String result = submit.get();
System.out.println(result);
}
提交一个Callable对象或者Runnable对象到ExecutorService里,能够返回一个Future对象,并且可以通过Future对象获取Callable对象或者Runnable对象 的运行结果
接口关系
public interface RunnableFuture<V> extends Runnable, Future<V>
public class FutureTask<V> implements RunnableFuture<V>
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
总结:维护一个阻塞队列,当一批任务执行时,每个任务执行完成后都会把Future对象放进队列,因此队列take到的都是执行完成的任务。该线程池的优点是不用自己阻塞每个Future,而是提交一批任务后,优先处理先完成的任务就好了。依赖这个特性实现invokeAny语义,只需要调用take方法,有返回就结束执行即可。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
如果当前线程数量小于corePoolSize,直接创建Worker工作线程,执行任务
如果当前线程数量大于corePoolSize,任务进阻塞队列
如果阻塞队列满,线程数量小于maximumPoolSize,创建Worker执行任务
如果线程数达到maximumPoolSize,执行拒绝策略
Worker工作线程启动后,从阻塞队列阻塞获取任务。
如果线程数大于corePoolSize,那么阻塞获取任务时会调用poll(keepAliveTime, TimeUnit.NANOSECONDS);如果阻塞超时没获取到任务,此时线程数大于corePoolSize会销毁当前线程
如果线程数不大于corePoolSize并且没有设置核心线程超时,一直take。可以通过allowCoreThreadTimeOut方法让核心线程超时
可以通过prestartCoreThread预启动核心线程
可以通过继承当前类重写beforeExecute,afterExecute方法进行任务执行拦截
拒绝策略:AbortPolicy抛异常;DiscardPolicy丢弃当前任务;DiscardOldestPolicy丢弃最老的任务;CallerRunsPolicy任务提交者执行
CallerRunsPolicy比较有用,可以通过拉低提交速率,减缓线程池压力,达到背压的效果
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V>{
//省略其他方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
原文:https://www.cnblogs.com/zby9527/p/13151379.html