? ? ? ? 在传统的多线程实现方式中(继承Thread和实现Runnable)无法直接获取线程执行的返回结果,如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。
? ? ? ? 从Java 1.5开始,java.util.concurrent包中提供了 Callable和 Future两个接口,通过它们就可以在任务执行完毕之后得到任务执行结果。
?
? ? ? ? 一、Callable
? ? ? ??返回可能抛出异常的任务的结果。实现者定义了一个不带任何参数的叫做 call 的方法。?
?
? ? ? ? 1.Callable与?Runnable
? ? ? ? Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常。?
? ? ? ? Callable是?java.util.concurrent中的接口,而?Runnable是 java.lang包中的接口。它们都只有一个方法。
? ? ? ? 以下是?Runnable接口的源代码:
public interface Runnable {
/**
* 使用实现接口 Runnable 的对象创建一个线程时,
* 启动该线程将导致在独立执行的线程中调用对象的 run 方法。
*/
public abstract void run();
}
? ? ? ? 由于 run方法返回类型是 void,所以在执行完任务之后无法获得任何返回结果。
? ? ? ? 接下来是?Callable接口的源代码:
public interface Callable<V> {
/**
* 计算结果,如果无法计算结果,则抛出一个异常。
*/
V call() throws Exception;
}
? ? ? ? 可以看到 call方法返回了一个泛型V结果,所以我们就可以通过结果来进一步判断处理。
?
? ? ? ? 2.Callable转换?
? ? ? ? Executors 类包含一些从其他普通形式(Runnable、PrivilegedAction等)转换成 Callable 类的实用方法。?
? ? ? ? 以下是?Executors类中将?Runnable转换成?Callable的相关方法:
/** * 返回 Callable 对象,调用它时可运行给定的任务并返回 null */ static Callable<Object> callable(Runnable task) /** * 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果 */ static <T> Callable<T> callable(Runnable task, T result)
? ? ? ? 我们可以通过?Executors类的callable方法将?Runnable 转换成?Callable,其中 result为返回结果,以下是callable方法的源代码:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
? ? ? ? 当 task为空时会抛出空指针异常,然后通过构造方法创建一个?RunnableAdapter实例。
? ? ? ??RunnableAdapter类是一个内部类,其实?RunnableAdapter是一个?Runnable的适配器,它的作用很简单,就是将?Runnable转换成?Callable,所以这里就用到了适配器模式,面试的时候你可以拿出去装装B。
? ? ? ? 以下是?RunnableAdapter类的源代码:
/**
* Runnable适配器,具有返回结果的可运行任务
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
? ? ? ? 代码即简洁有清晰,通过调用构造方法创建一个?RunnableAdapter实例,然后通过执行call方法运行线程,最后返回结果。
?
? ? ? ? 3.Callable?的使用
? ? ? ? 因为?Callable?接口并没有相关实现类,所以我们无法直接使用它,所以使用?Callable?需要调用?ExecutorService接口的?submit方法:
<T> Future<T> submit(Callable<T> task)
? ? ? ? submit提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。?
? ? ? ? 如果想立即阻塞任务的等待,则可以使用:
result = executorService.submit(aCallable).get();
? ? ? ? 注意:Executors 类包括了一组方法,可以转换某些其他常见的类似于闭包的对象,例如,将 PrivilegedAction 转换为 Callable 形式,这样就可以提交它们了。?
? ? ? ? 后面的 Future章节还会更深入介绍?Callable?的相关使用。
?
? ? ? ? 二、Future
? ? ? ? Future 用以表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
?
? ? ? ? 1.继承关系

?
? ? ? ? 2.Future?源代码
? ? ? ? 以下是 Future接口的源代码:
public interface Future<V> {
/**
* 试图取消对此任务的执行
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 如果在任务正常完成前将其取消,则返回 true
*/
boolean isCancelled();
/**
* 如果任务已完成,则返回 true
*/
boolean isDone();
/**
* 如有必要,等待计算完成,然后获取其结果
*/
V get() throws InterruptedException, ExecutionException;
/**
* 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
?
? ? ? ? 3.Future 相关方法
? ? ? ? Future包含了5个方法。
? ? ? ? 1)get():获取其结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。
? ? ? ? 2)get(long timeout,TimeUnit unit):与get()方法不同的是,在指定时间内等待结果的返回,如果指定时间已到仍没有结果,则返回null。
? ? ? ? 3)isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法也都将返回 true。
? ? ? ? 4)cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、已取消或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。?
? ? ? ? 此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。?
? ? ? ? 5)isCancelled():如果在任务正常完成前将其取消,则返回 true。
? ? ? ? 通过 Future可以:
? ? ? ? 1)判断任务是否完成;
? ? ? ? 2)能够中断任务;
? ? ? ? 3)能够获取任务执行结果。
?
? ? ? ? 4.Future的使用
? ? ? ? 1)执行Future
? ? ? ? 使用 ExecutorService的?submit(Callable<T> task)方法。
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 创建FutureThread实例
FutureThread ft = new FutureThread();
// 调用ExecutorService的 submit(Callable<T> task)方法,执行ft
Future<Integer> f = es.submit(ft);
// 关闭线程池
es.shutdown();
// 获取结果
Integer result = f.get();
// 判断结果
if (result != null) {
System.out.println("执行成功,结果为:" + result);
} else {
System.out.println("执行失败");
}
}
}
class FutureThread implements Callable<Integer> {
// 执行函数
public Integer call() throws Exception {
System.out.println("开始执行");
return new Random().nextInt(100);
}
}
//结果:
开始执行
执行成功,结果为:51
? ? ? ? 首先,创建一个?Callable实现类,其中 call方法中是线程要执行的方法,这与 run方法类似。
? ? ? ? 然后,创建一个?ExecutorService实例,也就是线程池,然后调用submit方法将之前的?Callable实现传递执行。
? ? ? ? 最终,通过 Future的get方法可以获取到线程的执行结果,进一步处理。
?
? ? ? ? 2)取消 Future
? ? ? ? 使用 Future的 cancel方法尝试取消Future:
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 创建FutureThread实例
FutureThread ft = new FutureThread();
// 调用ExecutorService的 submit(Callable<T> task)方法,执行ft
Future<Integer> f = es.submit(ft);
// 取消
f.cancel(true);
if (!f.isCancelled()) {
// 获取结果
Integer result = f.get();
// 判断结果
if (result != null) {
System.out.println("执行成功,结果为:" + result);
} else {
System.out.println("执行失败");
}
} else {
System.out.println("执行已被取消");
}
// 关闭线程池
es.shutdown();
}
}
class FutureThread implements Callable<Integer> {
// 执行函数
public Integer call() throws Exception {
System.out.println("开始执行");
return new Random().nextInt(100);
}
}
//结果:
执行已被取消
?
? ? ? ? 3)判断Future是否完成
? ? ? ? 可以通过isDone()方法判断 Future是否已经完成:
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 创建FutureThread实例
FutureThread ft = new FutureThread();
// 调用ExecutorService的 submit(Callable<T> task)方法,执行ft
Future<Integer> f = es.submit(ft);
// 取消
// f.cancel(true);
if (!f.isCancelled()) {
for (;;) {
if (f.isDone()) {
// 获取结果
Integer result = f.get();
// 判断结果
if (result != null) {
System.out.println("执行成功,结果为:" + result);
} else {
System.out.println("执行失败");
}
break;
} else {
System.out.println("执行未完成");
}
}
} else {
System.out.println("执行已被取消");
}
// 关闭线程池
es.shutdown();
}
}
class FutureThread implements Callable<Integer> {
// 执行函数
public Integer call() throws Exception {
System.out.println("开始执行");
return new Random().nextInt(100);
}
}
//结果:
执行未完成
执行未完成
开始执行
...
...
执行未完成
执行未完成
执行成功,结果为:72
? ? ? ? 当然我们并不需要这样来循环判断,因为 get方法可以阻塞等待结果。
?
? ? ? ? 5.RunnableFuture
? ? ? ? RunnableFuture作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果。RunnableFuture接口在?Future接口的基础上增加了一个run方法:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* 在未被取消的情况下,将此 Future 设置为计算的结果
*/
void run();
}
? ? ? ? 注意:在 Java中class是无法多继承的,但接口可以。
?
? ? ? ? 6.ScheduledFuture
? ? ? ? ScheduledFuture是一个延迟的、结果可接受的操作,可将其取消 Future。通常已安排的 future 是用 ScheduledExecutorService 安排任务的结果
? ? ? ? ScheduledFuture接口继承自Delayed与 Future,其中没有任何独立方法:
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
? ? ? ? 7.RunnableScheduledFuture
? ? ? ? RunnableScheduledFuture继承自 Runnable 的 ScheduledFuture。成功执行 run 方法可以完成 Future 并允许访问其结果。
? ? ? ? RunnableScheduledFuture?增加了一个isPeriodic()方法,用来判断是否是一个定期任务:
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
/**
* 如果这是一个定期任务,则返回 true。定期任务可以根据计划重新运行。非定期任务只能运行一次
*/
boolean isPeriodic();
}
?
? ? ? ? 三、FutureTask
? ? ? ? 可取消的异步计算任务。FutureTask提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。?
? ? ? ? 可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。?
? ? ? ? 除了作为一个独立的类外,FutureTask还提供了 protected 功能(done()、set(V v)等方法),这在创建自定义任务类时可能很有用。?
?
? ? ? ? 1.构造方法
? ? ? ? 有两个构造方法:
/**
* 创建一个 FutureTask,一旦运行就执行给定的 Callable
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
/**
* 创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果
*/
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
? ? ? ? 一种是直接指定?Callable为参数。另一种则是指定了?Runnable与 result。
? ? ? ? 两个构造方法都是创建了一个 Sync实例,最终传递的参数都为?callable,Sync内部类我们已经接触过很多次了,虽然不同类中 Sync的细节实现各不相同,但实现思想基本一致。
?
? ? ? ? 2.Sync内部类
? ? ? ? Sync作为?FutureTask的内部类,用以控制?FutureTask的同步。
? ? ? ? 1)构造方法
? ? ? ? Sync只有一个构造方法。
Sync(Callable<V> callable) {
this.callable = callable;
}
? ? ? ? 2)成员变量
/** 表示任务正在运行的状态值 */ private static final int RUNNING = 1; /** 表示任务运行完成的状态值 */ private static final int RAN = 2; /** 表示任务已被取消的状态值 */ private static final int CANCELLED = 4; /** 底层 callable */ private final Callable<V> callable; /** 返回结果,通过get方法获取 */ private V result; /** 从get方法中抛出的异常 */ private Throwable exception; /** * 运行任务的线程 */ private volatile Thread runner;
? ? ? ? 从成员变量中可以看到?FutureTask拥有四种状态:未运行(0)、运行中(1),已运行(2)和已取消(4)。 其中未运行是默认状态,状态值为0,其他三种已经在类中定义。其他几个成员变量分别是运行相关的声明。
? ? ? ? 3)核心方法
? ? ? (1)innerGet方法
? ? ? ? innerGet方法是Sync中获取执行结果的方法,FutureTask的 get方法也是调用的本方法。
? ? ? ? 以下是 innerGet方法的源代码:
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
? ? ? ? innerGet方法首先调用的是AQS(AbstractQueuedSynchronizer)的 acquireSharedInterruptibly方法,最终调用的是 doAcquireSharedInterruptibly方法。然后判断任务状态,如果已经取消则抛出?CancellationException异常,然后判断异常信息,如果有则抛出。最后返回结果。
? ? ? ? doAcquireSharedInterruptibly的核心代码为:
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
? ? ? (2)innerGet(long nanosTimeout)方法
? ? ? ? 与?innerGet()方法类似,只是innerGet(long nanosTimeout)会在指定时间内未得到结果时返回null。
? ? ? (3)innerRun()方法
? ? ? ? FutureTask运行任务时会调用 Sync的?innerRun方法。
void innerRun() {
//首先设置状态,未运行-->运行中
if (!compareAndSetState(0, RUNNING))
return;
try {
//获取当前线程
runner = Thread.currentThread();
//判断状态
if (getState() == RUNNING)
//开始执行
innerSet(callable.call());
else
//释放共享锁
releaseShared(0);
} catch (Throwable ex) {
//处理异常
innerSetException(ex);
}
}
? ? ? ? 首先会将未运行状态设置成运行中,也就是判断该任务是否已经执行过,然后再设置状态。其中会调用?innerSet方法来执行任务。调用?releaseShared方法来取消任务。
? ? ? ? (4)innerSet(V v)方法
void innerSet(V v) {
for (;;) {
//获取状态
int s = getState();
//如果s==2,即已运行过,则返回
if (s == RAN)
return;
//如果s==4,即已取消
if (s == CANCELLED) {
//释放共享锁
releaseShared(0);
return;
}
//设置状态为RAN,值为2
if (compareAndSetState(s, RAN)) {
//设置结果
result = v;
//释放共享锁
releaseShared(0);
//调用done方法,done方法需要使用者自行实现
done();
return;
}
}
}
? ? ? ?(5)innerSetException
? ? ? ? innerSetException方法与innerSet方法流程基本一致,只是在其中添加了异常信息的设置。
void innerSetException(Throwable t) {
for (;;) {
//获取状态
int s = getState();
//如果已运行则返回
if (s == RAN)
return;
//如果为取消状态
if (s == CANCELLED) {
//释放共享锁
releaseShared(0);
return;
}
//设置状态为RAN
if (compareAndSetState(s, RAN)) {
//设置异常信息
exception = t;
result = null;
//释放共享锁
releaseShared(0);
done();
return;
}
}
}
? ? ? ?(6)innerCancel
? ? ? ? innerCancel方法用于取消任务执行。
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
// 获取状态
int s = getState();
// 判断状态是否为 RAN或 CANCELLED
if (ranOrCancelled(s))
return false;
// 设置状态值为 CANCELLED
if (compareAndSetState(s, CANCELLED))
break;
}
// 如果可以中断
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
// 中断线程
r.interrupt();
}
// 释放共享锁
releaseShared(0);
//调用done方法
done();
return true;
}
? ? ? ?(7)innerRunAndReset
? ? ? ? innerRunAndReset与innerRun不同之处在于,innerRunAndReset只运行却不返回结果,所以innerRunAndReset更适合一些重置恢复的操作。
boolean innerRunAndReset() {
// 将状态为未运行设置为运行中
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
// 再次判断状态
if (getState() == RUNNING)
// 执行call
callable.call(); // 不设置结果
runner = null;
// 将状态重置为未运行
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
// 设置异常
innerSetException(ex);
return false;
}
}
?
? ? ? ? 4)其他方法
? ? ? ?(1)ranOrCancelled
? ? ? ? 判断状态是否为 RAN或?CANCELLED状态。
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
? ? ? ? 可以看到?ranOrCancelled是通过状态值与 RAN或?CANCELLED值做与运算。
? ? ? ? 假设此时state值为0,与运算后结果为0,则最终结果就为false,所以即不为 RAN也不为?CANCELLED。
? ? ? ?(2)innerIsDone
? ? ? ? 判断内部执行是否完成。
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
? ? ? ? 首先判断状态是否为?RAN或?CANCELLED,然后再判断?runner是否为空。
? ? ? ?(3)tryReleaseShared
? ? ? ? 尝试释放共享锁。
/**
* AQS的基本实现,当状态为已完成和runner为空时释放共享锁
*/
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
? ? ? ? 可以看到?tryReleaseShared为?protected 方法,而且方法体中也没有太具体的实现,那是因为这种?protected 修饰的方法需要使用者自行实现。
? ? ? ? 还有几个其他类似的方法,这里就不一一说明了,它们的原理跟这几个方法类似。
?
? ? ? ? 3.FutureTask方法
? ? ? ? 1)get()方法
? ? ? ? 等待计算完成,然后获取其结果,如果未返回结果则阻塞。
? ? ? ? 以下是get方法的源代码:
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
? ? ? ? get方法调用的是Sync内部类的?innerGet方法,innerGet方法在上面已经分析过,这里就不赘述了。
? ? ? ? 2)get(long timeout, TimeUnit unit)方法
? ? ? ? 在指定时间内等待,如果时间已到无结果则返回null。
? ? ? ? 3)set(V v)方法
? ? ? ? 除非已经设置了此 Future 或已将其取消,否则将其结果设置为给定的值。在计算成功完成时通过 run 方法内部调用此方法。
? ? ? ? set方法调用的是Sync内部类的innerSet方法。
protected void set(V v) {
sync.innerSet(v);
}
? ? ? ? 4)run()方法
? ? ? ? 除非已将此 Future 取消,否则将其设置为其计算的结果。
? ? ? ? run方法调用的是Sync内部类的 innerRun方法。
public void run() {
sync.innerRun();
}
? ? ? ? 5)runAndReset()方法
? ? ? ? 执行计算而不设置其结果,然后将此 Future 重置为初始状态,如果计算遇到异常或已取消,则该操作失败。本操作被设计用于那些本质上要执行多次的任务。
? ? ? ? runAndReset方法调用的是Sync内部类的 runAndReset方法。
?
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
? ? ? ? 6)cancel(boolean mayInterruptIfRunning)方法
? ? ? ? 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。?
? ? ? ? cancel方法调用的也是Sync内部类的?innerCancel方法。
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
? ? ? ? 可以看到 FutureTask的几个方法其实调用的都是Sync内部类的相关方法,所以?FutureTask的其他方法就不介绍了,只需要把 Sync的相关方法理解就可以了。
?
? ? ? ? 4.FutureTask使用
? ? ? ? FutureTask的各个方法我们基本已经了解了,接下来就是通过实例来学习如何使用?FutureTask。
? ? ? ??FutureTask与 Future的使用方式类似:
public class FutureTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 创建FutureThread实例
FutureThread ft = new FutureThread();
FutureTask<Integer> futureTask = new FutureTask<Integer>(ft);
// 第一种方式,使用方式与 Future相同
es.submit(futureTask);
// 第二种方式,使用 Thread运行 Runnable实现,同样可以得到结果
//Thread thread = new Thread(futureTask);
//thread.start();
// 关闭线程池
es.shutdown();
// 获取结果
Integer result = futureTask.get();
// 判断结果
if (result != null) {
System.out.println("执行成功,结果为:" + result);
} else {
System.out.println("执行失败");
}
}
}
class FutureThread implements Callable<Integer> {
// 执行函数
public Integer call() throws Exception {
System.out.println("开始执行");
return new Random().nextInt(100);
}
}
? ? ? ? 除了之上创建FutureTask实例的方式外,我们还可以这样来写:
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
public Integer call() throws Exception {
System.out.println("开始执行");
return new Random().nextInt(100);
}
});Java并发之Callable,Future,FutureTask
原文:http://286.iteye.com/blog/2299701