项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
-
- public class AddTask implements Callable<Integer> {
-
- private int a,b;
-
- public AddTask(int a, int b) {
- this.a = a;
- this.b = b;
- }
-
- @Override
- public Integer call() throws Exception {
- Integer result = a + b;
- return result;
- }
-
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- ExecutorService executor = Executors.newSingleThreadExecutor();
-
- Future<Integer> future = executor.submit(new AddTask(1, 2));
- Integer result = future.get();
- }
- }
虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:
- public interface Future<V> {
- boolean cancel(boolean mayInterruptIfRunning);
- boolean isCancelled();
- boolean isDone();
- V get() throws InterruptedException, ExecutionException;
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:
- package future;
-
- import java.util.concurrent.CancellationException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
-
- public interface IFuture<V> extends Future<V> {
- boolean isSuccess();
- V getNow();
- Throwable cause();
- boolean isCancellable();
- IFuture<V> await() throws InterruptedException;
- boolean await(long timeoutMillis) throws InterruptedException;
- boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
- IFuture<V> awaitUninterruptibly();
- boolean awaitUninterruptibly(long timeoutMillis);<span style="line-height: 1.5;">
- boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
- IFuture<V> addListener(IFutureListener<V> l);
- IFuture<V> removeListener(IFutureListener<V> l);
-
- }
接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:
- public class Object {
-
- public final void wait() throws InterruptedException {
- wait(0);
- }
-
-
- public final native void notifyAll();
- }
知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):
- package future;
-
- import java.util.Collection;
- import java.util.concurrent.CancellationException;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
-
- public class AbstractFuture<V> implements IFuture<V> {
-
- protected volatile Object result;
-
- protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>();
-
-
- private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal();
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
-
- synchronized (this) {
- if (isDone()) {
- return false;
- }
- result = new CauseHolder(new CancellationException());
- notifyAll();
- }
- notifyListeners();
- return true;
- }
-
- @Override
- public boolean isCancellable() {
- return result == null;
- }
-
- @Override
- public boolean isCancelled() {
- return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
- }
-
- @Override
- public boolean isDone() {
- return result != null;
- }
-
- @Override
- public V get() throws InterruptedException, ExecutionException {
- await();
-
- Throwable cause = cause();
- if (cause == null) {
- return getNow();
- }
- if (cause instanceof CancellationException) {
- throw (CancellationException) cause;
- }
- throw new ExecutionException(cause);
- }
-
- @Override
- public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (await(timeout, unit)) {
- Throwable cause = cause();
- if (cause == null) {
- return getNow();
- }
- if (cause instanceof CancellationException) {
- throw (CancellationException) cause;
- }
- throw new ExecutionException(cause);
- }
-
- throw new TimeoutException();
- }
-
- @Override
- public boolean isSuccess() {
- return result == null ? false : !(result instanceof CauseHolder);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public V getNow() {
- return (V) (result == SUCCESS_SIGNAL ? null : result);
- }
-
- @Override
- public Throwable cause() {
- if (result != null && result instanceof CauseHolder) {
- return ((CauseHolder) result).cause;
- }
- return null;
- }
-
- @Override
- public IFuture<V> addListener(IFutureListener<V> listener) {
- if (listener == null) {
- throw new NullPointerException("listener");
- }
- if (isDone()) {
- notifyListener(listener);
- return this;
- }
- synchronized (this) {
- if (!isDone()) {
- listeners.add(listener);
- return this;
- }
- }
- notifyListener(listener);
- return this;
- }
-
- @Override
- public IFuture<V> removeListener(IFutureListener<V> listener) {
- if (listener == null) {
- throw new NullPointerException("listener");
- }
-
- if (!isDone()) {
- listeners.remove(listener);
- }
-
- return this;
- }
-
- @Override
- public IFuture<V> await() throws InterruptedException {
- return await0(true);
- }
-
-
- private IFuture<V> await0(boolean interruptable) throws InterruptedException {
- if (!isDone()) {
-
- if (interruptable && Thread.interrupted()) {
- throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted.");
- }
-
- boolean interrupted = false;
- synchronized (this) {
- while (!isDone()) {
- try {
- wait();
- } catch (InterruptedException e) {
- if (interruptable) {
- throw e;
- } else {
- interrupted = true;
- }
- }
- }
- }
- if (interrupted) {
-
-
- Thread.currentThread().interrupt();
- }
- }
- return this;
- }
-
- @Override
- public boolean await(long timeoutMillis) throws InterruptedException {
- return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
- }
-
- @Override
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return await0(unit.toNanos(timeout), true);
- }
-
- private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
- if (isDone()) {
- return true;
- }
-
- if (timeoutNanos <= 0) {
- return isDone();
- }
-
- if (interruptable && Thread.interrupted()) {
- throw new InterruptedException(toString());
- }
-
- long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
- long waitTime = timeoutNanos;
- boolean interrupted = false;
-
- try {
- synchronized (this) {
- if (isDone()) {
- return true;
- }
-
- if (waitTime <= 0) {
- return isDone();
- }
-
- for (;;) {
- try {
- wait(waitTime / 1000000, (int) (waitTime % 1000000));
- } catch (InterruptedException e) {
- if (interruptable) {
- throw e;
- } else {
- interrupted = true;
- }
- }
-
- if (isDone()) {
- return true;
- } else {
- waitTime = timeoutNanos - (System.nanoTime() - startTime);
- if (waitTime <= 0) {
- return isDone();
- }
- }
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- @Override
- public IFuture<V> awaitUninterruptibly() {
- try {
- return await0(false);
- } catch (InterruptedException e) {
- throw new java.lang.InternalError();
- }
- }
-
- @Override
- public boolean awaitUninterruptibly(long timeoutMillis) {
- try {
- return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
- } catch (InterruptedException e) {
- throw new java.lang.InternalError();
- }
- }
-
- @Override
- public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
- try {
- return await0(unit.toNanos(timeout), false);
- } catch (InterruptedException e) {
- throw new java.lang.InternalError();
- }
- }
-
- protected IFuture<V> setFailure(Throwable cause) {
- if (setFailure0(cause)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already: " + this);
- }
-
- private boolean setFailure0(Throwable cause) {
- if (isDone()) {
- return false;
- }
-
- synchronized (this) {
- if (isDone()) {
- return false;
- }
- result = new CauseHolder(cause);
- notifyAll();
- }
-
- return true;
- }
-
- protected IFuture<V> setSuccess(Object result) {
- if (setSuccess0(result)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already: " + this);
- }
-
- private boolean setSuccess0(Object result) {
- if (isDone()) {
- return false;
- }
-
- synchronized (this) {
- if (isDone()) {
- return false;
- }
- if (result == null) {
- this.result = SUCCESS_SIGNAL;
- } else {
- this.result = result;
- }
- notifyAll();
- }
- return true;
- }
-
- private void notifyListeners() {
- for (IFutureListener<V> l : listeners) {
- notifyListener(l);
- }
- }
-
- private void notifyListener(IFutureListener<V> l) {
- try {
- l.operationCompleted(this);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private static class SuccessSignal {
-
- }
-
- private static final class CauseHolder {
- final Throwable cause;
-
- CauseHolder(Throwable cause) {
- this.cause = cause;
- }
- }
- }
监听接口
public interface IFutureListener<V> {
void operationCompleted(IFuture<V> future) throws Exception;
}
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
- package future.test;
-
- import future.IFuture;
- import future.IFutureListener;
-
- public class DelayAdder {
-
- public static void main(String[] args) {
- new DelayAdder().add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer>() {
-
- @Override
- public void operationCompleted(IFuture<Integer> future) throws Exception {
- System.out.println(future.getNow());
- }
-
- });
- }
-
- public DelayAdditionFuture add(long delay, int a, int b) {
- DelayAdditionFuture future = new DelayAdditionFuture();
- new Thread(new DelayAdditionTask(delay, a, b, future)).start();
- return future;
- }
-
- private class DelayAdditionTask implements Runnable {
-
- private long delay;
-
- private int a, b;
-
- private DelayAdditionFuture future;
-
- public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
- super();
- this.delay = delay;
- this.a = a;
- this.b = b;
- this.future = future;
- }
-
- @Override
- public void run() {
- try {
- Thread.sleep(delay);
- Integer i = a + b;
-
- future.setSuccess(i);
- } catch (InterruptedException e) {
-
- future.setFailure(e.getCause());
- }
- }
-
- }
- }
- package future.test;
-
- import future.AbstractFuture;
- import future.IFuture;
- public class DelayAdditionFuture extends AbstractFuture<Integer> {
-
- @Override
- public IFuture<Integer> setSuccess(Object result) {
- return super.setSuccess(result);
- }
-
- @Override
- public IFuture<Integer> setFailure(Throwable cause) {
- return super.setFailure(cause);
- }
-
- }
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
-
- public class AddTask implements Callable<Integer> {
-
- private int a,b;
-
- public AddTask(int a, int b) {
- this.a = a;
- this.b = b;
- }
-
- @Override
- public Integer call() throws Exception {
- Integer result = a + b;
- return result;
- }
-
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- ExecutorService executor = Executors.newSingleThreadExecutor();
-
- Future<Integer> future = executor.submit(new AddTask(1, 2));
- Integer result = future.get();
- }
- }
虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:
- public interface Future<V> {
- boolean cancel(boolean mayInterruptIfRunning);
- boolean isCancelled();
- boolean isDone();
- V get() throws InterruptedException, ExecutionException;
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:
- package future;
-
- import java.util.concurrent.CancellationException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
-
-
-
-
-
-
-
- public interface IFuture<V> extends Future<V> {
- boolean isSuccess();
- V getNow();
- Throwable cause();
- boolean isCancellable();
- IFuture<V> await() throws InterruptedException;
- boolean await(long timeoutMillis) throws InterruptedException;
- boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
- IFuture<V> awaitUninterruptibly();
- boolean awaitUninterruptibly(long timeoutMillis);<span style="line-height: 1.5;">
- boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
- IFuture<V> addListener(IFutureListener<V> l);
- IFuture<V> removeListener(IFutureListener<V> l);
-
- }
接下来就一起来实现这个IFuture,在这之前要说明下Object.wait(),Object.notifyAll()方法,因为整个Future实现的原理的核心就是这两个方法.看看JDK里面的解释:
- public class Object {
-
-
-
-
-
-
-
-
- public final void wait() throws InterruptedException {
- wait(0);
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
- public final native void notifyAll();
- }
知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await()等一系列的方法时,如果Future还未完成,那么就调用future.wait() 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll()方法来唤醒之前因为调用过wait()方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):
- package future;
-
- import java.util.Collection;
- import java.util.concurrent.CancellationException;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public class AbstractFuture<V> implements IFuture<V> {
-
- protected volatile Object result;
-
-
-
- protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>();
-
-
-
-
-
- private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal();
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (isDone()) {
- return false;
- }
-
- synchronized (this) {
- if (isDone()) {
- return false;
- }
- result = new CauseHolder(new CancellationException());
- notifyAll();
- }
- notifyListeners();
- return true;
- }
-
- @Override
- public boolean isCancellable() {
- return result == null;
- }
-
- @Override
- public boolean isCancelled() {
- return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
- }
-
- @Override
- public boolean isDone() {
- return result != null;
- }
-
- @Override
- public V get() throws InterruptedException, ExecutionException {
- await();
-
- Throwable cause = cause();
- if (cause == null) {
- return getNow();
- }
- if (cause instanceof CancellationException) {
- throw (CancellationException) cause;
- }
- throw new ExecutionException(cause);
- }
-
- @Override
- public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (await(timeout, unit)) {
- Throwable cause = cause();
- if (cause == null) {
- return getNow();
- }
- if (cause instanceof CancellationException) {
- throw (CancellationException) cause;
- }
- throw new ExecutionException(cause);
- }
-
- throw new TimeoutException();
- }
-
- @Override
- public boolean isSuccess() {
- return result == null ? false : !(result instanceof CauseHolder);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public V getNow() {
- return (V) (result == SUCCESS_SIGNAL ? null : result);
- }
-
- @Override
- public Throwable cause() {
- if (result != null && result instanceof CauseHolder) {
- return ((CauseHolder) result).cause;
- }
- return null;
- }
-
- @Override
- public IFuture<V> addListener(IFutureListener<V> listener) {
- if (listener == null) {
- throw new NullPointerException("listener");
- }
- if (isDone()) {
- notifyListener(listener);
- return this;
- }
- synchronized (this) {
- if (!isDone()) {
- listeners.add(listener);
- return this;
- }
- }
- notifyListener(listener);
- return this;
- }
-
- @Override
- public IFuture<V> removeListener(IFutureListener<V> listener) {
- if (listener == null) {
- throw new NullPointerException("listener");
- }
-
- if (!isDone()) {
- listeners.remove(listener);
- }
-
- return this;
- }
-
- @Override
- public IFuture<V> await() throws InterruptedException {
- return await0(true);
- }
-
-
- private IFuture<V> await0(boolean interruptable) throws InterruptedException {
- if (!isDone()) {
-
- if (interruptable && Thread.interrupted()) {
- throw new InterruptedException("thread " + Thread.currentThread().getName() + " has been interrupted.");
- }
-
- boolean interrupted = false;
- synchronized (this) {
- while (!isDone()) {
- try {
- wait();
- } catch (InterruptedException e) {
- if (interruptable) {
- throw e;
- } else {
- interrupted = true;
- }
- }
- }
- }
- if (interrupted) {
-
-
- Thread.currentThread().interrupt();
- }
- }
- return this;
- }
-
- @Override
- public boolean await(long timeoutMillis) throws InterruptedException {
- return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
- }
-
- @Override
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return await0(unit.toNanos(timeout), true);
- }
-
- private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
- if (isDone()) {
- return true;
- }
-
- if (timeoutNanos <= 0) {
- return isDone();
- }
-
- if (interruptable && Thread.interrupted()) {
- throw new InterruptedException(toString());
- }
-
- long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
- long waitTime = timeoutNanos;
- boolean interrupted = false;
-
- try {
- synchronized (this) {
- if (isDone()) {
- return true;
- }
-
- if (waitTime <= 0) {
- return isDone();
- }
-
- for (;;) {
- try {
- wait(waitTime / 1000000, (int) (waitTime % 1000000));
- } catch (InterruptedException e) {
- if (interruptable) {
- throw e;
- } else {
- interrupted = true;
- }
- }
-
- if (isDone()) {
- return true;
- } else {
- waitTime = timeoutNanos - (System.nanoTime() - startTime);
- if (waitTime <= 0) {
- return isDone();
- }
- }
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- @Override
- public IFuture<V> awaitUninterruptibly() {
- try {
- return await0(false);
- } catch (InterruptedException e) {
- throw new java.lang.InternalError();
- }
- }
-
- @Override
- public boolean awaitUninterruptibly(long timeoutMillis) {
- try {
- return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
- } catch (InterruptedException e) {
- throw new java.lang.InternalError();
- }
- }
-
- @Override
- public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
- try {
- return await0(unit.toNanos(timeout), false);
- } catch (InterruptedException e) {
- throw new java.lang.InternalError();
- }
- }
-
- protected IFuture<V> setFailure(Throwable cause) {
- if (setFailure0(cause)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already: " + this);
- }
-
- private boolean setFailure0(Throwable cause) {
- if (isDone()) {
- return false;
- }
-
- synchronized (this) {
- if (isDone()) {
- return false;
- }
- result = new CauseHolder(cause);
- notifyAll();
- }
-
- return true;
- }
-
- protected IFuture<V> setSuccess(Object result) {
- if (setSuccess0(result)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already: " + this);
- }
-
- private boolean setSuccess0(Object result) {
- if (isDone()) {
- return false;
- }
-
- synchronized (this) {
- if (isDone()) {
- return false;
- }
- if (result == null) {
- this.result = SUCCESS_SIGNAL;
- } else {
- this.result = result;
- }
- notifyAll();
- }
- return true;
- }
-
- private void notifyListeners() {
- for (IFutureListener<V> l : listeners) {
- notifyListener(l);
- }
- }
-
- private void notifyListener(IFutureListener<V> l) {
- try {
- l.operationCompleted(this);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private static class SuccessSignal {
-
- }
-
- private static final class CauseHolder {
- final Throwable cause;
-
- CauseHolder(Throwable cause) {
- this.cause = cause;
- }
- }
- }
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
- package future.test;
-
- import future.IFuture;
- import future.IFutureListener;
-
-
-
-
-
-
- public class DelayAdder {
-
- public static void main(String[] args) {
- new DelayAdder().add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer>() {
-
- @Override
- public void operationCompleted(IFuture<Integer> future) throws Exception {
- System.out.println(future.getNow());
- }
-
- });
- }
-
-
-
-
-
-
-
- public DelayAdditionFuture add(long delay, int a, int b) {
- DelayAdditionFuture future = new DelayAdditionFuture();
- new Thread(new DelayAdditionTask(delay, a, b, future)).start();
- return future;
- }
-
- private class DelayAdditionTask implements Runnable {
-
- private long delay;
-
- private int a, b;
-
- private DelayAdditionFuture future;
-
- public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
- super();
- this.delay = delay;
- this.a = a;
- this.b = b;
- this.future = future;
- }
-
- @Override
- public void run() {
- try {
- Thread.sleep(delay);
- Integer i = a + b;
-
- future.setSuccess(i);
- } catch (InterruptedException e) {
-
- future.setFailure(e.getCause());
- }
- }
-
- }
- }
- package future.test;
-
- import future.AbstractFuture;
- import future.IFuture;
-
- public class DelayAdditionFuture extends AbstractFuture<Integer> {
-
- @Override
- public IFuture<Integer> setSuccess(Object result) {
- return super.setSuccess(result);
- }
-
- @Override
- public IFuture<Integer> setFailure(Throwable cause) {
- return super.setFailure(cause);
- }
-
- }
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
Java并发编程之异步Future机制的原理和实现
原文:https://www.cnblogs.com/cxhfuujust/p/12622039.html