现在使用Callable+FutureTask既可以有返回值,也可以捕获异常
当有了返回值后,我们就可以不用一直等着线程的结果,而是可以先干点别的事情,最后凭future获取结果,例如星期六你去蛋糕店做蛋糕,店员给你一张小票,你这时候可以先去看一部电影,看完回到蛋糕店凭小票领取蛋糕即可,这样就可以省去你等待做蛋糕花费的时间,这里的future就相当于小票
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> call = () -> {
System.out.println("正在计算结果。。。");
Thread.sleep(1000);
return 1;
};
FutureTask<Integer> futureTask = new FutureTask<>(call);
new Thread(futureTask).start();
// 做点别的事情
System.out.println("do something...");
System.out.println("拿到的结果为 " + futureTask.get());
}
}
在new FutureTask的时候需要传入Callable接口,那么先去看看FutureTask的构造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 这里可以看到,FutureTask自己维护了一个Callable变量
this.state = NEW; // ensure visibility of callable
}
然后我们需要将FutureTask对象传入Thread类中,那么在Thread类中的run()方法里面又发生了什么呢?
public void run() {
if (target != null) {
target.run(); // 执行了target的run()方法
}
}
其实在Thread类中的run()方法很简单,只是执行了target的run()方法,这个target就是我们在构建Thread对象时传入的Runnable对象,在这里就是FutureTask对象(FutureTask继承了Runnable)
因此直接去FutureTask的run()方法查看
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 将自己维护的Callable对象赋值给c
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 在这里执行了Callable的run()方法,也就是我们自己写的run()方法,并且拿到了返回结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); // 在这里将返回的结果设置供set()方法获取调用
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
首先,需要先了解FutureTask类里面定义的几个状态,当FutureTask被new出来的时候,就会把state设置为NEW状态,表示这是新创建的任务
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
下面可以看set()跟get()方法了
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 将状态修改为COMPLETING
outcome = v; // 将执行结果设置给FutureTask维护的outcome变量
// 如果一切设置成功,就将任务状态设置为NORMAL状态,这便是已经完成了
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
// 当设置成功后,就会执行这个方法,将所有先前执行了FutureTask.get()方法的线程唤醒(因为在之前代码还未执行完,没有得到结果,所以执行get()的线程都会被等待,一知道结果出来为止)
finishCompletion();
}
}
private void finishCompletion() {
// 这里的WaitNode里面存储了对应的线程,这是一个链式结构,在这一步会一个一个遍历唤醒
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 主要是在这一步对线程进行唤醒
}
// 获取到下一个节点
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
// 这个done方法并未在FutureTask中有任何有效的代码,如下。我猜测是一个钩子函数,等set执行完就会立即执行
// 这个done可以自己实现,可以自定义一个类继承FutureTask或者用匿名内部类实现
done();
callable = null;
}
protected void done() { }
// 实现如下
FutureTask<Integer> futureTask = new FutureTask<Integer>(call) {
@Override
protected void done() {
System.out.println("=======完成了");
}
};
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 假设状态值小于COMPLETING,也就是说任务还没有完成,就执行下面的等待方法
s = awaitDone(false, 0L); // 如果任务没有执行完,那么会将当前线程等待在这里
return report(s); // 返回处理结果,方法在下面展示
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q); // 如果线程被中断了,就把所有的WaitNode移除
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 如果任务完成,直接返回状态
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 如果已经完成了任务的执行,就没要继续占用cpu的执行权,让出即可
// yield()方法会通知线程调度器放弃对处理器的占用,但调度器可以忽视这个通知。yield()方法主要是为了保障线程间调度的连续性,防止某个线程一直长时间占用cpu资源。
Thread.yield();
else if (q == null)
// 如果当前访问get()方法的线程还没有对应的WaitNode,就创建
// 而且会把当前线程放入WaitNode中
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 如果设置了超时时间,进行检测,如果超时了就一处所有的WaitNode
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 如果任务状态小于COMPLETING,也就是还没有执行完,那么就把当前线程阻塞,这也是这个方法的关键
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
// 这个outcome是FutureTask维护的变量,在先前的set()方法中已经将结果设置给outcome
Object x = outcome;
if (s == NORMAL)
return (V)x; // 正常执行完,返回结果
if (s >= CANCELLED) // 如果非正常执行完,抛出异常
throw new CancellationException();
// 此处会将异常转为Throwable,是所有异常的基类
throw new ExecutionException((Throwable)x);
}
// 这是WaitNode的实现
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
// 每次创建,都会把当前正在执行的线程存进来
WaitNode() { thread = Thread.currentThread(); }
}
以上就是FutureTask基本的一些api的源码解读
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Integer> {
private Integer begin;
private Integer end;
public ForkJoinDemo(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - begin <= 2) {
// 计算
for (int i = begin; i <= end; i++) {
sum += i;
}
} else {
// 拆分任务
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(begin, (begin + end) / 2);
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo((begin + end) / 2 + 1, end);
// 执行子任务
forkJoinDemo.fork();
forkJoinDemo2.fork();
Integer join = forkJoinDemo.join(); // 获取执行结果
Integer join1 = forkJoinDemo2.join();
sum = join + join1;
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> future = pool.submit(new ForkJoinDemo(1, 100));
System.out.println("。。。。");
System.out.println("计算的结果:" + future.get());
}
}
原文:https://www.cnblogs.com/Myarticles/p/12046072.html