我们都知道FutureTask类能返回异步执行结果、能取消任务执行、能查询线程是否执行完成,但是它背后是怎样实现的呢?下面我会基于JDK1.8的源码分析FutureTask类,讲解它的实现原理。
PS:分析类图要从面向对象的角度分析
如下图类图:
分析完FutureTask类相关的接口,下面会从FutureTask的源码出发,分析它的具体实现。
平时开发中,一个FutureTask任务交给线程执行后,一般只有一个线程关系它的执行结果,调用get方法后,获取执行结果的线程会被阻塞直到结果返回。但它作为一个对象,其实是允许被多线程并发调用执行(允许多个线程关心它的执行结果),那么你得记录下有哪些线程被阻塞了。所以你就会理解为什么FutureTask类会存在CAS、等待队列、执行状态等属性。
就等待队列而言,常规的实现:类似ReentrantLock(可重入锁)它背后的等待队列Sync类是通过AQS实现的,但是FutureTask是自己实现了一个栈结构的等待队列。
为什么FutureTask的等待队列不用AQS呢?
其实因为AQS的等待队列中的线程会时刻在自旋,判断自己是否是首节点(首节点能获取到锁),但是FutureTask的场景是多个线程来获取执行结果,把它们全部阻塞起来,等到执行完毕后再全部唤醒就可以,不需要线程一直自旋。总的来说,用AQS会浪费CPU资源,不要杀鸡用牛刀。
//任务的执行状态,可能存在的状态切换有:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
private volatile int state; private static final int NEW = 0; //初始状态 private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。 private static final int NORMAL = 2; //任务正常完成,结果被set private static final int EXCEPTIONAL = 3; //任务抛出异常 private static final int CANCELLED = 4; //任务被取消 private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断 private static final int INTERRUPTED = 6; //线程已经被中断 //构造函数传进来的、需要被执行的回调任务 private Callable<V> callable; //get()或者异常返回的执行结果 private Object outcome; // non-volatile, protected by state reads/writes //执行callable的线程,会在执行run()的时候,通过CAS初始化 private volatile Thread runner; //栈结构的等待队列,该变量是栈顶 private volatile WaitNode waiters; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE;
//state变量在内存地址的偏移量 private static final long stateOffset;
//runner变量在内存地址的偏移量 private static final long runnerOffset;
//waiters变量在内存地址的偏移量 private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
FutureTask可以通过callable实例对象初始化或者通过runnable实例对象初始化。如果是通过runnable实例对象初始化的话,会被适配器封装成callable。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } //下面的Executors类的callable方法 /** * 通过适配器模式返回一个Callable */ public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
FutureTask任务的执行,涉及到另外的知识,这里简单介绍下。我们都知道FutureTask实现了Runnable接口,Runnable实例是被线程或者线程池执行的。我们都知道调用Thread.start()就能启动线程。查看start()的源码,里面也没有调用到Runnable的run()。其实Runnable的run()是在jvm底层代码调用了,c语言层面吧。这里的run()是重写了Runnable的run(),所以我们把FutureTask任务传递给线程或者线程池,在线程启动后就会执行到该方法。下面看看FutureTask的run()是怎么实现的。
public void run() {
//如果任务不是初始状态,或者CAS当前线程到runner变量失败的话,直接返回。确保任务只能被"同时"执行一次。
//注意:runner变量在这里被通过CAS的方式直接操作内存地址进行了赋值!!! if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try {
//执行callable任务,得到result结构 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran)
//线程执行完毕后的操作:设置返回结果、修改futureTask的状态等 set(result); } } finally { // runner must be non-null until state is settled to
// prevent concurrent calls to run() // runner属性在state属性被重新赋值前,不能为空。防止被多线程同时调用run方法。(看该方法开头的CAS算法就明白) runner = null; // state must be re-read after nulling runner to prevent
// leaked interrupts //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
下面继续分析上面run()方法里面提到的set()方法。
protected void set(V v) {
//通过CAS方法,直接操作state的内存地址,更新state的状态为计算完成状态(COMPLETING) if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//cas成功,则把返回结果赋值给outcome;并且更新更新state的状态为任务执行完成(NORMAL) outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } /** * 1、清空等待栈 * 2、唤醒所有等待线程
* 3、清空callable */ private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) {
//把futureTask的waters属性设置为null,清空等待栈 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//遍历栈,依次唤醒等待的线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null;
//唤醒等待线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //任务执行完成后的预留接口,给子类拓展(大佬代码的拓展性杠杠的) done(); callable = null; // to reduce footprint }
总结:从以上分析能看出,任务被执行完毕后,会把结果赋值到outcome变量。并且会重置futureTask的属性,唤醒等待队列中的线程。
get()方法是阻塞获取等待结果,直到结果返回。看看它是怎么实现的。(get的超时获取方法就不展开介绍了)
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING)
//任务还没执行完成,把线程加到等待栈 s = awaitDone(false, 0L);
//返回执行结果 return report(s); }
//timed为true代表是超时get,超过时间还没返回结果就throw exception private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false;
//下面是个死循环,死循环会在最后被挂起。在唤醒后会继续执行死循环,直到跳出。跳出循环有三种情况
//1、获取结果的线程被中断了
//2、任务状态是完成/取消/中断/异常,则跳出循环
//3、超时退出 for (;;) { if (Thread.interrupted()) {
//如果获取结果的线程被中断了,从等待栈移除它,并抛出中断异常 removeWaiter(q); throw new InterruptedException(); } int s = state;
//如果任务状态是完成或者取消、中断、异常等,返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //COMPLETING状态是任务执行完成的状态,会很快设置成NORMAL(看set()方法)
//所以这里通过yield让出cpu资源,比挂起更快
Thread.yield(); else if (q == null)
//任务还在执行,创建节点。该节点会在下一次循环加入到等待栈的栈顶 q = new WaitNode(); else if (!queued)
//线程节点加到栈顶 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) {
//超时式挂起 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //线程执行到这里会被挂起。被唤醒的时候还会进入死循环,但是唤醒一般伴随着任务状态修改,这个可以跳出死循环。
LockSupport.park(this); } }
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
//mayInterruptIfRunning 如果正在运行,是否中断允许。
public boolean cancel(boolean mayInterruptIfRunning) {
//如果任务刚刚初始化,还没执行,直接中断或者取消。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try {
//如果为null,说明任务没有在执行。已经完成或者其他状态 Thread t = runner; if (t != null)
//只有线程在允许才允许中断线程 t.interrupt(); } finally { // final state
//设置任务的状态为中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally {
//唤醒所有在get()方法等待的线程 finishCompletion(); } return true; }
futuretask源码分析下来还是非常有趣,细心看看难度不大。
原文:https://www.cnblogs.com/akid1994/p/12747053.html