最近做了一个项目,产品提了个很无语的需求。异步去调用其他系统的接口,又不让我们用消息队列。当时我就觉得很扯淡,以来这个接口的网络IO不会消耗太多时间,而来我们业务系统对性能要求不高。
思前想后使用了Dubbo的异步调用方式去调接口。Dubbo 会返回一个Future,我们根据返回的future获得调用结果再判断是否重试。
当然这篇博客的重点不在于此,而是我突然发现我甚至不知道为什么异步调用会给我返回一个Future;瞬间觉得自己好扣脚,于是我决定看下jdk源码看看到底是怎么回事,
我打开了java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)方法
源码如下:
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
可知返回的是个RunableFuture对象;接着点开:
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
发现该接口实现了Runable和Future,突然有点眉目了。
接着回到java.util.concurrent.AbstractExecutorService#newTaskFor(java.util.concurrent.Callable<T>)
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);// RunableFuture的具体实现 }
好了我点开看看到底是怎么具体实现的,
public class FutureTask<V> implements RunnableFuture<V> { // 省略 /** The underlying callable; nulled out after running */ private Callable<V> callable;// 维护了一个callable对象 /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
看到这里应该是差不多了,一个FutureTask维护了一个callable,但是我们还是看看具体实现
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } // 写入返回值的方法 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
由以上代码可知,outcome字段是我们线程执行的最终结果,我们看看具体实在哪里写入的
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
可以看到是在覆写了Runable的run方法中去执行callable.call,并将结果写入outcome字段。即当我们把线程提交到线程池,我们先立即返回一个Future,然后线程池经过一系列判断,将提交的线程放入工作池,工作池去开启新的线程去跑我们的任务。从而实现我们的异步操作。
线程池的具体代码如下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread;// 利用提交的callble创建线程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);// 将线程放入工作池 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();// 添加成功就执行线程。 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
很多时候我们在开发中都淡忘了底层的一些实现(做业务做久了就这样),然后去看实现,才能有所理解。也能帮助我们在开发中写出更好更合理的代码!
原文:https://www.cnblogs.com/neverendingDreaming/p/12755717.html