首页 > 其他 > 详细

Future

时间:2019-07-10 23:43:54      阅读:95      评论:0      收藏:0      [点我收藏+]

Future类是想解析:

/**
 * 可以取消的异步计算类,时Future 的基础实现,包括:
 * 启动和取消计算,查询计算是否完成以及检索计算结果,只有计算完成才能检索结果,否则将导致阻塞,一旦计算完成不能重启动和取消(除非runAndReset)
 * 可以用来包装Runnable和Future接口(因为实现了两个接口),可以用来提交给Executor
 * protected方法自定义子类时也很有用*/
public class FutureTask<V> implements RunnableFuture<V> {

    /**
     * 任务的状态集合
     * 初始时为NEW,运行状态转换为终态只有通过设置:setException、cancel
     * 状态有可能是COMPLETIING(当结果正在被设置)或者INTERRUPTING(只有当中断运行满足:cancel(true))
     * 中间态--》终态通常使用廉价的:有序/延迟 写 因为结果是唯一且不可修改的
     * volatile确保线程可见性
     * 可能的状态转变:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 基础的callable,运行后将被清空 */
    private Callable<V> callable;
    /** 用于返回或者throws的结果 */
    private Object outcome; // non-volatile 受 state reads/wirtes保护
    /** 运行runnable的线程, CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads 等待线程栈 */
    private volatile WaitNode waiters;

    /**
     * 已经完成的任务返回结果或抛出异常*/
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)//正常结束
            return (V)x;
        if (s >= CANCELLED)//任务已取消(取消、中断中、中断)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);//异常结束
    }

    /**
     * 给定callable创建FutureTask对象
     * given {@code Callable}.*/
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 确保对callable可见
    }

    /**
     * 运行时执行给定runnable,执行完成后返回给定result.
     * 不想得到特定结果时:Future<?> f = new FutureTask<Void>(runnable, null)}*/
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // 确保callable的可见性
    }
  /**
   * 是否被取消
   */
public boolean isCancelled() { return state >= CANCELLED;//取消、中断中、中断 } public boolean isDone() { return state != NEW;//状态不为新建即完成 } public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && STATE.compareAndSet (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state STATE.setRelease(this, INTERRUPTED); } } } finally { finishCompletion(); } return true; } /** * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } /** * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } /** * Protected method invoked when this task transitions to state * {@code isDone} (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { } /** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } } /** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = t; STATE.setRelease(this, EXCEPTIONAL); // final state finishCompletion(); } } public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * Executes the computation without setting its result, and then * resets this future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. * * @return {@code true} if successfully run and reset */ protected boolean runAndReset() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don‘t set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } /** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let‘s spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); } /** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } /** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion or at timeout */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - call nanoTime exactly once for each call to park // - if nanos <= 0L, return promptly without allocation or nanoTime // - if nanos == Long.MIN_VALUE, don‘t underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } else if (q == null) { if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else LockSupport.park(this); } } /** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don‘t * expect lists to be long enough to outweigh higher-overhead * schemes. */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!WAITERS.compareAndSet(this, q, s)) continue retry; } break; } } } /** * Returns a string representation of this FutureTask. * * @implSpec * The default implementation returns a string identifying this * FutureTask, as well as its completion state. The state, in * brackets, contains one of the strings {@code "Completed Normally"}, * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code * "Not completed"}. * * @return a string representation of this FutureTask */ public String toString() { final String status; switch (state) { case NORMAL: status = "[Completed normally]"; break; case EXCEPTIONAL: status = "[Completed exceptionally: " + outcome + "]"; break; case CANCELLED: case INTERRUPTING: case INTERRUPTED: status = "[Cancelled]"; break; default: final Callable<?> callable = this.callable; status = (callable == null) ? "[Not completed]" : "[Not completed, task = " + callable + "]"; } return super.toString() + status; } // VarHandle mechanics private static final VarHandle STATE; private static final VarHandle RUNNER; private static final VarHandle WAITERS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state", int.class); RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; } }

 

Future

原文:https://www.cnblogs.com/gsanye/p/11167190.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!